资讯详情

浅谈(Java)BIO通信

??

?

??

??

??相遇是缘分,既然来了就拿着小板凳坐下来聊一会儿,如果在文中有所收获,请不要忘记一键三连,??,你的鼓励是我创作的动力!


文章目录

  • Java IO - BIO 详解
    • 几个重要概念
    • 传统的BIO通信方式简介
      • 传统的BIO的问题
      • 多线程方式 - 伪异步方式
    • BIO深入分析通信模式
      • 模拟20个客户20个客户端的并发请求:
      • 多线程优化服务器端
      • 看看服务器端的执行效果
      • 问题根源

Java IO - BIO 详解

BIO就是: blocking IO。最容易理解和实现IO应用程序向操作系统要求网络IO操作时,应用程序将继续等待;另一方面,操作系统将等待,直到数据传输到监控端口;操作系统将数据发送到应用程序;最后,应用程序接收数据并解除等待状态。

几个重要概念

  • 阻塞IO非阻塞IO

这两个概念是程序级别主要描述程序请求操作系统IO如果IO如果资源没有准备好,程序应该如何处理: 前者等待;后者继续执行(并使用线程轮询,直到有IO资源准备好了)

  • 同步IO非同步IO

这两个概念是操作系统级别主要描述操作系统在收到程序请求时IO如果IO未准备好资源,如何应对程序问题: 直到IO资源准备好后;后者返回一个标记(让程序和你知道未来数据在哪里通知),当IO资源准备好后,用事件机制返还程序。

传统的BIO通信方式简介

过去,大多数网络通信模式都是阻塞模式,即:

  • 客户端向服务器端发出请求后,客户端会一直等待(不做其他事情),直到服务器端返回结果或网络出现问题。
  • 服务器端也是如此。当处理客户端A发送的请求时,另一个客户端B发送的请求将等待,直到服务器端的处理线程完成最后一个处理。

image.png

传统的BIO的问题

  • 同时,服务器只能接受客户端A的请求信息;虽然客户端A和客户端B的请求同时进行,但客户端B发送的请求信息只能在服务器接受A的请求数据后才能接受。
  • 由于服务器一次只能处理一个客户端请求,第二个请求只能在处理完成并返回(或异常)后处理。显然,这种处理方法不能用于高并发性。

多线程方式 - 伪异步方式

如果服务器只有一个线程,读者会直接提出我们可以使用多线程技术来解决这个问题:

  • 服务器收到客户端X的请求后,(读取所有请求数据后)将该请求发送到独立线程,然后主线程继续接受客户端Y的请求。
  • 在客户端的一侧,以使用子线程和服务器端进行通信。这样,客户端主线程的其他工作就不会受到影响。当服务器端有响应信息时,子线程将通过 主线程通知监控模式/观察模式(等设计模式)。

如下图所示:

但使用线程来解决这个问题实际上是有限的:

  • 虽然在服务器端,请求处理交给了一个独立的线程,但操作系统通知accept()的方式仍然是单一的。也就是说,事实上,服务器在到数据报告后的业务处理过程可以是多线程的,但数据报告的接受仍然需要一个接一个以下示例代码和debug我们可以清楚地看到这个过程)
  • 在linux在系统中,可创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令检查可以创建的最大线程数。当然,这个值可以改变,但线程越多,CPU切换时间越长,处理真实业务的需求就越少。
  • 创建一个线程消耗了大量的资源。JVM创建线程时,即使不做任何工作,JVM堆栈空间将被分配。这个空间的大小默认为128K,您可以通过-Xss调整参数。当然也可以用ThreadPoolExecutor线程池可以缓解线程的创建,但也会造成BlockingQueue积压任务的不断增加也消耗了大量资源。
  • 此外,如果您的应用程序大量使用长连接,则线程不会关闭。这使得系统资源的消耗更容易失控。 所以,如果你真的想简单地使用线程来解决阻塞问题,那么你可以计算出一个服务器节点可以接受多少并发性。似乎简单地使用线程来解决这个问题并不是最好的方法。

BIO深入分析通信模式

BIO关键不在于是否使用多线程(包括线程池)来处理此请求,而在于是否使用多线程(包括线程池)来处理此请求accept()、read()操作点被堵塞。测试这个问题也很简单。我们模拟了20个客户端(20个线程模拟)JAVA同步计数器CountDownLatch,确保这20个客户初始化,然后同时向服务器发送请求,然后让我们观察一下Server这边接受信息的情况。

模拟20个客户20个客户端的并发请求:

客户端代码(SocketClientDaemon)

package testBSocket;  import java.util.concurrent.CountDownLatch;  public class SocketClientDaemon { 
             public static void main(String[] args) throws Exception { 
                 Integer clientNumbr = 20;
        CountDownLatch countDownLatch = new CountDownLatch(clientNumber);

        //分别开始启动这20个客户端
        for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) { 
        
            SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
            new Thread(client).start();
        }

        //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
        synchronized (SocketClientDaemon.class) { 
        
            SocketClientDaemon.class.wait();
        }
    }
} 

客户端代码(SocketClientRequestThread模拟请求)

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

/** * 一个SocketClientRequestThread线程模拟一个客户端请求。 * @author yinwenjie */
public class SocketClientRequestThread implements Runnable { 
        

    static { 
        
        BasicConfigurator.configure();
    }

