消息队列
课程目标
1.MQ相关概念
1.1 什么是MQ
MQ(message queue),从字面上看,本质是一个,FIFO 先入先出,但存储在队列中的内容是
message 而已,还是一种跨进程的通信机制,用于上下游传递消息
1.2 为什么使用MQ
流量削峰
- 如果处理能力有限,1W它可以立即响应,现在由于活动,,因此,消息可以到达消息队列,但用户下单的时间比以前长
应用解耦
异步处理
- 例如 A 调用 B,B 执行需要很长时间,但是 A 需要知道 B 什么时候能完成,
- 以前有两种方式:
- A 调用一段时间 B 的查询 api 查询
- 或者 A 提供一个 callback api,B 执行后调用 api 通知 A 服务
- ,解决这个问题很方便
- A 调用 B 服务结束后,只需监控 B 处理完成的信息
- 当 B 处理完成后,将发送消息 MQ,MQ 将此消息转发给 A 服务
1.3 常用MQ优缺点
ActiveMQ
-
:单机吞吐量万级,及时性 ms 等级,高可用性,基于主从架构实现高可用性,数据丢失概率低
-
:现在官方社区是对的 ActiveMQ 5.x 维护越来越少,高吞吐量场景使用越来越少
Kafka
- :
- 性能优异,单机写入 TPS 大约一百万条/秒,最大的优势是。
- 时效性 ms 可用性很高
- kafka 它是分布式的,多个数据副本,少数机器停机,,不会导致不可用
- 消费者采 用 Pull 获取消息的方式, 消息有序, 通过控制,可以确保所有消息都被消费
- 第三方优秀Kafka Web 管理界面 Kafka-Manager
- 日志领域成熟,被多家公司和多个开源项目使用
- :
- ,实时计算和大数据领域大规模使用
- ;
RocketMQ
- RocketMQ 阿里巴巴的开源产品使用 Java 语言实现,在设计时,并做出了自己的一 一些改进。广泛应用于订单、交易、充值、流量计算、消息推送、日志流处理等领域。binglog 分发等场景
- :
- ,可用性很高
- 分布式架构,
- MQ 功能完善,分布式,扩展性好
- ,性能不会因为积累而下降,源代码是 java 我们可以自己阅读源码
- :
- ,目前是 java 及 c ,其中 c 不成熟
- ,没有在 MQ 实现核心 JMS 等界面,有些系统需要修改大量代码才能迁移
RabbitMQ
- 2007 年发布,是一个 在高级消息队列协议的基础上完成,可重复使用的企业消息系统是
- :
- 由于 erlang 语言的,性能较好;
- MQ 功能齐全,强壮、稳定、易于使用 使用,跨平台, 如:
- Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等
- 支持 AJAX 文档齐全;开源提供的管理界面很好,使用方便,;更新频率相当高
- :
- 商业版收费,学习成本高
1.4 MQ的选择
label>小/中型项目:RabbitMQ
RabbitMQ > RocketMQ > kafka
RabbitMQ、RocketMQ、kafka都支持持久化
kafka = RocketMQ > RabbitMQ
kafka = RocketMQ > RabbitMQ
2.RabbitMQ
是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
2.1 前置概念
2.1.1 AMQP核心概念
虚拟主机()或()
- 被称为 虚拟主机()
- RabbitMQ server 可以说就是一个消息队列服务器实体()
- 当中可以有多个用户,而用户只能在虚拟主机的粒度进行权限控制,所以RabbitMQ中需要多个虚拟主机
交换机()
- 它。它可以被理解成具有路由表的路由程序。()
- 交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。
- Exchange的类型
- :直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
- :这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
- :交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
- Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多
队列()
- 队列是消息载体,每个消息都会被投入到一个或多个队列
- 试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求()。
绑定器()
- 作用:把exchange和queue按照路由规则绑定起来
- 将交换器和队列连接起来,并且封装消息的路由信息
2.1.2 程序中连接与消息使用的两个关键概念
- 与RabbitMQ Server建立的一个连接
- 由ConnectionFactory创建
- 每个connection只与一个物理的Server进行连接,此连接是基于Socket进行连接的
- 消息通道(主要进行相关定义,发送消息,获取消息,事务处理等)
- 在客户端的,每个channel代表一个会话任务
2.2 七种工作模式
- 简单模式:一个生产者,一个消费者
- work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
- 订阅模式:一个生产者发送的消息会被多个消费者获取
- 路由模式: 发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
- topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词
- RPC模式:使用RabbitMQ构建RPC系统:客户端和可伸缩RPC服务器
- 发布确认:与发布者进行可靠的发布确认
2.3 安装
# 1.拉取镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker pull rabbitmq:3.8-management
# 2.查看镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker images
# 3.启动
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
a5ca877169d4eda31a9c404f94565c118f1cc64de2cbb398ff17a66e72c682cd
# 4.查看启动情况
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a5ca877169d4 rabbitmq:3.8-management "docker-entrypoint.s…" 4 seconds ago Up 3 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq
问题
- 防火墙问题
- 启动web管理界面
# 进入rabbitmq
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker exec -it rabbitmq bash
# 启动web管理界面
root@a5ca877169d4:/# rabbitmq-plugins enable rabbitmq_management
2.3.1 添加新用户
# 创建账号
root@a5ca877169d4:/# rabbitmqctl add_user admin admin
Adding user "admin" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
# 设置用户角色
root@a5ca877169d4:/# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
# 设置用户权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 用户 admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
root@a5ca877169d4:/# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
# 查看当前用户和角色
root@a5ca877169d4:/# rabbitmqctl list_users
Listing users ...
user tags
admin [administrator]
guest [administrator]
2.3 Hello World
-
在本教程的这一部分中,我们将用 Java 编写两个程序;
-
发送单个消息的生产者和接收消息并将其打印出来的消费者
-
在下图中,“P”是我们的生产者,“C”是我们的消费者
- 中间的盒子是一个队列——RabbitMQ 代表消费者保留的消息缓冲区
2.3.1 依赖
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
2.3.2 消息生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("IP");
factory.setUsername("admin");
factory.setPassword("admin");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 也就是是否用完就删除 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 - 延迟、死信等 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
/** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息,传递的消息携带的properties * 4.发送消息的消息体 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
2.3.3 消息消费者
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("IP");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/** * 消费者消费消息 - 接受消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 * 4.消息被取消时的回调 */
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
2.4 Work Queues
2.4.1 抽取工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("IP");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2.4.2 启动两个工作线程来接收消息
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
// 这是一个工作线程,相当于之前的消费者
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.basicQos(1);
//消息接受
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
//消息被取消
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
System.out.println("C1 消费者启动等待消费.................. ");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
2.4.3 启动一个发送线程
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完成:" + message);
}
}
}
2.5 小结
server:服务器
v-host:名称上来说,虚拟主机
exchange:交换机(消息通常是发送到交换机)
queue:队列(存放消息的)
bind:将交换机和队列进行绑定
2.5.1 消息队列持久化
创建一个队列的时候,可以是非持久化的,也可以是持久化的
- 非持久化:rabbitmq如果重启,该队列就会被删除
- 持久化:重启不影响
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
2.5.2 消息的持久化
消息持久化,可以去预防消息丢失,需要设置
MessageProperties.PERSISTENT_TEXT_PLAIN
注意:配置了消息持久化,并不能够完全保证消息不丢失,只能保证消息到了队列中不消失
消息丢失的方式:
- 发送方发送到消息队列的时候,丢了
- 交换机到队列中,丢了
后续的解决方案,参考确认Confirm,Return
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
2.5.3 ACK
默认情况下,消费者,就会进行自动应答(ACK)
如果一个消费者处理消息时间长,或者异常了,所以需要手动Ack
- 自动Ack
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// true 表示自动ack:autoAck
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
- 手动Ack
// 1.channel.basicConsume 将true改为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
// 2.在finally 中手动Ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
2.5.4 消费不均匀
工作模式中,默认轮询,会出现一些问题,比如:两个消费者,亮跟大哥,实现功能(10个),
亮:1,3,5,7,9
大哥:2,4,6,8,10
但是大哥实现的快,出现一个问题,大哥3个小时完成了,摸鱼5小时,亮做了8小时
所以我们可以设置一个预抓取值,官网给的是一次抓取一个任务(每次抓取一个消息,当上一个消息没有ack的时候,不抓取新的消息)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:这里的预抓取值给的是1,这个值太小(消费者能力比较强,就会等待),但是具体还是看服务器性能跟消息复杂度,通常情况下100-300
2.6 Publish/Subscribe()
Exchange的类型
- :交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
- :直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
- :这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
- Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多
2.6.1 操作
- 发送方
public class EmitLog { // 交换机名称 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { // 创建交换机,并设置交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Scanner scanner = new Scanner(System.in); while (true) { System.out.print("请输入要发送的消息:"); String msg = scanner.next(); channel.