资讯详情

Netty 源码分析系列(十五)自定义解码器、编码器、编解码器

前言

今天继续分析 Netty 编解码器,这次我们必须自己实现自定义编码器、解码器和编码器。

基于换行的自定义解码器

LineBasedFrameDecoder 类

LineBasedFrameDecoder 类是基于换行的,这意味着只要在接收数据时遇到换行符\n或者回车换行符\r\n最后,数据已被接收并处理。

LineBasedFrameDecoder 类继承自 ByteToMessageDecoder,并重写了 decode方法。

public class LineBasedFrameDecoder extends ByteToMessageDecoder { 
              /** 帧的最大长度限制 */     private final int maxLength;     /** 帧超长时是否抛出异常 */     private final boolean failFast;     private final boolean stripDelimiter;      /** 若超长为True,表示需要丢弃输入的数据 */     private boolean discarding;     private int discardedBytes;      /** 最后扫描位置 */     private int offset;         public LineBasedFrameDecoder(final int maxLength) { 
                 this(maxLength, true, false);     }      public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { 
                 this.maxLength = maxLength;         this.failFast = failFast;         this.stripDelimiter = stripDelimiter;     }      @Override     protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws 
       
        Exception 
        { 
          
        Object decoded 
        = 
        decode
        (ctx
        , in
        )
        ; 
        if 
        (decoded 
        != 
        null
        ) 
        { 
          out
        .
        add
        (decoded
        )
        ; 
        } 
        } 
        protected 
        Object 
        decode
        (
        ChannelHandlerContext ctx
        , 
        ByteBuf buffer
        ) 
        throws 
        Exception 
        { 
          
        final 
        int eol 
        = 
        findEndOfLine
        (buffer
        )
        ; 
        if 
        (
        !discarding
        ) 
        { 
          
        if 
        (eol 
        >= 
        0
        ) 
        { 
          
        final 
        ByteBuf frame
        ; 
        final 
        int length 
        = eol 
        - buffer
        .
        readerIndex
        (
        )
        ; 
        final 
        int delimLength 
        = buffer
        .
        getByte
        (eol
        ) 
        == 
        '\r'
        ? 
        2 
        : 
        1
        ; 
        if 
        (length 
        > maxLength
        ) 
        { 
          buffer
        .
        readerIndex
        (eol 
        + delimLength
        )
        ; 
        fail
        (ctx
        , length
        )
        ; 
        return 
        null
        ; 
        } 
        if 
        (stripDelimiter
        ) 
        { 
          frame 
        = buffer
        .
        readRetainedSlice
        (length
        )
        ; buffer
        .
        skipBytes
        (delimLength
        )
        ; 
        } 
        else 
        { 
          frame 
        = buffer
        .
        readRetainedSlice
        (length 
        + delimLength
        )
        ; 
        } 
        return frame
        ; 
        } 
        else 
        { 
          
        final 
        int length 
        = buffer
        .
        readableBytes
        (
        )
        ; 
        if 
        (length 
        > maxLength
        ) 
        { 
          discardedBytes 
        = length
        ; buffer
        .
        readerIndex
        (buffer
        .
        writerIndex
        (
        )
        )
        ; discarding 
        = 
        true
        ; offset 
        = 
        0
        ; 
        if 
        (failFast
        ) 
        { 
          
        fail
        (ctx
        , 
        "over " 
        + discardedBytes
        )
        ; 
        } 
        } 
        return 
        null
        ; 
        } 
        } 
        else 
        { 
          
        if 
        (eol 
        >= 
        0
        ) 
        { 
          
        final 
        int length 
        = discardedBytes 
        + eol 
        - buffer
        .
        readerIndex
        (
        )
        ; 
        final 
        int delimLength 
        = buffer
        .
        getByte
        (eol
        ) 
        == 
        '\r'
        ? 
        2 
        : 
        1
        ; buffer
        .
        readerIndex
        (eol 
        + delimLength
        )
        ; discardedBytes 
        = 
        0
        ; discarding 
        = 
        false
        ; 
        if 
        (
        !failFast
        ) 
        { 
          
        fail
        (ctx
        , length
        )
        ; 
        } 
        } 
        else 
        { 
          discardedBytes 
        += buffer
        .
        readableBytes
        (
        )
        ; buffer
        .
        readerIndex
        (buffer
        .
        writerIndex
        (
        )
        )
        ; 
        // 跳过缓冲区中的所有内容,需要再次将offset 设置为0 offset 
        = 
        0
        ; 
        } 
        return 
        null
        ; 
        } 
        } 
        private 
        void 
        fail
        (
        final 
        ChannelHandlerContext ctx
        , 
        int length
        ) 
        { 
          
        fail
        (ctx
        , 
        String
        .
        valueOf
        (length
        )
        )
        ; 
        } 
        private 
        void 
        fail
        (
        final 
        ChannelHandlerContext ctx
        , 
        String length
        ) 
        { 
          ctx
        .
        fireExceptionCaught
        ( 
        new 
        TooLongFrameException
        ( 
        "frame length (" 
        + length 
        + 
        ") exceeds the allowed maximum (" 
        + maxLength 
        + 
        ')'
        )
        )
        ; 
        } 
        /** * 返回找到的行尾缓冲区的索引 * 如果在缓冲区中未找到行尾,则返回 -1 */ 
        private 
        int 
        findEndOfLine
        (
        final 
        ByteBuf buffer
        ) 
        { 
          
        int totalLength 
        = buffer
        .
        readableBytes
        (
        )
        ; 
        int i 
        = buffer
        .
        forEachByte
        (buffer
        .
        readerIndex
        (
        ) 
        + offset
        , totalLength 
        - offset
        , 
        ByteProcessor
        .FIND_LF
        )
        ; 
        if 
        (i 
        >= 
        0
        ) 
        { 
          offset 
        = 
        0
        ; 
        // 判断是否是回车符 
        if 
        (i 
        > 
        0 
        && buffer
        .
        getByte
        (i 
        - 
        1
        ) 
        == 
        '\r'
        ) 
        { 
          i
        --
        ; 
        } 
        } 
        else 
        { 
          offset 
        = totalLength
        ; 
        } 
        return i
        ; 
        } 
        } 
       

