资讯详情

通用接口开放平台设计与实现——(13)消息服务端之消息处理器

在此之前,我们介绍了服务端消息处理链中处理器的配置,实际上相当于技术框架。今天,我们将介绍处理特定业务信息的处理器。

处理框架由几个主要部件组成: 1.新闻主题:我们根据业务需要将事件转化为新闻主题。新闻主题相当于对不同类型的新闻进行分类,这是我们业务处理的基本支持部分。 2.消息处理器工厂:根据消息主题找到相应的处理器完整类名,然后通过反射技术创建特定的消息处理器对象。 3.消息处理器:在真正处理特定业务逻辑的地方,消息分为两类,消息处理器也有两类。一方面,它可以提取公共部分形成父亲,另一方面,它可以被特定的业务逻辑处理器继承。

下面逐一介绍。

消息主题

如上所述,新闻主题是基础数据,实际上需要在平台管理中单独定义和维护,主要属性如下: code:主题编码与业务事件编码一致,可以独特识别一类新闻; handler:处理器存储与消息主题对应的特定逻辑处理器的完整路径(包名和类名),然后处理器工厂根据该属性反射处理器的实例; responseTopicCode:响应主题代码,对于请求消息,需要配置默认响应主题代码,以实现消息响应。

/** * 消息主题 * @author wqliu * @date 2021-08-21 * */ @Data @EqualsAndHashCode(callSuper = true) @Accessors(chain = true) @TableName("ip_api_message_topic") public class ApiMessageTopic extends BaseEntity { 
              private static final long serialVersionUID = 1L;      /** * 编码 */     @TableField("code")     private String code;      /** * 名称 */     @TableField("name")     private String name;      /** * 处理器 */     @TableField("handler")     private String handler;       /** * 响应主题编码 */     @TableField("response_topic_code")     private String responseTopicCode;      /** * 分类 */     @TableField("category")     private String category;      /** * 状态 */     @TableField("status")     private String status;      /** * 备注 */     @TableField("remark")     private String/span> remark; /** * 排序号 */ @TableField("order_no") private String orderNo; } 

消息处理器工厂

使用了设计模式中的简单工厂模式,根据消息主题编码拿到处理器的完整路径,通过反射实现处理器的实例化。

/** * 消息处理器工厂 * @author wqliu * @date 2021-10-13 9:07 **/
public  class MessageHandlerFactory { 
        
    private MessageHandlerFactory(){ 
        };

    public static MessageHandler createHandler(String topic){ 
        
        //使用反射技术获取类
        Class<MessageHandler> messageHandler=null;
        try { 
        
            //根据消息主题获取对应的消息处理类名
            ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class);
            String handlerName = service.getHandlerByCode(topic);
            messageHandler = (Class<MessageHandler>) Class.forName(handlerName);
            //返回消息处理类的实例
            return messageHandler.newInstance();
        }
        catch (CustomException e){ 
        
            throw new MessageException("S101",e.getMessage());
        }catch (Exception e){ 
        
            throw new MessageException("S102","消息处理器不存在");
        }
    }
}

在我们设计的消息处理链条的技术框架中,请求消息处理器和响应消息处理器,都是通过如下代码来实现具体业务消息处理器的实例化的。

     //转具体的消息处理器进行处理
     ResponseMessageHandler handler = (ResponseMessageHandler)MessageHandlerFactory.createHandler(topic);
     handler.handleMessage(message,ctx.channel());

消息处理器

因为消息分了两类,并且对于请求消息和响应消息的处理逻辑是不同的,相应的,消息处理器也有两个,分别是请求消息处理器RequestMessageHandler和响应消息处理器ResponseMessageHandler,并且可以提取这两个处理器的公用操作,形成一个抽象父类MessageHandler。 ​

具体的业务消息处理器,则会继承RequestMessageHandler或ResponseMessageHandler,覆写其中的部分方法即可。 ​

我们先从比较简单的响应消息处理器说起。 ​

响应消息处理器

主要干三件事 这里的验证跟技术框架中的基本验证有所不同,主要是验证以下内容:消息主题验证(是否存在及是否可用)、应用验证(是否存在及是否可用)、权限验证、时效性验证。 ​

根据请求消息标识,查找消息日志,然后填充其响应部分,后面会详细说下消息日志的设计与实现。

预留了一个messageOperation方法,当具体的消息处理器有额外的个性化逻辑需要处理时,只需要覆盖该方法即可,这里实际是设计模式中的模板方法的应用

/** * 响应消息处理器 * @author wqliu * @date 2022-1-8 11:07 **/
public class ResponseMessageHandler  extends MessageHandler{ 
        

