??
?
??。
??
??相遇是缘分,既然来了就拿着小板凳坐下来聊一会儿,如果在文中有所收获,请不要忘记一键三连,??,你的鼓励是我创作的动力!
文章目录
- Java IO - BIO 详解
-
- 几个重要概念
- 传统的BIO通信方式简介
-
- 传统的BIO的问题
- 多线程方式 - 伪异步方式
- BIO深入分析通信模式
-
- 模拟20个客户20个客户端的并发请求:
- 多线程优化服务器端
- 看看服务器端的执行效果
- 问题根源
Java IO - BIO 详解
BIO就是: blocking IO。最容易理解和实现IO应用程序向操作系统要求网络IO操作时,应用程序将继续等待;另一方面,操作系统将等待,直到数据传输到监控端口;操作系统将数据发送到应用程序;最后,应用程序接收数据并解除等待状态。
几个重要概念
阻塞IO
和非阻塞IO
这两个概念是程序级别
主要描述程序请求操作系统IO如果IO如果资源没有准备好,程序应该如何处理: 前者等待;后者继续执行(并使用线程轮询,直到有IO资源准备好了)
同步IO
和非同步IO
这两个概念是操作系统级别
主要描述操作系统在收到程序请求时IO如果IO未准备好资源,如何应对程序问题: 直到IO资源准备好后;后者返回一个标记(让程序和你知道未来数据在哪里通知),当IO资源准备好后,用事件机制返还程序。
传统的BIO通信方式简介
过去,大多数网络通信模式都是阻塞模式,即:
- 客户端向服务器端发出请求后,客户端会一直等待(不做其他事情),直到服务器端返回结果或网络出现问题。
- 服务器端也是如此。当处理客户端A发送的请求时,另一个客户端B发送的请求将等待,直到服务器端的处理线程完成最后一个处理。
传统的BIO的问题
- 同时,服务器只能接受客户端A的请求信息;虽然客户端A和客户端B的请求同时进行,但客户端B发送的请求信息只能在服务器接受A的请求数据后才能接受。
- 由于服务器一次只能处理一个客户端请求,第二个请求只能在处理完成并返回(或异常)后处理。显然,这种处理方法不能用于高并发性。
多线程方式 - 伪异步方式
如果服务器只有一个线程,读者会直接提出我们可以使用多线程技术来解决这个问题:
- 服务器收到客户端X的请求后,(读取所有请求数据后)将该请求发送到独立线程,然后主线程继续接受客户端Y的请求。
- 在客户端的一侧,以使用子线程和服务器端进行通信。这样,客户端主线程的其他工作就不会受到影响。当服务器端有响应信息时,子线程将通过 主线程通知监控模式/观察模式(等设计模式)。
如下图所示:
但使用线程来解决这个问题实际上是有限的:
- 虽然在服务器端,请求处理交给了一个独立的线程,但操作系统通知accept()的方式仍然是单一的。也就是说,事实上,服务器在到数据报告后的业务处理过程可以是多线程的,但数据报告的接受仍然需要一个接一个以下示例代码和debug我们可以清楚地看到这个过程)
- 在linux在系统中,可创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令检查可以创建的最大线程数。当然,这个值可以改变,但线程越多,CPU切换时间越长,处理真实业务的需求就越少。
- 创建一个线程消耗了大量的资源。JVM创建线程时,即使不做任何工作,JVM堆栈空间将被分配。这个空间的大小默认为128K,您可以通过-Xss调整参数。当然也可以用ThreadPoolExecutor线程池可以缓解线程的创建,但也会造成BlockingQueue积压任务的不断增加也消耗了大量资源。
- 此外,如果您的应用程序大量使用长连接,则线程不会关闭。这使得系统资源的消耗更容易失控。 所以,如果你真的想简单地使用线程来解决阻塞问题,那么你可以计算出一个服务器节点可以接受多少并发性。似乎简单地使用线程来解决这个问题并不是最好的方法。
BIO深入分析通信模式
BIO关键不在于是否使用多线程(包括线程池)来处理此请求,而在于是否使用多线程(包括线程池)来处理此请求accept()、read()操作点被堵塞。测试这个问题也很简单。我们模拟了20个客户端(20个线程模拟)JAVA同步计数器CountDownLatch,确保这20个客户初始化,然后同时向服务器发送请求,然后让我们观察一下Server这边接受信息的情况。
模拟20个客户20个客户端的并发请求:
客户端代码(SocketClientDaemon)
package testBSocket; import java.util.concurrent.CountDownLatch; public class SocketClientDaemon {
public static void main(String[] args) throws Exception {
Integer clientNumbr = 20;
CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
//分别开始启动这20个客户端
for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) {
SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
new Thread(client).start();
}
//这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
synchronized (SocketClientDaemon.class) {
SocketClientDaemon.class.wait();
}
}
}
客户端代码(SocketClientRequestThread模拟请求)
package testBSocket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
/** * 一个SocketClientRequestThread线程模拟一个客户端请求。 * @author yinwenjie */
public class SocketClientRequestThread implements Runnable {
static {
BasicConfigurator.configure();
}
/** * 日志 */
private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);
private CountDownLatch countDownLatch;
/** * 这个线层的编号 * @param countDownLatch */
private Integer clientIndex;
/** * countDownLatch是java提供的同步计数器。 * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性 * @param countDownLatch */
public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) {
this.countDownLatch = countDownLatch;
this.clientIndex = clientIndex;
}
@Override
public void run() {
Socket socket = null;
OutputStream clientRequest = null;
InputStream clientResponse = null;
try {
socket = new Socket("localhost",83);
clientRequest = socket.getOutputStream();
clientResponse = socket.getInputStream();
//等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求
this.countDownLatch.await();
//发送请求信息
clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes());
clientRequest.flush();
//在这里等待,直到服务器返回信息
SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息");
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
int realLen;
String message = "";
//程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)
while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
message += new String(contextBytes , 0 , realLen);
}
SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message);
} catch (Exception e) {
SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
} finally {
try {
if(clientRequest != null) {
clientRequest.close();
}
if(clientResponse != null) {
clientResponse.close();
}
} catch (IOException e) {
SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
}
}
}
}
服务器端(SocketServer1)单个线程
package testBSocket;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class SocketServer1 {
static {
BasicConfigurator.configure();
}
/** * 日志 */
private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);
public static void main(String[] args) throws Exception{
ServerSocket serverSocket = new ServerSocket(83);
try {
while(true) {
Socket socket = serverSocket.accept();
//下面我们收取信息
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 2048;
byte[] contextBytes = new byte[maxLen];
//这里也会被阻塞,直到有数据准备好
int realLen = in.read(contextBytes, 0, maxLen);
//读取信息
String message = new String(contextBytes , 0 , realLen);
//下面打印信息
SocketServer1.LOGGER.info("服务器收到来自于端口: " + sourcePort + "的信息: " + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
//关闭
out.close();
in.close();
socket.close();
}
} catch(Exception e) {
SocketServer1.LOGGER.error(e.getMessage(), e);
} finally {
if(serverSocket != null) {
serverSocket.close();
}
}
}
}
多线程来优化服务器端
客户端代码和上文一样,最主要是更改服务器端的代码:
package testBSocket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class SocketServer2 {
static {
BasicConfigurator.configure();
}
private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);
public static void main(String[] args) throws Exception{
ServerSocket serverSocket = new ServerSocket(83);
try {
while(true) {
Socket socket = serverSocket.accept();
//当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
//最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
SocketServerThread socketServerThread = new SocketServerThread(socket);
new Thread(socketServerThread).start();
}
} catch(Exception e) {
SocketServer2.LOGGER.error(e.getMessage(), e);
} finally {
if(serverSocket != null) {
serverSocket.close();
}
}
}
}
/** * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。 * 但还是改变不了socket被一个一个的做accept()的情况。 * @author yinwenjie */
class SocketServerThread implements Runnable {
/** * 日志 */
private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);
private Socket socket;
public SocketServerThread (Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream in = null;
OutputStream out = null;
try {
//下面我们收取信息
in = socket.getInputStream();
out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
//使用线程,同样无法解决read方法的阻塞问题,
//也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
int realLen = in.read(contextBytes, 0, maxLen);
//读取信息
String message = new String(contextBytes , 0 , realLen);
//下面打印信息
SocketServerThread.LOGGER.info("服务器收到来自于端口: " + sourcePort + "的信息: " + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
} catch(Exception e) {
SocketServerThread.LOGGER.error(e.getMessage(), e);
} finally {
//试图关闭
try {
if(in != null) {
in.close();
}
if(out != null) {
out.close();
}
if(this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
SocketServerThread.LOGGER.error(e.getMessage(), e);
}
}
}
}
看看服务器端的执行效果
看一看服务器使用多线程处理时的情况:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qdh20TKf-1651325100426)(https://s2.51cto.com/images/20220424/1650785309688075.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)]
问题根源
那么重点的问题并不是“是否使用了多线程”,而是为什么accept()、read()方法会被阻塞。即: 异步IO模式 就是为了解决这样的并发性存在的。但是为了说清楚异步IO模式,在介绍IO模式的时候,要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。
API文档中对于 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
serverSocket.accept()会被阻塞? 这里涉及到阻塞式同步IO的工作原理:
- 服务器线程发起一个accept动作,询问操作系统 是否有新的socket套接字信息从端口X发送过来。
- 注意,是询问操作系统。也就是说socket套接字的IO模式支持是基于操作系统的,那么自然同步IO/异步IO的支持就是需要操作系统级别的了。如下图:
如果操作系统没有发现有套接字从指定的端口X来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为什么accept()方法为什么会阻塞: 它内部的实现是使用的操作系统级别的同步IO。
Java系列文章摘自Java全栈知识体系,这是一个非常棒的Java网站,知识体系全面,由浅入深的讲解知识点,官网地址:https://pdai.tech/ 文章转载已取得站长本人同意,仅用于学习和知识分享,切勿商用,违者必究。网站上站长的联系方式,进群有福利哟~