资讯详情

使用netty实现并维护TCP长连接

使用netty实现并维护TCP长连接

  • Netty是什么
    • Netty的优点
    • Netty为什么并发高?
  • 创建TCP长连接实例

Netty是什么

Netty 是一个利用 Java 高级网络的能力隐藏了背后的复杂性,并提供了易于使用的能力 API 客户端/服务器框架。 Netty 它被广泛使用 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它像大公司一样活跃和成长于用户社区 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

Netty的优点

1.并发高 2.传输快 3.封装好

Netty为什么并发高?

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)与开发的网络通信框架相比,BIO(Blocking I/O,阻塞IO),他的并发性能大大提高了

创建TCP长连接实例

  1. 这种类型的功能是创建客户端连接和创建链接的监控,而不是无限重连
mport java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;  public class CreateTCPConnection {      private String host; ///连接主机ip     private Integer port; ////连接主机端口     public EventLoopGroup group;      public Channel channel;     public ChannelFuture f;      public CreateTCPConnection(String host, Integer port) {         this.host = host;         this.port = port;     }      /**      * 创建TCP长连接和实例化channel      */     public void connect() {         if(group!=null){             group.shutdownGracefully();  ///这里调用这个,看看它是否会释放         }         group = new NioEventLoopGroup();         try {             Bootstrap b = new Bootstrap();             b.group(group) //1 设置reactor 线程                     .option(ChannelOption.SO_KEEPALIVE, true) //1 设置通道选项                     .channel(NioSocketChannel.class) //2 设置nio类型的channel                     .handler(new ClientChannelInitializer(CreateTCPConnection.this));  //3 装配流水线             //创建连接             f = b.connect(host, port);              //检测并执行断线重连             f.addListener((ChannelFutureListener) channelFuture -> {                 if (!channelFuture.isSuccess()) {                     final EventLoop loop = channelFuture.channel().eventLoop();                     loop.schedule(() -> {                         TdLog.error(host   " : 服务端链接不上,重连操作开始...");                         connect();                         ////处理重连次数                     }, 1L, TimeUnit.SECONDS);                 } else {                     channel = channelFuture.channel();                     TdLog.info("服务链接成功...");                 }             });         } catch (Exception e) {             e.printStackTrace();         }     }  } 
  1. 客户端渠道配置类
import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;  /**  * 类描述:TCP通道配置  */ public class ClientChannelInitializer extends ChannelInitializer {      private CreateTCPConnection createTCPConnection;      public ClientChannelInitializer(CreateTCPConnection createTCPConnection) {         this.createTCPConnection = createTCPConnection;     }      @Override     protected void initChannel(Channel ch) throws Exception {         ChannelPipeline pipeline = ch.pipeline();         //心跳检测         pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));         ///字符串编码(默认编码)         pipeline.addLast("encoder", new StringEncoder());         //字符串解码(重写类,如不需要可使用默认)         pipeline.addLast(new TCPBaseMessageDecoder());         ///客户逻辑         pipeline.addLast("handler", new NettyClientHandler(createTCPConnection));     } } 
  1. 重写decode类
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.commons.collections.CollectionUtils;  /**  * TCP消息解码器  * @author 07408  *  */ @TdClassLog public class TCPBaseMessageDecoder extends ByteToMessageDecoder {     private List<byte[]> byteList =new ArrayList<>();     @Override     protected void  decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {             //处理实际业务             while (in.isReadable()) {                 System.err.print((char)in.readByte());             }     } } 
  1. SimpleChannelInboundHandler实现类能管理并使用该客户端连接
import javanet.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;


/**
 * 类描述:TCP客户端适配器
 */
public class NettyClientHandler extends SimpleChannelInboundHandler {

    private CreateTCPConnection nettyClient;
    private int heartNumber;

    public NettyClientHandler(CreateTCPConnection createTCPConnection) {
        nettyClient = createTCPConnection;
    }

    //这个方法接收不到数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg){}

    /**
     * 长连接读取数据方法-处理业务逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIP = insocket.getAddress().getHostAddress();
        System.out.println(clientIP+" Server say : " + msg.toString());
    }

    /**
     * 通道连接时调用-处理业务逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIP = insocket.getAddress().getHostAddress();
        TdLog.error(clientIP + " :通道已连接!");
    }

    /**
     * 通道闲置触发-启动断线重连功能
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        //使用过程中断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> {
            TdLog.error("断线连接中...");
            nettyClient.connect();
        }, 1, TimeUnit.SECONDS);
        ctx.fireChannelInactive();
    }

    /**
     * 心跳方法
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                System.out.println("READER_IDLE");
            } else if (event.state().equals(IdleState.WRITER_IDLE)) {
                /**发送心跳,保持长连接*/
                byte[] buff = {'V','Z',1,0,0,0,0,0};
                String cmd = new String(buff);
                ctx.channel().writeAndFlush(cmd);
                
            } else if (event.state().equals(IdleState.ALL_IDLE)) {
                System.out.println("ALL_IDLE");
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
  1. 本地调试测试连接能否正常使用
public static void main(String[] args) throws InterruptedException {
   CreateTCPConnection pu = new CreateTCPConnection("10.30.20.185", 8131);
    pu.connect();
}

标签: l型连接器连接

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台