资讯详情

RabbitMQ从概念到使用、从Docker安装到RabbitMQ整合Springboot【1.5w字保姆级教学】

文章目录

    • 一、前言
    • 二、RabbitMQ作用
      • 1. 异步处理
      • 2. 应用解耦
      • 3. 流量控制
    • 三、RabbitMQ概念
      • 1. RabbitMQ简介
      • 2. 核心概念
    • 四、JMS与AMQP比较
    • 五、RabbitMQ运行机制
      • 1. direct
      • 2. fanout
      • 3. topic
    • 六、Docker安装RabbitMQ
    • 七、整合Springboot
      • 1. 引入依赖
      • 2. 注解添加到主启动类
      • 3. 编写配置文件
    • 八、测试创建交换机、队列、绑定关系
      • 1. 测试创建Direct交换机
      • 2. 打开交换机界面查看
      • 3. 创建Queue
      • 4. 打开队列界面查看
      • 5. 绑定交换机和队列
      • 6. 打开交换机中的绑定界面
    • 九、测试发送消息
      • 1. 测试发送消息
      • 2. Queue列表中查看消息
      • 3. 手动确认消息
      • 4. 测试发送对象
      • 5. 查看发送消息的对象
      • 6. 写信发送对象转JSON配置类
      • 7. 再次发送9、4中的代码,查询对象是否正常显示
    • 十、测试接收信息
      • 1. 创建接收信息的方法
      • 2. 模拟发送消息
      • 3. 查看收到的信息
      • 4. 在思考
      • 5. 多个服务监控同一个队列
      • 6. 模拟发送十条消息,检查将被服务接收
      • 7. 查看多服务接收消息结果
    • 十一、总结

一、前言

先说新闻中间件: 使用消息中间件高效可靠的信息传输机制基于数据通信,进行平台无关的数据交流和分布式系统集成。通过提供消息传输和消息排队模型,它可以在分布式环境下扩展通信过程。(百度百科)

我们常见的中间件有很多种,比如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka 这三款。Redis也可以在一定程度上使用list或者Stream实现消息队列,但不能算中间件!

如果你对如何选择感兴趣,你可以看看这篇文章:四大MQ选型

今天小编带大家一起学习RabbitMQ,从入门到精通,从无到有!!小编没有使用Windows安装很麻烦,所以使用Docker没有安装Docker看看小编的另一篇文章:Linux安装Docker

其实小编也是通过雷神的课件和讲解,自己整理一下,供以后学习和参考,在此感谢尚硅谷雷神哈!

小编觉得在说概念之前,应该知道他的作用,然后再系统的学习概念等!

二、RabbitMQ作用

其实功能还是挺多的,但主要有以下三点:

  • 异步处理
  • 应用解耦
  • 流量控制

让我们一个个简单描述一下哈,我们还是拿用了一万次的例子和图例哈!

1. 异步处理

用户站注册成功后,用户需要向用户发送电子邮件和信息,以提示其注册成功(事实上,没有必要,但例子还可以,小边自己的理解哈!)。传统的做法是:在后台将注册信息保存到数据库中,然后向用户发送短信。

在这里插入图片描述 我们看到这是非常耗时的,事实上,保存完成后,你可以登录,短信和电子邮件接收一段时间没有问题!或者发送失败,用户没有收到,这没有问题,用户已经登录,不管你是否发送短信,我们说对吧!

既然有问题,我们就有消息队列来解决这个问题: 我们可以册信息数据库后,我们可以将要发送注册邮件和短信的消息写入消息队列,然后通知用户注册成功。邮件和短信将由订阅消息的应用程序异步执行。这个耗时的问题就解决了!

2. 应用解耦

在大型电子商务项目中,订单系统和库存系统将被分为两个不同的应用程序。然后在服务和服务之间呼叫。通常,订单系统会在用户下订单后呼叫库存系统,然后返回用户以显示订单的成功。 但也有问题。如果库存系统挂断,会导致订单失败;如果你是用户,你会判断这个产品不好,以后不用了!

别担心,这个用户,我们会帮你解决的。

现在有同学会问,怎么解决? 别担心,小编会告诉你的!错误的原因是库存系统挂断,改变处理的请求没有处理,因此订单失败;我们引入消息队列,将订单消息写入消息队列,然后库存系统订阅我们的消息队列;然后库存系统到消息队列获取消息,处理订单,完成库存减少操作;如果失败,也会有重试机制,真的挂了,也可以持久化,等到库存系统活了再继续处理!!不影响用户体验的目的!

3. 流量控制

看名字就能知道,一定是很大的并发情况才会出现,不用想就是秒杀时刻了! 假设一瓶茅台有2万人抢劫,这是因为我们的系统可能会被打败。所以我们把超过一定程度并发量将超过的请求放在消息队列中,然后减缓系统压力,然后慢慢处理;虽然用户体验可能会减少,但秒杀就是这样。只有一些人能成功。我们应该确保系统能够正常运行!

