资讯详情

rabbitmq

发送方:

1, 不关注队列是否存在,如果交换机直接这边生成了,也没有在这边做绑定啥的,那么mq那里没有交换机。只有在发送请求之前,它才会出来。如果消费者服务在出来之前启动,它将报告错误。如果不能绑定交换机,则挂断过程(如果消费者也生成交换机并使用它bing绑定,交换机会立即出来,绑定成功) 2.如果没有交换机,就不会报错,也不会影响启动。只有调用时才会报错,找不到交换机 3.如果已经机已经存在,交换机的参数将在发送时从新的定义中报错。此定义的属性将用于发送,但mq中的交换机不会变。所以会报错,发送跟交换机的属性不一样,重试几次之后,就不会发送了,发送失败。 消费方:

1.如果绑定的交换机不存在,就会报错,过程也起不来 2.如果绑定不是默认交换机,交换机被删除,则该队列将成为绑定默认队列。如果生成了交换机,则该队列仍然是默认的绑定。如果再次绑定但改变了队列信息,则会报告错误,并说队列信息不同。如果信息没有改变并再次绑定,该队列将重新绑定到交换机 3.如果队列存在(交换机存在),则从新绑定rootingkey也就是说,如果只改变队列信息,就会报错两个队列信息

    //解绑 队列和交换机     @Test     public void unBind() throws Exception {         Connection connection = ConnectionUtil.getConnection();         Channel channel = connection.createChannel();         channel.queueUnbind("spring.test.*", "spring.test.exchange", "*.*");     }
    // 设置到broker确认(即是否到达exchange)              springbootConnectionFactory.setPublisherConfirmType(                         CachingConnectionFactory.ConfirmType.CORRELATED);     // 没有队列接收就不会触发这个,只有在异常情况下才会触发     springbootConnectionFactory.setPublisherReturns(true);      factory.addChannelListener(rabbitChannelListener);     factory.addConnectionListener(rabbitConnectionListener);     factory.setRecoveryListener(rabbitRecoveryListener);      // 交换器无法根据自己的类型和路由键找到合格的队列处理方法 true:RabbitMQ会调用      // Basic.Return命令将消息返还给生产者 false:RabbitMQ直接丢弃消息     rabbitTemplate.setMandatory(true);
这里有三个监听器: ChannelListener 用于创建和销毁监控通道 
     @Service     public class RabbitChannelListener implements ChannelListener {         @Override         public void onCreate(Channel channel, boolean b) {             log.info("=====onCreate channel: {}, transactional: {}", channel, b);         }          @Override         public void onShutDown(ShutdownSignalException signal) {             // 可根据isHardError判断是channel断开还是connection断开             if (signal.isHardError()) {                 AMQImpl.Connection.Close close = (AMQImpl.Connection.Close) signal.getReason();                 log.warn("Connection onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}",                         close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText());             } else {                 AMQImpl.Channel.Close close = (AMQImpl.Channel.Close) signal.getReason();                 log.warn("Channel onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}",                         close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText());             }         }     }

ConnectionListener 用于创建和关闭监控连接

    public class RabbitConnectionListener implements ConnectionListener {         @Override         public void onCreate(Connection connection) {             log.info("================onCreate: {}", connection);         }          @Override         public void onClose(Connection connection) {             log.info("================onClose: {}", connection);         }          @Override         public void onShutDown(ShutdownSignalException signal) {             log.info("================onShutDown: {}", signal);         }     }
RecoveryListener 监听自动重连的情况,这个listener没有测试什么场景会出现
    public class RabbitRecoveryListener implements RecoveryListener {         @Override         public void handleRecovery(Recoverable recoverable) {             log.info("================handleRecovery: {}", recoverable);         }          @Override         public void handleRecoveryStarted(Recoverable recoverable) {             log.info("================handleRecoveryStarted: {}", recoverable);         }     }

当生产者和消费者在同一服务中时,官方文件特别提醒他们不要使用相同的服务factory 一、可创建两个factory 2.//使用单独的发送连接,避免消费者因各种原因被阻塞 rabbitTemplate.setUsePublisherConnection(true); 关于失败重连模式 1.也可调用相应的类对象实现重连。java client 方法 ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); ///设置网络异常重连 factory.setNetworkRecoveryInterval(10000) 没10s ,重试一次 factory.setTopologyRecoveryEnabled(true);//设置重新声明交换器、队列等信息。 2.捕获异常,判断异常信息,重新连接最简单的方法。try...,except,(catch)。 下面设置ssl里面的factory就是使用的这种

设置 ssl:

    @Bean(name = "cachingConnectionFactory")     public ConnectionFactory cachingConnectionFactory(MqProperties mqProperties) {         log.info("mq ssl open:{}", mqProperties.getMqSslOpen());         CachingConnectionFactory springbootConectionFactory = new CachingConnectionFactory(
                getCommonConnectionFactory());
        try {
            Map<String, String> map = getMqInfo(mqProperties);
            springbootConnectionFactory.setHost(map.get("mqIp"));
            springbootConnectionFactory.setPort(Integer.parseInt(map.get("mqPort")));
            springbootConnectionFactory.setUsername(map.get("mqUser"));
            springbootConnectionFactory.setPassword(map.get("mqPwd"));
            log.info("mqConnectionFactory created, mq address is :{},port is {}", springbootConnectionFactory.getHost(),
                    springbootConnectionFactory.getPort());
            Connection connection = springbootConnectionFactory.createConnection();
            connection.close();
            return springbootConnectionFactory;
        } catch (Exception e) {
            log.info("create rabbitMq connection error:{}", e.getMessage());
        }
        new Thread(() -> System.exit(1)).start();
        return null;
    }

    private com.rabbitmq.client.ConnectionFactory getCommonConnectionFactory() {
        com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory();
        rabbitConnectionFactory.setVirtualHost("/");
        rabbitConnectionFactory.useSslProtocol(getSslContext().orElseThrow(RuntimeException::new));
        return rabbitConnectionFactory;
    }

    private Optional<SSLContext> getSslContext(MqProperties mqProperties) {
        ClassLoader classLoader = MqProperties.class.getClassLoader();
        if (classLoader == null) {
            return Optional.empty();
        }
        try (InputStream trustStoreStream = classLoader.getResourceAsStream("XXXX.jks")) {
            return sslContext(trustStoreStream, mqProperties.getMqTrustPass(), mqProperties.getProtocol());
        } catch (Exception e) {
            log.error("getSslContext error:", e);
        }
        return Optional.empty();
    }

    private Optional<SSLContext> sslContext(InputStream trustStoreStream, String mqTrustPass, String protocol)
            throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException {
        char[] trustPassphrase = 解密出这个值(mqTrustPass).toCharArray();
        KeyStore tks = KeyStore.getInstance("JKS");
        tks.load(trustStoreStream, trustPassphrase);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);
        // 初始化SSL上下文
        SSLContext sslContext = SSLContext.getInstance(protocol);
        sslContext.init(null, tmf.getTrustManagers(), null);
        return Optional.of(sslContext);
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(@Qualifier("cachingConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if (!ack) {
                log.error("correlationData:{} ack:false cause:{} ", correlationData, cause);
            }
        }));
        return rabbitTemplate;
    }

标签: sunx电涡流位移传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台