ByteBuffer

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理

ByteBuffer入门案例

首先我们创建一个txt文件,其内容为:

1
1234567890abc

接着编写代码获取文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
public class TestByteBuffer {
public static void main(String[] args) {
// 获取Channel 这里获取的是FileChannel
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true) {
// 从channel读取数据 向buffer写入
int len = channel.read(buffer);
log.info("读取到的字节数为{}", len);
if (len == -1) { // 读取结束
break;
}
buffer.flip(); // 切换至读模式
while (buffer.hasRemaining()) { // 判断是否还有未读取的数据
byte b = buffer.get();
log.info("读取到的字节为{}", (char) b);
}
buffer.clear(); // 切换至写模式 从头开始写
// 或者使用compact()方法 从未读的位置开始写
}
} catch (IOException e) {
}
}
}
  • 通过 ByteBuffer.allocate(); 来对Buffer空间进行分配
  • 通过 channel.read() 向buffer写入数据
    • 也可以通过 buffer.put() 方式写入数据
  • 通过 buffer.flip() 将 buffer 切换到读模式
  • 通过 buffer.clear() 将 buffer 切换到写模式
    • buffer.clear() 会将 position 初始化到0位置
    • buffer.compact() 会将 position 初始化到读位置

核心属性及方法

字节缓冲区的父类Buffer中有几个核心属性,如下

1
2
3
4
5
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
  • capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
  • limit:缓冲区的界限。位于 limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
  • position:记录读写的索引
  • mark:记录当前position的值。
    • 可以通过 buffer.mark() 方式进行标记
    • 通过 buffer.reset() 方式将 position 初始化到**标记位置

写模式

写模式

从上图可以看出,在前三个位置已经写入数据,此时 position 为3,而由于当前是读模式,limit=capacity=13

我们可以通过 #put() 方法将一个 byte 类型的数据存入缓冲区中,并将 position+1

读模式

#flip() 方法可以实现缓冲区的模式切换,若当前为读模式,则切换为写模式,若当前为写模式,则切换为读模式

当我们切换为写模式时,position 会初始化到0位置,且 limit 会初始化到 position 之前的位置

在读模式下,可以通过 #get() 方法来获取缓冲区的内容,该方法会将 position+1

#get(int i) 方法会获取缓冲区内索引为i的内容,但该方法不会将 position+1

#rewind() 方法可以将 position、limit和capability恢复初始化

读模式

方法演示

#clear() 只是对position、limit、mark进行重置,而 #compact() 在对 position 进行设置,以及 limit、mark 进行重置的同时,还涉及到数据在内存中拷贝(会调用arraycopy)。所以compact比clear更耗性能。#compact() 能保存你未读取的数据,将新数据追加到为读取的数据之后;而 #clear() 则不行,若你调用了 #clear() ,则未读取的数据就无法再读取到了

用到的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import java.nio.ByteBuffer;

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.MathUtil.*;

public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

方法演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestByteBufferReadWrite {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0x61);
debugAll(buffer); // 结果1
buffer.put(new byte[]{0x62, 0x63, 0x64});
debugAll(buffer); // 结果2

//System.out.println(buffer.get()); // 0

buffer.flip(); //切换为读模式 此时position变为0 limit变为4
System.out.println(buffer.get());
debugAll(buffer); // 结果3

buffer.compact(); // 切换为写模式 将之前读取的移除 未读取的前移
debugAll(buffer); // 此时3位置依然是64 但是由于position为3 下次写入时会覆盖64 结果4

buffer.put(new byte[]{0x65, 0x66});
debugAll(buffer); // 结果5
}
}

结果1如下:成功在缓冲区中存入一个数据,且 position+1

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00 |a......... |
+--------+-------------------------------------------------+----------------+

结果2如下:成功在缓冲区中存入三个数据,且 position+3

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+

结果3如下:flip切换为读模式后,limit=4,获取缓冲区 position=0 位置的内容,且将 position+1