从上述代码可以看出,LineBasedFrameDecoder是通过查找回车换行符来找到数据结束的标志的。

定义解码器

定义了解码器MyLineBasedFrameDecoder,该解码器继承自LineBasedFrameDecoder,因此可以使用LineBasedFrameDecoder上的所有功能。

代码如下:

public class MyLineBasedFrameDecoder extends LineBasedFrameDecoder { 
        

	private final static int MAX_LENGTH = 1024; // 帧的最大长度

	public MyLineBasedFrameDecoder() { 
        
		super(MAX_LENGTH);
	}

}

在上述代码中,通过MAX_LENGTH常量,来限制解码器帧的大小。超过该常量值的限制的话,则会抛出TooLongFrameException异常。

定义 ChannelHandler

ChannelHandler 定义如下:

public class MyLineBasedFrameDecoderServerHandler extends ChannelInboundHandlerAdapter { 
        

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) { 
        
		// 接收msg消息,此处已经无需解码了
		System.out.println("Client -> Server: " + msg);
	}
}

MyLineBasedFrameDecoderServerHandler业务非常简单,把收到的消息打印出来即可。

定义 ChannelInitializer

定义一个 ChannelInitializer 用于容纳解码器 MyLineBasedFrameDecoder 和 MyLineBasedFrameDecoderServerHandler,代码如下:

public class MyLineBasedFrameDecoderChannelInitializer extends ChannelInitializer<SocketChannel> { 
        

	@Override
	protected void initChannel(SocketChannel channel) { 
        
		// 基于换行符号
		channel.pipeline().addLast(new MyLineBasedFrameDecoder());

		// 解码转String
		channel.pipeline().addLast(new StringDecoder());

		// 自定义ChannelHandler
		channel.pipeline().addLast(new MyLineBasedFrameDecoderServerHandler());
	}
}

注意添加到ChannelPipelineChannelHandler的顺序,MyLineBasedFrameDecoder 在前,MyLineBasedFrameDecoderServerHandler 在后,意味着数据先经过MyLineBasedFrameDecoder解码,然后再交给MyLineBasedFrameDecoderServerHandler处理。

StringDecoder实现将数据转换为字符串。

编写服务器

定义服务器 MyLineBasedFrameDecoderServer代码如下:

public class MyLineBasedFrameDecoderServer { 
        

    public static int DEFAULT_PORT = 8023;

