资讯详情

通用接口开放平台设计与实现——(17)消息服务端之消息发送器

之前,我们介绍了服务器。无论是请求消息还是响应消息,我们都将消息主题传输到工厂,并建立相应的处理器进行处理。

同样,我们也可以使用相同的处理模式来发送消息。

有一个问题需要思考。发送器和消息的关系是什么?

发送器相当于业务信息的包装。新闻参数是通过发送器属性设置的,还是在调用发送方法时作为方法参数?

事实上,这两种方法都能达到我们的目的,具体分析如下:

对于发送机对应的新闻主题,我们希望具体的发送机直接内置固化,不能交给外部,否则会造成混乱,因此采用结构参数设置。

对于消息标志,有两种场景,一种是新消息,此时消息标志直接使用要求消息结构方法生成的标志,另一种是消息重新发送,此时,我们需要阅读原始消息标志,即在重新发送过程中保持消息不变,以便服务端可以根据消息标志重新发送。因此,在调用方法时可以引入消息标参数。

对于新闻内容,也有两种情况,一种是相应的业务事件,包括识别、事件,另一种是相应的技术框架部分的操作,如登录,需要户和密钥json对象传入。

综上所述,我们设置了两个属性,即新闻主题和新闻内容,其中新闻主题是私有属性,只允许通过构造函数传输,新闻内容设置了保护方法,允许被子调用。

处理框架由几个主要部件组成: 1.新闻主题:我们根据业务需要将事件转化为新闻主题。新闻主题相当于对不同类型的新闻进行分类,这是我们业务处理的基本支持部分。 2.新闻发送器工厂:根据新闻主题找到相应的发送器完整类别,然后通过反射技术创建特定的新闻发送器对象。 3.新闻发送器:在真正处理特定业务逻辑的地方,新闻分为两类,新闻发送器也有两类。一方面,它可以提取公共部分形成父亲,另一方面,它可以被特定的业务逻辑处理器继承。

下面逐一介绍。

消息主题

如上所述,新闻主题是基础数据,我们需要在原始基础上添加一个sender属性,存储信息发送器的完整类别

/** * 消息主题 * @author wqliu * @date 2021-08-21 * */ @Data @EqualsAndHashCode(callSuper = true) @Accessors(chain = true) @TableName("ip_api_message_topic") public class ApiMessageTopic extends BaseEntity { 
             ……     /** * 处理器 */     @TableField("handler")     private String handler;               /** * 发送器 */     @TableField("sender")     private String sender;       /** * 响应主题编码 */     @TableField("response_topic_code")     private String responseTopicCode;     ……   } 

新闻发送器工厂

采用设计模式中的简单工厂模式,根据新闻主题编码获取发送机的完整路径,通过反射实现发送机的实例化。

/** * 消息发送器工厂 * @author wqliu * @date 2022-01-29 16:30 **/ public  class MessageSenderFactory { 
             private MessageSenderFactory(){ 
        };      public static MessageSender createSender(String topic){ 
                 ////使用反射技术获取类别         Class<MessageSender> messageSender=null; try { 
          //根据消息主题获取对应的消息处理类名 ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class); String senderName = service.getSenderByCode(topic); messageSender = (Class<MessageSender>) Class.forName(senderName); //返回消息发送器类的实例 return messageSender.newInstance(); } catch (CustomException e){ 
          throw new MessageException("S101",e.getMessage()); }catch (Exception e){ 
          throw new MessageException("S102","消息发送器不存在"); } } } 

消息发送器

消息发送有两个维度,按消息类型分为请求和响应,按发送场景分为新创建和重发,组合出4种情况,但按照我们对于消息传输可靠性的设计思路,由消息发送方对消息进行重发,因此实际并不需要对响应消息重发。 1.新创建消息 请求消息:根据订阅列表查询要发送的目的地。 响应消息:将请求消息中的发送方作为目的地。 ​

2.重发消息 只会对请求消息进行重发,并且基于消息日志构建,可以直接从记录中获取到要发送给谁。

因为消息分了两类,并且对于请求消息和响应消息的处理逻辑是不同的,相应的,消息发送器也有两个,分别是请求消息发送器RequestMessageSender和响应消息发送器ResponseMessageSender,并且可以提取这两个发送器的公用操作,形成一个抽象父类MessageSemder。 ​

具体的业务消息发送器,则会继承RequestMessageSenderr或ResponseMessageSender,覆写其中的部分方法即可。 ​

公用处理器

请求与响应发送器的父类,主要是公共部分复用,定义公共属性消息主题与消息内容。

/** * 消息发送器基类 * @author wqliu * @date 2021-10-5 14:02 */
@Slf4j
public  class MessageSender { 
        

    /** * 消息主题 */
    private String topic;
    /** * 消息内容 */
    private String content;

    public MessageSender(String topic){ 
        
        this.topic=topic;
    }

    /** * 获取消息主题 */
    public String getTopic() { 
        
        return this.topic;
    }


    /** * 设置消息内容 * @param content */
    public void setContent(String content) { 
        
        this.content = content;
    }

    /** * 获取消息内容 */
    public String getContent() { 
        
        return this.content;
    }

    /** * 应用程序配置 */
    protected AppConfig appConfig= SpringUtil.getBean(AppConfig.class);

    /** * 消息日志服务 */
    protected ApiMessageLogService apiMessageLogService= SpringUtil.getBean(ApiMessageLogService.class);

}

请求消息发送器

请求消息发送器比较复杂,会在以下两个场景下使用

  1. 服务端收到生产者的业务消息时,需要找到订阅该消息主题的所有消费者,推送消息。
  2. 通过消息日志实现的消息重发功能。