    /** * 日志 */
    private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);

    private CountDownLatch countDownLatch;

    /** * 这个线层的编号 * @param countDownLatch */
    private Integer clientIndex;

    /** * countDownLatch是java提供的同步计数器。 * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性 * @param countDownLatch */
    public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) { 
        
        this.countDownLatch = countDownLatch;
        this.clientIndex = clientIndex;
    }

    @Override
    public void run() { 
        
        Socket socket = null;
        OutputStream clientRequest = null;
        InputStream clientResponse = null;

        try { 
        
            socket = new Socket("localhost",83);
            clientRequest = socket.getOutputStream();
            clientResponse = socket.getInputStream();

            //等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求
            this.countDownLatch.await();

            //发送请求信息
            clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes());
            clientRequest.flush();

            //在这里等待,直到服务器返回信息
            SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息");
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            String message = "";
            //程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)
            while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { 
        
                message += new String(contextBytes , 0 , realLen);
            }
            SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message);
        } catch (Exception e) { 
        
            SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
        } finally { 
        
            try { 
        
                if(clientRequest != null) { 
        
                    clientRequest.close();
                }
                if(clientResponse != null) { 
        
                    clientResponse.close();
                }
            } catch (IOException e) { 
        
                SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

服务器端(SocketServer1)单个线程

package testBSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 { 
        

    static { 
        
        BasicConfigurator.configure();
    }

    /** * 日志 */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception{ 
        
        ServerSocket serverSocket = new ServerSocket(83);

        try { 
        
            while(true) { 
        
                Socket socket = serverSocket.accept();

                //下面我们收取信息
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                //这里也会被阻塞,直到有数据准备好
                int realLen = in.read(contextBytes, 0, maxLen);
                //读取信息
                String message = new String(contextBytes , 0 , realLen);

                //下面打印信息
                SocketServer1.LOGGER.info("服务器收到来自于端口: " + sourcePort + "的信息: " + message);

                //下面开始发送信息
                out.write("回发响应信息!".getBytes());

                //关闭
                out.close();
                in.close();
                socket.close();
            }
        } catch(Exception e) { 
        
            SocketServer1.LOGGER.error(e.getMessage(), e);
        } finally { 
        
            if(serverSocket != null) { 
        
                serverSocket.close();
            }
        }
    }
}

多线程来优化服务器端

客户端代码和上文一样,最主要是更改服务器端的代码:

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 { 
        

    static { 
        
        BasicConfigurator.configure();
    }

    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    public static void main(String[] args) throws Exception{ 
        
        ServerSocket serverSocket = new ServerSocket(83);

        try { 
        
            while(true) { 
        
                Socket socket = serverSocket.accept();
                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) { 
        
            SocketServer2.LOGGER.error(e.getMessage(), e);
        } finally { 
        
            if(serverSocket != null) { 
        
                serverSocket.close();
            }
        }
    }
}

/** * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。 * 但还是改变不了socket被一个一个的做accept()的情况。 * @author yinwenjie */
class SocketServerThread implements Runnable { 
        

    /** * 日志 */
    private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);

    private Socket socket;

    public SocketServerThread (Socket socket) { 
        
        this.socket = socket;
    }

    @Override
    public void run() { 
        
        InputStream in = null;
        OutputStream out = null;
        try { 
        
            //下面我们收取信息
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            //使用线程,同样无法解决read方法的阻塞问题,
            //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
            int realLen = in.read(contextBytes, 0, maxLen);
            //读取信息
            String message = new String(contextBytes , 0 , realLen);

            //下面打印信息
            SocketServerThread.LOGGER.info("服务器收到来自于端口: " + sourcePort + "的信息: " + message);

            //下面开始发送信息
            out.write("回发响应信息!".getBytes());
        } catch(Exception e) { 
        
            SocketServerThread.LOGGER.error(e.getMessage(), e);
        } finally { 
        
            //试图关闭
            try { 
        
                if(in != null) { 
        
                    in.close();
                }
                if(out != null) { 
        
                    out.close();
                }
                if(this.socket != null) { 
        
                    this.socket.close();
                }
            } catch (IOException e) { 
        
                SocketServerThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
} 

看看服务器端的执行效果

看一看服务器使用多线程处理时的情况:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qdh20TKf-1651325100426)(https://s2.51cto.com/images/20220424/1650785309688075.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)]

问题根源

那么重点的问题并不是“是否使用了多线程”,而是为什么accept()、read()方法会被阻塞。即: 异步IO模式 就是为了解决这样的并发性存在的。但是为了说清楚异步IO模式,在介绍IO模式的时候,要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。

API文档中对于 serverSocket.accept() 方法的使用描述:

Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.

serverSocket.accept()会被阻塞? 这里涉及到阻塞式同步IO的工作原理:

  • 服务器线程发起一个accept动作,询问操作系统 是否有新的socket套接字信息从端口X发送过来。

  • 注意,是询问操作系统。也就是说socket套接字的IO模式支持是基于操作系统的,那么自然同步IO/异步IO的支持就是需要操作系统级别的了。如下图:

如果操作系统没有发现有套接字从指定的端口X来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为什么accept()方法为什么会阻塞: 它内部的实现是使用的操作系统级别的同步IO。

Java系列文章摘自Java全栈知识体系,这是一个非常棒的Java网站,知识体系全面,由浅入深的讲解知识点,官网地址:https://pdai.tech/ 文章转载已取得站长本人同意,仅用于学习和知识分享,切勿商用,违者必究。网站上站长的联系方式,进群有福利哟~


标签: rp5n连接电缆

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台