1
2
3
4
5
6
7
8
97
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+

结果4如下:由于61已读,因此从缓冲区中移除,剩余内容向前拷贝,position=3的位置(64)下次写入时覆盖

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 64 00 00 00 00 00 00 |bcdd...... |
+--------+-------------------------------------------------+----------------+

结果5如下:可以看出64已经被覆盖了,且 position=5

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 65 66 00 00 00 00 00 |bcdef..... |
+--------+-------------------------------------------------+----------------+

字符串和ByteBuffer的转换

  • 直接转换方式
  • 通过 Charset 转换
  • 通过 #wrap() 转换

直接转换方式

1
2
3
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello".getBytes());
debugAll(buffer);

结果如下

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+

通过 Charset 转换

1
2
ByteBuffer utf_buffer = StandardCharsets.UTF_8.encode("hello"); // 会自动切换到读模式 且长度和hello一致
debugAll(utf_buffer);

结果如下:#encode() 方法会自动切换到读模式,且可以看出缓冲区大小和字符串长度一致

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+

通过 #wrap() 转换

1
2
ByteBuffer wrap_buffer = ByteBuffer.wrap("hello".getBytes()); // 同Charset
debugAll(wrap_buffer);

结果如下:和 Charset 方法一致

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+

ByteBuffer转字符串

1
StandardCharsets.UTF_8.decode(utf_buffer).toString()

需要注意的是,当我们使用直接转换方式时,我们首先需要使用 #flip() 方法将缓冲区转换为读模式

粘包和半包

场景

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔

但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello,world\n
  • I’m Zhangsan\n
  • How are you?\n

变成了下面的两个 byteBuffer (粘包,半包)

  • Hello,world\nI’m Zhangsan\nHo
  • w are you?\n

出现原因

粘包

发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去

半包

接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象

解决办法

  • 通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。

    注意

    :get(index)不会改变position的值

    • 记录该段数据长度,以便于申请对应大小的缓冲区
    • 将缓冲区的数据通过get()方法写入到target中
  • 调用compact方法切换模式,因为缓冲区中可能还有未读的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestByteBufferExam {
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI’m Zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}

private static void split(ByteBuffer source) {
source.flip(); //切换为读模式
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整的记录
if (source.get(i) == '\n') {
int length = i - source.position() + 1;
// 将记录存入新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
// 从 source 读,向 target 写
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); //切换为写模式 由于可能存在未读到的字符 因此不能采用clear()
}
}

网络编程

阻塞

  • 阻塞模式下,部分方法会使线程暂停运行
    • ServerSocketChanner.accept() 会在没有连接建立时暂停线程
    • SocketChannel.read() 会在通道中没有数据可读时让线程暂停
    • 阻塞期间线程暂停,不会占用CPU
  • 单线程下,阻塞方法之间互相影响,无法实现正常工作

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 2. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 3. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 4. 连接集合 存在多个客户端连接服务器
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 5. accept 建立与客户端的连接 SocketChannel用来和客户端通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法 线程停止运行
log.debug("connected..., {}", sc);

channels.add(sc);
// 6. 接收客户端发送的数据
for (SocketChannel channel : channels) {
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法 线程停止运行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}", channel);
}
}
}
}

客户端代码

1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting.."); //debug
}
}

运行过程

首先我们启动服务端代码,可以在控制台看到输出如下

#accept()阻塞

可以看出此时进程在 SocketChannel sc = ssc.accept(); 处阻塞,无法向下执行。

当有新的客户端与服务端建立连接时,线程会继续向下执行。

此时,我们以 debug 模式启动客户端,控制台输出如下

#read()阻塞

通过控制台我们可以了解到此时连接服务端的客户端端口为 4309

在这里我们可以看到此时进程在 channel.read(buffer); 处阻塞

我们通过手动方式在客户端写入数据传输到服务端

数据传输

