文章目录
- MQ 概述
-
- MQ 简介
- MQ 用途
-
- 流量削峰填谷
- 业务异步解耦
- 收集大量数据
- RocketMQ
-
- 为什么选择 RocketMQ?
- 主要概念
-
- Producer
- Consumer
- Name Server
- Broker Server
- 消息(Message)
- 主题(Topic)
- 队列(Queue)
- 工作原理
- 特性
- 性能问题
- 集群搭建方案
MQ 概述
MQ 简介
MQ
(Message Queue)这意味着新闻队列,通常是指提供新闻队列服务的中间部分。简单地说,它是接收、存储和转发信息的东西。
MQ 用途
MQ 有很多应用场景,一般分为 3 类
流量削峰填谷
暂存系统超载请求 MQ 在中间,然后在系统负消耗。一方面,为了提高资源利用率,防止系统被过度要求压垮。
业务异步解耦
一般是 A 服务完成后,发出通知,其他服务可以订阅这些信息来处理自己的逻辑。A 不关心订阅者的处理逻辑,也不需要等待其他服务同步完成。另一方面,订阅者不需要改变 A 服务代码。
收集大量数据
一般用在大数据行业,例如物联网行业内,传感器每时每刻都在产生数据,一个是数据量大,一个是写入频率高。传统的数据库不适合直接处理这样的数据。而MQ这种数据存储的临时介质非常适合,这主要是 Kafka 的天下。
RocketMQ
为什么选择 RocketMQ?
MQ 功能基本相同,市场上没有多少选择。我选择了 RocketMQ主要考虑以下几点:
- 吞吐量高,自然支持集群部署
- 开发语言是Java,中文文档,学习方便
- 阿里巴巴出品,通过双11级的高并发考验,消息支持 0 丢失配置。
主要概念
RocketMQ主要分 4 部分,Producer,Consumer,Name Server,Broker Server
Producer
也就是说,生产者负责发送消息。 NameServer 里拉取目标 Broker 然后发送信息 Broker,支持同步发送、异步发送、顺序发送、单向发送
Consumer
也就是说,消费者负责从消费者的角度提取和消费消息 2 获取消息的方式:拉取消费(Pull) 促进消费(Push)。同时提供2 消费模式:集群消费和广播消费。
Name Server
与微服务中的注册中心类似的名称服务器持有 Broker 一些信息供生产者和消费者选择Broker。
Broker Server
代理服务器负责存储和转发消息。内部还存储消息的元数据,包括消费者群体、廉价消费进度、主题、队列消息等
消息(Message)
实际消息的载体,包含实际消息以及一些元数据 例如,Topic、Tag等
主题(Topic)
收集一类新闻,是的 RocketMQ 基本单位的订阅
队列(Queue)
Topic进一步的内部划分实际上是 consumerqueue,主题一般包括一个或多个队列,并区分阅读和写作的逻辑区别。信息索引存储在队列中,实际消费信息从这里提取。在集群模式的同时,一个队列只能由同一消费者组中的一个消费者消费。
工作原理
- 启动 Name Server,Name Server 监控指定端口等待 Broker Server、Producer、Consumer 连接
- 启动 Broker Server,Broker Server连接 Name Server,在Name Server维护自己 Topic、Queue 以及负载等信息
- 启动 Producer,连接 Name Server,创建或获取 Topic,并从 Name Server 中选出 Topic 对应的 Broker Server地址以及 Queue 信息,然后像 Boker Server 发送消息。
- Broker Server 收到消息后,会计信息 consumerqueue 中的偏移、tag hash等待其他数据组装成消息,然后写入 commitlog 同时,在相应的文件中写入索引 consumerqueue 的文件中。
- 启动 Consumer,连接 Name Server,通过 topic以及从Name Server 拿到真实的 Broker Server 地址以及 Queue 信息。Queue 也就是 consumerqueue,内部包含 Topic 消息在 commitlog 根据文件中的偏移
消费进度
(集群消费模式下的消费进度 broker 由广播模式提供 consumer 自己提供)拉下一条消息消费(实际上是缓存后分批或分批消费),消费完成后返回 ACK,成功或其他状态。
特性
- 订阅发布
- 本质上是拉
- 顺序消费
- 确保同一队列有序
- 消息过滤
- tag 过滤
- 新闻属性过滤,类似SQL语言
- 消息可靠性
- 在单点故障下,异步刷会丢失少量消息,同步刷盘不会丢失消息
- 当磁盘损坏时,通过磁盘Raid10 或者主始终保证消息的可靠性(异步刷盘仍会丢失少量消息)
- 同步复制或刷盘下可靠性高,不会丢失消息,但性能低,实际使用需要权衡
- 至少一次
- RocketMQ 保证消息至少发送一次,但消息可能会重复发送,在某些特殊情况下会有大量重复消息
- 消息处理需要支持力等等
- 回溯消费
- 消费消息不直接删除,所以RocketMQ在某一时间点时间点后的消息配置,时间精确到毫秒
- 事务消息
- 发送事务信息,执行本地事务,成功提交事务信息,失败回滚事务信息,事务信息提交给消费者,如果事务信息没有提交或回滚,将自动
回查
- 发送事务信息,执行本地事务,成功提交事务信息,失败回滚事务信息,事务信息提交给消费者,如果事务信息没有提交或回滚,将自动
- 定时消息
- 在指定的延迟水平下发送支出配置的延迟消息(默认为1)s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level),消费者只有在消息延迟指定时间后才能看到。
- 消息重试
- 在消息消费失败的情况下,我希望重试消息每次都会延迟一定的时间(默认情况下,重试级别的时间配置参考定期消息的延迟级别,删除前两个级别)
- 消息重投
- 同步发送:向他人发送 broker 重新发送消息
- 异步发送:只在同一个 broker 下不断尝试
- 单向消息 Oneway:只发,不重试
- 流量控制
- 生产者流控,拒绝 send,并且
不会重试
,通常是 broker 新闻写入能力达到瓶颈。- commitlog 被锁超过 osPageCacheBusyimeOutMills 时,默认 1000ms
- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控
- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
- 消费者流控,会降低消息拉取频率,通常是消费过程耗时太长
- 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
- 生产者流控,拒绝 send,并且
- 死信队列
- 消息不断重试,达到最大重试次数后仍然失败则会进去私信队列,通常意味着业务逻辑或者消息本身存在问题,这时候需要开发人员介入排查失败原因。
性能问题
- 低配服务器下也能有上万的吞吐量
- RocketMQ 储存介质是文件,依赖操作系统提供的
mmap零拷贝
技术,将对文件操作转换为对内存的操作,极大的提高文件读写效率。 - 储存文件以及索引文件写入几乎都是顺序写入,降低了寻址时间。
- 消费消息时经过索引后会产生大量随机读取 commitlog 文件的操作,使用固态能显著提高读取效率。
集群搭建方案
多主多从(普通集群),多 master broker 保证负载,多 slave broker 提供稳定性。 Dledger(高可用集群),Raft选主,主节点负责维护 commitlog 一致,但性能有所牺牲