三、RabbitMQ概念

1. RabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)实现开源。 RabbitMQ 是部署最广泛的开源新闻代理。 RabbitMQ拥有数万用户是最受欢迎的开源新闻代理之一。T-Mobile到Runtastic,RabbitMQ用于世界各地的小型初创企业和大型企业。 RabbitMQ它是轻量级的,易于部署在本地和云中。它支持各种新闻传输协议。RabbitMQ可部署在分布式和联合配置中,以满足高规模、高可用性的需求。 RabbitMQ在许多操作系统和云环境中运行,并为最流行的语言提供了广泛的开发工具。

2. 核心概念

新闻,新闻不具名,它由消息头和消息体组成。消息体不透明,而消息头由一系列可选属性组成, 这些属性包括routing-key(路由键),priority(与其他消息相比),delivery-mode(指出消息可以 可持续存储)等。

新闻制作人也是一个向交换器发布新闻的客户端应用程序。

用于接收制造商发送的信息,并将信息路由到服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略不同Queue 消息队列用于保存消息,直到发送给消费者。它不仅是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列。消息一直在队列中,等待消费者连接到队列并取走。

绑定,用于消除队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Exchange 和Queue的绑定可以是多对多的关系

网络连接,比如一个TCP连接。

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 类似docker容器和容器之间是相互隔离的,一个坏了,不耽误另一个使用

表示消息队列服务器实体。

四、JMS与AMQP比较

JMS(Java Message Service) AMQP(Advanced Message Queuing Protocol)
定义 Java api 网络线级协议
跨语言
跨平台
Model 提供两种消息模型:(1)、Peer-2-Peer(2)、Pub/sub 提供了五种消息模型:(1)、direct exchange(2)、fanout exchange(3)、topic change(4)、headers exchange(5)、system exchange本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分;
支持消息类型 多种消息类型:TextMessageMapMessageBytesMessageStreamMessageObjectMessageMessage (只有消息头和属性) byte[]当实际应用时,有复杂的消息,可以将消息序列化后发送。
实现中间件 ActiveMQ、HornetMQ RabbitMQ
综合评价 JMS 定义了JAVA API层面的标准;在java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差; AMQP定义了wire-level层的协议标准;天然具有跨平台、跨语言特性

五、RabbitMQ运行机制

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了Exchange 和Binding的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列

  • direct
  • fanout
  • topic
  • headers(不建议使用)

1. direct

消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"a1.b1",则只转发 routing key 标记为"a1.b1"的消息,不会转发"a1.b2”,也不会转发"a1.b3" 等等。它是完全匹配、单播的模式

2. fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的、广播

3. topic

topic是升级版的fanout模式,做了选择权,并不是全都会接受,符合条件才会收到!topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。 它同样也会识别两个通配符:符号#和符号*#匹配0个或多个单词,* 匹配一个单词。

六、Docker安装RabbitMQ

直接输入命令,docker会帮助我们自动去拉去镜像的:

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 \
	-p 15671:15671 -p 15672:15672 rabbitmq:management

我们查询是否运行成功

docker ps

我们在windows上进行测试是否能够打开界面: 输入:http://192.168.17.130:15672/ 用户名密码都是:guest

进入界面:

七、整合Springboot

1. 引入依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <!--自定义消息转化器Jackson2JsonMessageConverter所需依赖-->
 <dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
 </dependency>

2. 主启动类上添加注解

@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication { 
        

    public static void main(String[] args) { 
        
        SpringApplication.run(GulimallOrderApplication.class, args);
    }

}

3. 编写配置文件

# 指定rabbitmq服务器主机
spring.rabbitmq.host=192.168.17.130
# 账号密码端口号都默认配置了,我们无需配置

八、测试创建交换机、队列、绑定关系

1. 测试创建Direct交换机

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createExchange() { 
        
    // 第一个参数为交换机名字,第二个参数为是否持久化,第三个参数为不使用交换机时删除
    DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
    amqpAdmin.declareExchange(directExchange);
    System.out.println("交换机创建成功");
}

2. 打开交换机界面查看

3. 创建Queue

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createQueue() { 
        
    /** * 第一个参数为队列名字, * 第二个参数为是否持久化, * 第三个参数为是否排他(true:一个连接只能有一个队列,false:一个连接可以有多个(推荐)) * 第四个参数为不使用队列时自动删除 */
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);
    System.out.println("队列创建成功");
}

4. 打开队列界面查看