此时的控制台输出如下

读取数据

服务端读取数据后,重新在 SocketChannel sc = ssc.accept(); 处阻塞

我们重新对服务端传输数据 “hi”,此时服务端没有变化

这是由于服务端阻塞在 ServerSocketChanner.accept() 时,需要有新的客户端连接才能继续执行线程

此时我们再以 debug 的方式启动另一个客户端,此时控制台日志如下

新的客户端连接

可以看出此时线程已经成功读取由 4309 端口客户端发送的信息”hi”

而由于 13625 端口的客户端未向服务端发送信息,服务端再次阻塞

综上,单线程阻塞模式下,由于服务端阻塞方法之间相互影响,服务端对客户端的处理结果并不理想

非阻塞

  • 通过 ServerSocketChannel#configureBlocking(false) 将 ServerSocketChannel 设置为非阻塞模式,若没有客户端与服务端建立连接,方法返回 null
  • 通过 SocketChannel#configureBlocking(false) 将 SocketChannel 设置为非阻塞模式,若通道中没有数据可读时,方法返回0
  • 非阻塞模式下,即使没有连接建立和可读数据,线程仍然会运行,浪费CPU
  • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 2. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 将ServerSocketChannel切换为非阻塞模式
// 3. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 4. 连接集合 存在多个客户端连接服务器
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 5. accept 建立与客户端的连接 SocketChannel用来和客户端通信
SocketChannel sc = ssc.accept(); // 非阻塞 如果没有建立连接 返回null
if (sc != null) {
log.debug("connected..., {}", sc);
sc.configureBlocking(false); // 将SocketChannel切换为非阻塞模式
channels.add(sc);
}
// 6. 接收客户端发送的数据
for (SocketChannel channel : channels) {
int content = channel.read(buffer);// 非阻塞 如果没有读取到数据 返回0
if (content > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}", channel);
}
}
}
}
}

客户端代码

1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting..");
}
}

运行过程

首先我们启动服务端,并以 debug 模式启动三个客户端,控制台如下

非阻塞模式启动

可以看出三台客户端启动分别占据 9120,9127,9133 三个端口,且启动过程中没有阻塞

此时我们通过端口为 9133 的客户端发送一条数据 “Java”,通过端口为 9127 的客户端发送一条数据 “Javascript”,控制台如下

客户端发送数据

可以看出客户端发送数据也并没有受到影响

综上,通过非阻塞模式,服务端处理客户端的连接和信息接收有所改进,但是由于服务端是通过 while(true) 来时刻监听客户端的连接和发送数据,而大部分时间并没有客户端连接和发送数据,这就导致 CPU 资源被浪费

Selector

单线程配合 Selector 完成对多个 channel 可读写事件的监控,称为多路复用

多路复用仅针对网络IO,普通文件IO没法利用多路复用

如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 可以保证

  • 有可连接事件时才去连接
  • 有可读事件时才去读取
  • 有可写入事件时采取写入

accept事件

  • 通过 SelectionKey#register() 方法来向 Selector 注册 channel,通道必须为非阻塞模式
  • 通过 SelectionKey#interestOps() 方法来设置 key 关注的事件
    • accept:在有连接请求时触发
    • connect:客户端在连接建立时触发
    • read:可读事件
    • write:可写事件
  • Selector#select() 方法会在没有事件时阻塞

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1.创建selector 管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 2.建立selector和channel的联系(注册)
// 可以通过SelectionKey获取事件以及事件发生的channel
SelectionKey sscKey = ssc.register(selector, 0, null);
// key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key: {}", sscKey);

ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3.select方法 没有事件发生 线程阻塞 由事件发生线程恢复运行
selector.select();
// 4.处理事件 selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("key: {}", key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
log.debug("SocketChannel: {}", sc);
}
}
}
}

运行过程

启动服务器,并以 debug 模式启动一个客户端,控制台如下

selector