    /** * 消息处理 * * @param message 消息 * @param channel 通道 */
    public void handleMessage(ResponseMessage responseMessage, Channel channel) { 
        


        //验证框架
        validateFramework(responseMessage);

        // 更新消息日志
        apiMessageLogService.updateResponsePart(responseMessage);

        //特殊处理
        messageOperation(responseMessage, channel);
    }


    /** * 响应消息处理 * * @param message * @param channel */
    protected void messageOperation(ResponseMessage message, Channel channel) { 
        

    }


}

请求消息处理器

请求消息处理器比响应处理器要复杂不少,除了要验证消息、创建消息日志以及预留的业务逻辑方法外,还需要实现以下两个重要逻辑:

  1. 向发送请求的客户端,发送响应消息,并更新相应的日志记录的响应部分
  2. 消息复制及转发,找到订阅该消息主题的所有客户端,推送消息。

/** * 请求消息处理器 * * @author wqliu * @date 2022-1-8 10:42 **/
@Slf4j
public class RequestMessageHandler extends MessageHandler { 
        
    /** * 消息处理 * * @param message 消息 * @param channel 通道 */
    public void handleMessage(RequestMessage requestMessage, Channel channel) { 
        

        // 记录消息请求日志
        apiMessageLogService.createRequestPart(requestMessage);

        //验证框架
        validateFramework(requestMessage);

        //将请求消息状态默认设置为无需发送
        apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId());


        //特殊处理
        messageOperation(requestMessage, channel);

        //发送响应
        sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
                requestMessage.getTopic());
        //消息处理(复制及转发)
        repostMessage(requestMessage);
    }




    /** * 消息复制及转发 * * @param requestMessage */
    protected void repostMessage(RequestMessage requestMessage) { 
        
        //查找订阅
        String topic = requestMessage.getTopic();
        List<String> subscriberList = apiMessageSubscriptionService.getSubscriberList(topic);
        if (CollectionUtils.isEmpty(subscriberList)) { 
        
            return;
        }
        //创建请求消息
        RequestMessage message = new RequestMessage();
        //TODO:为方便测试,使用同一个客户端作为消息的生产者和消费者,消息中心在转发消息时,将消息主题附加.temp后缀
        message.setTopic(topic+".temp");
        message.setContent(requestMessage.getContent());
        message.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());
        //设置状态为等待发送
        message.setStatus(MessageStatusEnum.WAIT_REQUEST.name());
        //遍历订阅者,发送消息
        subscriberList.stream().forEach(appCode -> { 
        
            //通过原型模式复制消息,更改响应应用编码
            RequestMessage newMessage = message.clone();
            newMessage.setResponseAppCode(appCode);
            //TODO:数据权限过滤
            boolean hasDataPermission = dataPermissionFilter(requestMessage, appCode);
            if (hasDataPermission) { 
        
                //发送消息
                sendMessage(newMessage);
            }

        });

    }


    /** * 发送消息 * * @param message */
    protected void sendMessage(RequestMessage message) { 
        
        String appCode = message.getResponseAppCode();

        //获取对接模式
        String integrationModel = apiAppService.getIntegrationModelByAppCode(appCode);
        if (integrationModel.equals(IntegrationModelEnum.CLIENT.name())) { 
        
            //客户端模式
            Channel channel = MessageServerHolder.appChannelMap.get(appCode);
            // log.info("服务端发送业务消息至订阅者{}:{}",appCode, JSON.toJSONString(message));
            ChannelFuture channelFuture = channel.writeAndFlush(message);
            channelFuture.addListener(new ChannelFutureListener() { 
        
                @Override
                public void operationComplete(ChannelFuture future) throws Exception { 
        
                    //设置状态
                    message.setStatus(MessageStatusEnum.REQUESTED.name());
                    message.setSendCount(message.getSendCount() + 1);

                    // 记录消息请求日志
                    apiMessageLogService.createRequestPart(message);
                }
            });
        } else if (integrationModel.equals(IntegrationModelEnum.INTERFACE.name())) { 
        
            //状态设置为待处理
            message.setStatus(MessageStatusEnum.WAIT_HANDLE.name());

            // 记录消息请求日志
            apiMessageLogService.createRequestPart(message);
        }

    }


    /** * 请求消息处理 * * @param message * @param channel */
    protected void messageOperation(RequestMessage message, Channel channel) { 
        

    }


}

其中消息复制与转发部分,相关的逻辑比较复杂,下篇我们专门来说。 ​

公用处理器

请求与响应处理器的父类,主要是公共部分复用,主要完成了数据验证和发送响应等工作。

/** * 消息处理父类 * * @author wqliu * @date 2021-10-13 9:07 **/
@Slf4j
public  class MessageHandler { 
        

    protected AppConfig appConfig = SpringUtil.getBean(AppConfig.class);