5. 绑定交换机和队列

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createBinding() { 
        
    /** * 第一个参数为目的地,就是交换机或者队列的名字 * 第二个参数为目的地类型,交换机还是队列 * 第三个参数为交换机, * 第四个参数为路由键,匹配的名称 */
    Binding binding = new Binding("hello-java-queue",
            Binding.DestinationType.QUEUE,
            "hello-java-exchange",
            "hello.java",null);
    amqpAdmin.declareBinding(binding);
    System.out.println("绑定成功");
}

6. 打开交换机中的绑定界面

点击交换机 绑定列表

九、测试发消息

1. 测试发送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){ 
        
	// 消息类型为object 发送对象也是可以的
    String msg = "这是一条消息";
    // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
    System.out.println("消息发送成功");
}

2. Queue列表中查看消息

3. 手动确认消息

点击我们的队列:

进入详细界面,下滑找到Get messages

在次点击队列,消息消失:

4. 测试发送对象

@Data
// 必须序列化,不然报错
public class User implements Serializable { 
        
    private String name;
    private Integer age;
}
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){ 
        
    User user = new User();
    user.setAge(22);
    user.setName("王振军");
    // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
    System.out.println("消息发送成功");
}

5. 查看消息发送的对象

6. 书写消息发送对象转JSON配置类

@Configuration
public class MyRabbitmqConfig { 
        

    @Bean
    public MessageConverter messageConverter(){ 
        
        return new Jackson2JsonMessageConverter();
    }
}

7. 再次发送九、4中的代码,查询是否正常显示对象

十、测试收消息

1. 创建接收信息的方法

方法所在的类必须交给了IOC管理,我们直接写在service里面。代码如下:

@Service
public class TestService { 
        
	// queues是监听的队列名字,可以是多个
    @RabbitListener(queues = { 
        "hello-java-queue"})
    public void reciveMessage(Object message){ 
        
        System.out.println("接受的信息" + message);
    }
}

2. 模拟发送消息

还是用上面的方法进行发送一个对象!

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){ 
        
    User user = new User();
    user.setAge(22);
    user.setName("王振军");
    // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
    System.out.println("消息发送成功");
}

3. 接收消息查看

接受的信息:(Body:'{"name":"王振军","age":22}' 
MessageProperties [headers={ 
        __TypeId__=com.atguigu.gulimall.order.entity.User}, 
contentType=application/json, contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, 
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])

4. 在思考

我们发现刚刚返回的是详细信息,我们可以指定消息的类型,就是发送消息发的对象是什么,我们就可以直接接收就行!看代码:

@Service
public class TestService { 
        

    @RabbitListener(queues = { 
        "hello-java-queue"})
    public void reciveMessage(Message message, User user){ 
        
        System.out.println("接受的信息:" + message);
        System.out.println("发送的信息:" + user);
    }
}

在此发送消息,我们看一下控制台:

接受的信息:(Body:'{"name":"王振军","age":22}' 
MessageProperties [headers={ 
        __TypeId__=com.atguigu.gulimall.order.entity.User}, 
contentType=application/json, contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, 
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
发送的信息:User(name=王振军, age=22)

这样就很清晰了哈!

拓展: 接收的还有第三个参数就是通道,每一个连接只会有一个通道哈!,大家可以自己测试一下,打印看看一下!!

public void reciveMessage(Message message, User user, Channel channel)

5. 多个服务监听同一条队列

右击已存在服务,复制一份配置不同端口:

现在有两个服务监听同一个队列!!

6. 模拟发送十条消息,查看会被那个服务接收

调整测试发消息代码:

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){ 
        
    for (int i = 0;i < 10; i++) { 
        
        User user = new User();
        user.setAge(i);
        user.setName("王振军" + i);
        // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user);
        System.out.println("消息发送成功");
    }
}

接收消息的代码:

@Service
public class TestService { 
        

    @RabbitListener(queues = { 
        "hello-java-queue"})
    public void reciveMessage(Message message, User user){ 
        

        System.out.println("接收的信息:" + user);
    }
}

7. 多服务接收消息结果查看

我们看到9000服务接收了1,4,7消息 9010服务接收了0,3,6,9消息 总结: 我们可以发现一个消息只会被接收一次!

还有就是发了10条消息,只有7条被接收了,其余的呢?

别急小编来告诉大家,这是因为我们测试是使用SpringBoot的测试类进行的,有的部分消息被测试的接收了!大家不信可以看一下测试的控制台,找一下:

我们看到消息的2,5,8在这里呢!!

拓展:

十一、总结

这样我们就对RabbitMQ有了新的认识,从入门也算走上了实践!后面有时间小编再把消息的可靠性发出来,也就是进阶版!!

在次感谢雷神的课程哈,看到这里,小伙伴们点个赞呗,小编整理不易呀!!谢谢大家了!!


有缘人才可以看得到的哦!!!

点击访问!小编自己的网站,里面也是有很多好的文章哦!

标签: sub连接器78p

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台