前言
今天继续分析 Netty 编解码器,这次我们必须自己实现自定义编码器、解码器和编码器。
基于换行的自定义解码器
LineBasedFrameDecoder 类
LineBasedFrameDecoder 类是基于换行的,这意味着只要在接收数据时遇到换行符\n
或者回车换行符\r\n
最后,数据已被接收并处理。
LineBasedFrameDecoder 类继承自 ByteToMessageDecoder,并重写了 decode
方法。
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
/** 帧的最大长度限制 */ private final int maxLength; /** 帧超长时是否抛出异常 */ private final boolean failFast; private final boolean stripDelimiter; /** 若超长为True,表示需要丢弃输入的数据 */ private boolean discarding; private int discardedBytes; /** 最后扫描位置 */ private int offset; public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false); } public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength; this.failFast = failFast; this.stripDelimiter = stripDelimiter; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws
Exception
{
Object decoded
=
decode
(ctx
, in
)
;
if
(decoded
!=
null
)
{
out
.
add
(decoded
)
;
}
}
protected
Object
decode
(
ChannelHandlerContext ctx
,
ByteBuf buffer
)
throws
Exception
{
final
int eol
=
findEndOfLine
(buffer
)
;
if
(
!discarding
)
{
if
(eol
>=
0
)
{
final
ByteBuf frame
;
final
int length
= eol
- buffer
.
readerIndex
(
)
;
final
int delimLength
= buffer
.
getByte
(eol
)
==
'\r'
?
2
:
1
;
if
(length
> maxLength
)
{
buffer
.
readerIndex
(eol
+ delimLength
)
;
fail
(ctx
, length
)
;
return
null
;
}
if
(stripDelimiter
)
{
frame
= buffer
.
readRetainedSlice
(length
)
; buffer
.
skipBytes
(delimLength
)
;
}
else
{
frame
= buffer
.
readRetainedSlice
(length
+ delimLength
)
;
}
return frame
;
}
else
{
final
int length
= buffer
.
readableBytes
(
)
;
if
(length
> maxLength
)
{
discardedBytes
= length
; buffer
.
readerIndex
(buffer
.
writerIndex
(
)
)
; discarding
=
true
; offset
=
0
;
if
(failFast
)
{
fail
(ctx
,
"over "
+ discardedBytes
)
;
}
}
return
null
;
}
}
else
{
if
(eol
>=
0
)
{
final
int length
= discardedBytes
+ eol
- buffer
.
readerIndex
(
)
;
final
int delimLength
= buffer
.
getByte
(eol
)
==
'\r'
?
2
:
1
; buffer
.
readerIndex
(eol
+ delimLength
)
; discardedBytes
=
0
; discarding
=
false
;
if
(
!failFast
)
{
fail
(ctx
, length
)
;
}
}
else
{
discardedBytes
+= buffer
.
readableBytes
(
)
; buffer
.
readerIndex
(buffer
.
writerIndex
(
)
)
;
// 跳过缓冲区中的所有内容,需要再次将offset 设置为0 offset
=
0
;
}
return
null
;
}
}
private
void
fail
(
final
ChannelHandlerContext ctx
,
int length
)
{
fail
(ctx
,
String
.
valueOf
(length
)
)
;
}
private
void
fail
(
final
ChannelHandlerContext ctx
,
String length
)
{
ctx
.
fireExceptionCaught
(
new
TooLongFrameException
(
"frame length ("
+ length
+
") exceeds the allowed maximum ("
+ maxLength
+
')'
)
)
;
}
/** * 返回找到的行尾缓冲区的索引 * 如果在缓冲区中未找到行尾,则返回 -1 */
private
int
findEndOfLine
(
final
ByteBuf buffer
)
{
int totalLength
= buffer
.
readableBytes
(
)
;
int i
= buffer
.
forEachByte
(buffer
.
readerIndex
(
)
+ offset
, totalLength
- offset
,
ByteProcessor
.FIND_LF
)
;
if
(i
>=
0
)
{
offset
=
0
;
// 判断是否是回车符
if
(i
>
0
&& buffer
.
getByte
(i
-
1
)
==
'\r'
)
{
i
--
;
}
}
else
{
offset
= totalLength
;
}
return i
;
}
}
从上述代码可以看出,LineBasedFrameDecoder
是通过查找回车换行符来找到数据结束的标志的。
定义解码器
定义了解码器MyLineBasedFrameDecoder
,该解码器继承自LineBasedFrameDecoder
,因此可以使用LineBasedFrameDecoder
上的所有功能。
代码如下:
public class MyLineBasedFrameDecoder extends LineBasedFrameDecoder {
private final static int MAX_LENGTH = 1024; // 帧的最大长度
public MyLineBasedFrameDecoder() {
super(MAX_LENGTH);
}
}
在上述代码中,通过MAX_LENGTH
常量,来限制解码器帧的大小。超过该常量值的限制的话,则会抛出TooLongFrameException
异常。
定义 ChannelHandler
ChannelHandler 定义如下:
public class MyLineBasedFrameDecoderServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接收msg消息,此处已经无需解码了
System.out.println("Client -> Server: " + msg);
}
}
MyLineBasedFrameDecoderServerHandler
业务非常简单,把收到的消息打印出来即可。
定义 ChannelInitializer
定义一个 ChannelInitializer 用于容纳解码器 MyLineBasedFrameDecoder 和 MyLineBasedFrameDecoderServerHandler,代码如下:
public class MyLineBasedFrameDecoderChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
// 基于换行符号
channel.pipeline().addLast(new MyLineBasedFrameDecoder());
// 解码转String
channel.pipeline().addLast(new StringDecoder());
// 自定义ChannelHandler
channel.pipeline().addLast(new MyLineBasedFrameDecoderServerHandler());
}
}
注意添加到ChannelPipeline
的ChannelHandler
的顺序,MyLineBasedFrameDecoder 在前,MyLineBasedFrameDecoderServerHandler 在后,意味着数据先经过MyLineBasedFrameDecoder
解码,然后再交给MyLineBasedFrameDecoderServerHandler
处理。
StringDecoder
实现将数据转换为字符串。
编写服务器
定义服务器 MyLineBasedFrameDecoderServer
代码如下:
public class MyLineBasedFrameDecoderServer {
public static int DEFAULT_PORT = 8023;
public static void main(String[] args) throws Exception {
int port = DEFAULT_PORT;
// 多线程事件循环器
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
try {
// 启动NIO服务的引导程序类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(NioServerSocketChannel.class) // 指明新的Channel的类型
.childHandler(new MyLineBasedFrameDecoderChannelInitializer()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();
System.out.println("MyLineBasedFrameDecoderServer已启动,端口:" + port);
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
// 优雅的关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
MyLineBasedFrameDecoderServer 中唯一需要注意的是,在 ServerBootstrap 中指定MyLineBasedFrameDecoderChannelInitializer
,这样服务器就能应用咱们自定义的编码器和ChannelHandler
了。
编写客户端
为了测试服务器,编写了一个简单的 TCP 客户端,代码如下:
public class TcpClient {
public static void main(String[] args) throws IOException {
Socket socket = null;
OutputStream out = null;
try {
socket = new Socket("localhost", 8023);
out = socket.getOutputStream();
// 请求服务器
String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
byte[] outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
} finally {
// 关闭连接
out.close();
socket.close();
}
}
}
上述客户端在启动后会发送一段文本,而后关闭连接。该文本每句用回车换行符\r\n
结尾,这样服务器就能一句一句地解析了。
测试
先启动服务器,观察控制台,可以看到如下输出的内容:
MyLineBasedFrameDecoderServer已启动,端口:8023
然后启动客户端。启动完成之后,再次观察服务器的控制台,可以看到如下输出内容:
MyLineBasedFrameDecoderServer已启动,端口:8023
Client -> Server: 床前明月光
Client -> Server: 疑是地上霜
Client -> Server: 举头望明月
Client -> Server: 低头思故乡
上述的输出内容说明,MyLineBasedFrameDecoderServerHandler
接收到了 4 条数据。那么为啥客户端发送了 1 条数据,到这里就变成了 4 条了呢?这是因为在前面介绍的MyLineBasedFrameDecoderChannelInitializer
中,MyLineBasedFrameDecoder
先被添加到ChannelPipeline
,然后才添加到MyLineBasedFrameDecoderServerHandler
,意味着数据先经过解码,再交给MyLineBasedFrameDecoderServerHandler
处理,而在数据解码过程中,MyLineBasedFrameDecoderServerHandler
是按照换行解码的,而客户端所发送的数据里面又包含了 4 个回车换行符,因此,数据被解码为了 4 条。
自定义编码器
定义消息通信协议
消息通信协议是连接客户端和服务器的
密语
,只有熟知双方的通信协议,客户端和服务器才能正常识别消息的内容。常见的消息通信协议有 HTTP、MQTT、XMPP、STOMP、AMQP和 RTMP等。
下图展示了消息通信协议的内容格式:
类型 | 名称 | 字节序列 | 取值范围 | 备注 |
---|---|---|---|---|
消息头 | msgType | 0 | 0x00-0xff | 消息类型 |
消息头 | len | 1-4 | 0-2147483647 | 消息体长度 |
消息体 | body | 变长 | 0- | 消息体 |
从上述协议中可以看出,消息主要是由消息头和消息体组成,并说明如下:
- msgType 表示消息的类型。在本节示例中,请求用
EMGW_LOGIN_REQ(0x00)
,响应用EMGW_LOGIN_RES(0x01)
表示。 - len 表示消息体的长度。
- body 表示消息体。
定义了如下MsgType
枚举类型来表示消息类型:
public enum MsgType {
EMGW_LOGIN_REQ((byte) 0x00),
EMGW_LOGIN_RES((byte) 0x01);
private byte value;
public byte getValue() {
return value;
}
private MsgType(byte value) {
this.value = value;
}
}
消息头类 MsgHeader
定义如下:
public class MsgHeader {
private byte msgType; // 消息类型
private int len; // 长度
public MsgHeader() {
}
public MsgHeader(byte msgType, int len) {
this.msgType = msgType;
this.len = len;
}
public byte getMsgType() {
return msgType;
}
public void setMsgType(byte msgType) {
this.msgType = msgType;
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
}
消息类 Msg 定义如下:
public class Msg {
private MsgHeader msgHeader = new MsgHeader();
private String body;
public MsgHeader getMsgHeader() {
return msgHeader;
}
public void setMsgHeader(MsgHeader msgHeader) {
this.msgHeader = msgHeader;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
定义编码器
public class MyEncoder extends MessageToByteEncoder<Msg> {
@Override
protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
if (msg == null | msg.getMsgHeader() == null) {
throw new Exception("The encode message is null");
}
// 获取消息头
MsgHeader header = msg.getMsgHeader();
// 获取消息体
String body = msg.getBody();
byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
// 计算消息体的长度
int bodySize = bodyBytes.length;
System.out.printf("MyEncoder header: %s, body: %s", header.getMsgType(), body);
out.writeByte(MsgType.EMGW_LOGIN_RES.getValue());
out.writeInt(bodySize);
out.writeBytes(bodyBytes);
}
}
MyEncoder
会将 Msg 消息转为 ByteBuf 类型。
定义解码器
public class MyDecoder extends LengthFieldBasedFrameDecoder {
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_LENGTH = 4;
private static final int LENGTH_FIELD_OFFSET = 1;
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private static final int HEADER_SIZE = 5;
private byte msgType; // 消息类型
private int len; // 长度
public MyDecoder() {
super(MAX_FRAME_LENGTH,
LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);<