资讯详情

RabbitMQ实现延迟队列的方式

1.背景

最近,做类似拍卖系统的上架功能。卖方将货物上架后,如到期时间为24小时或48小时。如果没有投标人或买方,货物将自动下架到用户的电子邮件中。例如,如果电子商务用户下订单,30分钟内未付款,订单将自动取消库存. 要实现这些类似的业务场景,你想到的最简单的方法就像我想的那样.看看它是否与你想象的基本一致.

1.1 MySQL 轮询

写入数据MySQL然后程序轮询,比如1分钟或者几分钟轮询一次,看架单是否到期,到期后下架。.

简单粗暴,在业务中可以重用MySQL,不需要额外维护其他中间件。 数据量小的时候没问题。 ,数据量大后,第一个查询慢,第二个对数据库有不必要的压力.

无脑轮询,浪费资源,即时性差.

1.2 Redis 轮询实现

基于Redis 轮询的实现显然比较MySQL显得聪明。因为操作Redis比操作速度快MySQL速度快很多.架单到期message写到zset,利用可排序的特点,将剩余的超时时间排在第一位。如果数据达到超时条件,则执行业务逻辑.

简单粗暴,在业务中可以重用Redis其他中间件不需要额外维护。速度相对MySQL快.

1.由于存储成本高,存储成本高Redis所有的数据都在内存中,内存相对MySQL占用磁盘的成本很高.数据量太大会对Redis内存要求高.

2.数据消费zset若中断,则消息丢失,部分数据无法完全处理。zset没有像MQ的ack机制.

1.3 Redis 订阅Key过期事件

因为Redis可以key设置过期时间Redis自动删除key.同时Redis支持订阅key的过期事件,拿到事件则再操作业务。

以上优缺点Redis 实现轮询基本相同。这个事件没有通知机制。ack,如果通知到了,但是业务服务挂了,下次继续监控就不到消息。

那么这些日常生活中能想到的方法都想到了。实现这样的业务场景还有哪些可靠的方法呢?在保证即时性能的同时,我们也希望保证容错性,不希望因异常情况而发送或尽量避免数据丢失。

最后,我用的是基础RabbitMQ实现这个业务场景. 因为MQ第一个有Ack机制,确保消息在异常情况下可以重新消费, 第二就是MQ削峰能力强.

2.RabbitMQ队列设置TTL 死信队列

在RabbitMQ我们可以给某个queue绑定一个exchange来作为这个queue死信队列。向死信队列发送触发消息有几种情况:

1.消息被reject或者nack且没有设置requeue重入队列

2.消息TTL或者队列设置TTL,时间过期

3.超过队列长度的消息被丢弃

我们可以看到第二项,为队列设置TTL或者设置消息TTL.一旦过期,将交付到死信队列。只要我们消耗死信队列,我们就可以完成延迟队列业务的需求.

3.RabbitMQ消息设置TTL 死信队列

以上设置队列TTL类似地,我们可以单独设置消息TTL.有时候,我们不想设置相同的队列TTL,这个时候我们可以这样做.

如果设置了队列和消息TTL,所以两者最小TTL过期时间为准. 这两种情况都遵循队列【先进先出,后进后出】的原则.

这样会带来一个问题,那就是过期消息【队头堵塞】.以栗子为例。你认为消费顺序如何?

想象输出结果: 1s、50s、100s

实际输出结果: 100s、50s、1s

这不对!!!如何与我想象中的延迟队列不同,显然1s的比100s和50s到期快,但最后一个被消费了。那么1s数据会延迟很长时间. 这显然不符合我们的业务场景。谁先过期,谁就消费。这是我们的初衷。因为过期时间没有必要有强制顺序。

原理是: 消息队列不会对所有消息做定时器,或者起一个线程/进程查看,哪个消息过期了没.这样对MQ资源消耗太大,撑不住。如何实现过期原则? 数据出队时,做出判断。如果过期,将被丢弃到死新队列,否则队头将被堵塞等待。即使你背后有一个s其实消息已经过期了,但是队长100s消息还没有过期,那么接下来的1s你得等消息.

那么如何理解延迟队列,谁先过期消费谁呢? RabbitMQ到底能不能实现这种功能。答案是肯定的。 RabbitMQ官方维护了一个插件:rabbitmq-delayed-message-exchange

Github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

4.RabbitMQ的Delay插件

RabbitMQ打开这个插件可以定义一个Delay的Exchange交换机,用户发送的信息message,将数据发送到上的插件将延迟数据message投递到Queue中。只要有message超时将被送到Queue。最后,用户可以从队列中消费消息,无需使用TTl 延迟队列实现了死信队列的方式。

1.定义一个Exchange, type类型为=>x-delayed-message .且Header头部加标签=>x-delayed-type(direct)

2.发送消息时,在消息header中传递TTL过期时间: x-delay => 60000 (ms,单位 毫秒)

此时消费的顺序是: 1s、50s、100s

 

1.该插件虽然能够实现延迟队列的效果,但是官方说明了该插件的局限性。 就是队列的数据量不能太大, 例如堆积了几十W或者上百万的数据。这个插件存在局限性,用在生产环境需要谨慎判断自己消息的堆积数据量. 

2.可以使用Promethues监控当前堆积的消息数量(消息还未超时,没投递到Queue的消息的数量)   指标名称:

erlang_mnesia_tablewise_size{table="rabbit_delayed_messagerabbit@rabbitmq-server"}

 3.如果数据量比较大,那么可以考虑使用RocketMQ原生支持的延迟队列. 

标签: ld50s激光传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

 锐单商城 - 一站式电子元器件采购平台  

 深圳锐单电子有限公司