    protected ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);

    protected ApiMessageSubscriptionService apiMessageSubscriptionService= SpringUtil.getBean(ApiMessageSubscriptionService.class);


    protected ApiAppService apiAppService = SpringUtil.getBean(ApiAppService.class);

    protected ApiMessageTopicService apiMessageTopicService=SpringUtil.getBean(ApiMessageTopicService.class);


    protected ApiMessagePermissionService apiMessagePermissionService=
            SpringUtil.getBean(ApiMessagePermissionService.class);

    protected ApiDataPermissionService apiDataPermissionService=   SpringUtil.getBean(ApiDataPermissionService.class);


    /** * 数据权限通配符 */
    public static final String DATA_PERMISSION_ALL = "*";

    /** * 验证框架 */
    protected  void validateFramework(BaseMessage message){ 
        

        // 消息主题验证(是否存在及是否可用)
        validateTopic(message.getTopic());
        // 应用验证(是否存在及是否可用)
        validateAppCode(message.getPublishAppCode());
        // 权限验证
        validatePermission(message.getPublishAppCode(),message.getTopic());
        // 时效性验证
        validatePublishTimeValid(message.getPublishTime());
    }

    /** * 发送响应 * * @param channel * @param result * @param errorCode * @param errorMessage * @param requestMessageId */
    public void sendResponse(Channel channel, String result, String errorMessage,
                             String requestMessageId, String topic) { 
        


        // 组织响应消息
        ResponseMessage messageResponse = new ResponseMessage();
        //默认从消息主题实体类中获取响应主题编码,可被子类覆写
        messageResponse.setTopic(this.getResponseTopicCode(topic));
        messageResponse.setContent(this.getResponseContent());

        messageResponse.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());
        messageResponse.setResult(result);
        messageResponse.setErrorMessage(errorMessage);
        messageResponse.setRequestMessageId(requestMessageId);


        // 发送响应给请求方
        channel.writeAndFlush(messageResponse);

        // 更新消息日志
        apiMessageLogService.updateResponsePart(messageResponse);

    }




    /** * 验证权限 * * @param publishAppCode 应用程序编码 * @param topicCode 主题编码 */
    protected void validatePermission(String publishAppCode, String topicCode) { 
        

        boolean hasPermission = apiMessagePermissionService.checkPermission(publishAppCode, topicCode);
        if(hasPermission==false){ 
        
            throw new MessageException("301", "应用无权限");
        }

    }

    /** * 验证时效性 * * @param publishTimeString 发布时间字符串 */
    protected void validatePublishTimeValid(String publishTimeString) { 
        


        // 数据验证环节已验证可转换,此处不再处理转换异常
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date publishTime = null;
        try { 
        
            publishTime = dateFormat.parse(publishTimeString);
        } catch (ParseException e) { 
        
            // 前序环节已验证过日期格式,此处不会抛出异常,仅为编译通过
        }

        // 获取系统当前时间
        Date currentTime = new Date();
        // 比较时间
        long diff = Math.abs(currentTime.getTime() - publishTime.getTime());
        //允许最大的时间差,单位毫秒
        int maxTimeSpan = 10*60*1000;
        if (diff  > maxTimeSpan)
        { 
        
            // 请求时间超出合理范围(10分钟)
            throw new MessageException("S401", "发布时间超出合理范围");
        }
    }

    /** * 验证应用 * * @param publishAppCode 应用编码 */
    protected void validateAppCode(String publishAppCode) { 
        
        try { 
        
            ApiApp app =apiAppService.getByCode(publishAppCode);
            if(app.getStatus().equals(StatusEnum.DEAD.name())){ 
        
                throw new MessageException("S202", "应用被停用");

            }
        }catch (Exception ex){ 
        
            throw new MessageException("S201", "应用标识无效");
        }
    }

    /** * 验证主题编码 * * @param topicCode 主题编码 */
    protected void validateTopic(String topicCode) { 
        
        try { 
        
            ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
            if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){ 
        
                throw new MessageException("S102", "消息主题不可用");
            }
        }catch (Exception ex){ 
        
            throw new MessageException("S101", "消息主题不存在");
        }

    }



    /** * 获取响应消息主题 * * @return */
    protected String getResponseTopicCode(String topic) { 
        
        //默认从消息主题实体类中获取
        ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class);
        String topicCode = service.getResponseTopicCodeByCode(topic);
        return topicCode;
    }

    /** * 获取响应消息内容 * * @return */
    protected String getResponseContent() { 
        
        return StringUtils.EMPTY;
    }


    /** * 数据权限过滤 * @param message 消息 * @param appCode 响应方应用编码 * @return true,有权限,false 无权限 */
    protected boolean dataPermissionFilter(RequestMessage message,String appCode){ 
        
        //默认返回true,不进行数据权限控制,可被需要进行数据权限控制的子类覆写
        return true;
    }

}

标签: abs轮速传感器s102端盖

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台