可以看出我们向 selector 注册后返回的 key 和当发生 accept 事件后返回的 key 是一致的

如果,我们对事件不进行处理,即 ServerSocketChannel#accept() 方法不执行,那么 Selector#select() 方法将不会阻塞

一个事件要么被处理,要么被取消,否则线程将不会阻塞

取消事件采用 ServerSocketChannel#canel() 方法

read事件

  • 通过 SelectionKey#isAcceptable()SelectionKey#isConnectable()SelectionKey#isReadable()SelectionKey#isWritable() 来分别对事件进行判断,实现不同的业务逻辑

  • 处理完一个事件后,需要将事件从 selectedKeys 集合移除

  • 客户端断开时,需要进行 read 事件处理

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1.创建selector 管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 2.建立selector和channel的联系(注册)
// 可以通过SelectionKey获取事件以及事件发生的channel
SelectionKey sscKey = ssc.register(selector, 0, null);
// key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key: {}", sscKey);

ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3.select方法 没有事件发生 线程阻塞 由事件发生线程恢复运行
selector.select();
// 4.处理事件 selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("key: {}", key);
// 5.区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("SocketChannel: {}", sc);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int content = channel.read(buffer);
if (content == -1) {
// 客户端正常断开
log.debug("客户端正常断开...");
key.cancel();
} else {
buffer.flip();
debugRead(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 当客户端强制断开的时候 会有一个read事件(从selectedKey中删除)
}
}
// 6.移除key
iter.remove();
}
}
}
}

过程分析

selector 中有两个集合,一个存放的是注册到 selector 中的 channel 集合 registerKeys,一个是触发事件的 channel 集合 selectedKeys

当我们使用 SelectionKey#register() 向 selector 注册 channel 时,会将一个 key 存入 registerKeys 集合中

监听accept事件

此时我们将一个监听 accept 事件的 key@45fe3ee3 注册到 selector 中

当客户端连接时,即监听到 accept 事件时,会将 registerKeys 中的这个 key 复制一份存入到 selectedKeys

同理,当将 SocketChannel 注册到 selector 后指定监听 read 事件时,也会将一个监听 read 事件的 key@7fbe847c 注册到 registerKeys 中

当客户端向服务端发送数据时,服务端会监听到 read 事件,此时会遍历 selectedKeys 集合,并执行相应 key 的业务逻辑

监听read事件

由于 selectedKeys 在处理完事件后,并不会将 key 从中移除

若我们不主动将 key 从 selectedKeys 中移除,那么在客户端发送数据时,服务端遍历 selectedKeys 时会有一个 key@45fe3ee3 和一个 key@7fbe847c ,但由于我们已经处理过 key@45fe3ee3 的 accept 事件,执行 ServerSocketChannel#accept()返回一个 null, 此时会产生空指针异常

因此,当我们处理完一个事件后,需要将这个 key 从 selectedKeys 中移除

需要注意的是,当我们强制终止客户端时,会产生 IOException 异常,我们需要对这个异常进行捕获处理

由于客户端在强制断开和正常断开时,都会产生一个 read 事件,我们需要对这个 read 事件进行处理

因此在 catch 块中,采用 cancel() 方式将事件取消

同理,当客户端正常断开时,SocketChannel#read() 的返回结果为-1,此时将事件取消即可

消息边界处理

当缓冲区大小为4字节,传输2个汉字时,服务端通过 decode 解码会出现乱码

1
2
3
ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));

这是因为 UTF-8 字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题。

消息边界问题

  1. 消息长度大于缓冲区大小,缓冲区无法直接传输消息
  2. 消息长度小于缓冲区大小,缓冲区发送半包或粘包现象

消息边界问题

