1.15 泛型
一般类型的本质是参数类型,即所操作的数据类型被指定为参数。
可以在此之前使用 Object 要实现一般、不同类型的处理,有两个缺点:
- 每次使用都需要强制转换为想要的类型
- 编译时,编译器不知道类型转换是否正常,只知道运行时不安全
根据《Java 编程思想的描述,泛型的动机在于:
泛型的出现有很多原因,最引人注目的原因之一就是创造
容器类
。
事实上,引入泛型的主要目标如下:
- 类型安全
- 提高泛型的主要目标是提高泛型 Java 安全类型的程序
- 编译期可以检查原因 Java 由于类型不正确 ClassCastException 异常
- 越早出错的原则越小
- 消除强制类型转换
- 泛型的附带优点之一是在使用时直接获得目标类型,消除许多强制类型转换
- 收入就是所需,这使得代码更可读,减少了出错的机会
- 潜在的性能收入
- 由于泛型的实现,不需要支持泛型(几乎) JVM 或类文件变更
- 所有的工作都在编译器中完成
- 编译器生成的代码与不使用泛型(强制类型转换)时编写的代码几乎相同,但更能保证类型的安全
泛型接口/类/方法
package top.cc; /** * @author chen.chao * @version 1.0 * @date 2020/4/4 13:51 * @description */ class List<E> implements Collection { private E e; @Override public E get() { return e; } } class Set implements Collection{ private Object e; @Override public Object get() { return e; } public <E> void setE(E e) { this.e = (Object) e; } } public interface Collection<E>{ E get(); }
泛继承、实现
- 父类使用泛型,子类要么指定具体类型参数,要么继续使用泛型
泛型的约束和局限性
- 只能使用包装器类型,不能使用基本数据类型;
- 类型查询只适用于原始类型,不适用于带类型参数的类型;
- 具有类型参数的泛型数组无法创建
Pair [] pairs = new Pair[10];//error
- 只能用反射来创建泛型数组
public static <T extends Comparable> T[] minmax(T… a){ T[] mm = (T[]) Array.newInstance(a.getClass().getComponentType(),个数); // …复制 }
通配符
-
?
代表未知类型,只能用于声明类型或方法参数,不能用于定义(指定类型参数) -
List<?> unknownList;
-
List<? extends Number> unknownNumberList;
-
List<? super Integer> unknownBaseLineIntegerList;
-
对于参数值为未知类型的容器类,只能读取元素,不能添加元素, 由于其类型未知,编译器无法识别添加元素的类型是否与容器的类型兼容,唯一的例外是null。
-
通配符类型 List<?> 与原始类型 List 和具体类型 List都不相同,List<?>表示这个list每个元素的类型都是一样的,但我们不知道这种类型是什么。注意,List<?>和List因为Object是最高层的超类,List表示元素可以是任何类型的对象,但是List<?>不是这个意思(未知类型)。
extends 指定的类型必须是自己或其子类型
-
List<? extends Fruit>
-
如果以此引用变量为参数,可以引入哪些引用?
-
本身及其子类
-
还包含通配符extends本身及子类
-
不能引入只含通配符,不含通配符extends 指定类 的引用
-
或者extends不是指定类及其子类,而是其父类
-
// Number “extends” Number (in this context)
-
List<? extends Number> foo3 = new ArrayList<? extends Number>();
-
// Integer extends Number
-
List<? extends Number> foo3 = new ArrayList<? extends Integer>();
-
// Double extends Number
-
List<? extends Number> foo3 = new ArrayList<? extends Double>();
-
若实现多个接口,则可使用&将接口分开
-
T extends Comparable & Serializable
-
List<? extends Number> list = new ArrayList();
- list.add(new Integer(1)); //error
-
list.add(new Float(1.2f)); //error
super 指定的类型必须是自己或父亲的类型
- 泛型通配符不能同时声明
PECS(读extends,写super)
-
producer-extends, consumer-super.
-
produce是指参数是producer,consumer是指参数是consumer。
-
用泛型类写数据时extends;
-
从泛型类读数据时使用super;
-
既要拿又要写,不需要通配符(即extends与super都不用)比如List。
-
若参数类型表示T生产者,则为<? extends T>;如果它表示T消费者,就使用它<? super E>。
-
Stack的pushAllE实例供产生参数Stack因此,参数类型为Iterable<? extends E>。
-
popAll的参数提供Stack因此,消费E的参数类型是Collection<? super E>。
public void pushAll(Iterable<? extends E> src) { for (E e : src) push(e); } public void popAll(Collection<? super E> dst) { - while (!isEmpty()) - dst.add(pop()); - } 在调用pushAll生产方法E 实例(produces E instanes),在调用popAll方法时dst消费了E 实例(consumes E instances)。Naftalin与Wadler将PECS称为Get and Put Principle。
- Collections#copy
public static <T> void copy(List<? super T> dest, List<? extends T> src) {
int srcSize = src.size();
if (srcSize > dest.size())
throw new IndexOutOfBoundsException("Source does not fit in dest");
if (srcSize < COPY_THRESHOLD ||
(src instanceof RandomAccess && dest instanceof RandomAccess)) {
for (int i=0; i<srcSize; i++)
dest.set(i, src.get(i));
} else {
ListIterator<? super T> di=dest.listIterator();
ListIterator<? extends T> si=src.listIterator();
for (int i=0; i<srcSize; i++) {
di.next();
di.set(si.next());
}
}
}
泛型擦除(编译时擦除)
-
编译器生成的bytecode是不包含泛型信息的,泛型类型信息将在编译处理是被擦除,这个过程即泛型擦除。
-
擦除类型变量,并替换为限定类型(无限定的变量用Object)。
-
比如 T extends Comparable
-
那么下面所有出现T的地方都会被替换为Comparable
-
如果调用时指定某个类,比如 Pair pair = new Pair<>();
-
那么Pair 中所有的T都替换为String
-
泛型擦除带来的问题:
- 无法使用具有不同类型参数的泛型进行方法重载
public void test(List<String> ls) {
System.out.println("Sting");
}
public void test(List<Integer> li) {
System.ut.println("Integer");
} // 编译出错
public interface Builder<K,V> {
void add(List<K> keyList);
void add(List<V> valueList);
}
-
泛型类的静态变量是共享的
-
另外,因为Java泛型的擦除并不是对所有使用泛型的地方都会擦除的,部分地方会保留泛型信息,在运行时可以获得类型参数。
1.16 IO
Unix IO模型
- 异步I/O 是指用户程序发起IO请求后,不等待数据,同时操作系统内核负责I/O操作把数据从内核拷贝到用户程序的缓冲区后通知应用程序。数据拷贝是由操作系统内核完成,用户程序从一开始就没有等待数据,发起请求后不参与任何IO操作,等内核通知完成。
- 同步I/O 就是非异步IO的情况,也就是用户程序要参与把数据拷贝到程序缓冲区(例如java的InputStream读字节流过程)。
- 同步IO里的非阻塞 是指用户程序发起IO操作请求后不等待数据,而是调用会立即返回一个标志信息告知条件不满足,数据未准备好,从而用户请求程序继续执行其它任务。执行完其它任务,用户程序会主动轮询查看IO操作条件是否满足,如果满足,则用户程序亲自参与拷贝数据动作。
- Unix IO模型的语境下,同步和异步的区别在于数据拷贝阶段是否需要完全由操作系统处理。阻塞和非阻塞操作是针对发起IO请求操作后是否有立刻返回一个标志信息而不让请求线程等待。
BIO NIO AIO介绍
- BIO:同步阻塞,每个客户端的连接会对应服务器的一个线程来同时支持多个用户的请求
- NIO:同步非阻塞,基于事件驱动思想设计,避免在bio场景中线程数量过多而造成瘫痪,Reactor,当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。 也就是说,这个时候,已经不是一个连接就要对应一个处理线程了,而是有效的请求,对应一个线程,当连接没有数据时,是没有工作线程来处理的
- AIO: 异步非阻塞,客户端的IO请求由OS完成后再通知服务器启动线程处理(需要OS支持)
- 进程向操作系统请求数据
- 操作系统把外部数据加载到内核的缓冲区中,
- 操作系统把内核的缓冲区拷贝到进程的缓冲区
- 进程获得数据完成自己的功能
- AIO需要操作系统支持
Java NIO属于同步非阻塞IO,即IO多路复用,单个线程可以支持多个IO 即询问时从IO没有完毕时直接阻塞,变成了立即返回一个是否完成IO的信号。
Java BIO 使用
package top.io.bio;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
/**
* @author chen.chao
* @version 1.0
* @date 2020/4/4 15:16
* @description
*/
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(12222);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 2, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1024), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
while (true){
final Socket accept = serverSocket.accept();
threadPoolExecutor.submit(()->{
try (BufferedReader br = new BufferedReader(new InputStreamReader(accept.getInputStream()))){
String s = null;
while ((s=br.readLine())!=null){
System.out.println(s);
}
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}
package top.io.bio;
import java.io.IOException;
import java.net.Socket;
import java.util.Date;
/**
* @author chen.chao
* @version 1.0
* @date 2020/4/4 15:24
* @description
*/
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
Socket socket = new Socket("127.0.0.1",12222);
socket.getOutputStream().write(String.format("ping %s\n",new Date().toString()).getBytes());
socket.getOutputStream().flush();
Thread.sleep(20000);
}
}
Java NIO 使用
-
传统的IO操作面向数据流,意味着每次从流中读一个或多个字节,直至完成,数据没有被缓存在任何地方。
-
NIO操作面向缓冲区,数据从Channel读取到Buffer缓冲区,随后在Buffer中处理数据。
-
BIO中的accept是没有客户端连接时阻塞,NIO的accept是没有客户端连接时立即返回。
-
NIO的三个重要组件:Buffer、Channel、Selector。
-
Buffer是用于容纳数据的缓冲区,Channel是与IO设备之间的连接,类似于流。
-
数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。
-
Selector是Channel的多路复用器。
Buffer(缓冲区)
- clear 是将position置为0,limit置为capacity;
- flip是将limit置为position,position置为0;
MappedByteBuffer(对应OS中的内存映射文件)
-
ByteBuffer有两种模式:直接/间接。间接模式就是HeapByteBuffer,即操作堆内存 (byte[])。
-
但是内存毕竟有限,如果我要发送一个1G的文件怎么办?不可能真的去分配1G的内存.这时就必须使用"直接"模式,即 MappedByteBuffer。
-
OS中内存映射文件是将一个文件映射为虚拟内存(文件没有真正加载到内存,只是作为虚存),不需要使用文件系统调用来读写数据,而是直接读写内存。
-
Java中是使用MappedByteBuffer来将文件映射为内存的。通常可以映射整个文件,如果文件比较大的话可以分段进行映射,只要指定文件的那个部分就可以。
-
优点:减少一次数据拷贝
-
之前是 进程空间<->内核的IO缓冲区<->文件
-
现在是 进程空间<->文件
-
MappedByteBuffer可以使用FileChannel.map方法获取。
-
它有更多的优点:
-
a. 读取快
-
b. 写入快
-
c. 随机读写
-
MappedByteBuffer使用虚拟内存,因此分配(map)的内存大小不受JVM的-Xmx参数限制,但是也是有大小限制的。
-
那么可用堆外内存到底是多少?,即默认堆外内存有多大:
-
① 如果我们没有通过-XX:MaxDirectMemorySize来指定最大的堆外内存。则
-
② 如果我们没通过-Dsun.nio.MaxDirectMemorySize指定了这个属性,且它不等于-1。则
-
③ 那么最大堆外内存的值来自于directMemory = Runtime.getRuntime().maxMemory(),这是一个native方法。
-
在我们使用CMS GC的情况下的实现如下:其实是新生代的最大值-一个survivor的大小+老生代的最大值,也就是我们设置的-Xmx的值里除去一个survivor的大小就是默认的堆外内存的大小了。
-
如果当文件过大,内存不足时,可以通过position参数重新map文件后面的内容。
-
MappedByteBuffer在处理大文件时的确性能很高,但也存在一些问题,如内存占用、文件关闭不确定,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
DirectByteBuffer(堆外内存)
-
DirectByteBuffer继承自MappedByteBuffer,它们都是使用的堆外内存,不受JVM堆大小的限制,只是前者仅仅是分配内存,后者是将文件映射到内存中。
-
可以通过ByteBuf.allocateDirect方法获取。
-
堆外内存的特点(大对象;加快内存拷贝;减轻GC压力) - 对于大内存有良好的伸缩性(支持分配大块内存) - 对垃圾回收停顿的改善可以明显感觉到(堆外内存,减少GC对堆内存回收的压力) - 在进程间可以共享,减少虚拟机间的复制,加快复制速度(减少堆内内存拷贝到堆外内存的过程) - 还可以使用 池+堆外内存 的组合方式,来对生命周期较短,但涉及到I/O操作的对象进行堆外内存的再使用。( Netty中就使用了该方式 )
-
堆外内存的一些问题
- 1)堆外内存回收问题(不手工回收会导致内存溢出,手工回收就失去了Java的优势);
-
- 数据结构变得有些别扭。要么就是需要一个简单的数据结构以便于直接映射到堆外内存,要么就使用复杂的数据结构并序列化及反序列化到内存中。很明显使用序列化的话会比较头疼且存在性能瓶颈。使用序列化比使用堆对象的性能还差。
堆外内存的释放
-
java.nio.DirectByteBuffer对象在创建过程中会先通过Unsafe接口直接通过os::malloc来分配内存,然后将内存的起始地址和大小存到java.nio.DirectByteBuffer对象里,这样就可以直接操作这些内存。这些内存只有在DirectByteBuffer回收掉之后才有机会被回收,因此如果这些对象大部分都移到了old,但是一直没有触发CMS GC或者Full GC,那么悲剧将会发生,因为你的物理内存被他们耗尽了,因此为了避免这种悲剧的发生,通过-XX:MaxDirectMemorySize来指定最大的堆外内存大小,当使用达到了阈值的时候将调用System.gc来做一次full gc,以此来回收掉没有被使用的堆外内存。
-
GC方式:
-
存在于堆内的DirectByteBuffer对象很小,只存着基地址和大小等几个属性,和一个Cleaner,但它代表着后面所分配的一大段内存,是所谓的冰山对象。通过前面说的Cleaner,堆内的DirectByteBuffer对象被GC时,它背后的堆外内存也会被回收。
-
当新生代满了,就会发生minor gc;如果此时对象还没失效,就不会被回收;撑过几次minorgc后,对象被迁移到老生代;当老生代也满了,就会发生full gc。
-
这里可以看到一种尴尬的情况,因为DirectByteBuffer本身的个头很小,只要熬过了minor gc,即使已经失效了也能在老生代里舒服的呆着,不容易把老生代撑爆触发full gc,如果没有别的大块头进入老生代触发full gc,就一直在那耗着,占着一大片堆外内存不释放。
-
这时,就只能靠前面提到的申请额度超限时触发的System.gc()来救场了。但这道最后的保险其实也不很好,首先它会中断整个进程,然后它让当前线程睡了整整一百毫秒,而且如果gc没在一百毫秒内完成,它仍然会无情的抛出OOM异常。
-
那为什么System.gc()会释放DirectByteBuffer呢?
-
每个DirectByteBuffer关联着其对应的Cleaner,Cleaner是PhantomReference的子类,虚引用主要被用来跟踪对象被垃圾回收的状态,通过查看ReferenceQueue中是否包含对象所对应的虚引用来判断它是否即将被垃圾回收。
-
当GC时发现DirectByteBuffer除了PhantomReference外已不可达,就会把它放进 Reference类pending list静态变量里。然后另有一条ReferenceHandler线程,名字叫 "Reference Handler"的,关注着这个pending list,如果看到有对象类型是Cleaner,就会执行它的clean(),其他类型就放入应用构造Reference时传入的ReferenceQueue中,这样应用的代码可以从Queue里拖出这些理论上已死的对象,做爱做的事情——这是一种比finalizer更轻量更好的机制。
-
手工方式:
-
如果想立即释放掉一个MappedByteBuffer/DirectByteBuffer,因为JDK没有提供公开API,只能使用反射的方法去unmap;
-
或者使用Cleaner的clean方法。
public static void main(String[] args) {
try {
File f = File.createTempFile("Test", null);
f.deleteOnExit();
RandomAccessFile file = new RandomAccessFile(f, "rw");
file.setLength(1024);
FileChannel channel = file.getChannel();
MappedByteBuffer buffer = channel.map(
FileChannel.MapMode.READ_WRITE, 0, 1024);
channel.close();
file.close();
// 手动unmap
Method m = FileChannelImpl.class.getDeclaredMethod("unmap",
MappedByteBuffer.class);
m.setAccessible(true);
m.invoke(FileChannelImpl.class, buffer);
if (f.delete())
System.out.println("Temporary file deleted: " + f);
else
System.err.println("Not yet deleted: " + f);
} catch (Exception ex) {
ex.printStackTrace();
}
}
Channel(通道)
- Channel与IO设备的连接,与Stream是平级的概念。
流与通道的区别
-
1、流是单向的,通道是双向的,可读可写。
-
2、流读写是阻塞的,通道可以异步读写。
-
3、流中的数据可以选择性的先读到缓存中,通道的数据总是要先读到一个缓存中,或从缓存中写入
-
注意,FileChannel 不能设置为非阻塞模式。
分散与聚集
-
分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。
-
聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel。
Pipe
public class PipeTest {
public static void main(String[] args) {
Pipe pipe = null;
ExecutorService exec = Executors.newFixedThreadPool(2);
try {
pipe = Pipe.open();
final Pipe pipeTemp = pipe;
exec.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Pipe.SinkChannel sinkChannel = pipeTemp.sink();//向通道中写数据
while (true) {
TimeUnit.SECONDS.sleep(1);
String newData = "Pipe Test At Time " + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while (buf.hasRemaining()) {
System.out.println(buf);
sinkChannel.write(buf);
}
}
}
});
exec.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Pipe.SourceChannel sourceChannel = pipeTemp.source();//向通道中读数据
while (true) {
TimeUnit.SECONDS.sleep(1);
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
int bytesRead = sourceChannel.read(buf);
System.out.println("bytesRead=" + bytesRead);
while (bytesRead > 0) {
buf.flip();
byte b[] = new byte[bytesRead];
int i = 0;
while (buf.hasRemaining()) {
b[i] = buf.get();
System.out.printf("%X", b[i]);
i++;
}
String s = new String(b);
System.out.println("=================||" + s);
bytesRead = sourceChannel.read(buf);
}
}
}
});
} catch (IOException e) {
e.printStackTrace();
} finally {
exec.shutdown();
}
}
}
FileChannel与文件锁
-
在通道中我们可以对文件或者部分文件进行上锁。上锁和我们了解的线程锁差不多,都是为了保证数据的一致性。在文件通道FileChannel中可以对文件进行上锁,通过FileLock可以对文件进行锁的释放。
-
文件加锁是建立在文件通道(FileChannel)之上的,套接字通道(SockeChannel)不考虑文件加锁,因为它是不共享的。它对文件加锁有两种方式:
-
①lock
-
②tryLock
-
两种加锁方式默认都是对整个文件加锁,如果自己配置的话就可以控制加锁的文件范围:position是加锁的开始位置,size是加锁长度,shared是用于控制该锁是共享的还是独占的。
-
lock是阻塞式的,当有进程对锁进行读取时会等待锁的释放,在此期间它会一直等待;tryLock是非阻塞式的,它尝试获得锁,如果这个锁不能获得,那么它会立即返回。
-
release可以释放锁。
-
在一个进程中在锁没有释放之前是无法再次获得锁的
-
在java的NIO中,通道包下面有一个FileLock类,它主要是对文件锁工具的一个描述。在上一小节中对文件的锁获取其实是FileChannel获取的(lock与trylock是FileChannel的方法),它们返回一个FileLock对象。这个类的核心方法有如下这些:
-
boolean isShared() :判断锁是否为共享类型
-
abstract boolean isValid() :判断锁是否有效
-
boolean overlaps():判断此锁定是否与给定的锁定区域重叠
-
long position():返回文件内锁定区域中第一个字节的位置。
-
abstract void release() :释放锁
-
long size() :返回锁定区域的大小,以字节为单位
-
在文件锁中有3种方式可以释放文件锁:①锁类释放锁,调用FileLock的release方法; ②通道类关闭通道,调用FileChannel的close方法;③jvm虚拟机会在特定情况释放锁。
-
锁类型(独占式和共享式)
-
我们先区分一下在文件锁中两种锁的区别:①独占式的锁就想我们上面测试的那样,只要有一个进程获取了独占锁,那么别的进程只能等待。②共享锁在一个进程获取的情况下,别的进程还是可以读取被锁定的文件,但是别的进程不能写只能读。
Selector (Channel的多路复用器)
- Selector可以用单线程去管理多个Channel(多个连接)。
- 放在网络编程的环境下:Selector使用单线程,轮询客户端对应的Channel的请求,如果某个Channel需要进行IO,那么分配一个线程去执行IO操作。
- Selector可以去监听的请求有以下几类:
- 1、connect:客户端连接服务端事件,对应值为SelectionKey.OPCONNECT(8)
- 2、accept:服务端接收客户端连接事件,对应值为SelectionKey.OPACCEPT(16)
- 3、read:读事件,对应值为SelectionKey.OPREAD(1)
- 4、write:写事件,对应值为SelectionKey.OPWRITE(4)
- 每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据返回。
- SelectionKey是一个复合事件,绑定到某个selector对应的某个channel上,可能是多个事件的复合或单一事件。
Java NIO 实例(文件上传)
- 服务器主线程先创建Socket,并注册到selector,然后轮询selector。
- 1)如果有客户端需要进行连接,那么selector返回ACCEPT事件,主线程建立连接(accept),并将该客户端连接注册到selector,结束,继续轮询selector等待下一个客户端事件;
- 2)如果有已连接的客户端需要进行读写,那么selector返回READ/WRITE事件,主线程将该请求交给IO线程池中的某个线程执行操作,结束,继续轮询selector等待下一个客户端事件。
服务器
public class NIOTCPServer {
private ServerSocketChannel serverSocketChannel;
private final String FILE_PATH = "E:/uploads/";
private AtomicInteger i;
private final String RESPONSE_MSG = "服务器接收数据成功";
private Selector selector;
private ExecutorService acceptPool;
private ExecutorService readPool;
public NIOTCPServer() {
try {
serverSocketChannel = ServerSocketChannel.open();
//切换为非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9000));
//获得选择器
selector = Selector.open();
//将channel注册到selector上
//第二个参数是选择键,用于说明selector监控channel的状态
//可能的取值:SelectionKey.OP_READ OP_WRITE OP_CONNECT OP_ACCEPT
//监控的是channel的接收状态
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
acceptPool = new ThreadPoolExecutor(50, 100, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
readPool = new ThreadPoolExecutor(50, 100, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
i = new AtomicInteger(0);
System.out.println("服务器启动");
} catch (IOException e) {
e.printStackTrace();
}
}
public void receive() {
try {
//如果有一个及以上的客户端的数据准备就绪
while (selector.select() > 0) {
//获取当前选择器中所有注册的监听事件
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext(); ) {
SelectionKey key = it.next();
//如果"接收"事件已就绪
if (key.isAcceptable()) {
//交由接收事件的处理器处理
acceptPool.submit(new ReceiveEventHander());
} else if (key.isReadable()) {
//如果"读取"事件已就绪
//交由读取事件的处理器处理
readPool.submit(new ReadEventHandler((SocketChannel) key.channel()));
}
//处理完毕后,需要取消当前的选择键
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
class ReceiveEventHander implements Runnable {
public ReceiveEventHander() {
}
@Override
public void run() {
SocketChannel client = null;
try {
client = serverSocketChannel.accept();
// 接收的客户端也要切换为非阻塞模式
client.configureBlocking(false);
// 监控客户端的读操作是否就绪
client.register(selector, SelectionKey.OP_READ);
System.out.println("服务器连接客户端:" + client.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReadEventHandler implements Runnable {
private ByteBuffer buf;
private SocketChannel client;
public ReadEventHandler(SocketChannel client) {
this.client = client;
buf = ByteBuffer.allocate(1024);
}
@Override
public void run() {
FileChannel fileChannel = null;
try {
int index = 0;
synchronized (client) {
while (client.read(buf) != -1) {
if (fileChannel == null) {
index = i.getAndIncrement();
fileChannel = FileChannel.open(Paths.get(FILE_PATH, index + ".jpeg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
buf.flip();
fileChannel.write(buf);
buf.clear();
}
}
if (fileChannel != null) {
fileChannel.close();
System.out.println("服务器写来自客户端" + client + " 文件" + index + " 完毕");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
NIOTCPServer server = new NIOTCPServer();
server.receive();
}
}
客户端
public class NIOTCPClient {
private SocketChannel clientChannel;
private ByteBuffer buf;
public NIOTCPClient() {
try {
clientChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
//设置客户端为非阻塞模式
clientChannel.configureBlocking(false);
buf = ByteBuffer.allocate(1024);
} catch (IOException e) {
e.printStackTrace();
}
}
public void send(String fileName) {
try {
FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
while (fileChannel.read(buf) != -1) {
buf.flip();
clientChannel.write(buf);
buf.clear();
}
System.out.println("客户端已发送文件" + fileName);
fileChannel.close();
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(50, 100, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
Instant begin = Instant.now();
for (int i = 0; i < 200; i++) {
pool.submit(() -> {
NIOTCPClient client = new NIOTCPClient();
client.send("E:/1.jpeg");
});
}
pool.shutdown();
Instant end = Instant.now();
System.out.println(Duration.between(begin,end));
}
}
Java AIO 使用
-
对AIO来说,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。
-
AIO的四步:
-
1、进程向操作系统请求数据
-
2、操作系统把外部数据加载到内核的缓冲区中,
-
3、操作系统把内核的缓冲区拷贝到进程的缓冲区
-
4、进程获得数据完成自己的功能
-
JDK1.7主要增加了三个新的异步通道:
-
AsynchronousFileChannel: 用于文件异步读写;
-
AsynchronousSocketChannel: 客户端异步socket;
-
AsynchronousServerSocketChannel: 服务器异步socket。
-
因为AIO的实施需充分调用OS参与,IO需要操作系统支持、并发也同样需要操作系统的
-
在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open()静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法外,还提供了read和write方法。
-
在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败时被回调。
-
void completed(V result, A attachment);
-
void failed(Throwable exc, A attachment);
public class AIOServer {
private static int PORT = 8080;
private static int BUFFER_SIZE = 1024;
private static String CHARSET = "utf-8"; //默认编码
private static CharsetDecoder decoder = Charset.forName(CHARSET).newDecoder(); //解码
private AsynchronousServerSocketChannel serverChannel;
public AIOServer() {
this.decoder = Charset.forName(CHARSET).newDecoder();
}
private void listen() throws Exception {
//打开一个服务通道
//绑定服务端口
this.serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT), 100);
this.serverChannel.accept(this, new AcceptHandler());
}
/**
* accept到一个请求时的回调
*/
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
@Override
public void completed(final AsynchronousSocketChannel client, AIOServer server) {
try {
System.out.println("远程地址:" + client.getRemoteAddress());
//tcp各项参数
client.setOption(StandardSocketOptions.TCP_NODELAY, true);
client.setOption(StandardSocketOptions.SO_SNDBUF, 1024);
client.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
if (client.isOpen()) {
System.out.println("client.isOpen:" + client.getRemoteAddress());
final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.clear();
client.read(buffer, client, new ReadHandler(buffer));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
server.serverChannel.accept(server, this);// 监听新的请求,递归调用。
}
}
@Override
public void failed(Throwable e, AIOServer attachment) {
try {
e.printStackTrace();
} finally {
attachment.serverChannel.accept(attachment, this);// 监听新的请求,递归调用。
}
}
}
/**
* Read到请求数据的回调
*/
private class ReadHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {
private ByteBuffer buffer;
public ReadHandler(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public void completed(Integer result, AsynchronousSocketChannel client) {
try {
if (result < 0) {// 客户端关闭了连接
AIOServer.close(client);
} else if (result == 0) {
System.out.println("空数据"); // 处理空数据
} else {
// 读取请求,处理客户端发送的数据
buffer.flip();
CharBuffer charBuffer = AIOServer.decoder.decode(buffer);
System.out.println(charBuffer.toString()); //接收请求
//响应操作,服务器响应结果
buffer.clear();
String res = "hellworld";
buffer = ByteBuffer.wrap(res.getBytes());
client.write(buffer, client, new WriteHandler(buffer));//Response:响应。
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
exc.printStackTrace();
AIOServer.close(attachment);
}
}
/**
* Write响应完请求的回调
*/
private class WriteHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {
private ByteBuffer buffer;
public WriteHandler(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
buffer.clear();
AIOServer.close(attachment);
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
exc.printStackTrace();
AIOServer.close(attachment);
}
}
private static void close(AsynchronousSocketChannel client) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
System.out.println("正在启动服务...");
AIOServer AIOServer = new AIOServer();
AIOServer.listen();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
}
}
});
t.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Java NIO 源码
- 关于Selector源码过于难以理解,可以先放过。
Buffer
public abstract class Buffer {
/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
long address;
// Creates a new buffer with the given mark, position, limit, and capacity,
// after checking invariants.
//
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}
- }
- ByteBuffer有两种实现:HeapByteBuffer和DirectByteBuffer。
- ByteBuffer#allocate
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
- ByteBuffer#allocateDirect
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
HeapByteBuffer(间接模式)
- 底层基于byte数组。
初始化
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
}
- 调用的是ByteBuffer的初始化方法
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
- ByteBuffer的独有成员变量:
- final byte[] hb; // Non-null only for heap buffers final int offset; boolean isReadOnly; // Valid only for heap buffers
get
public byte get() {
return hb[ix(nextGetIndex())];
}
final int nextGetIndex() { // package-private
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
- protected int ix(int i) { return i + offset; }
put
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}
final int nextPutIndex() { // package-private
if (position >= limit)
throw new BufferOverflowException();
return position++;
}
DirectByteBuffer(直接模式)
- 底层基于c++的malloc分配的堆外内存,是使用Unsafe类分配的,底层调用了native方法。
- 在创建DirectByteBuffer的同时,创建一个与其对应的cleaner,cleaner是一个虚引用。
- 回收堆外内存的几种情况:
- 1)程序员手工释放,需要使用sun的非公开API实现。
- 2)申请新的堆外内存而内存不足时,会进行调用Cleaner(作为一个Reference)的静态方法tryHandlePending(false),它又会调用cleaner的clean方法释放内存。
- 3)当DirectByteBuffer失去强引用,只有虚引用时,当等到某一次System.gc(full gc)(比如堆外内存达到XX:MaxDirectMemorySize)时,当DirectByteBuffer对象从pending状态 -> enqueue状态时,会触发Cleaner的clean(),而Cleaner的clean()的方法会实现通过unsafe对堆外内存的释放。
初始化
- 重要成员变量:
private final Cleaner cleaner;
- // Cached unsafe-access object protected static final Unsafe unsafe = Bits.unsafe();
- Unsafe中很多都是native方法,底层调用c++代码。
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
-
// 内存是否按页分配对齐 boolean pa = VM.isDirectMemoryPageAligned();
-
// 获取每页内存大小 int ps = Bits.pageSize();
-
// 分配内存的大小,如果是按页对齐方式,需要再加一页内存的容量 long size = Math.max(1L, (long)cap + (pa ? ps : 0));
-
// 用Bits类保存总分配内存(按页分配)的大小和实际内存的大小 Bits.reserveMemory(size, cap);
long base = 0; try {
-
// 在堆外内存的基地址,指定内存大小 base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0);
- // 计算堆外内存的基地址 if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
-
第一行super调用的是其父类MappedByteBuffer的构造方法
MappedByteBuffer(int mark, int pos, int lim, int cap) { // package-private
super(mark, pos, lim, cap);
this.fd = null;
}
- 而它的super又调用了ByteBuffer的构造方法
ByteBuffer(int mark, int pos, int lim, int cap) { // package-private
this(mark, pos, lim, cap, null, 0);
}
-
Bits#reserveMemory
-
该方法用于在系统中保存总分配内存(按页分配)的大小和实际内存的大小。
-
总的来说,Bits.reserveMemory(size, cap)方法在可用堆外内存不足以分配给当前要创建的堆外内存大小时,会实现以下的步骤来尝试完成本次堆外内存的创建:
-
① 触发一次非堵塞的Reference#tryHandlePending(false)。该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。
-
② 如果进行一次堆外内存资源回收后,还不够进行本次堆外内存分配的话,则进行 System.gc()。System.gc()会触发一个full gc,但你需要知道,调用System.gc()并不能够保证full gc马上就能被执行。所以在后面打代码中,会进行最多9次尝试,看是否有足够的可用堆外内存来分配堆外内存。并且每次尝试之前,都对延迟等待时间,已给JVM足够的时间去完成full gc操作。
-
这里之所以用使用full gc的很重要的一个原因是:System.gc()会对新生代和老生代都会进行内存回收,这样会比较彻底地回收DirectByteBuffer对象以及他们关联的堆外内存.
-
DirectByteBuffer对象本身其实是很小的,但是它后面可能关联了一个非常大的堆外内存,因此我们通常称之为冰山对象.
-
我们做young gc的时候会将新生代里的不可达的DirectByteBuffer对象及其堆外内存回收了,但是无法对old里的DirectByteBuffer对象及其堆外内存进行回收,这也是我们通常碰到的最大的问题。(并且堆外内存多用于生命期中等或较长的对象)
-
如果有大量的DirectByteBuffer对象移到了old,但是又一直没有做cms gc或者full gc,而只进行ygc,那么我们的物理内存可能被慢慢耗光,但是我们还不知道发生了什么,因为heap明明剩余的内存还很多(前提是我们禁用了System.gc – JVM参数DisableExplicitGC)。
-
注意,如果你设置了-XX:+DisableExplicitGC,将会禁用显示GC,这会使System.gc()调用无效。
-
③ 如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。
-
static void reserveMemory(long size, int cap) {
if (!memoryLimitSet && VM.isBooted()) { maxMemory = VM.maxDirectMemory(); memoryLimitSet = true; }
// optimist! if (tryReserveMemory(size, cap)) { return; }
final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess(); // 如果系统中内存( 即,堆外内存 )不够的话:
-
jlra.tryHandlePendingReference()会触发一次非堵塞的Reference#tryHandlePending(false)。该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。
-
// retry while helping enqueue pending Reference objects // which includes executing pending Cleaner(s) which includes // Cleaner(s) that free direct buffer memory while (jlra.tryHandlePendingReference()) { if (tryReserveMemory(size, cap)) { return; } } // 如果在进行一次堆外内存资源回收后,还不够进行本次堆外内存分配的话,则 // trigger VM’s Reference processing System.gc();
// a retry loop with exponential back-off delays // (this gives VM some time to do it’s job) boolean interrupted = false; try { long sleepTime = 1; int sleeps = 0; while (true) { if (tryReserveMemory(size, cap)) { return; }
-
// 9 if (sleeps >= MAX_SLEEPS) { break; } if (!jlra.tryHandlePendingReference()) { try { Thread.sleep(sleepTime); sleepTime <<= 1; sleeps++; } catch (InterruptedException e) { interrupted = true; } } } // 如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。 // no luck throw new OutOfMemoryError(“Direct buffer memory”);
} finally { if (interrupted) { // don’t swallow interrupts Thread.currentThread().interrupt(); } } }
-
Reference#tryHandlePending
-
static boolean tryHandlePending(boolean waitForNotify) { Reference r; Cleaner c; try { synchronized (lock) { if (pending != null) { r = pending; // ‘instanceof’ might throw OutOfMemoryError sometimes // so do this before un-linking ‘r’ from the ‘pending’ chain… c = r instanceof Cleaner ? (Cleaner) r : null; // unlink ‘r’ from ‘pending’ chain pending = r.discovered; r.discovered = null; } else { // The waiting on the lock may cause an OutOfMemoryError // because it may try to allocate exception objects. if (waitForNotify) { lock.wait(); } // retry if waited return waitForNotify; } } } catch (OutOfMemoryError x) { // Give other threads CPU time so they hopefully drop some live references // and GC reclaims some space. // Also prevent CPU intensive spinning in case ‘r instanceof Cleaner’ above // persistently throws OOME for some time… Thread.yield(); // retry return true; } catch (InterruptedException x) { // retry return true; }
// Fast path for cleaners if (c != null) { c.clean(); return true; }
ReferenceQueue<? super Object> q = r.queue; if (q != ReferenceQueue.NULL) q.enqueue®; return true; }
Deallocator
- 后面是调用unsafe的分配堆外内存的方法,然后初始化了该DirectByteBuffer对应的cleaner。
- 注:在Cleaner 内部中通过一个列表,维护了一个针对每一个 directBuffer 的一个回收堆外内存的 线程对象(Runnable),回收操作是发生在 Cleaner 的 clean() 方法中。
private static class Deallocator
implements Runnable
{
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
// Paranoia
return;
}
unsafe.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
Cleaner(回收)
public class Cleaner extends PhantomReference<Object> {
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue();
private static Cleaner first = null;
private Cleaner next = null;
private Cleaner prev = null;
private final Runnable thunk;
private static synchronized Cleaner add(Cleaner var0) {
if (first != null) {
var0.next = first;
first.prev = var0;
}
first = var0;
return var0;
}
private static synchronized boolean remove(Cleaner var0) {
if (var0.next == var0) {
return false;
} else {
if (first == var0) {
if (var0.next != null) {
first = var0.next;
} else {
first = var0.prev;
}
}
if (var0.next != null) {
var0.next.prev = var0.prev;
}
if (var0.prev != null) {
var0.prev.next = var0.next;
}
var0.next = var0;
var0.prev = var0;
return true;
}
}
private Cleaner(Object var1, Runnable var2) {
super(var1, dummyQueue);
this.thunk = var2;
}
// var0是DirectByteBuffer,var1是Deallocator线程对象
public static Cleaner create(Object var0, Runnable var1) {
return var1 == null ? null : add(new Cleaner(var0, var1));
}
public void clean() {
if (remove(this)) {
try {
// 回收该DirectByteBuffer对应的堆外内存
this.thunk.run();
} catch (final Throwable var2) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
if (System.err != null) {
(new Error("Cleaner terminated abnormally", var2)).printStackTrace();
}
System.exit(1);
return null;
}
});
}
}
}
}
- Cleaner的构造方法中又调用了父类虚引用的构造方法:
public PhantomReference(T referent, ReferenceQueue<? super T> q) {
super(referent, q);
}
get
public byte get() {
return ((unsafe.getByte(ix(nextGetIndex()))));
}
put
public ByteBuffer put(byte x) {
unsafe.putByte(ix(nextPutIndex()), ((x)));
return this;
}
FileChannel(阻塞式)
- FileChannel的read、write和map通过其实现类FileChannelImpl实现。
- FileChannelImpl的Oracle JDK没有提供源码,只能在OpenJDK中查看。
open
public static FileChannel open(Path path, OpenOption... options)
throws IOException
{
Set<OpenOption> set = new HashSet<OpenOption>(options.length);
Collections.addAll(set, options);
return open(path, set, NO_ATTRIBUTES);
}
public static FileChannel open(Path path,
Set<? extends OpenOption> options,
FileAttribute<?>... attrs)
throws IOExcepti