资讯详情

Rabbit mq 消息服务器(分布式中非常重要的服务器)

Rabbitmq概述

RabbitMQ使用场景

服务解耦

流量削峰

异步调用

使用步骤

VM版本:16 (Rabbitmq在容器中使用)

安装Docker(先克隆只有一个docker 虚拟机进入执行)

搭建Rabbitmq服务

六种工作模式

简单模式(对应只有一个消费者)

工作模式(对应个人消费者)

如何使消息持久(防止服务器消息丢失)

群发模式(发消息需要交换机)

路由模式

主题模式

如何实现SCC的配置更新


Rabbitmq概述

Rabbitmq就像邮局一样,投递员(生产者)给消息发信息Rabbitmq,Rabbitmq邮递员(线程)按照设定的道路发送给指定的收件人(消费者)。收件人收到信后,阅读理解(处理),最后给投递人写回信(响应),必须通过Rabbitmq,重复这样的循环

RabbitMQ使用场景

服务解耦

服务解耦这种情况在单个项目中,不能考虑这个问题,服务解耦是基于两个服务之间的调用,A服务于生成数据BCD所有这些都需要,所以我可以直接在A中调用BCD,从而实现数据传输,但在微服务中,服务不会像这样少,成千上万的服务,所以服务间耦合过高,维护成本过高,引入Rabbitmq中间商后,A把数据交给Rabbitmq,然后BCD等待服务,需要就去rabbitmq拿就可以了

流量削峰

例如,当我们的服务器只有一有一个时,瞬间qps如果达到3000,服务器的压力会急剧增加,即瞬时压力爆炸,然后通过Rabbitmq通过rabbitmq请求慢慢发送到服务器处理,例如qps从每秒3000通过rabbitmq后变成qps300而其他的在rabbitmq排队等待处理,虽然延长了时间,但可以减轻瞬时压力

异步调用

链路 A(接收请求)--- rabbitmq ----- B(处理请求),即A线程只负责接收和发送rabbitmq,之后,A的线程将被释放,并继续接收请求,rabbitmq负责生成消息队列,B服务只负责业务处理,B服务没处理完的可以在消息队列中等待,之后按顺序进行处理。

消息服务

消息队列

消息中间件

常用服务器:

1.Activemq 2.Rockermq(阿里) 3.Kafka(大数据) 4.tubemq(腾讯万亿级) 5.Rabbitmq(spring集成)

使用步骤

VM版本:16 (Rabbitmq在容器中使用)

VM网段:192.168.64.0

知识点:咋改网段 编辑 选择虚拟网络编辑器VMnet8 左下角子网IP 修改岂可

虚拟机:centos-8-2105 centos-7-1908随便选一个

课前资料设置的东西

2.安装三个工具:python pip ansible

3.设置两个脚本文件ip地址

ip-dhcp:自动获取

ip-static:手动获取

4.用vm打开对应的.vmx加载虚拟机镜像的文件

5.启动 按提示复制虚拟机

6.默认用户密码root

7.设置ip

./ip-dhcp   #执行脚本  ifconfig  看ip ifconfig ens33

没有网卡怎么办(执行以下两行代码)

nmcli n on  systemctl restart NetworkManager 

安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)

1.可以从网上下载Docker离线包

https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz

2.离线安装工具(简化安装,要下载)

https://github.com/Jrohy/docker-install/

3.安装,通过MBX软件,连接,并把下载好的以下文件一块放到/root/里1.阿里的yum安装源

    - docker-20.10.6.tgz
	- install.sh
	- docker.bash

4.执行安装

# 进入 docker-install 文件夹
cd docker-install

# 为 docker-install 添加执行权限
chmod +x install.sh

# 安装
./install.sh -f docker-20.10.6.tgz

5.由于国内网络的问题,需要配置加速器来加速

cat下面命令直接生成文件daemon.json

cat <<EOF > /etc/docker/daemon.json {   "registry-mirrors": [     "https://docker.mirrors.ustc.edu.cn",     "http://hub-mirror.c.163.com"   ],   "max-concurrent-downloads": 10,   "log-driver": "json-file",   "log-level": "warn",   "log-opts": {     "max-size": "10m",     "max-file": "3"     },   "data-root": "/var/lib/docker" } EOF