    public static void main(String[] args) throws Exception { 
        
        int port = DEFAULT_PORT;

        // 多线程事件循环器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker

        try { 
        
            // 启动NIO服务的引导程序类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) // 设置EventLoopGroup
                    .channel(NioServerSocketChannel.class) // 指明新的Channel的类型
                    .childHandler(new MyLineBasedFrameDecoderChannelInitializer()) // 指定ChannelHandler
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();
            System.out.println("MyLineBasedFrameDecoderServer已启动,端口:" + port);
            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally { 
        
            // 优雅的关闭
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

MyLineBasedFrameDecoderServer 中唯一需要注意的是,在 ServerBootstrap 中指定MyLineBasedFrameDecoderChannelInitializer,这样服务器就能应用咱们自定义的编码器和ChannelHandler了。

编写客户端

为了测试服务器,编写了一个简单的 TCP 客户端,代码如下:

public class TcpClient { 
        

    public static void main(String[] args) throws IOException { 
        
        Socket socket = null;
        OutputStream out = null;
        try { 
        
            socket = new Socket("localhost", 8023);
            out = socket.getOutputStream();
            // 请求服务器 
            String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
            byte[] outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();
        } finally { 
        
            // 关闭连接 
            out.close();
            socket.close();
        }
    }
}

上述客户端在启动后会发送一段文本,而后关闭连接。该文本每句用回车换行符\r\n结尾,这样服务器就能一句一句地解析了。

测试

先启动服务器,观察控制台,可以看到如下输出的内容:

MyLineBasedFrameDecoderServer已启动,端口:8023

然后启动客户端。启动完成之后,再次观察服务器的控制台,可以看到如下输出内容:

MyLineBasedFrameDecoderServer已启动,端口:8023
Client -> Server: 床前明月光
Client -> Server: 疑是地上霜
Client -> Server: 举头望明月
Client -> Server: 低头思故乡

上述的输出内容说明,MyLineBasedFrameDecoderServerHandler接收到了 4 条数据。那么为啥客户端发送了 1 条数据,到这里就变成了 4 条了呢?这是因为在前面介绍的MyLineBasedFrameDecoderChannelInitializer中,MyLineBasedFrameDecoder先被添加到ChannelPipeline,然后才添加到MyLineBasedFrameDecoderServerHandler,意味着数据先经过解码,再交给MyLineBasedFrameDecoderServerHandler处理,而在数据解码过程中,MyLineBasedFrameDecoderServerHandler是按照换行解码的,而客户端所发送的数据里面又包含了 4 个回车换行符,因此,数据被解码为了 4 条。

自定义编码器

定义消息通信协议

消息通信协议是连接客户端和服务器的密语,只有熟知双方的通信协议,客户端和服务器才能正常识别消息的内容。常见的消息通信协议有 HTTP、MQTT、XMPP、STOMP、AMQP和 RTMP等。

下图展示了消息通信协议的内容格式:

类型 名称 字节序列 取值范围 备注
消息头 msgType 0 0x00-0xff 消息类型
消息头 len 1-4 0-2147483647 消息体长度
消息体 body 变长 0- 消息体

从上述协议中可以看出,消息主要是由消息头和消息体组成,并说明如下:

  • msgType 表示消息的类型。在本节示例中,请求用EMGW_LOGIN_REQ(0x00),响应用EMGW_LOGIN_RES(0x01)表示。
  • len 表示消息体的长度。
  • body 表示消息体。

定义了如下MsgType枚举类型来表示消息类型:

public enum MsgType { 
        
    EMGW_LOGIN_REQ((byte) 0x00),
    EMGW_LOGIN_RES((byte) 0x01);

    private byte value;

    public byte getValue() { 
        
        return value;
    }

    private MsgType(byte value) { 
        
        this.value = value;
    }
}

消息头类 MsgHeader定义如下:

public class MsgHeader { 
        
    private byte msgType; // 消息类型
    private int len; // 长度

    public MsgHeader() { 
        
    }

    public MsgHeader(byte msgType, int len) { 
        
        this.msgType = msgType;
        this.len = len;
    }

    public byte getMsgType() { 
        
        return msgType;
    }

    public void setMsgType(byte msgType) { 
        
        this.msgType = msgType;
    }

    public int getLen() { 
        
        return len;
    }

    public void setLen(int len) { 
        
        this.len = len;
    }

}

消息类 Msg 定义如下:

public class Msg { 
        

    private MsgHeader msgHeader = new MsgHeader();
    private String body;

    public MsgHeader getMsgHeader() { 
        
        return msgHeader;
    }

    public void setMsgHeader(MsgHeader msgHeader) { 
        
        this.msgHeader = msgHeader;
    }

    public String getBody() { 
        
        return body;
    }

    public void setBody(String body) { 
        
        this.body = body;
    }

}

定义编码器

public class MyEncoder extends MessageToByteEncoder<Msg> { 
        

	@Override
	protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception { 
        
		if (msg == null | msg.getMsgHeader() == null) { 
        
			throw new Exception("The encode message is null");
		}

		// 获取消息头
		MsgHeader header = msg.getMsgHeader();

		// 获取消息体
		String body = msg.getBody();
		byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));

		// 计算消息体的长度
		int bodySize = bodyBytes.length;

		System.out.printf("MyEncoder header: %s, body: %s", header.getMsgType(), body);

		out.writeByte(MsgType.EMGW_LOGIN_RES.getValue());
		out.writeInt(bodySize);
		out.writeBytes(bodyBytes);
	}

}

MyEncoder会将 Msg 消息转为 ByteBuf 类型。

定义解码器

public class MyDecoder extends LengthFieldBasedFrameDecoder { 
        
	
	private static final int MAX_FRAME_LENGTH = 1024 * 1024;
	private static final int LENGTH_FIELD_LENGTH = 4;
	private static final int LENGTH_FIELD_OFFSET = 1;
	private static final int LENGTH_ADJUSTMENT = 0;
	private static final int INITIAL_BYTES_TO_STRIP = 0;
	
	private static final int HEADER_SIZE = 5;
	private byte msgType; // 消息类型
	private int len; // 长度

	public MyDecoder() { 
        
		super(MAX_FRAME_LENGTH,
				LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
				LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);<

标签: 016lf连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台