资讯详情

浅谈消息队列

浅谈新闻队列

1、为什么使用消息队列、优缺点和常用消息队列比较

这里抛出三个问题

  • 为何使用消息队列?
  • 消息队列的优缺点是什么?
  • Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么区别,适合什么场景?

剖析

1. 为什么使用消息队列?

消息队列的使用场景是什么,项目中的具体场景是什么,这个场景中的消息队列是什么?

先说消息队列中常见的使用场景。其实场景很多,但核心有 3 个:

(1)解耦

看看这样的场景。A 向系统发送数据 BCD 通过接口调用发送三个系统。 E 如果系统也想要这个数据呢? C 系统现在不需要了呢?A 系统负责人几乎崩溃了…

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-aVhB8TID-1657002585874)(http://ww1.sinaimg.cn/large/a5fa4a8dgy1gdl8sp93ouj20ql0dswgf.jpg)]

在这个场景中,A 系统与各种其他乱七八糟的系统严重耦合,A 系统生成关键数据,许多系统需要 A 该系统发送数据。A 系统要时刻考虑 BCDE 四个系统挂了怎么办?要不要重发,要不要留消息?头发都白了!

如果使用 MQ,A 系统生成数据并发送到 MQ 在里面,哪个系统需要自己数据 MQ 里面消费。如果新系统需要数据,直接从 MQ 在里面消费;如果系统不需要此数据,则取消正确的数据 MQ 消费新闻。这样下来,A 系统根本不需要考虑向谁发送数据,不需要维护代码,也不需要考虑人们是否呼叫成功、失败和加班。

img

总结:

通过一个 MQ,Pub/Sub 发布订阅消息这样的模型,A 该系统与其他系统完全解耦。

  • 传统模式的缺点:系统之间的耦合太强,如上图所示,系统A直接在代码中调用系统B和系统C的代码,如果未来D系统接入,系统A也需要修改代码,太麻烦了!
  • 中间件模式:将消息写入消息队列,需要消息的系统从消息队列中订阅,使系统A不需要任何修改。
(2)异步

再看一个场景,A 系统需要在个要求,需要在自己的本地写库,也需要在 BCD 三个系统写库,本地写库 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 300 450 200 = 953ms,接近 1s,用户觉得自己做了什么,慢死。用户通过浏览器发起请求,等待 1s,这几乎是不可接受的。

一般互联网企业,对于用户的直接操作,一般要求是每个要求都必须在 200 ms 对用户几乎没有感知。

如果使用 MQ,那么 A 连续发送系统 3 条消息到 MQ 假如在队列中耗时 5ms,A 从接受请求到返回响应用户,系统的总时间是 3 5 = 8ms,对实对于用户来说,感觉就是点按钮,8ms 以后直接回来,爽!网站做得真好,真快!

总结:

  • 传统模式的缺点:一些不必要的业务逻辑以同步的方式运行,过于耗时。
  • 中间件模式:将消息写入消息队列,异步操作不必要的业务逻辑,以加快响应速度
(3)削峰

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量 50 一个。结果每次都到 12:00 ~ 13:00 ,每秒并发请求的数量突然增加 5k 但是系统是直接基于的。 MySQL的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。

一般的 MySQL,扛到每秒 2k 如果每秒请求到达,请求几乎是一样的。 5k 可以直接把 MySQL 被杀,导致系统崩溃,用户无法再使用系统。

但是高峰期一过,下午就成了低峰期,可能是 1w 用户同时在网站上操作,每秒请求的数量可能是 50 对整个系统几乎没有压力。

如果使用 MQ,每秒 5k 个请求写入 MQ,A 每秒最多处理系统 2k 因为 MySQL 每秒最多处理 2k 个。A 系统从 MQ 慢慢拉请求,每秒拉请求 2k 不要超过你每秒能处理的最大请求数 ok,这样下来,即使峰期,A 系统永远不会挂断。而 MQ 每秒钟 5k 请求进来,就 2k 一个请求出去,导致中午高峰(1) 几十万甚至几百万的请求可能在一个小时内积压 MQ 中。

这短暂的高峰积压是 ok 是的,因为高峰期过后,每秒钟 50 个请求进 MQ,但是 A 系统仍将遵循每秒 2k 处理一个请求的速度。因此,只要高峰期过去,A 该系统将迅速解决积压消息。

总结:

  • 传统模式的缺点:并发量大时,所有请求都直接与数据库联系,导致数据库连接异常
  • 中间件模式:系统A根据数据库可以处理的并发量慢慢从消息队列中提取消息。在生产过程中,允许在这个短高峰期积压。

2. 消息队列的优缺点是什么?

上面已经说过优点,就是在特殊场景下有相应的好处,

缺点如下:

  • 系统引入的外部依赖越多,就越容易挂断。你就是这样 A 系统调用 BCD 三个系统的界面很好,人 ABCD 四个系统很好,没问题,你只是加了一个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整个系统崩溃了,你不就完了吗?如何保证消息队列的高可用性,可以点击此处查看。
  • 硬生生加个 MQ 进来,如何保证消息不重复消费?如何处理消息丢失?如何保证消息传递的顺序?头大头大,问题多,疼。
  • A 系统处理后直接返回成功,人们认为你的请求成功;但问题是,如果是,如果 BCD 三个系统,BD 两个系统成功写库,结果 C 系统写库失败了,怎么整理?你的数据不一致。

所以新闻队列实际上是一个非常复杂的架构,你介绍它有很多好处,但也必须做各种额外的技术解决方案和架构来避免它的缺点,完成后,你会发现,妈妈,系统的复杂性提高了数量级,可能很复杂 10 倍。但在关键时刻,还是要用的。

3. Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么区别,适合什么场景?

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-mvy3KWbr-1657002585878)(http://ww1.sinaimg.cn/large/a5fa4a8dgy1gdl8pf8do0j20sn0ogwfp.jpg)]

综上所述,经过各种比较,我个人倾向于是:

引入一般的业务系统MQ,大家最早都用ActiveMQ,但是现在大家用的不多了,社区也不是很活跃,没有经过大规模吞吐量场景的验证,大家就算了。我个人不推荐这个;

后来大家都开始用了RabbitMQ,但是确实erlang语言阻止了大量的java对于公司来说,工程师进行深入的研究和控制几乎是不可控的,但人是开源的,支持稳定,活动性高;

然而,越来越多的公司会使用它们RocketMQ,这真的很好,但我提醒自己,如果社区突然变黄,我对我公司的技术实力绝对有信心,我建议使用它RocketMQ,否则,回去诚实实用RabbitMQ嗯,人是活跃的开源社区,永远不会变黄

因此,中小企业技术实力一般,技术挑战不是特别高RabbitMQ这是一个不错的选择;大公司,基础设施研发实力强,使用RocketMQ这是个不错的选择

实时计算、日志采集等场景在大数据领域使用Kafka是行业标准,绝对没问题,社区活动度很高,绝对不会变黄,更何况几乎是世界领域的事实规范

二、如何保证消息队列的高可用性?

剖析

MQ如何保证高可用性?

1. RabbitMQ高可用

RabbitMQ它更具代表性,因为它是基于主从的高可用性,我们以他为例来解释第一个MQ如何实现高可用性?

(1)基本架构

  • :信息队列服务流程包括两部分:Exchange和Queue
  • :消息队列交换机,按照一定的规则转发到某个队列,对消息进行过滤
  • :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方
  • :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
  • :消息消费者,即消费方客户端,接收MQ转发的消息
(2)高可用架构

rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式

① 单机模式

就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式

② 普通集群模式

就是在多个联通的服务器上安装不同的RabbitMQ的服务,这些服务器上的RabbitMQ服务组成一个个节点,通过RabbitMQ内部提供的命令或者配置来构建集群,形成了RabbitMQ的普通集群模式

  • 当用户向服务注册一个队列,该队列会随机保存到某一个服务节点上,然后将对应的元数据同步到各个不同的服务节点上

  • RabbitMQ的普通集群模式中,每个RabbitMQ都保存有相同的元数据

  • 用户只需要链接到任一一个服务节点中,就可以监听消费到对应队列上的消息数据

  • 但是RabbitMQ的实际数据却不是保存在每个RabbitMQ的服务节点中,这就意味着用户可能联系的是RabbitMQ服务节点C,但是C上并没有对应的实际数据,也就是说RabbitMQ服务节点C,并不能提供消息供用户来消费,那么RabbitMQ的普通集群模式如何解决这个问题呢?

  • RabbitMQ服务节点C发现自己本服务节点并没有对应的实际数据后,因为每个服务节点上都会保存相同的元数据,所以服务节点C会根据元数据,向服务节点B(该服务节点上有实际数据可供消费)请求实际数据,然后提供给用户进行消费

  • 这样给用户的感觉就是,在RabbitMQ的普通集群模式中,用户连接任一服务节点都可以消费到消息

提高消费的吞吐量

  • 为了请求RabbitMQ的实际数据以提供给用户,可能会在RabbitMQ内部服务节点之间进行频繁的进行数据交互,这样的交互比较耗费资源

  • 当其中一个RabbitMQ的服务节点宕机了,那么该节点上的实际数据就会丢失,用户再次请求时,就会请求不到数据,系统的功能就会出现异常

③ 镜像集群服务

镜像集群模式和普通集群模式大体是一样的,不一样的是:

  • 生产者向任一服务节点注册队列,该队列相关信息会同步到其他节点上
  • 任一消费者向任一节点请求消费,可以直接获取到消费的消息,因为每个节点上都有相同的实际数据
  • 任一节点宕机,不影响消息在其他节点上进行消费

镜像集群模式是怎么开启的呢?这里简单说下,在普通集群模式的基础上,我们可以通过web控制端来配置数据的同步策略,可以配置同步所有的节点,也可以配置同步到指定数量的服务节点

虽然镜像集群模式能够解决普通集群模式的缺点,当任一节点宕机了,不能正常提供服务了,也不影响该消息的正常消费,但是其本身也有相应的缺点:

  1. 性能开销非常大,因为要同步消息到对应的节点,这个会造成网络之间的数据量的频繁交互,对于网络带宽的消耗和压力都是比较重的
  2. 没有扩展可言,rabbitMQ是集群,不是分布式的,所以当某个Queue负载过重,我们并不能通过新增节点来缓解压力,因为所以节点上的数据都是相同的,这样就没办法进行扩展了

对于镜像集群而言,当某个queue负载过重,可能会导致集群雪崩,那么如何来减少集群雪崩呢?我们可以通过HA的同步策略来实现

HA的同步策略如下:

HA-mode HA-params 说明
all 镜像队列将会在整个集群中复制。当一个新的节点加入后,也会在这个节点上复制一份。
exactly count 镜像队列将会在集群上复制count份。如果集群数量少于count时候,队列会复制到所有节点上。 如果大于Count集群,有一个节点crash后,新进入节点也不会做新的镜像。(可以阻止集群雪崩)
nodes node name 镜像队列会在node name中复制。如果这个名称不是集群中的一个,这不会触发错误。 如果在这个node list中没有一个节点在线,那么这个queue会被声明在client连接的节点。

2. Kafka的高可用

(1)基础架构

  • : 消息生产者,发布消息到 kafka 集群的终端或服务。
  • : kafka 集群中包含的服务器。
  • : 每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
  • : partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
  • : 从 kafka 集群中消费消息的终端或服务。
  • : high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  • : partition 的副本,保障 partition 的高可用。
  • : replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
  • : replica 中的一个角色,从 leader 中复制数据。
  • : kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
  • : kafka 通过 zookeeper 来存储集群的 meta 信息。

Kafka由多个 broker 组成,每个 broker 是一个节点;每创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。

这就是,就是说一个 topic 的数据,是

实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

(2)高可用架构

Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。

比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。为什么只能读写 leader?很简单,,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

这么搞,就有所谓的了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。

的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

3. RocketMQ高可用

(1)基础架构

主要组成结构:

  • ​ 消息发送者

    ​ Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • ​ 消息消费者

    ​ Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

  • ​ 主要负责消息的存储、投递和查询以及服务高可用保证。

    ​ Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对 应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

  • ​ 管理Broker和路由信息

    ​ NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

补充说明:

  • ​ 区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息

  • 消息标签。RocketMQ支持给在发送的时候给topic打tag,因此消费时可根据不同tag进行不同逻辑处理。

  • :相当于是Topic的分区;用于并行发送和接收消息

  • : 用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以 是一台机器的多个迕程,或者一个迕程的多个 Producer 对象。一个 Producer Group 可以发送多个 Topic 消息,Producer Group 作用如下:

    1. 标识一类 Producer
    2. 可以通过运维工具查询某个发送消息应用下的多个 Producer 实例
    3. 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态
  • : 用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可 以是多个进程,或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊方式消费消息,如果设置为广播方式,那举返个 Consumer Group 下的每个实例都消费全量数据。

(2)高可用架构
① 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

② 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
③ 多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
④ 多Master多Slave模式(同步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

三、如何保证消息不被重复消费(如何保证消息消费时的幂等性)

分析

既然是消费消息,那肯定要考虑考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是MQ领域的基本问题,其实本质上还是使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。

剖析

  • 首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是你自己保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

    kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

    但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

  • 其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

    举个例子,假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

    一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

    幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

  • 那所以第二个问题来了,怎么保证消息队列消费的幂等性?

    其实还是得结合业务来思考,我这里给几个思路:

    (1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧

    (2)比如你是业务是写入redis,那没问题了,反正每次都是set,天然幂等性。

    (3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

    (3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

    (4)使用消息记录表,每次消费先查询消息记录表中是否有该消息的消费记录(可根据几个唯一字段判断),根据消息记录表来对重复消费做处理,非重复消费就将其记录到消息记录表中。

    (5)还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

如何保证MQ的消费是幂等性的,需要结合具体的业务来看

四、如何保证消息的可靠性传输(如何处理消息丢失的问题)

分析

用mq有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是刚才说的重复消费和幂等性问题。不能少,就是说这数据别搞丢了。那这个问题你必须得考虑一下。

如果说你这个是用mq来传递非常核心的消息,比如说计费,扣费的一些消息,因为我以前设计和研发过一个公司非常核心的广告平台,计费系统,计费系统是很重的一个业务,操作是很耗时的。所以说广告系统整体的架构里面,实际上是将计费做成异步化的,然后中间就是加了一个MQ。

我们当时为了确保说这个MQ传递过程中绝对不会把计费消息给弄丢,花了很多的精力。广告主投放了一个广告,明明说好了,用户点击一次扣费1块钱。结果要是用户动不动点击了一次,扣费的时候搞的消息丢了,我们公司就会不断的少几块钱,几块钱,积少成多,这个就对公司是一个很大的损失。

剖析

为什么消息会丢失

消息从生产到消费可以经历三个阶段:生产阶段、存储阶段和消费阶段。

  • 生产阶段:在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端。
  • 存储阶段: 消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段:Consumer从Broker上拉取消息,经过网络 传输发送在Consumer上。

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

保证消息的可靠性传输

  • 生产阶段:消息队列通常使用确认机制,来保证消息可靠传递:当你代码调用发送消息的方法,消息队列的客户端会把消息发送到Broker,Broker接受到消息会返回客户端一个确认。只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送的确认响应后,会自动重试,如果重试再失败,就会一返回值或者异常方式返回给客户端。所以在编写发送消息的代码,需要正确处理消息发送返回值或者异常,保证这个阶段消息不丢失。

  • 存储阶段:如果对消息可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。对于单个节点Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘再给Producer返回确认响应。如果是Broker集群,需要将Broker集群配置成:至少两个以上节点收到消息,再给客户端发送确认响应。

  • 消费阶段:消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递。Consumer收到消息后,需在执行消费逻辑后在发送确认消息。

  • 生产阶段,需要捕获消息发送错误,并重发消息
  • 存储阶段,通过配置刷盘和复制参数,让消息写入多个副本的磁盘上,来确保消息不会因为某个Broker宕机或者磁盘损坏而丢失。
  • 消费阶段:需要在处理完全部消费业务逻辑后,再发送确认消息。
RabbitMQ保证消息可靠性传输
生产阶段

生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。

解决方法:

存储阶段

如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。

解决方法:

设置消息持久化到磁盘。设置持久化有两个步骤:

  • 创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。

    /** * 声明队列 * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) * queue: 队列名称 * durable: 是否持久化,true的话,重启服务后该队列依旧存在 * exclusive: 队列是否独占此连接 * autoDelete: 队列不再使用时是否自动删除此队列 * argum

标签: 6j20高温电阻合金丝材

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台