资讯详情

消息中间件RocketMQ

消息中间件RocketMQ

??RocketMQ 它是阿里巴巴开源的分布式新闻中间部分。支持事务信息、顺序信息、批量信息、延迟信息、信息可追溯性等。它有几个不同于标准信息中间部分的概念,如Group、Topic、Queue等等。系统组成由Producer、Consumer、Broker、NameServer等。

  • :主要解决信息丢失、系统崩溃等问题
  • :解决不同重要性和能力水平系统之间的依赖,导致死亡
  • :当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • :有些在线链路不容易压测,可以通过堆积一定量的信息来压测
  • :远程调用不需要同步执行,可以有效提高响应时间

架构设计

部署模型

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-HLdplfIJ-1638173732530)(http://wb.paic.com.cn/file/uid/3892c9f51d984504b8fb579f6cded00a)]

rocketmq各种组件

NameServer

  • 一个Broker与Topic路由注册中心,支持Broker动态注册和发现
  • 底层由Netty它提供了路由管理、服务注册和服务发现的功能,是一个无状态节点
  • NameServer集群中的每个角色都是服务发现者(Producer、Broker、Consumer等等)都需要定方向NameServer报告自己的状态,以便互相发现,加班不报告,NameServer会把它从列表中剔除
  • NameServer可以部署多个,当多个NameServer在存在时,其他角色同时向他们报告信息,以确保高可用性,
  • NameServer集群之间没有交流,也没有主备的概念
  • NameServer内存储,NameServer中的Broker、Topic默认情况下,信息不会持他是无状态节点

NameServer主要包括两个功能:

  • Broker管理:接受Broker集群注册信息作为路由信息的基本数据保存;提供心跳检测 机制,检查Broker是否还活着。
  • 路由信息管理:每个NameServer中都保存着Broker客户端查询集群的整个路由信息和队列 信息。Producer和Conumser通过NameServer可获得整个Broker集群路由信息被消除 投递和消费利息。

Broker

  • Broker充当新闻中转角色,负责存储和转发新闻
  • Broker在RocketMQ该系统负责接收和存储从生产者那里发送的信息,并为消费者的要求做好准备。
  • Broker会定时向NameSrver提交自己的信息
  • 每个Broker节点启动时会经历NameServer列表,每一个NameServer建立长连接,注册自己的信息,然后定期报告

下图为Broker Server功能模块示意图。 [外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-klea5IaL-1638173732533)(http://wb.paic.com.cn/file/uid/59c60878446444ddb5930dacdf6b607f)]

  • Remoting Module:整个Broker负责处理来自实体的实体clients终端请求。而这个Broker实体由以下模具组成 块构成。
  • Client Manager:客户端管理器。负责接收和分析客户端(Producer/Consumer)请求,管理客户端。 如,维护Consumer的Topic订阅信息
  • Store Service:存储服务。提供方便简单的服务。API接口,将信息存储在物理硬盘和信息查询功能中。
  • HA Service:提供高可用服务Master Broker 和 Slave Broker数据同步功能之间。
  • Index Service:索引服务。根据具体情况Message key,对投递到Broker索引服务也提到了新闻 供根据Message Key快速查询消息的功能

Producer

  • 新闻制作人
  • 随机选择其中一个NameServer节点建立长连接,获得Topic包括路由信息Topic下的Queue,这些Queue分布在哪些Broker上等等)
  • 下一步是提供Topic服务的Master因为RocketMQ只有Master可以写信息),并定期向Master发送心跳

Consumer

  • 消费者消息
  • 通过NameServer集群获得Topic连接到相应路由信息的路由信息Broker上消费消息
  • 由于Master和Slave可以读取消息,所以Consumer会与Master和Slave建立消费消息的连接

