资讯详情

RabbitMQ从入门到实践

RabbitMQ

中间件介绍

中间件是什么?

Middleware

中间件是什么?

自20世纪80年代以来,中国企业逐渐进行信息建设。由于方法和系统的不成熟和企业业务市场需求的不断变化,企业可以同时运行多个不同的业务系统,可以基于不同的操作系统、不同的数据库和异构的网络环境。现在的问题是,如何将这些信息系统结合成一个有机的协作整体,真正实现企业跨平台、分布式应用。中间件是解决办法,它用自己的复杂性换取了企业应用的简单性。

中间件(Middleware)它是操作系统和应用程序之间的软件,也有人认为它应该是操作系统的一部分。当人们使用中间件时,通常是一组中间件集成在一起,形成一个平台(包括开发平台和操作平台),但必须有一个通信中间件,即中间件 平台 通信,这个定义也限制了中间件只能在分布式系统中使用,也可以与支持软件和使用软件区分开来

为什么需要使用消息中间部件?

具体来说,中间部分屏蔽了底部操作系统的复杂性,使程序开发人员面对简单统一的开发环境,降低程序设计的复杂性,专注于自己的业务,不再需要重复不同系统软件上的程序移植,大大减轻了技术负担,中间部分给应用系统,不仅简单的开发,缩短了开发周期,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

中间件特点

为了解决分布异构问题,人们提出了中间件(middleware)概念。平台(硬件和操作系统)与应用程序之间的通用服务,如下图所示,具有标准的程序接口和协议。对于不同的操作系统和硬件平台,它们可以实现各种符合接口的协议规范。

中间件也很难严格定义,但中间件应具有以下特点:

  1. 满足大量应用的需要
  2. 在各种硬件和 OS平台
  3. 支持跨网络、硬件和分布计算 OS平台的透明应用或服务交互
  4. 支持标准协议
  5. 支持标准接口

由于标准接口对可移植性和标准协议对可操作性的重要性,中间件已成为许多标准化工作的主要组成部分。对于应用软件开发,中间件远比操作系统和网络服务更重要,中间件提供的程序界面定义了一个相对稳定的高层应用环境,无论如何更新底层计算机硬件和系统软件,只要中间件升级,保持中间件外部界面定义,应用软件几乎不需要修改,从而保护企业在应用软件开发和维护方面的重大投资。

简单来说,中间件有一个很大的特点,就是一个脱离具体设计目标,具有普遍独立功能需求的模块。这使得中间件必须可替代。如果中间件在系统设计中是不可替代的,而不是架构和框架设计的问题,那么这个中间件可能是别处的中间件,也可能是这个系统的引擎。

中间件技术何时在项目中使用?

在项目结构和重建中,我们需要仔细考虑和思考使用任何技术和结构的变化,因为任何技术的整合和变化都可能是人员、技术和成本的增加。一些互联网公司或项目些互联网公司或项目。如果你只是一家初创公司,建议使用单体结构,最多添加一个缓存中间部件,不要盲目追求新的或所谓的高性能,追求的背后必须是业务和项目的驱动,因为一旦追求意味着你的学习成本,公司的人员结构和服务器成本,维护和运行成本将会增加,所以你需要仔细选择和考虑。

但作为一个开放的人员,必须有能力学习中间技术和思考,否则很容易当项目发展到一个阶段掌握估计或面试,会给自己带来很多麻烦,在这个时代这些技术并不新鲜,如果掌握和挖掘最关键或花时间和经验讨论和研究。

概述中间件技术和架构

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-MJu4Ayhh-1658582837876)(C:\Users\yumo\AppData\Roaming\Typora\typora-user-images\image-20220707123338487.png)]

单体架构

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-UpMpHMKA-1658582837877)(https://cdn.jsdelivr.net/gh/oddfar/static/img/RabbitMQ/image-20210625223559218.png)]

在企业发展中,大多数初始结构采用单体结构模式,该结构的典型特点是将所有业务和模块、源代码、静态资源文件放在项目中,如果模块升级或迭代发生小变化,将重新编译和重新部署项目。问题是:

  1. 耦合度太高
  2. 不易维护
  3. 服务器成本高
  4. 升级架构的复杂性也会增加

这样就有了后续的分布式架构系统。如下所示

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-eLKLfAMO-1658582837877)(https://cdn.jsdelivr.net/gh/oddfar/static/img/RabbitMQ/image-20210625223659487.png)]

分布式系统:一般来说,它是由多个服务(服务或系统)协同处理的请求

与单体结构不同,单体结构是一个请求 jvm调度线程(确切的 tomcat线程)分配线程 Thread处理请求直至释放,而分布式系统是:一个请求由多个系统共同完成,jvm环境可能是独立的。如果生活中的隐喻,单一的结构就像建造一个小房子可以很快完成,如果你想建造一个鸟巢或一个大建筑,你必须协调和分布每个链接,这样目的也是部署和思考项目发展的问题。不难看出,分布式架构系统的特点和问题如下:

  1. 学习成本高,技术栈太多
  2. 运维成本和服务器成本增加
  3. 人员成本也会增加
  4. 项目负荷也会上升
  5. 错误和容错性也会成倍增加
  6. 选择占用的服务器端口和通信成本高
  7. 可以选择安全考虑和因素强迫 RMI/MQ相关服务器端通信

  1. 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少

    确切地说,服务资源的合理分配不会造成服务器资源的浪费

  2. 独立维护和部署系统,降低耦合,可插拔

  3. 系统架构和技术栈的选择可以变得灵活(而不是简单的选择) java)

  4. 弹性部署不会导致平台因部署而瘫痪和停止服务

