发送方:
1, 不关注队列是否存在,如果交换机直接这边生成了,也没有在这边做绑定啥的,那么mq那里没有交换机。只有在发送请求之前,它才会出来。如果消费者服务在出来之前启动,它将报告错误。如果不能绑定交换机,则挂断过程(如果消费者也生成交换机并使用它bing绑定,交换机会立即出来,绑定成功) 2.如果没有交换机,就不会报错,也不会影响启动。只有调用时才会报错,找不到交换机 3.如果已经机已经存在,交换机的参数将在发送时从新的定义中报错。此定义的属性将用于发送,但mq中的交换机不会变。所以会报错,发送跟交换机的属性不一样,重试几次之后,就不会发送了,发送失败。 消费方:
1.如果绑定的交换机不存在,就会报错,过程也起不来 2.如果绑定不是默认交换机,交换机被删除,则该队列将成为绑定默认队列。如果生成了交换机,则该队列仍然是默认的绑定。如果再次绑定但改变了队列信息,则会报告错误,并说队列信息不同。如果信息没有改变并再次绑定,该队列将重新绑定到交换机 3.如果队列存在(交换机存在),则从新绑定rootingkey也就是说,如果只改变队列信息,就会报错两个队列信息
//解绑 队列和交换机 @Test public void unBind() throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueUnbind("spring.test.*", "spring.test.exchange", "*.*"); }
// 设置到broker确认(即是否到达exchange) springbootConnectionFactory.setPublisherConfirmType( CachingConnectionFactory.ConfirmType.CORRELATED); // 没有队列接收就不会触发这个,只有在异常情况下才会触发 springbootConnectionFactory.setPublisherReturns(true); factory.addChannelListener(rabbitChannelListener); factory.addConnectionListener(rabbitConnectionListener); factory.setRecoveryListener(rabbitRecoveryListener); // 交换器无法根据自己的类型和路由键找到合格的队列处理方法 true:RabbitMQ会调用 // Basic.Return命令将消息返还给生产者 false:RabbitMQ直接丢弃消息 rabbitTemplate.setMandatory(true);
这里有三个监听器: ChannelListener 用于创建和销毁监控通道
@Service public class RabbitChannelListener implements ChannelListener { @Override public void onCreate(Channel channel, boolean b) { log.info("=====onCreate channel: {}, transactional: {}", channel, b); } @Override public void onShutDown(ShutdownSignalException signal) { // 可根据isHardError判断是channel断开还是connection断开 if (signal.isHardError()) { AMQImpl.Connection.Close close = (AMQImpl.Connection.Close) signal.getReason(); log.warn("Connection onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}", close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText()); } else { AMQImpl.Channel.Close close = (AMQImpl.Channel.Close) signal.getReason(); log.warn("Channel onShutDown replyCode: {}, methodId: {}, classId: {}, replyText: {}", close.getReplyCode(), close.getMethodId(), close.getClassId(), close.getReplyText()); } } }
ConnectionListener 用于创建和关闭监控连接
public class RabbitConnectionListener implements ConnectionListener { @Override public void onCreate(Connection connection) { log.info("================onCreate: {}", connection); } @Override public void onClose(Connection connection) { log.info("================onClose: {}", connection); } @Override public void onShutDown(ShutdownSignalException signal) { log.info("================onShutDown: {}", signal); } }
RecoveryListener 监听自动重连的情况,这个listener没有测试什么场景会出现
public class RabbitRecoveryListener implements RecoveryListener { @Override public void handleRecovery(Recoverable recoverable) { log.info("================handleRecovery: {}", recoverable); } @Override public void handleRecoveryStarted(Recoverable recoverable) { log.info("================handleRecoveryStarted: {}", recoverable); } }
当生产者和消费者在同一服务中时,官方文件特别提醒他们不要使用相同的服务factory 一、可创建两个factory 2.//使用单独的发送连接,避免消费者因各种原因被阻塞 rabbitTemplate.setUsePublisherConnection(true); 关于失败重连模式 1.也可调用相应的类对象实现重连。java client 方法 ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); ///设置网络异常重连 factory.setNetworkRecoveryInterval(10000) 没10s ,重试一次 factory.setTopologyRecoveryEnabled(true);//设置重新声明交换器、队列等信息。 2.捕获异常,判断异常信息,重新连接最简单的方法。try...,except,(catch)。 下面设置ssl里面的factory就是使用的这种
设置 ssl:
@Bean(name = "cachingConnectionFactory") public ConnectionFactory cachingConnectionFactory(MqProperties mqProperties) { log.info("mq ssl open:{}", mqProperties.getMqSslOpen()); CachingConnectionFactory springbootConectionFactory = new CachingConnectionFactory( getCommonConnectionFactory()); try { Map<String, String> map = getMqInfo(mqProperties); springbootConnectionFactory.setHost(map.get("mqIp")); springbootConnectionFactory.setPort(Integer.parseInt(map.get("mqPort"))); springbootConnectionFactory.setUsername(map.get("mqUser")); springbootConnectionFactory.setPassword(map.get("mqPwd")); log.info("mqConnectionFactory created, mq address is :{},port is {}", springbootConnectionFactory.getHost(), springbootConnectionFactory.getPort()); Connection connection = springbootConnectionFactory.createConnection(); connection.close(); return springbootConnectionFactory; } catch (Exception e) { log.info("create rabbitMq connection error:{}", e.getMessage()); } new Thread(() -> System.exit(1)).start(); return null; } private com.rabbitmq.client.ConnectionFactory getCommonConnectionFactory() { com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory(); rabbitConnectionFactory.setVirtualHost("/"); rabbitConnectionFactory.useSslProtocol(getSslContext().orElseThrow(RuntimeException::new)); return rabbitConnectionFactory; } private Optional<SSLContext> getSslContext(MqProperties mqProperties) { ClassLoader classLoader = MqProperties.class.getClassLoader(); if (classLoader == null) { return Optional.empty(); } try (InputStream trustStoreStream = classLoader.getResourceAsStream("XXXX.jks")) { return sslContext(trustStoreStream, mqProperties.getMqTrustPass(), mqProperties.getProtocol()); } catch (Exception e) { log.error("getSslContext error:", e); } return Optional.empty(); } private Optional<SSLContext> sslContext(InputStream trustStoreStream, String mqTrustPass, String protocol) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException { char[] trustPassphrase = 解密出这个值(mqTrustPass).toCharArray(); KeyStore tks = KeyStore.getInstance("JKS"); tks.load(trustStoreStream, trustPassphrase); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); // 初始化SSL上下文 SSLContext sslContext = SSLContext.getInstance(protocol); sslContext.init(null, tmf.getTrustManagers(), null); return Optional.of(sslContext); } @Bean public RabbitTemplate rabbitTemplate(@Qualifier("cachingConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> { if (!ack) { log.error("correlationData:{} ack:false cause:{} ", correlationData, cause); } })); return rabbitTemplate; }