rocketmq工作流程

  • 启动NameServer,NameServer启动后,开始监控端口,等待Broker、Producer、 Consumer连接。

  • 启动Broker时,Broker会与所有的NameServer每30秒建立并保持长连接 向NameServer定期发送心跳包。

  • 在发送消息之前,可以先创建Topic,创建Topic需要指定时间Topic存储在哪些内容Broker当然,在创作中Topic时也会将Topic与Broker的关系写入到NameServer然而,这一步是可选的,也可以在发送信息时自动创建Topic。

  • Producer发送消息,启动时跟随NameServer其中一个集群建立了长连接,从NameServer获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP Port)映射关系。然后根据算法策略从队选择一个Queue,在队列所在Broker从而建立长连接Broker发消息。当然,在获得路由信息后,Producer先将路由信息缓存到本地,然后每30秒从NameServer更新路由信息。

  • Consumer跟Producer和其中一个类似NameServer建立长连接,获取订阅Topic路由信息,然后根据算法策略从路由信息中获取所消费Queue,然后直接跟Broker建立长连接,开始消费新闻。Consumer获取路由信息后,也会每30秒从NameServer更新路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker生存状态。

实现原理

??RocketMQ由NameServer注册中心集群,Producer生产者集群,Consumer消费者集群和一些Broker(RocketMQ其结构原理如下:

  • Broker启动时去所有的地方NameServer每30次注册并保持长连接s发送心跳
  • Producer发消息时从NameServer获取Broker根据负载平衡算法,选择服务器发送消息
  • Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉消息消费

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-4nUxzTB8-1638173732536)(http://wb.paic.com.cn/file/uid/c88b814f7c0041a9a1f7b6796657321e)]

核心概念

Message(消息)

??消息载体。Message发送或消费时必须指定Topic。Message有一个可选的Tag用于过滤新闻,还可以添加额外的键值对。

Topic(主题)

??消息的逻辑分类必须在发送消息的逻辑分类topic只有这样,这条消息才能发送到这里topic上面。消费消息时指定这个。topic消费。是逻辑分类。

Queue(队列)

??1个Topic分为N个Queue,数量置数量。message其实是存储的queue上去,消费者也消费queue上面的消息。多说,比如一个topic4个queue,有5个Consumer都在消费这个topic,那么会有一consumer浪费掉了,因为负载均衡策略,每个consumer消费1个queue,5>4,溢出1个,这个会不工作。

Tag(标签)

  Tag 是 Topic 的进一步细分,顾名思义,标签。每个发送的时候消息都能打tag,消费的时候可以根据tag进行过滤,选择性消费。

Producer Group(生产者组)

  消息生产者组。标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不完全支持事务消息(阉割了事务回查的代码)。

Consumer Group(消费者组)

  消息消费者组。标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。

注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。

消息标识(MessageId/Key)

  RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个msgId,当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。

  • msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode +当前时间 + AutomicInteger自增计数器
  • offsetMsgId:由broker端生成,其生成规则为: brokerIp + 物理分区的offset(Queue中的偏移量)
  • key:由用户指定的业务相关的唯一标识

消费进度Offset

  消费进度offset是用来记录每个Queue的不同消费组的消费进度的。根据消费进度记录器的不同,可以 分为两种模式:本地模式和远程模式。

1、offset本地管理模式

  当消费模式为广播消费时,offset使用本地模式存储。因为每条消息会被所有的消费者消费,每个消费 者管理自己的消费进度,各个消费者之间不存在消费进度的交集。

  Consumer在广播消费模式下offset相关数据以json的形式持久化到Consumer本地磁盘文件中,默认文 件路径为当前用户主目录下的.rocketmq_offsets/consumerClientId/groupName/Offsets.json 。 其中consumerClientId为当前消费者实例id,默认为ip@DEFAULT;groupName为消费者组名称。

2、offset远程管理模式

  当消费模式为集群消费时,offset使用远程模式管理。因为所有Cosnumer实例对消息采用的是均衡消 费,所有Consumer共享Queue的消费进度。

  Consumer在集群消费模式下offset相关数据以json的形式持久化到Broker磁盘文件中,文件路径为当前 用户主目录下的

  Broker启动时会加载这个文件,并写入到一个双层Map(ConsumerOffsetManager)。外层map的key 为topic@group,value为内层map。内层map的key为queueId,value为offset。当发生Rebalance时, 新的Consumer会从该Map中获取到相应的数据来继续消费。 集群模式下offset采用远程管理模式,主要是为了保证Rebalance机制。

3、offset作用

  消费者是如何从最开始持续消费消息的?消费者要消费的第一条消息的起始位置是用户自己通过consumer.setConsumeFromWhere()方法指定的,在Consumer启动后,其要消费的第一条消息的起始位置常用的有三种,这三种位置可以通过枚举类型常量设置。这个枚举类型为ConsumeFromWhere。

  当消费完一批消息后,Consumer会提交其消费进度offset给Broker,Broker在收到消费进度后会将其更 新到那个双层Map(ConsumerOffsetManager)及consumerOffset.json文件中,然后向该Consumer进 行ACK,而ACK内容中包含三项数据:当前消费队列的最小offset(minOffset)、最大 offset(maxOffset)、及下次消费的起始offset(nextBeginOffset)

public enum ConsumeFromWhere { 
        
    /** * 从queue的当前最后一条消息开始消费 */
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    /** * 从queue的第一条消息开始消费 */
    CONSUME_FROM_FIRST_OFFSET,
    /** * 从指定的具体时间戳位置的消息开始消费。 * */
    CONSUME_FROM_TIMESTAMP,
}

rocketmq核心组件启动流程

nameserv启动流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DlnF7F5g-1638173732540)(http://wb.paic.com.cn/file/uid/b69d2887e1504216b508186cb2ea5049)]

broker启动流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3I3NguL4-1638173732543)(http://wb.paic.com.cn/file/uid/2ec4597270d145eda3c32d421eb69458)]

producer启动流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hx5XmbGz-1638173732545)(http://wb.paic.com.cn/file/uid/41faefb45cc448a2bc676c4f81d27be1)]

consumer启动流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m1I4B7j9-1638173732547)(http://wb.paic.com.cn/file/uid/be7d7b47bea14cd98c17ae478d7da72e)]

消息发送

1、消息类型

rocketmq中有如下几种消息类型

  • 普通消息
  • 顺序消息
  • 延时消息
  • 事务消息

普通消息

Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZvnFbB4V-1638173732549)(http://wb.paic.com.cn/file/uid/923bc49337d5440f82b86acaf73498ed)]

  同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OHcGeKB4-1638173732551)(http://wb.paic.com.cn/file/uid/53edee2646c74373ab4239668f0641e0)]   异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xitJODk3-1638173732553)(http://wb.paic.com.cn/file/uid/90d33c53894f4a57a4c8324ab69e15b1)]   单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。

顺序消息

什么是顺序消息

  顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

  默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

  • :一个queue内所有的消息按照先进先出的顺序进行发布和消费
  • :一个Topic内所有的消息按照先进先出的顺序进行发布和消费
为什么需要顺序消息

  现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态: 未支付、已支付、发货中、发货成功、发货失败。 根据以上订单状态,生产者从时序上可以生成如下几个消息:

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BS0HHl96-1638173732555)(http://wb.paic.com.cn/file/uid/c97d7b33843d4e3c8136423603fd789a)]

  这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方 式,是无法保证顺序是正确的。

  基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 Queue中,然后消费者采用一定队列选择算法策略(例如,一个线程独立处理一个queue,保证处理消息的顺序 性),能够保证消费的顺序性。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1yi2Xd8Y-1638173732557)(http://wb.paic.com.cn/file/uid/a520597a756d46d39883248ae76ee755)]

示例代码

        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 若为全局有序,则需要设置Queue数量为1
        // producer.setDefaultTopicQueueNums(1);

        producer.start();

        for (int i = 0; i < 100; i++) { 
        
            // 为了演示简单,使用整型数作为orderId
            Integer orderNo = i;
            byte[] body = ("Hello," + i).getBytes();
            Message msg = new Message("TopicA", "TagA", body);
            // 将orderId作为消息key
            msg.setKeys(orderNo.toString());
            // send()的第三个参数值会传递给选择器的select()的第三个参数
            // 该send()为同步发送
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() { 
        

                // 具体的选择算法在该方法中定义
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        
                    // 使用消息key作为选择的选择算法
                    String keys = msg.getKeys();
                    Integer orderNo = Integer.valueOf(keys);

                    int index = orderNo % mqs.size();
                    return mqs.get(index);
                }
            }, orderNo);

            System.out.println(sendResult);
        }
        producer.shutdown();

