资讯详情

大聪明教你学Java | RabbitMQ 的工作原理及其简单操作

前言

来自二线城市的程序员拒绝过江东,致力于用猥琐的方法解决繁琐的问题,使复杂的问题易于理解。 赞美,关注,留言,留言~

我们在上一个博客中解释了如何 Win10 安装部署在环境中 RabbitMQ,我相信你已经建立了自己的一套 RabbitMQ 所以今天就来说说环境, Java 代码对 RabbitMQ 的简单操作。

传送门:大聪明教你学习Java | Win10 安装部署在环境中 RabbitMQ

Java 代码对 RabbitMQ 的简单操作

RabbitMQ 工作原理和基本概念

在谈代码之前,让我们简单了解一下 RabbitMQ 工作原理(图片来自百度图片,侵删~)??

在这里插入图片描述 当我们谈队列,我们通常会想到三个角色:生产者和消费者 、消息队列。生产者将消息发送到消息队列,消费者从消息队列中获取消息进行处理。但是对于 RabbitMQ 在此基础上,它进行了抽象,并引入了交换器 Exchange 交换器是生产者和消息队列之间的中间桥梁,起着消息路由的作用,即生产者不直接与消息队列相关,而是先将消息发送给交换器,然后从交换器路由到相应的消息队列。从上图可以看出,生产者并没有通过建立和建立直接向消息队列发送消息 Exchange(交换器) Channel(信道)发送消息 Exchange,Exchange 根据路由规则,将消息转发给指定的消息队列。消息队列存储消息,等待消费者取出消息,消费者建立与消息队列相连 Channel,从消息队列中获取消息。

让我们简单介绍一下 RabbitMQ 一些基本概念:

  1. Channel(信道):多路复用连接中的独立双向数据流通道。信道是基于真实的 TCP 重用连接中的虚拟连接 TCP 连接的通道。
  2. Producer(生产者):向消息队列发布消息的客户端应用程序。
  3. Consumer(消费者):从消息队列获取消息的客户端应用程序。
  4. Queue(消息队列):存储消息的一种数据结构,它可以用来保存消息,直到消息发送给消费者。一个消息可以投入一个或多个队列。消息一直在队列中,等待消费者连接到这个队列,拿走消息。需要注意的是,当多个消费者订阅同一个消费者时, Queue,这时 Queue 消息将平均分配给多个消费者,而不是每个消费者收到并处理所有消息,每个消息只能由一个订阅者接收。
  5. Exchange(交换器):用于接收生产者发送的消息,并按照路由规则将其转发到消息队列。交换器用于转发消息,不存储消息 ,假如没有消息队列(Queue)绑定到交换器(Exchange)如果直接丢弃生产者发送的信息。

Java 代码对 RabbitMQ 的简单操作

通过以上内容,让我们 RabbitMQ 我们对工作原理和基本概念有了基本的了解。接下来,让我们来看看如何使用它。 Java 代码操作 RabbitMQ。

先介绍一下 Maven 依赖??

<!-- rabbitmq 依赖 --> <dependency>     <groupId>com.rabbitmq</groupId>     <artifactId>amqp-client</artifactId>     <version>5.8.0</version> </dependency>  <!-- io依赖 --> <dependency>     <groupId>commons-io</groupId>     <artifactId>commons-io</artifactId>     <version>2.6</version> </dependency> 
p>接下来创建一个生产者类,向 RabbitMQ 中发送一条消息👇

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** * RabbitMQ 生产者 * @description: Producer * @author: 庄霸.liziye * @create: 2022-06-22 10:36 **/
public class Producer { 
        

    public static final String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws IOException, TimeoutException { 
        
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //RabbitMQ 安装的服务器ip地址
        factory.setHost("127.0.0.1");
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /** * 生成一个队列 * 参数: * 1 队列名称 * 2 队列里的消息是否持久化,默认情况消息存储在内存中 * 3 该队列是否供多个消费者进行消费,是否进行消息共享 * 4 是否自动删除 * 5 其他参数 */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello, I am the Producer!";
        /** * 发送一个消费 * 参数: * 1 发送到哪个交换机 * 2 路由的key值 本次是队列名称 * 3 其他参数信息 * 4 发送消息的消息体 */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

接下来我们运行代码,会看到控制台输出了“消息发送完毕”的提示文字👇

下面我们再登录一下 RabbitMQ 管理页面,就会在 Queue 选项卡下看到我们刚刚建立的消息队列👇

接下来我们再创建一个消费者,从消息队列中把消息取出来👇

import com.rabbitmq.client.*;

/** * RabbitMQ 消费者 * @description: Consumer * @author: 庄霸.liziye * @create: 2022-06-22 10:58 **/
public class Consumer { 
        

    public static final String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws Exception{ 
        
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接rabbitmq的队列
        factory.setHost("127.0.0.1");
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");

        /** * 消费者接受消息 channel.basicConsume() * 参数: * 1 消费哪个队列 * 2 消费成功后是否要自动应答 * 3 消费者未成功消费的回调 * 4 消费者取录消费的回调 */
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{ 
        
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{ 
        
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

我们运行一下消费者的代码,就可以看到刚刚发送到消息队列中的消息被成功取出来了✌

小结

本人经验有限,有些地方可能讲的没有特别到位,如果您在阅读的时候想到了什么问题,欢迎在评论区留言,我们后续再一一探讨🙇‍

希望各位小伙伴动动自己可爱的小手,来一波点赞+关注 (✿◡‿◡) 让更多小伙伴看到这篇文章~ 蟹蟹呦(●’◡’●)

如果文章中有错误,欢迎大家留言指正;若您有更好、更独到的理解,欢迎您在留言区留下您的宝贵想法。

你在被打击时,记起你的珍贵,抵抗恶意; 你在迷茫时,坚信你的珍贵,抛开蜚语;

标签: 侵水变送器水侵变送器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台