使用netty实现并维护TCP长连接
- Netty是什么
-
- Netty的优点
- Netty为什么并发高?
- 创建TCP长连接实例
Netty是什么
Netty 是一个利用 Java 高级网络的能力隐藏了背后的复杂性,并提供了易于使用的能力 API 客户端/服务器框架。 Netty 它被广泛使用 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它像大公司一样活跃和成长于用户社区 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
Netty的优点
1.并发高 2.传输快 3.封装好
Netty为什么并发高?
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)与开发的网络通信框架相比,BIO(Blocking I/O,阻塞IO),他的并发性能大大提高了
创建TCP长连接实例
- 这种类型的功能是创建客户端连接和创建链接的监控,而不是无限重连
mport java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class CreateTCPConnection { private String host; ///连接主机ip private Integer port; ////连接主机端口 public EventLoopGroup group; public Channel channel; public ChannelFuture f; public CreateTCPConnection(String host, Integer port) { this.host = host; this.port = port; } /** * 创建TCP长连接和实例化channel */ public void connect() { if(group!=null){ group.shutdownGracefully(); ///这里调用这个,看看它是否会释放 } group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) //1 设置reactor 线程 .option(ChannelOption.SO_KEEPALIVE, true) //1 设置通道选项 .channel(NioSocketChannel.class) //2 设置nio类型的channel .handler(new ClientChannelInitializer(CreateTCPConnection.this)); //3 装配流水线 //创建连接 f = b.connect(host, port); //检测并执行断线重连 f.addListener((ChannelFutureListener) channelFuture -> { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(() -> { TdLog.error(host " : 服务端链接不上,重连操作开始..."); connect(); ////处理重连次数 }, 1L, TimeUnit.SECONDS); } else { channel = channelFuture.channel(); TdLog.info("服务链接成功..."); } }); } catch (Exception e) { e.printStackTrace(); } } }
- 客户端渠道配置类
import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; /** * 类描述:TCP通道配置 */ public class ClientChannelInitializer extends ChannelInitializer { private CreateTCPConnection createTCPConnection; public ClientChannelInitializer(CreateTCPConnection createTCPConnection) { this.createTCPConnection = createTCPConnection; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //心跳检测 pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); ///字符串编码(默认编码) pipeline.addLast("encoder", new StringEncoder()); //字符串解码(重写类,如不需要可使用默认) pipeline.addLast(new TCPBaseMessageDecoder()); ///客户逻辑 pipeline.addLast("handler", new NettyClientHandler(createTCPConnection)); } }
- 重写decode类
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.commons.collections.CollectionUtils; /** * TCP消息解码器 * @author 07408 * */ @TdClassLog public class TCPBaseMessageDecoder extends ByteToMessageDecoder { private List<byte[]> byteList =new ArrayList<>(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { //处理实际业务 while (in.isReadable()) { System.err.print((char)in.readByte()); } } }
- SimpleChannelInboundHandler实现类能管理并使用该客户端连接
import javanet.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 类描述:TCP客户端适配器
*/
public class NettyClientHandler extends SimpleChannelInboundHandler {
private CreateTCPConnection nettyClient;
private int heartNumber;
public NettyClientHandler(CreateTCPConnection createTCPConnection) {
nettyClient = createTCPConnection;
}
//这个方法接收不到数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg){}
/**
* 长连接读取数据方法-处理业务逻辑
* @param ctx
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
System.out.println(clientIP+" Server say : " + msg.toString());
}
/**
* 通道连接时调用-处理业务逻辑
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
TdLog.error(clientIP + " :通道已连接!");
}
/**
* 通道闲置触发-启动断线重连功能
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
TdLog.error("断线连接中...");
nettyClient.connect();
}, 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
/**
* 心跳方法
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println("READER_IDLE");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
/**发送心跳,保持长连接*/
byte[] buff = {'V','Z',1,0,0,0,0,0};
String cmd = new String(buff);
ctx.channel().writeAndFlush(cmd);
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL_IDLE");
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- 本地调试测试连接能否正常使用
public static void main(String[] args) throws InterruptedException {
CreateTCPConnection pu = new CreateTCPConnection("10.30.20.185", 8131);
pu.connect();
}