事务消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ERAYKUJH-1638173732559)(http://wb.paic.com.cn/file/uid/a92bb73210604f4983394f7359f381a8)]

事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。实现原理如下:

  • 生产者先发送一条半事务消息到MQ
  • MQ收到消息后返回ack确认
  • 生产者开始执行本地事务
  • 如果事务执行成功发送commit到MQ,失败发送rollback
  • 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
  • 生产者查询事务执行最终状态
  • 根据查询事务状态再次提交二次确认

如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hk2nsWes-1638173732560)(http://wb.paic.com.cn/file/uid/be2bf9a950c5449ebfc2e9509dcd8ce4)]

延时消息

  当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。   采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是订单超时未支付自动取消

  延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在RocketMQ服务端的MessageStoreConfig 类中的如下变量中

    // 延时等级分类
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的 延时等级可在broker配置文件中自定义

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJUUi29x-1638173732562)(http://wb.paic.com.cn/file/uid/487c4467a98f4fbeb97b93c525294037)]   Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:

  • 修改消息的Topic为SCHEDULE_TOPIC_XXXX
  • 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)。
  • 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的 Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到 commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息 被发送到Broker时的时间戳。
  • 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中 一条消息从生产到被消费,将会经历三个阶段:

  Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。

  延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。

        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 100; i++) { 
        
            byte[] body = ("Hello," + i).getBytes();
            Message msg = new Message("TopicB", "TagB", body);
            // 指定消息延迟等级为3级,即延迟10s
            msg.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(msg);
            // 输出消息被发送的时间
            System.out.print(new SimpleDateFormat("hh:mm:ss").format(new Date())+","+sendResult);
        }
        producer.shutdown();

