在此之前,我们介绍了客户端消息处理链中处理器的配置,实际上相当于技术框架。今天,我们将介绍处理特定业务信息的处理器。
处理框架由几个主要部件组成: 1.新闻主题:我们根据业务需要将事件转化为新闻主题。新闻主题相当于对不同类型的新闻进行分类,这是我们业务处理的基本支持部分。 2.消息处理器工厂:根据消息主题找到相应的处理器完整类名,然后通过反射技术创建特定的消息处理器对象。 3.消息处理器:在真正处理特定业务逻辑的地方,消息分为两类,消息处理器也有两类。一方面,它可以提取公共部分形成父亲,另一方面,它可以被特定的业务逻辑处理器继承。
下面逐一介绍。
消息主题
上面说了,消息主题是基础数据,实际在平台管理中需要单独定义和维护的,主要属性有以下几个: code:主题编码与业务事件编码一致,可以独特识别一类新闻; handler:处理器存储与消息主题对应的特定逻辑处理器的完整路径(包名和类名),然后处理器工厂根据该属性反射处理器的实例; responseTopicCode:响应主题代码,对于请求消息,需要配置默认响应主题代码,以实现消息响应。
/** * 消息主题 * @author wqliu * @date 2021-08-21 * */ @Data @EqualsAndHashCode(callSuper = true) @Accessors(chain = true) @TableName("ip_api_message_topic") public class ApiMessageTopic extends BaseEntity {
private static final long serialVersionUID = 1L; /** * 编码 */ @TableField("code") private String code; /** * 名称 */ @TableField("name") private String name; /** * 处理器 */ @TableField("handler") private String handler; /** * 响应主题编码 */ @TableField("response_topic_code") private String responseTopicCode; /** * 分类 */ @TableField("category") private String category; /** * 状态 */ @TableField("status") private String status; /** * 备注 */ @TableField("remark") private String/span> remark; /** * 排序号 */ @TableField("order_no") private String orderNo; }
消息处理器工厂
使用了设计模式中的简单工厂模式,根据消息主题编码拿到处理器的完整路径,通过反射实现处理器的实例化。
/** * 消息处理器工厂 * @author wqliu * @date 2021-10-13 9:07 **/
public class MessageHandlerFactory {
private MessageHandlerFactory(){
};
public static MessageHandler createHandler(String topic){
//使用反射技术获取类
Class<MessageHandler> messageHandler=null;
try {
//根据消息主题获取对应的消息处理类名
ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class);
String handlerName = service.getHandlerByCode(topic);
messageHandler = (Class<MessageHandler>) Class.forName(handlerName);
//返回消息处理类的实例
return messageHandler.newInstance();
}
catch (CustomException e){
throw new MessageException("S101",e.getMessage());
}catch (Exception e){
throw new MessageException("S102","消息处理器不存在");
}
}
}
在我们设计的消息处理链条的技术框架中,请求消息处理器和响应消息处理器,都是通过如下代码来实现具体业务消息处理器的实例化的。
//转具体的消息处理器进行处理
ResponseMessageHandler handler = (ResponseMessageHandler)MessageHandlerFactory.createHandler(topic);
handler.handleMessage(message,ctx.channel());
消息处理器
因为消息分了两类,并且对于请求消息和响应消息的处理逻辑是不同的,相应的,消息处理器也有两个,分别是请求消息处理器RequestMessageHandler和响应消息处理器ResponseMessageHandler,并且可以提取这两个处理器的公用操作,形成一个抽象父类MessageHandler。
具体的业务消息处理器,则会继承RequestMessageHandler或ResponseMessageHandler,覆写其中的部分方法即可。
我们先从比较简单的响应消息处理器说起。
响应消息处理器
主要干三件事 这里的验证比服务端简单得多,只需要验证消息主题。
根据请求消息标识,查找消息日志,然后填充其响应部分,后面会详细说下消息日志的设计与实现。
预留了一个messageOperation方法,当具体的消息处理器有额外的个性化逻辑需要处理时,只需要覆盖该方法即可,这里实际是设计模式中的模板方法的应用。
/** * 响应消息处理器 * @author wqliu * @date 2022-1-8 11:07 **/
@Slf4j
public class ResponseMessageHandler extends MessageHandler{
/** * 消息处理 * * @param message 消息 * @param channel 通道 */
public void handleMessage(ResponseMessage responseMessage, Channel channel) {
// 消息主题验证(是否存在及是否可用)
validateTopic(responseMessage.getTopic());
// 更新消息日志
apiMessageLogService.updateResponsePart(responseMessage);
//特殊处理
messageOperation(responseMessage, channel);
}
/** * 响应消息处理 * * @param message * @param channel */
protected void messageOperation(ResponseMessage message, Channel channel) {
}
}
请求消息处理器
请求消息处理器与响应处理相比,除了要验证消息、创建消息日志以及预留的业务逻辑方法外,多了一步,向发送请求的客户端,发送响应消息。
/** * 请求消息处理器 * * @author wqliu * @date 2022-1-8 10:42 **/
@Slf4j
public class RequestMessageHandler extends MessageHandler {
/** * 消息处理 * * @param message 消息 * @param channel 通道 */
public void handleMessage(RequestMessage requestMessage, Channel channel) {
// 记录消息请求日志
apiMessageLogService.createRequestPart(requestMessage);
// 消息主题验证(是否存在及是否可用)
validateTopic(requestMessage.getTopic());
//特殊处理
messageOperation(requestMessage, channel);
//发送响应至消息发送者
sendResponse(requestMessage, channel);
}
private void sendResponse(RequestMessage requestMessage, Channel channel) {
//获取响应消息的消息主题
String responseTopicCode = getResponseTopicCode(requestMessage.getTopic());
//根据消息主题构建发送器
ResponseMessageSender responseMessageSender = (ResponseMessageSender) MessageSenderFactory.createSender(responseTopicCode);
//发送消息
responseMessageSender.sendMessage(channel, requestMessage);
}
/** * 获取响应消息主题 * * @return */
protected String getResponseTopicCode(String topic) {
//默认从消息主题实体类中获取
return apiMessageTopicService.getResponseTopicCodeByCode(topic);
}
/** * 请求消息处理 * * @param message * @param channel */
protected void messageOperation(RequestMessage message, Channel channel) {
}
}
公用处理器
请求与响应处理器的父类,主要是公共部分复用,主要完成了数据验证工作。
/** * 消息处理抽象类 * * @author wqliu * @date 2021-10-13 9:07 **/
public class MessageHandler {
protected ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);
protected ApiMessageTopicService apiMessageTopicService=SpringUtil.getBean(ApiMessageTopicService.class);
/** * 验证主题编码 * * @param topicCode 主题编码 */
protected void validateTopic(String topicCode) {
try {
ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){
throw new MessageException("S102", "消息主题不可用");
}
}catch (Exception ex){
throw new MessageException("S101", "消息主题不存在");
}
}
}