6.重新加载docker配置 重启docker

7.测试 docker info

搭建Rabbitmq服务

  • 从docker-base再克隆一个虚拟机: rabbitmq
  • 设置ip:   运行            ./ip-static
  • 将镜像文件复制到/root 下(若本地没有则要去下载镜像)
    • docker pull rabbitmq:management
  • 关闭防火墙
  1.         systemctl stop firewalld
  2.         systemctl disable firewall
  3.         # 重启 docker 系统服务   systemctl restart docker
  • 导入镜像: docker load -i rabbit-image.gz

        

  • 配置管理员用户名与密码

    • mkdir /etc/rabbitmq vim /etc/rabbitmq/rabbitmq.conf

    • # 在文件中添加两行配置: default_user = admin default_pass = admin

  •    通过docker启动镜像  
  • docker run -d --name rabbit \
    -p 5672:5672 \
    -p 15672:15672 \
    -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
    -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
    --restart=always \
    rabbitmq:management
    
    

        消息服务器不会长久的储存,只要消费者处理完,消息服务器中的消息会被删除

六种工作模式

简单模式(对应只有一个消费者)

第一步:新建一个maven 什么依赖都不用添加

第二步:添加依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>     <groupId>com.tedu</groupId>     <artifactId>rabbitmq</artifactId>     <version>0.0.1-SNAPSHOT</version>     <dependencies>

        <dependency>               <groupId>com.rabbitmq</groupId>             <artifactId>amqp-client</artifactId>             <version>5.4.3</version>         </dependency>

        <dependency>             <groupId>org.slf4j</groupId>             <artifactId>slf4j-api</artifactId>             <version>1.8.0-alpha2</version>         </dependency>       

      <dependency>             <groupId>org.slf4j</groupId>             <artifactId>slf4j-log4j12</artifactId>             <version>1.8.0-alpha2</version>         </dependency>     </dependencies>

    <build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.8.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                 </configuration>             </plugin>         </plugins>     </build> </project>

3.创建生产者类发送消息

第一步:连接

第二步:创建通信的通道(channel对象)

第三步:在服务器上创建队列

第四步:发送消息

第五步:关闭资源

package m1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f= new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);  /*5672是收发消息   15672是一个管理控制台*/
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();      /*连接*/
        Channel channel = con.createChannel();  /*通信的通道*/

        //在服务器上创建队列  hello world
        /*队列已经存在不会去重复创建
        * 使用通信的通道channel去在服务器上操作
        * */
        channel.queueDeclare("helloworld", false, false, false, null);
            /*    第一个boolean 是否是一个持久队列
            *     第二个boolean 是否是一个排他队列或独占队列   多个消费者能否共享这一个队列
            *     第三个boolean 是否自动删除   若没有消费者的情况下队列自动删除
            *     null值   其他的参数属性  例如要带一个   map 键值对
            *
            *
            * */
        //发送消息
        channel.basicPublish("", "helloworld", null, "HelloWorld824".getBytes());

        /**
         * 空串参数:是一个默认的交换机(exchange)
         * null 消息的其他参数属性
         *
         *
         * */
        //断开连接
        channel.close();
        con.close();



    }
}

4.创建消费者接收处理数据

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel channel = con.createChannel();
        //创建队列
        channel.queueDeclare("helloworld", false, false, false, null);
        //创建回调对象
        // DeliverCallback deliverCallback1 = (consumerTag,message)->{};
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery/*这个就是massage*/) throws IOException {
                byte[] body = delivery.getBody();
                String s1 = new String(body);
                System.out.println("收到"+s1);
            }
        };

        CancelCallback cancelCallback = consumerTag -> {};  //取消接收消息时执行
        //从hello world队列接收消息,收到的消息会传递到回调对象进行处理
        channel.basicConsume("helloworld", true,deliverCallback,cancelCallback);//helloworld 回传到 message
                                            /**
                                             * autoAck -acknowledgment
                                             * false
                                             * true
                                             * true自动确认*/






    }
}

工作模式(对应对个消费者)

