本文是构建高性能、高可用性新闻推送的经典案例 Spring Cloud 环境下基于 Netty 搭建 websocket 实现集群。目录如下:
1、背景
2、websocket
3、netty
3.1 socket
3.2 Java IO模型
3.3 netty
3.3.1 概念:
3.3.2 三大特点:
3.3.3 主从Reactor架构图
3.3.4 应用场景
4、springboot环境下使用netty搭建websocket
4.1 架构图的系统设计
4.2 架构中的六个经典问题
4.3 引入pom依赖和yml配置
4.4 Websocket 初始化器
4.5 websocket 通道初始化器
4.6 websocket 入站处理器
4.7 channel任务队列中任务队列的线程任务
4.8 websocket启动程序
4.9 问题六解决方案
4.10 前端代码
4.11 wesocket在nginx中配置
4.12 效果图
5、总结
在实际的工作开发中,我们不可避免地会遇到一些需要网页与服务器端连接(至少看起来像连接)的需求,如微信网页版本的聊天应用程序,如频繁更新页面数据(实时数据,如天气、电流和电压,pm2.监控系统页面或股票看盘页面等。例如,服务器读取MySQL或者redis或者主动将第三方数据推送到浏览器客户端等业务场景。我们通常使用以下技术:
:使用前端教师ajax定期启动服务器http无论数据是否更新,请求都会立即返回数据。缺点是,一方面,如果后端数据木更新,那么这次http请求是无用的。另一方面,在高并发条件下,短链接的频繁创建和销毁,以及客户数量过多造成的过度无用http所有要求都会对服务器和带宽造成压力,短轮查询只适用于客户端连接少、并发量低的场景;
:利用comet不断向服务器发起请求,服务器暂时挂起请求,直到有新数据才返回。与短轮查询相比,请求次数减少,但仍不适用于高并发场景;
:服务端推送(Server Send Event),在客户端发起一次请求后会保持该连接,服务器端基于该连接持续向客户端发送数据,从HTML5开始加入。
:这也是一种保持长连接和双向的技术HTML5开始加入并不完全基于HTTP,双向通信场景适用于频繁和大流量,是服务器推送消息功能的最佳实践。websocket最好的方法是。
网上的很多netty搭建websocket博客文章不够全面,有很多问题没有解决办法,我通过实际工作经验,经常遇到问题总结方法,下面会说!
websocket在单个TCP全双工通信协议在连接上进行。也就是说,保持长连接的技术是双向的。
websocket协议本身就是建立的http对于协议上的升级协议,客户端首先向服务器端建立连接,即连接本身http协议只包含在头部信息中websocket一旦协议的相关信息http连接建立后,服务器端读取这些websocket协议的相关信息将协议升级为websocket协议。WebSocket使客户端和服务器之间的数据交换更容易,使服务器能够主动将数据推送到客户端。在WebSocket API在中间,浏览器和服务器只需握手一次,就可以直接创建连续连接,并进行双向数据传输。
3.1 socket
(1)也就是说,两个设备之间的信息通过相应的操作发送和接收API调度计算机硬件资源,利用管道(网线)进行数据交互。相关技术点像ISO七层模型,TCP网络编程的基础,如三次握手/四次挥手,不再赘述。
(2)对TCP/IP包装协议,Socket它本身不是协议,而是一个调用接口(API),通过Socket只有启动系统调用操作系统内核,才能使用TCP/IP协议。
(3),计算机中指Input/Output,即输入输出,实质上IO分为两种,一种是磁盘IO,磁盘上的数据读取到用户空间,所以数据转移操作实际上是一次I/O操作,即文件I/O。一种是网络IO,当客户端和服务端相互通信时,我们称之为网络交互io(网络通信)
3.2 Java IO模型
有BIO(同步阻塞IO)、NIO(同步非阻塞IO)、AIO(异步IO),netty就是一个NIO高性能框架。
(1)BIO:同步阻塞IO该模型适用于连接数量少、固定的架构。该方法对服务器资源要求较高,并发局限于应用程序。服务器实现模式为连接线程,即客户端连接 在连接请求时,服务器端需要启动一个线程进行处理。如果连接不做任何事情,它将被创建 成为不必要的线程,可通过线程池机制(实现多客户连接服务器)进行改进。
(2)NIO:同步非阻塞IO该模型适用于连接数据多、连接短的架构,如聊天服务器、弹幕系统、服务器间通信等;服务器实现模式是一个线程处理多个连接(一个要求一个线程)Selector 、Channel 、Buffer三大组件。
①:用一个线程处理多个客户端的连接selector选择器,selector用于监控多个渠道上是否发生事件(如连接请求、数据到达等)。如果发生事件,获取事件并相应处理每个事件,因此可以使用单线程监控多个客户端渠道。
②:Channel管道和Java IO中的Stream(流)几乎是一个等级。Stream它是单向的,例如:InputStream, OutputStream.而Channel是双向的,同时进行读写数据,而流只能读或者写。可以实现异步读写数据,可以从缓冲区读数据,也可以写数据到缓冲区。
③:Buffer本质上是一个可以读写的内存块,可以理解为容器对象,底层有一个数组,通过buffer为了实现非阻塞机制,对象提供了一组方法,可以轻松使用内存块。缓冲区内置了一些机制,可以跟踪和记录缓冲区的状态变化。Channel它提供了从文件和网络读取数据的渠道,但读取和写入的数据必须通过buffer。
(3)AIO:异步IO该模型用于连接数量多、连接长(重操作)的架构,如相册服务器 调用OS编程复杂,参与并发操作。在Linux底层用epoll(一种轮询模型),aio多包一层包装,aio的api更好用。Windows上的aio它是自己实现的,不是轮询模型,而是事件模型。完成端口比linxu上的aio效率高。
3.3 netty
3.3.1 概念:
netty网络应用程序框架由开源异步事件驱动,用于快速开发可维护的高性能协议服务器和客户端。
3.3.2 三大特点:
①高并发:Netty 是一款基于 NIO(Nonblocking IO,非阻塞IO)开发由异步事件驱动的高性能网络通信框架,nio使用了select模型(多路复用器技术)使系统能够同时处理单线程下的多个客户端请求。Netty使用了Reactor模型,Reactor模型有三种多线程模型,netty是在主从 Reactor 在一定程度上改进了多线程模型。
Netty有两个线程组,一个作为bossGroup线程组负责客户端接收,一个接一个workerGroup线程组负责工作线程(与客户端)IO操作和任务操作等),Netty 的所有 IO 异步非阻塞操作,通过 Future-Listener 机制,通过通知机制方便用户主动获取或获取 IO 操作结果。他的并发性能有了很大的提高。
②传输快:Netty 传输依赖于零拷贝特性,实现了更高效的传输。零拷贝需要核心(kernel)数据直接从磁盘文件复制到Socket缓冲区(连接字)不需要通过应用程序。零拷贝减少了不必要的内存拷贝,不仅提高了应用程序的性能,还减少了内核和用户态之间的上下文切换。
③封装好:Netty 封装了 NIO 许多操作细节提供了易于使用的调用接口。
3.3.3 主从Reactor架构图
说明:①Reactor响应式编程(事件驱动模型):一般有一个主循环和一个任务队列,所有事件只管往队列里塞,主循环则从队列里取出并处理。
如果不依赖于多路复用处理多个任务就会需要多线程(与连接数对等) ,但是依赖于多路复用,这个循环就可以在单线程的情况下处理多个连接。无论是哪个连接发生了什么事件,都会被主循环从队列取出并处理(可能用回调函数处理等) ,也就是说程序的走向由事件驱动.
②mainReactor:主Reactor负责 单线程就可以接受所有客户端连接
③subReactor:子Reactor负责 多线程处理客户端的读写IO事件
④ThreadPool:线程池负责 处理业务耗时的操作
3.3.4 应用场景
①现在物联网的应用无处不在,大量的项目都牵涉到应用传感器和服务器端的数据通信,Netty作为基础通信组件进行网络编程。
②现在互联网系统讲究的都是高并发、分布式、微服务,各类消息满天飞,Netty在这类架构里面的应用可谓是如鱼得水,如果你对当前的各种应用服务器不爽,那么完全可以基于Netty来实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。
现在非常多的开源软件都是基于netty开发的,例如阿里分布式服务框架 Dubbo 的 RPC 框架,淘宝的消息中间件 RocketMQ;
③游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。地图服务器之间可以方便的通过 Netty 进行高性能的通信。
④大数据:开源集群运算框架 Spark;分布式计算框架 Storm;
4.1 系统设计架构图
4.2 架构中存在的六大经典问题
:客户端和服务端单独通信,怎么实现?
:单机中websocekt主动向所有客户端推送消息如何实现?在集群中如何实现?
:单机如何统计同时在线的客户数量?websocket集群如何统计在线的客户数量呢?
:由于客户端和websocket服务器集群中的某个节点建立长连接是随机的,如何解决服务端向某个或某些部分客户端推送消息?
:websocket服务端周期性向客户端推送消息,单机或集群中如何实现?
:websocket集群中一个客户端向其他客户端主动发送消息,如何实现? 福利来啦!!以上所有问题,已在代码中全部解决并实践!!!
4.3 引入pom依赖和yml配置
(1)pom依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
</dependencies>
(2)yml配置
websocket:
port: 7000 #端口
url: /msg #访问url
(3) 客户端和服务端交互的消息体
package com.wander.netty.websocket.yeelight;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageRequest implements Serializable {
private Long unionId;
private Integer current = 1;
private Integer size = 10;
}
4.4 Websocket 初始化器
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket初始化器
**/
@Slf4j
@Component
public class WebsocketInitialization {
@Resource
private WebsocketChannelInitializer websocketChannelInitializer;
@Value("${websocket.port}")
private Integer port;
@Async
public void init() throws InterruptedException {
//bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//workerGroup工作线程组,主要负责网络IO读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动辅助类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//bootstrap绑定两个线程组
serverBootstrap.group(bossGroup, workerGroup);
//设置通道为NioChannel
serverBootstrap.channel(NioServerSocketChannel.class);
//可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//设置自定义的通道初始化器,用于入站操作
serverBootstrap.childHandler(websocketChannelInitializer);
//启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//异步
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.5 websocket 通道初始化器
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket通道初始化器
**/
@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private WebSocketHandler webSocketHandler;
@Value("${websocket.url}")
private String websocketUrl;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取pipeline通道
ChannelPipeline pipeline = socketChannel.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明
1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/* 说明
1. 对应websocket ,它的数据是以 帧(frame) 形式传递
2. 可以看到WebSocketFrame 下面有六个子类
3. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
5. 是通过一个 状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
//自定义的handler ,处理业务逻辑
pipeline.addLast(webSocketHandler);
}
}
4.6 websocket 入站处理器
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket处理器
**/
@Slf4j
@Component
@ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
//任务map,存储future,用于停止队列任务
private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
//存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
private static Map<String, Long> clientMap = new ConcurrentHashMap<>();
/**
* 客户端发送给服务端的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
try {
//接受客户端发送的消息
MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
//每个channel都有id,asLongText是全局channel唯一id
String key = ctx.channel().id().asLongText();
//存储channel的id和用户的主键
clientMap.put(key, messageRequest.getUnionId());
log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
if (!channelMap.containsKey(key)) {
//使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
//存储客户端和服务的通信的Chanel
channelMap.put(key, ctx.channel());
//存储每个channel中的future,保证每个channel中有一个定时任务在执行
futureMap.put(key, future);
} else {
//每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
}
} catch (Exception e) {
log.error("websocket服务器推送消息发生错误:", e);
}
}
/**
* 客户端连接时候的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
}
/**
* 客户端掉线时的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信过的channel
channelMap.remove(key);
//移除和用户绑定的channel
clientMap.remove(key);
//关闭掉线客户端的future
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
log.info("一个客户端移除......" + ctx.channel().remoteAddress());
ctx.close(); //关闭连接
}
/**
* 发生异常时执行的操作
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信过的channel
channelMap.remove(key);
//移除和用户绑定的channel
clientMap.remove(key);
//移除定时任务
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
//关闭长连接
ctx.close();
log.info("异常发生 " + cause.getMessage());
}
public static Map<String, Channel> getChannelMap() {
return channelMap;
}
public static Map<String, Future> getFutureMap() {
return futureMap;
}
public static Map<String, Long> getClientMap() {
return clientMap;
}
}
4.7 channel中任务队列的线程任务
/**
* @Author WDYin
* @Date 2021/8/10
* @Description websocket程序
**/
@Slf4j
@Component
public class WebsocketApplication {
@Resource
private WebsocketInitialization websocketInitialization;
@PostConstruct
public void start() {
try {
log.info(Thread.currentThread().getName() + ":websocket启动中......");
websocketInitialization.init();
log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
} catch (Exception e) {
log.error("websocket发生错误:",e);
}
}
}
4.8 websocket启动程序
/**
* @Author WDYin
* @Date 2021/8/10
* @Description websocket程序
**/
@Slf4j
@Component
public class WebsocketApplication {
@Resource
private WebsocketInitialization websocketInitialization;
@PostConstruct
public void start() {
try {
log.info(Thread.currentThread().getName() + ":websocket启动中......");
websocketInitialization.init();
log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
} catch (Exception e) {
log.error("websocket发生错误:",e);
}
}
}
4.9 问题六解决方案
/**
* @Author WDYin
* @Date 2021/9/12
* @Description
**/
@RequestMapping("index")
@Controller
public class WebsocketController {
/**
*
* @param id 用户主键
* @param idList 要把消息发送给其他用户的主键
*/
@RequestMapping("hello1")
private void hello(Long id, List<Long> idList){
//获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
//获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
Map<String, Long> clientMap = WebSocketHandler.getClientMap();
//解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
clientMap.forEach((k,v)->{
if (idList.contains(v)){
Channel channel = channelMap.get(k);
channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName()+"服务器时间" + LocalDateTime.now() + "wdy")));
}
});
}
}
4.10 前端代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判断当前浏览器是否支持websocket
if(window.WebSocket) {
socket = new WebSocket("ws://localhost:7000/msg");
//相当于channelReado, ev 收到服务器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
//相当于连接开启(感知到连接开启)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了.."
}
//相当于连接关闭(感知到连接关闭)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭了.."
}
} else {
alert("当前浏览器不支持websocket")
}
//发送消息到服务器
function send(websocketMessage) {
if(!window.socket) { //先判断socket是否创建好
return;
}
if(socket.readyState == WebSocket.OPEN) {
//通过socket 发送消息
socket.send(websocketMessage)
} else {
alert("连接没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name="websocketMessage" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.websocketMessage.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
4.11 wesocket在nginx中配置
nginx.conf中的配置
#第一步:
upstream websocket-router {
server 192.168.1.31:7000 max_fails=10 weight=1 fail_timeout=5s;
keepalive 1000;
}
#第二步:
server {
listen 80; #监听80端口
server_name websocket.wdy.com; #域名配置
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
location / {
client_max_body_size 100M;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header Upgrade $http_upgrade; #支持wss
proxy_set_header Connection "upgrade"; #支持wssi
proxy_pass http://websocket-router; #代理路由
root html;
index index.html index.htm;
}
}
4.12 效果图
springboot启动后,用浏览器打开前端页面:
本文主要讲述了网络编程相关的IO模型以及NIO框架 netty 如何搭建websocket。本文作者:王德印,欢迎复制下方链接关注博主动态。
链接:https://blog.csdn.net/qq_41889508/article/details/105953114
如果看到这里,说明你喜欢这篇文章,请 转发、点赞。微信搜索「web_resource」,关注后回复「进群」或者扫描下方二维码即可进入无广告交流群。
↓扫描二维码进群↓
推荐阅读
1. GitHub 上有什么好玩的项目?
2. Linux 运维必备 150 个命令汇总
3. SpringSecurity + JWT 实现单点登录
4. 100 道 Linux 常见面试题