资讯详情

RabbitMQ 补偿机制、消息幂等性解决方案

1. 场景

先看几个面试题:

  1. 如何保证信息的可靠性投递?即如何确定信息发送是否成功?
  2. 如果失败如何处理(补偿机制)?
  3. 如何保证消息不被重复消费?或者,消息消费时如何保证权力的等效性?

2.信息的可靠性投递

消息确认

主要包括新闻确认,因为在发送消息的过程中,我们无法确认消息是否可以通过,一旦消息丢失,我们就无法处理,所以我们需要确认消息,以避免消息丢失。

2.1 生产者确认

我们知道生产者和消费者是完全隔离的,不做任何配置,生产者不知道消息是否真的到达 RabbitMQ,也就是说,新闻发布操作不会向生产者返回任何新闻。

那么如何保证我们的消息发布呢??有以下常用机制。

因为之前的文章已经介绍了上面,这里就不一一介绍了,一般采用的方法是

原理:生产者将信道设置为 confirm 一旦信道进入模式 confirm 该信道上发布的所有消息都将被指定为唯一的模式 ID(从 1 从这个开始) id 在生产者和 RabbitMQ 之间确认消息。

这里的能够识别唯一的信息,触发回调时获得值,处理相应的错误,建立相应的信息补偿机制。(记住这是唯一的 ID,而且是全局唯一的,雪花算法可以用于分布式系统。

confirm 该模型最大的优点是它可以异步。一旦发布消息,生产者应用程序可以在等待信道返回确认时继续发送下一条消息。当消息最终确认时,生产者应用程序可以通过回调处理确认消息 RabbitMQ 因为内部错误导致消息丢失,会发送一个 nack 消息,生产者应用程序也可以在回调方法中处理 nack 消息决定下一步处理。

注:这里描述的场景都是这样的也就是说,不考虑失败通知(ReturnCallback)的情况。

因为现在每个人的发展基本上都是通过 Spring Boot 的方式进行开发,所以,这里直接提供其基本配置类参考,如下:

/**  * @description : 消息生产者  */ @Component @Slf4j public class RabbitmqProducer {       @Autowired     private RabbitTemplate rabbitTemplate;       public void sendMessage(Map<String, Object> headers, Object message, String messageId, String exchangeName, String key) {         // 定制消息头         MessageHeaders messageHeaders = new MessageHeaders(headers);         // 创建消息         Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);         /* 确认的回调 确认消息是否到达 Broker 服务器 事实上,它是否到达交换器          * 发送时指定的交换器不存在 ack 就是 false 代表消息无法到达          */         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {             log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);             if (!ack) {                 System.out.println("相应的消息补偿机制");             }         });         // 在实际中ID 应该是全局唯一 唯一能识别消息的 当消息无法到达时,触发ConfirmCallback回调方法可获得该值,处理相应的错误         CorrelationData correlationData = new CorrelationData(messageId);         rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData);     } }

复制代码

2.2 消费者确认

说到生产者如何保证消息的确认,消费者也需要确认。

前面还说,目前的编码是基于 Spring 是的,对于消费者来说,也有接收消息的配置类,如下:

/**  * @description : 消息消费者  */ @Component @Slf4j public class RabbitmqConsumer {       @RabbitListener(bindings = @QueueBinding(             value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),             exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),             key = RabbitInfo.ROUTING_KEY)     )     @RabbitHandler     public void onMessage(Message message, Channel channel) throws Exception {         MessageHeaders headers = message.getHeaders();         // 获取消息头信息和消息体         log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload());     } }

复制代码

对于上面接收消息的配置并没有做任何配置,当我们发送消息的时候,消费者接收消息并进行对应的逻辑处理,并且,但也有配置,如下:

spring:   rabbitmq:     addresses: 127.0.0.1:5672     # RabbitMQ 默认用户名和密码都是guest 而虚拟主机的名称是 "/"     # 若配置其它虚拟主机地址,需要提前创建控制台或图形界面 图形界面地址 http://主机地址:15672     username: admin     password: admin     virtual-host: /     listener:       simple:         # 为确保信息能够正确消费,建议将签收模式设置为手工签收,并在代码中手工签收         acknowledge-mode: manual         # 侦听器调用线程的最小数量         concurrency: 10         # 侦听器调用最大数量的线程         max-concurrency: 50  

复制代码

也就是上面的acknowledge-mode,他有三个值,如下:

  1. 当为NONE当时,即默认值,即autoAck=true,此时,消费者在收到消息后会自动确认,MQ 队列中的消息将被删除;
  2. 当为MANUAL我们需要手动确认,即channel.basicAck,如下:
  3. 当为AUTO经测试发现和发现时NONE没什么不同 。

这时,有人会说,如果我们的业务代码在默认情况下抛出异常(自动确认)怎么办?

Spring 做法是抛出异常,消息不会被抛出 ack,如下:

1/0此时将始终打印日志,如下:

再去看 MQ 状态如下:客户端:

如何理解这个时候可能会导致重复消费?

如果我的业务代码没有事务,或者在种方法在参数传输过程中没有事务控制,当异常业务代码存储之前,那么消息实际上没有确认,仍在队列中,因此,当下一个程序启动,将再次消费消息,尽管业务代码异常。

自动确认将在消息发送给消费者后立即确认,但可能会丢失消息。如果消费者端业务代码出现异常,即消费者端未能成功处理消息,则

如果消息已经被处理,但后续代码抛出异常,使用 Spring 如果管理,消费端业务代码进行回滚,这也同样造成了实际意义的消息丢失。

3. 补偿机制

在回到前面的问题,如何确定消息是否发送成功?  确实能帮我们解决这个问题,但如果生产者就是接收不到 ack 这个指令怎么办,比如消费者处理时间太长或者网络超时,等等情况,导致生产者一直接收不到这个 ack ,此时怎么办?

生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的补偿机制:通过消息落库 + 定时任务来实现。

怎么做?这里讲讲思路,如下:

  1. 发送消息之前,先把消息入库,我这里的表设计如下:
   CREATE TABLE `t_cap_published_message` (
     `id` varchar(40) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '标识。',
     `version` varchar(20) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '版本',
     `exchange` varchar(200) COLLATE utf8mb4_bin DEFAULT '' COMMENT '交换机。',
     `topic` varchar(200) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '话题。',
     `content` longtext COLLATE utf8mb4_bin NOT NULL COMMENT '消息内容。',
     `retries` int(11) NOT NULL COMMENT '重试次数,一般为 3 次。',
     `expiry` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '过期时间。',
     `status` varchar(40) COLLATE utf8mb4_bin NOT NULL COMMENT '状态,成功则消息ack成功,其他状态都要重试。',
     `created_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间。',
     `last_modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间,可以用作数据版本。',
     PRIMARY KEY (`id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='发布的消息。';

复制代码

  1. 入库之后在发送消息;
  2. 如果在规定时间不能 ack 或者 ack=false ,即 confirmCallback 回调的 ack=false ,则按照定时规则重新发送消息;
  3. 然后对于发布成功的消息,如果业务操作完成,实际上它的作用已经发挥完成,一段时间对数据库做清理即可,根据业务的具体情况。

4. 消息幂等性

首先我们要知道什么是幂等性,比如一个转账系统,A 要转给 B 100 元,当 A 发出消息后,B 接收成功,然后给 MQ 确认的时候出现网络波动,MQ 并没有接收到 ack 确认,那 MQ 为了保证消息被消费,就会继续给消费者投递之前的消息,如果再重复投递 5 次,则 B 在处理 5 次,加上之前的一次,B 的余额增加了 600 元,很明显是不合理的。

所以幂等性简单来说就是: 重复调用多次产生的业务结果与调用一次产生的业务结果相同 ;

为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端是没有这种控制的,因为它不知道你是不是就要把一条消息发送两次,所以只能在消费端控制。

回到前面生产者确认模式中讲到了一个  ,我们可以通过他来保证消息的幂等性,如下:

  1. 消费者获取到消息后先根据这个全局  去查询 redis/db 是否存在该消息;
  2. 如果不存在,则正常消费,消费完毕后写入 redis/db;
  3. 如果存在,则证明消息被消费过,直接丢弃,不做处理。

原文 https://xie.infoq.cn/article/abe7691508f57c91906d9a160

标签: a160贴片三极管

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台