基于消息中间件的分布式系统架构

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-iw5p2vvz-1658582837878)(https://cdn.jsdelivr.net/gh/oddfar/static/img/RabbitMQ/image-20210625224024821.png)]

从上图可以看出,消息中间件是

  1. 利用可靠的信息传输机制直接通信系统和系统
  2. 在分布式系统环境下,通过提供消息传递和消息的派对机制,可以扩展通信过程

新闻中间件应用场景

  1. 跨系统数据传输
  2. 高并发流量削峰
  3. 并发和异步处理数据
  4. 大数据分析与传递
  5. 分布式事务

例如,当您有一个数据需要迁移或请求并发送过多时,例如,您有10 W 我们可以在这些订单存储之前,将订单请求积累到消息队列中,使其稳定可靠地存储和执行

新闻队列介绍

MQ 的相关概念

什么是MQ

MQ(message queue),从字面上看,本质是一个队列,FIFO 先入先出,但存储在队列中的内容是 message 它仍然是一种跨过程通信机制,用于上下游传输信息。在互联网架构中,MQ 逻辑解耦是上下游非常常见的一种 物理解耦新闻通信服务。使用了 MQ 之后消息发送上游只需要依靠 MQ,不 依赖其他服务。

我们可以将消息队列视为存储消息的容器。当我们需要使用消息时,我们可以直接从容器中取出消息供自己使用。

Message queue

消息队列是分布式系统的重要组成部分一。使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。

我们知道队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

为什么要用MQ

举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:

使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。

生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。

从上图可以看到,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。

消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

另外,为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。

将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完。

以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅。

使用消息总线,可以很方便解决这个问题, A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用 做这些操作。A 服务还能及时的得到异步处理成功的消息。

使用MQ带来的一些问题

  • 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了
  • 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
  • 上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了

JMS VS AMQP

JMS
JMS 简介

JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

JMS 两种消息模型

使用**队列(Queue)*作为消息通信载体;满足*生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

发布订阅模型(Pub/Sub) 使用**主题(Topic)*作为消息通信载体,类似于*广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,

JMS 五种不同的消息正文格式

JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage – Java 原始值的数据流
  • MapMessage–一套名称-值对
  • TextMessage–一个字符串对象
  • ObjectMessage–一个序列化的 Java 对象
  • BytesMessage–一个字节的数据流
AMQP

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 (二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

JMS VS AMQP
对比方向 JMS AMQP
定义 Java API 协议
跨语言
跨平台
支持消息类型 提供两种消息模型:①Peer-2-Peer;②Pub/sub 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分;
支持消息类型 支持多种消息类型 ,我们在上面提到过 byte[](二进制)

  • AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
  • JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
  • 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

MQ 的分类

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较 低的概率丢失数据

缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为的消息中间件, 以其 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

:性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是。时效性 ms 级可用性非 常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采 用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持: 功能 较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场 景。

优点:,可用性非常高,分布式架构,,MQ 功能较为完善,还是分 布式的,扩展性好,支,不会因为堆积导致性能下降,源码是 java 我们可以自己阅 读源码,定制自己公司的 MQ

缺点:,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,

优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高

官网更新:https://www.rabbitmq.com/news.html(opens new window)

缺点:商业版需要收费,学习成本较高

MQ 的选择

Kafka 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集 和传输,适合产生的互联网服务的数据收集业务。建议可以选用,如果有功能, 肯定是首选 kafka 了。

天生为领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削 峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务 场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

结合 erlang 语言本身的并发优势,性能好,管理界面用起来十分 方便,如果你的,中小型公司优先选择功能比较完备的 RabbitMQ。

对比方向 概要
吞吐量 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。
可用性 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
时效性 RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。
功能支持 除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
消息丢失 ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。

  • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的
  • Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

RabbitMQ

RabbitMQ 的概念

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ 的具体特点可以概括为以下几点:

  • RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
  • 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们讲 RabbitMQ 核心概念的时候详细介绍到。
  • 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
  • RabbitMQ几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript等。
  • RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
  • RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI机制。

官网:https://www.rabbitmq.com/#features(opens new window)

核心概念

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

下图—— RabbitMQ 的整体模型架构。

  • 生产者

    产生数据发送消息的程序是生产者

  • 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定

    • Queue(队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    • 中消息只能存储在 中,这一点和 这种消息中间件相反。Kafka 将消息存储在 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
    • ,这时队列中的消息会被平均分摊(Round-Robin,即)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
    • 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。
  • 消费者

    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

RabbitMQ核心部分

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PDZMdK5A-1658582837881)(https://www.yumoyumo.top/wp-content/uploads/2022/07/image-20220707130721105.png)]

  1. 简单模式
  2. 工作模式
  3. 发布订阅模式
  4. 路由模式
  5. 主题模式
  6. 发布确认模式

各个名词介绍

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rWH8n19Q-1658582837881)(https://www.yumoyumo.top/wp-content/uploads/2022/07/image-20220705115204334.png)]

  • 生产者

  • 消费者

  • 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

  • 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

  • publisher/consumer 和 broker 之间的 TCP 连接

  • 信道

    如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。

  • 交换机

    message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

  • 队列

    消息最终被送到这里等待 consumer 取走

  • exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

安装

安装RabbitMQ

官网下载地址:https://www.rabbitmq.com/download.html(opens new window)

这里我们选择的版本号(注意这两版本要求)

  • rabbitmq-server-3.8.8-1.el7.noarch.rpm

    GitHub:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8(opens new window)

    加载下载:https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.8-1.el7.noarch.rpm(opens new window)

  • erlang-21.3.8.21-1.el7.x86_64.rpm

    官网:https://www.erlang-solutions.com/downloads/

    加速:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm(opens new window)

Red Hat 8, CentOS 8 和 modern Fedora 版本,把 “el7” 替换成 “el8”

上传到 /usr/local/software 目录下(如果没有 software 需要自己创建)

rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server

Web管理界面及授权操作

默认情况下,是没有安装web端的客户端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

安装完毕以后,重启服务即可

systemctl restart rabbitmq-server

访问 http://IP地址:15672 ,用默认账号密码(guest)登录,出现权限问题

默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户

# 创建账号和密码
rabbitmqctl add_user admin 123456

# 设置用户角色
rabbitmqctl set_user_tags admin administrator

# 为用户添加资源权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 添加配置、写、读权限

用户级别:

  1. :可以登录控制台、查看所有信息、可以对 rabbitmq 进行管理
  2. :监控者 登录控制台,查看所有信息
  3. :策略制定者 登录控制台,指定策略
  4. :普通管理员 登录控制台

再次登录,用 admin 用户

查看用户:rabbitmqctl list_users

关闭应用的命令为:rabbitmqctl stop_app

清除的命令为:rabbitmqctl reset

重新启动命令为:rabbitmqctl start_app

Docker 安装

官网:https://registry.hub.docker.com/_/rabbitmq/(opens new window)

docker run -id --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

简单案例

Hello world

我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者

在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-I5UTvXNj-1658582837882)(https://www.yumoyumo.top/wp-content/uploads/2022/07/image-20220718003243493.png)]

连接的时候,需要开启 5672 端口

pom.xml

<!--指定 jdk 编译版本-->
<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
    <!--rabbitmq 依赖客户端-->
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.15.0</version>
    </dependency>

    <!--操作文件流的一个依赖-->
    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
    </dependency>

</dependencies>

发送消息

package org.example.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** * @Author: yumo * @Description: 生产者 : 发消息 * @DateTime: 2022/7/7 18:50 **/
public class Producer { 
        

    //队列名称
    private final static String QUEUE_NAME = "hello";


    public static void main(String[] args) throws Exception { 
        
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("120.76.98.24");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 也就是是否用完就删除 * 3.该队列是否只供一个消费者进行消费 true 表示只供一个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送完毕");

    }
}

获取“生产者”发出的消息

package org.example.rabbitmq.one;

import com.rabbitmq.client.*;

/** * @Author: yumo * @Description: 消费者 : 消费 * @DateTime: 2022/7/7 19:00 **/
public class Consumer { 
        

    //队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception { 
        
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("120.76.98.24");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //建立连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待接收消息.........");

        //消息被成功消费后的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> { 
        
            System.out.println(new String(message.getBody()));
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> { 
        
            System.out.println("消息消费被中断");
        };
        /** * 消费者消费消息 - 接受消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消息被成功消费后的回调 * 4.消费者被取消时的回调 */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

Work Queues

Work Queues——工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

轮询分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

package org.example.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** * @Author: yumo * @Description: TODO * @DateTime: 2022/7/7 19:16 **/
public class RabbitMqUtils { 
        
    
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception { 
        
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("120.76.98.24");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

标签: k5连接器q18j5a连接器cp114差压变送器cyb11w微压力变送器sc连接器挂掉的原因

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台