解决方式

  1. 固定消息长度。服务端和客户端约定按一个固定大小传输数据,若客户端发送数据长度较小,则需要填充至固定长度,产生带宽浪费
  2. 分隔符拆分。通过约定分隔符来对数据进行拆分,需要对数据一个个字符进行匹配比较,效率低
  3. 采用 TLV 格式,即 Type、Length、Value。在消息头中存放后面内容的长度。缺点是 buffer 需要提前分配,如果内容过大则会影响server吞吐量

这里演示第二种方式

过程分析

我们需要考虑两个内容

  1. ByteBuffer 容量不够时,我们需要对 ByteBuffer 进行扩容,且将新的 ByteBuffer 替换原有的 ByteBuffer
  2. ByteBuffer的作用域
    • 如果 ByteBuffer 的作用域仅仅是在 if-else 中,那么当传输内容较大时,buffer 中没有分隔符,那么在下一次传输时,采用的是新的 buffer ,之前的内容就会丢失
    • 如果 ByteBuffer 的作用域是全局作用域,那么如果我们同时有多个客户端传输数据,则所有数据存在一个 buffer 中,无法区分
    • 综上,我们需要为每个 channel 维护一个独立的 ByteBuffer

当我们在注册 channel 时,采用的方法为 SocketChannel#register(Selector sel, int ops, Object att) ,我们可以在第三个参数中传入 buffer ,将其作为一个附件和 key 相关联,这样便解决了第二个问题,每个 channel 都维护了一个独立的 buffer

1
2
3
4
// 由于每个客户端需要独立的 ByteBuffer 因此将ByteBuffer也注册到key中
ByteBuffer buffer = ByteBuffer.allocate(16);
// 将buffer作为附件 关联到selectedKey中
SelectionKey scKey = sc.register(selector, 0, buffer);

当触发 read 事件时,我们可以从 selectedKeys 中获取 buffer

1
2
3
SocketChannel channel = (SocketChannel) key.channel();
// 获取key上的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();

当需要分割的数据大于缓冲区大小时,我们需要对 buffer 进行扩容。如果调用 ByteBuffer#compact() 后,position 的位置和 limit 的位置相等,说明当前 buffer 空间已经满了,需要对 buffer 容量进行扩容,此时创建新的 buffer,其容量大小为原来的两倍并将原来 buffer 的内容拷贝到新的 buffer 中。最后需要将新的 buffer 关联到 key 上。

1
2
3
4
5
6
7
if (buffer.position() == buffer.limit()) {
// 说明当前byteBuffer满了 需要扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip(); //切换为写模式
newBuffer.put(buffer);
key.attach(newBuffer); //替换附件
}

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Slf4j
public class Server {
private static void split(ByteBuffer source) {

source.flip(); //切换为读模式

for (int i = 0; i < source.limit(); i++) {
// 找到一条完整的记录
if (source.get(i) == '\n') {
int length = i - source.position() + 1;
// 将记录存入新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
// 从 source 读,向 target 写
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); //切换为写模式 由于可能存在未读到的字符 因此不能采用clear()
}
public static void main(String[] args) throws IOException {
// 1.创建selector 管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 2.建立selector和channel的联系(注册)
// 可以通过SelectionKey获取事件以及事件发生的channel
SelectionKey sscKey = ssc.register(selector, 0, null);
// key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key: {}", sscKey);

ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3.select方法 没有事件发生 线程阻塞 由事件发生线程恢复运行
selector.select();
// 4.处理事件 selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("key: {}", key);
// 5.区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
// 由于每个客户端需要独立的 ByteBuffer 因此将ByteBuffer也注册到key中
ByteBuffer buffer = ByteBuffer.allocate(16);
// 将buffer作为附件 关联到selectedKey中
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("SocketChannel: {}", sc);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
// 获取key上的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int content = channel.read(buffer);
if (content == -1) {
// 客户端正常断开
log.debug("客户端正常断开...");
key.cancel();
} else {
split(buffer);
if (buffer.position() == buffer.limit()) {
// 说明当前byteBuffer满了 需要扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip(); //切换为写模式
newBuffer.put(buffer);
key.attach(newBuffer); //替换附件
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 当客户端强制断开的时候 会有一个read事件(从selectedKey中删除)
}
}
// 6.移除key
iter.remove();
}
}
}
}