2、消息的发送过程

Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:

  • Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
  • NameServer返回该Topic的路由表及Broker列表
  • Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息
  • Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
  • Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue

3、消息发送queue选择算法

rocketmq底层默认queue选择算法为轮询方式,并且rocketmq提供一些可选则的队列选择算法

SelectMessageQueueByHash      // 哈希取模算法
SelectMessageQueueByRandom   // 随机数算法
SelectMessageQueueByMachineRoom    // rocketmq暂无实现,需要用户自己去实现具体逻辑
自定义队列选择算法

public class SelectMessageQueueByHash implements MessageQueueSelector { 
        

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        
        int value = arg.hashCode() % mqs.size();
        if (value < 0) { 
        
            value = Math.abs(value);
        }
        return mqs.get(value);
    }
}

    // 自定义选择队列算法
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() { 
        
    
    // 具体的选择算法在该方法中定义
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        
            // 使用消息key作为选择的选择算法
            String keys = msg.getKeys();
            Integer orderNo = Integer.valueOf(keys);
        
            int index = orderNo % mqs.size();
            return mqs.get(index);
        }
    }, null);

4、消息发送源码解析

        // 创建一个producer,参数为Producer Group名称
        DefaultMQProducer producer = new DefaultMQProducer("sync-product-group");
        // 指定nameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 设置当发送失败时重试发送的次数,默认为2次
        producer.setRetryTimesWhenSendFailed(3);
        // 设置发送超时时限为5s,默认3s
        producer.setSendMsgTimeout(5000);
        // 生产者启动
        producer.start();
        // 生产并发送100条消息
        for (int i = 0; i < 100; i++) { 
        
            byte[] body = ("hello-" + i).getBytes();
            Message msg = new Message("testTopic", "testTag", body);
            // 为消息指定key
            msg.setKeys("key-" + i);
            // 同步发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 关闭producer
        producer.shutdown();

消息对应的Topic信息以及具体内容被封装在了Message中,并交由DefaultMQProducer,调用send()进行发送。DefaultMQProducer 只是一个面向调用方的代理,真正的生产者是DefaultMQProducerImpl,而消息发送的具体实现,便在DefaultMQProducerImpl中的这个方法内:

private SendResult sendDefaultImpl(//
            Message msg,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback, final long timeout//
    ) 

  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

首先当然是进行本地查表,本地路由信息存放在topicPublishInfoTable中。但是如果本地没有,则会向NameSrv发起请求,获取路由信息,更新本地路由表。接着再次尝试从本地路由表中获取路由信息。

// 本地查表
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 本地缓存中没有,便向NameSrv发起请求,更新本地路由缓存。
if (null == topicPublishInfo || !topicPublishInfo.ok()) { 
        
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

如果比较幸运,从NameSrv上查询到了,此处便会直接返回所找到的路由信息:topicPublishInfo。但是如果Topic事先没有在任何Broker上进行配置,那么Broker在向NameSrv注册路由信息时便不会带上该Topic的路由,所以生产者也就无法从NameSrv中查询到该Topic的路由了。

if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) { 
        
    return topicPublishInfo;
} else { 
        
    // 再次查询Topic路由
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
}

对于这种没有事先配置Topic的情况,RocketMQ不会直接抛出错误,而是会走到上面的else分支里,再次调用 updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer),从NameSrv 获取路由信息。

   public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                      DefaultMQProducer defaultMQProducer) { 
        
        ......
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) { 
        
            // 查询默认Topic TBW102的路由信息
            topicRouteData =
                    this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(
                            defaultMQProducer.getCreateTopicKey(), 1000 * 3);
        }
        ......
        // 克隆一份,放到路由表中
        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
        ......
        this.topicRouteTable.put(topic, cloneTopicRouteData);
}