这两个场景的逻辑,特别是输入数据是不同的,场景1只需要消息发送内容就行了;场景2则是已经明确知道要发给谁,消息内容是什么,并且必须保证原消息标识不变。 ​

场景1使用void sendMessage(String content)方法来实现,内部需要查找订阅列表及实现数据权限过滤,并将处理拿到的数据(要发给谁,发什么内容),调用场景2的方法去执行发送工作。 场景2使用void sendMessage(String appCode,String content, String id)方法来实现,实现真正的消息发送,内部实现了消息日志的处理 ​

/** * 消息发送器基类 * @author wqliu * @date 2021-10-5 14:02 */
@Slf4j
public  class RequestMessageSender extends MessageSender { 
        



    protected ApiMessageSubscriptionService apiMessageSubscriptionService= SpringUtil.getBean(ApiMessageSubscriptionService.class);

    protected  ApiDataPermissionService apiDataPermissionService=SpringUtil.getBean(ApiDataPermissionService.class);

    /** * 数据权限通配符 */
    public static final String DATA_PERMISSION_ALL = "*";


    public RequestMessageSender(String topic){ 
        
        super(topic);

    }
    /** * 发送消息 * @param topic 消息主题 * @param content 消息内容 * @param id 消息标识 */
    public  void sendMessage(String appCode,String content, String id)
    { 
        

        // 生成请求消息
        RequestMessage message = new RequestMessage();
        // 使用已有ID重置默认生成的ID,用于关联消息
        if(StringUtils.isNotBlank(id)){ 
        
            message.setId(id);
        }
        //设置相关属性
        message.setTopic(super.getTopic());
        message.setContent(content);
        message.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());

        //获取发送通道
        Channel channel= MessageServerHolder.getChannel(appCode);
        if(channel!=null && channel.isActive()){ 
        
            ChannelFuture channelFuture = channel.writeAndFlush(message);
            channelFuture.addListener(new ChannelFutureListener() { 
        
                @Override
                public void operationComplete(ChannelFuture future) throws Exception { 
        

                    if(StringUtils.isBlank(id)){ 
        
                        //设置状态
                        message.setStatus(MessageStatusEnum.REQUESTED.name());
                        message.setSendCount(message.getSendCount() + 1);
                        //创建日志
                        apiMessageLogService.createRequestPart(message);
                    }else { 
        
                        // 更新发送次数
                        apiMessageLogService.incrementSendCount(id);
                    }
                }
            });

        }else{ 
        
            //创建日志
            apiMessageLogService.createRequestPart(message);
        }

    }

    /** * 发送请求消息 * @param topic 消息主题 * @param content 消息内容 * @param id 消息标识 */
    public  void sendMessage(String content)
    { 
        
        //查找是否有消息订阅者,无则直接终止后续处理
        List<String> subscriberList = apiMessageSubscriptionService.getSubscriberList(super.getTopic());
        if (CollectionUtils.isEmpty(subscriberList)) { 
        
            return;
        }
        //遍历订阅者,发送消息
        subscriberList.stream().forEach(appCode -> { 
        
            //数据权限过滤
            boolean hasDataPermission = dataPermissionFilter(content, appCode);
            if (hasDataPermission) { 
        
                //发送消息
                sendMessage(appCode,content,null);
            }
        });
    }



    /** * 数据权限过滤 * @param content 消息内容,通常是业务实体标识 * @param appCode 应用编码 * @return true,有权限,false 无权限 */
    protected boolean dataPermissionFilter(String content,String appCode){ 
        
        //默认返回true,不进行数据权限控制,可被需要进行数据权限控制的子类覆写
        return true;
    }


}

预留了数据权限过滤方法dataPermissionFilter,默认不进行数据权限控制,可被需要进行数据权限控制的子类覆写。 ​

响应消息发送器

响应消息发送器不需要转发消息,相对简单一些,预留了一个设置消息响应内容setResponseContent用于子类覆写,例如登录场景下,需要根据登录请求消息携带的账号密钥进行身份验证,然后根据验证构建不同的结果。

/** * @author wqliu * @date 2022-1-31 8:14 **/
@Data
public class ResponseMessageSender extends MessageSender{ 
        

    private String result;

    private String errorCode;

    private String errorMessage;


    public ResponseMessageSender(String topic) { 
        
       super(topic);
       //默认设置响应结果为成功
       this.result= MessageResponseResultEnum.SUCCESS.name();
    }



    /** * 发送响应 * * @param channel 通道 * @param requestMessage 请求消息 */
    public void sendMessage(Channel channel, RequestMessage requestMessage) { 
        

        // 组织响应消息
        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());
        responseMessage.setRequestMessageId(requestMessage.getId());
        //设置主题
        responseMessage.setTopic(this.getTopic());
        //设置默认值
        responseMessage.setResult(this.result);
        responseMessage.setErrorCode(this.errorCode);
        responseMessage.setErrorMessage(this.errorMessage);
        //设置响应
        setResponseContent(requestMessage,responseMessage,channel);
        // 发送响应给请求方
        channel.writeAndFlush(responseMessage);

        // 更新消息日志
        apiMessageLogService.updateResponsePart(responseMessage);

    }

    /** * 设置响应消息内容 * * @param requestMessage 请求消息 * @param responseMessage 响应消息 * @param channel 通道 */
    protected void setResponseContent(RequestMessage requestMessage,ResponseMessage responseMessage,Channel channel) { 
        

    }


}

标签: abs轮速传感器s102端盖

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台