ByteBuffer的大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
  • 分配思路可以参考
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

write事件

服务器向客户端传输数据时,由于网络的传输能力有限,无法一次性将 buffer 的内容写入 channel 传输,往往需要分多次写入

  1. 需要传输时,我们首先进行一次写操作,将 buffer 的内容写入 channel 中

    1
    2
    int write = sc.write(buffer);
    System.out.println(write);
  2. 如果 buffer 还有剩余内容没有写入 channel 中,则关注 write 事件,并将 buffer 作为附件挂载到 key 上

    1
    2
    3
    4
    if (buffer.hasRemaining()) {
    key.interestOps(key.interestOps() + SelectionKey.OP_WRITE); //保持原有关注事件的基础上 关注 write 事件
    key.attach(buffer);
    }
  3. 此时会监听 write 事件,当 buffer 中的内容未完全写完时,会重复执行 write 事件

  4. 当 buffer 中的内容写完时,则将 key 上的 buffer 附件移除,并取消注册 write 事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    if (key.isWritable()) {
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    SocketChannel sc = (SocketChannel) key.channel();

    int write = sc.write(buffer);
    System.out.println(write);

    if (!buffer.hasRemaining()) {
    key.attach(null); //需要清除buffer
    key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); //不需要关注write事件
    }
    }

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

ssc.bind(new InetSocketAddress(8080));

while (true) {
selector.select();
Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
while (itr.hasNext()) {
SelectionKey key = itr.next();
itr.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, 0, null);
key.interestOps(SelectionKey.OP_READ);
// 1.向客户端发送大量请求
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300000000; i++) {
sb.append("a");
}
ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());
// 2.返回值代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
// 3.判断是否还有剩余内容
if (buffer.hasRemaining()) {
// 4.关注可写事件
// 表示即关注原有事件(read) 同时也关注write事件
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
// 5.把未写完的数据挂到 key 上
key.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();

int write = sc.write(buffer);
System.out.println(write);

// 6.清除
if (!buffer.hasRemaining()) {
key.attach(null); //需要清除buffer
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); //不需要关注write事件
}
}
}
}

}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));

int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}

多线程

单线程的缺陷:当所有客户端都由一个线程来处理时,如果某一个客户端的任务执行时间过长,那么其它所有客户端的任务都将会被阻塞

因此需要充分利用 CPU ,通过多线程来实现服务端

  • 单线程分配一个 Selector(Boss):专门用于处理 Accept 事件
  • 其余线程各自分配一个选择器 Selector(Worker):轮询的方式处理 Read 事件

实现思路

  • 创建一个负责处理 Accept 事件的 Boss 线程,与多个负责处理 Read 事件的 Worker 线程

  • Boss 线程执行的操作

    • 接受并处理 Accepet 事件,当 Accept 事件发生后,调用 Worker#register(SocketChannel socket) 方法,让 Worker 去处理 Read 事件,其中需要**根据标识 index 去判断将任务分配给哪个 Worker **

      1
      2
      3
      4
      5
      6
      7
      8
      9
      // 创建固定数量的workder
      Worker[] workers = new Worker[2];
      for (int i = 0; i < workers.length; i++) {
      Worker worker = new Worker("worker-" + i);
      }
      AtomicInteger index = new AtomicInteger(0);

      // 轮询的方式分配任务
      workers[index.getAndIncrement() % workers.length].register(sc);
    • Worker#register(SocketChannel socket) 方法会通过同步队列完成 Boss 线程与 Worker 线程之间的通信,让 SocketChannel 的注册任务被 Worker 线程执行。添加任务后需要调用 selector.wakeup() 来唤醒被阻塞的 Selector

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      // #register() 方法
      // 向任务队列中添加任务
      queue.add(() -> {
      try {
      sc.register(selector, SelectionKey.OP_READ, null);
      } catch (ClosedChannelException e) {
      e.printStackTrace();
      }
      });
      selector.wakeup(); // 唤醒selector

      // #run()方法
      // 执行任务 sc.register(selector, SelectionKey.OP_READ, null);
      Runnable task = queue.poll();
      if (task != null) {
      task.run();
      }
    • 为什么要用同步队列和 selector.wakeup()?这是因为 Selector#selector() 方法会阻塞线程,该方法会影响 SocketChannel 的注册。我们需要将 Selector#selector() 和 SocketChannel 的注册在同一个线程中执行。通过任务队列将注册任务存放在队列当中,通过 selector.wakeup() 来唤醒阻塞的 Worker 线程执行任务队列中的任务,则可以实现 Read 事件的监听,在下一次循环的时候仍然阻塞在 Selector#select() 方法上监听 read 事件。

  • Worker线程从任务队列中获取注册任务,注册 read 事件的监听

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));