这次调用 updateTopicRouteInfoFromNameServer()时,传入的参数 isDefault 为true,那么代码自然就进入了上面的 if 分支中。这里依旧是调用getDefaultTopicRouteInfoFromNameServer( defaultMQProducer.getCreateTopicKey(),1000*3) 从NameSrv查询Topic路由,不过此次不是查询消息所属Topic的路由信息,而是查询RocketMQ设置的一个默认Topic的路由,该默认Topic为 ,这个Topic就是用来创建其他Topic所用的。

如果某Broker配置了 autoCreateTopicEnable,允许自动创建Topic,那么在该Broker启动后,便会向自己的路由表中插入TBW102这个Topic,并注册到NameSrv,表明处理该Topic类型的消息。

所以当消息所属的Topic,暂且叫Topic X吧,它本身没有在任何Broker上配置的时候,生产者就会查询Topic TBW102的路由信息,暂时作为Topic X的的路由,并插入到本地路由表中。当TopicX利用该路由发送到 Broker后,Broker发现自己并没有该Topic信息后,便会创建好该Topic,并更新到NameSrv中,表明后续接收TopicX的消息。

获取topic路由大致流程如下:

1、先从本地缓存的路由表中查询;

2、没有找到的话,便向NameSrv发起请求,更新本地路由表,再次查询。

3、如果仍然没有查询到,表明Topic没有事先配置,则用Topic TBW102向NameSrv发起查 询,返回TBW102的路由信息,暂时作为Topic的路由。

前面提到每个Topic的路由信息中可能包含若干Queue,那么这些Queue是从哪来的呢?不管怎么样,这些Queue的信息肯定是NameSrv返回的。生产者从NameSrv拉取的路由信息为TopicRouteData,我们不妨先来看下它的结构:

public class TopicRouteData extends RemotingSerializable { 
        
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    ......
}

queueDatas 中包含了Topic对应的所有Queue信息,其中QueueData结构如下:

public class QueueData implements Comparable<QueueData> { 
        
    private String brokerName;
    private int readQueueNums;
    private  

标签: 7b0bs41变送器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台