多个消费者从同一个队列进行获取消息,可以并行的处理多条消息,处理速度翻倍

合理分发:

1.设置autoAck = false  

2.如何设置qos:       qos -pre fetch -预抓取   c.basicQos(1);必须在手动确认模式下才会生效)

如何使消息持久化(防止服务器端的消息丢失)

不一定所有的数据都要持久化,例如日志是可以丢失的,而订单一定不能丢失

1.队列持久化

当队列被创建后是无法被更改的,要么删除,要么创建一个名字不相同的队列

2.消息持久化

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f= new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);  /*5672是收发消息   15672是一个管理控制台*/
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();      /*连接*/
        Channel channel = con.createChannel();  /*通信的通道*/

        //创建队列
        channel.queueDeclare("task_queue", true, false, false, null);
        //循环在控制台发送消息
        while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_BASIC, s.getBytes());
                                                                        /*持久化信息 改成这个,消息会存在磁盘上*/
        }
    }
}
package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel channel = con.createChannel();
        //创建队列
        channel.queueDeclare("task_queue", true, false, false, null);


        //创建回调对象
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                String str = new String(delivery.getBody());
                System.out.println("收到"+str);
                for (int i = 0; i < str.length(); i++) {
                    if (str.charAt(i) == '.'){

                        //模拟耗时消息,遍历字符串,每遇到一个‘.’字符暂停1秒
                        try {
                            Thread.sleep(1000);

                        }catch (Exception e){

                        }
                    }
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);/*(回执[其实就是Tag,需要通过消息对象进行获取,Tag是一段long型的数字],是否确认收到过的所有消息[一般为false])*/
                System.out.println("消息处理完成=================================================================消息处理完成");

            }
        };
        CancelCallback cancelCallback = consumerTag -> {};

        //预抓取,只收一条,处理完之前不收下一条
        //只在手动确认模式才有效
        channel.basicQos(1);
        //接收消息
        channel.basicConsume("task_queue", false, deliverCallback, cancelCallback); /*修改为手动确认模式一开始第二个参数为true*/

    }
}

群发模式(发消息就需要交换机)

所有消费者得到同一个消息,每个消费者都要有一个自己的队列,队列要求与交换机进行绑定,交换机只会发给与自己绑定的队列

使用的Fanout交换机(扇形交换机)

路由模式

每个消费者都有自己的队列,这个队列是随机命名,direct交换机,通过关键词来进行匹配队列发送

主题模式

使用的Taotal交换机,其次使用的关键词变为xxxx.xxxx.xxx()

如何实现SCC的配置更新

Spring cloud config + Bus组件(写好的代码)

要在对应的服务中添加该组件

1.添加bus依赖

        1.rabbitmq

        2.bus

        3.bus去操作rabbitmq时用到binder-rabbit

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

  amqp是消息服务的协议(数据格式)集成了rabbitmq的依赖

2.配置中心还有去添加一个actuator依赖

2.添加rabbit-mq连接配置

        09修改yml

        2.3.4修改config目录的三个文件,再上传到远程仓库

4.配置中心暴露bus-refresh刷新路径   m.e.w.e.i = bus-refresh 

     

1.BUS配置刷新

2.Bus发送刷新指令,其他模块接收指令执行配置刷新操作,Rabbitmq主题模式

链路跟踪

sleuth +zipkin

sleuth 用来产生链路跟踪日志 会产生日志数据

执行添加依赖,0配置就可以产生依赖

zipkin 对链路跟踪日志进行分析处理 最后用图形进行展示

a-b-c-d  默认日志只有10%会发给zipkin

a,asuidhiasd,asuidhiasd,true

b,asuidhiasd,,true

c,asuidhiasd,asuidhiasd,true

d,asuidhiasd,asuidhiasd,true

 

sleuth 通过 RABBITMQ发送到zipkin

修改2,3,4,6

1.添加zipkin client 客户端 依赖

2.在06添加rabbitmq依赖

3.修改06的application

4.修改config目录的是哪个文件添加zipkin发送方式

zipkin需要自己下载,之后用cmd

标签: mbx连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

 锐单商城 - 一站式电子元器件采购平台  

 深圳锐单电子有限公司