// 创建固定数量的workder
Worker[] workers = new Worker[2];
for (int i = 0; i < workers.length; i++) {
Worker worker = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger(0);

while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected... {}", sc.getRemoteAddress());
// 关联 selector
log.debug("before register...{}", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after register...{}", sc.getRemoteAddress());

}
}
}
}

static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;

private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化线程和selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
selector = Selector.open();
thread = new Thread(this, name);
thread.start();
start = true;
}

// 向任务队列中添加任务
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 唤醒selector
}

@Override
public void run() {
while (true) {
try {
selector.select();

// 执行任务 sc.register(selector, SelectionKey.OP_READ, null);
Runnable task = queue.poll();
if (task != null) {
task.run();
}

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

客户端代码

1
2
3
4
5
6
7
8
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
System.in.read();
}
}

NIO 和 BIO

Stream和Channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行
    • 虽然Stream是单向流动的,但是它也是全双工的

IO模型

阻塞IO

阻塞IO

用户线程进行 read 操作时,需要等待操作系统执行实际的 read 操作,此期间用户线程是被阻塞的,无法执行其他操作

非阻塞IO

非阻塞IO

用户线程在一个循环中一直调用 read 方法,若内核空间中还没有数据可读,立即返回

用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果

只是在等待阶段非阻塞

多次在用户程序空间和内核空间之间切换,效率依旧不理想

多路复用

多路复用

当没有事件是,调用select方法会被阻塞,一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

多路复用和阻塞IO的区别?

  • 阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件
    • 例:c1发起 read 请求,c2发起 accept 请求,c3发起 read 请求,则需要分别对这三个请求进行处理
  • 多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行
    • 例:c1发起 read 请求,c2发起 accept 请求,c3发起 read 请求,三个请求会一起被发送到用户程序空间,并执行相应的操作

异步非阻塞IO

异步非阻塞io

线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果

当方法的运行结果出来以后,由线程2将结果返回给线程1

零拷贝

详细参考:https://xiaolincoding.com/os/8_network_system/zero_copy.html#%E4%B8%BA%E4%BB%80%E4%B9%88%E8%A6%81%E6%9C%89-dma-%E6%8A%80%E6%9C%AF

零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);Copy

内部工作流如下

IO模型内部流程

  • Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  • 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA

  • 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝

  • 接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10)

    • 底层对应 HeapByteBuffer,使用的还是 Java 内存
  • ByteBuffer.

    allocateDirect

    (10)

    • 底层对应DirectByteBuffer,使用的是操作系统内存

优化1

大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用

  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
      • 当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存
      • DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化1

以下两种方式都是零拷贝,即无需将数据拷贝到用户缓冲区中(JVM内存中)

底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

优化2

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
  • 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

这种方法下

  • 只发生了1次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化2

linux 2.4 对上述方法再次进行了优化

优化3

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  • 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次