Netty

优化

序列化

序列化和反序列化主要用在对消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据
  • 反序列化时,需要将传入的正文数据还原成 Java 对象

目前我们的代码仅支持 Java 自带的序列化和反序列化方式

1
2
3
4
5
6
7
8
9
// 序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();

// 反序列化
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content));
Message message = (Message) ois.readObject();

为了后续能更好的扩展序列化的使用,我们抽象一个 Serializer 接口,专门用于处理序列化的方式

1
2
3
4
5
6
7
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);

// 序列化方法
<T> byte[] serialize(T object);
}

并采用枚举类的方式,来实现不同的序列化方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 用于扩展序列化、反序列化算法
*/
public interface Serializer {

// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);

// 序列化方法
<T> byte[] serialize(T object);

enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}

@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},

Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
String json = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(json, clazz);
}

@Override
public <T> byte[] serialize(T object) {
String json = new Gson().toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
}

设置 Config 类,通过读取 application.properties 中的 serializer.algorithm 来选择序列化方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}

修改之前 MessageCodecSharable 类中的编码方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Message message, List<Object> list) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
// 设置4个字节的魔数
byteBuf.writeBytes(new byte[] {0, 6, 1, 8});
// 设置1个字节的版本号
byteBuf.writeByte(1);
// 设置1个字节的序列化方式
byteBuf.writeByte(Config.getSerializerAlgorithm().ordinal());
// 设置1个字节的指令方式
byteBuf.writeByte(message.getMessageType());
// 设置4个字节的请求序列号
byteBuf.writeInt(message.getSequenceId());
// 补齐字节 到2的次方
byteBuf.writeByte(0);

// 获得序列化后的msg
byte[] bytes = Config.getSerializerAlgorithm().serialize(message);

// 用4个字节标识正文长度
byteBuf.writeInt(bytes.length);
// 设置正文
byteBuf.writeBytes(bytes);
list.add(byteBuf);
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int magicNum = byteBuf.readInt(); // 魔数
byte version = byteBuf.readByte(); // 版本号
byte serializedType = byteBuf.readByte(); // 序列化方式
byte msgType = byteBuf.readByte(); // 指令方式
int sequenceId = byteBuf.readInt(); // 请求序列号
byteBuf.readByte(); // 补齐字符

int length = byteBuf.readInt(); // 长度
byte[] content = new byte[length]; // 内容
byteBuf.readBytes(content, 0, length);

// 找到反序列化时的算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializedType];
// 确定具体的消息类型
Class<?> messageClass = Message.getMessageClass(msgType);
Object message = algorithm.deserialize(messageClass, content);

list.add(message);

log.debug("magicNum: {}", magicNum);
log.debug("version: {}", version);
log.debug("serializedType: {}", serializedType);
log.debug("msgType: {}", msgType);
log.debug("sequenceId: {}", sequenceId);
}
}

最后我们编写测试类来对编码方式进行测试

1
2
3
4
5
6
7
8
9
10
public class TestSerializer {
public static void main(String[] args) {
MessageCodecSharable CODEC = new MessageCodecSharable();
LoggingHandler LOGGING = new LoggingHandler();
EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);

LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(message);
}
}

这里加了两个 LoggingHandler 是为了查看序列化前后的内容

结果如下,通过 Json 序列化的方式传输的对象内容更小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
=================================== Java ===================================
14:37:36 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 220B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................|
|00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen|
|00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log|
|00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage|
|00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa|
|00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l|
|00000060| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 |ang/String;L..us|
|00000070| 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1c 63 |ernameq.~..xr..c|
|00000080| 6f 6d 2e 63 68 65 6e 78 69 69 69 2e 6d 65 73 73 |om.chenxiii.mess|
|00000090| 61 67 65 2e 4d 65 73 73 61 67 65 d6 b9 3e a7 6d |age.Message..>.m|
|000000a0| c9 2f 2f 02 00 02 49 00 0b 6d 65 73 73 61 67 65 |.//...I..message|
|000000b0| 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 |TypeI..sequenceI|
|000000c0| 64 78 70 00 00 00 00 00 00 00 00 74 00 03 31 32 |dxp........t..12|
|000000d0| 33 74 00 08 7a 68 61 6e 67 73 61 6e |3t..zhangsan |
+--------+-------------------------------------------------+----------------+
=================================== Json ===================================
14:30:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 87B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 06 01 08 01 01 00 00 00 00 00 00 00 00 00 47 |...............G|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |":"123","sequenc|
|00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000050| 79 70 65 22 3a 30 7d |ype":0} |
+--------+-------------------------------------------------+----------------+

参数优化

CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 的参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class TestTimeOut {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync(); // 断点1
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}
  • 客户端通过 Bootstrap#option() 方法来配置参数,配置参数作用于 SocketChannel
  • 服务器通过 ServerBootstrap 来配置参数,但是对于不同的 Channel 需要选择不同的方法
    • 通过 ServerBootstrap#option() 来配置 ServerSocketChannel 上的参数
    • 通过 ServerBootstrap#childOption() 来配置 SocketChannel 上的参数

源码分析

在 AbstractNioChannel#connect() 中有这么一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 获取配置的超时时间,即通过option方法传入的CONNECT_TIMEOUT_MILLIS值
int connectTimeoutMillis = AbstractNioChannel.this.config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
// 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
AbstractNioChannel.this.connectTimeoutFuture = AbstractNioChannel.this.eventLoop().schedule(new Runnable() {
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
// 这里的Promise和主线程返回结果的Promise是同一个对象
// 也就是说通过这里的Promise对象返回结果给主线程
if (connectPromise != null && !connectPromise.isDone() && connectPromise.tryFailure(new ConnectTimeoutException("connection timed out: " + remoteAddress))) {
AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
}

}
}, (long)connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

SO_BACKLOG

  • 第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列
  • 当完成三次握手以后,连接会被放入全连接队列中
  • 服务器处理 Accept 事件是在 TCP 三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理
三次握手和连接队列

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

源码调试

测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestBacklogServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.option(ChannelOption.SO_BACKLOG, 2)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {

}
}).bind(8080);
}
}

在 NioEventLoop#processSelectedKey() 中有如下代码

1
2
3
if ((readyOps & 17) != 0 || readyOps == 0) {
unsafe.read(); // debug
}

其中 readOpts & 17 表示接收到 accept 事件,然后通过 unsafe.read() 从全连接队列中获取客户端连接

我们在unsafe.read() 打上断点,让服务端无法从全连接队列中处理客户端连接

最后,我们打开三个客户端去连接服务端,结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

其余参数

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
  • 该参数用于指定接收方与发送方的滑动窗口大小

ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存
  • 设置的是 handler 内部分配的 ByteBuf 类型
  • 网络 IO 读取数据时,直接内存比堆内存效率要高,此时 Netty 会采用直接内存

RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 Netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

RPC 简单框架

准备工作

在原有 Message 类上添加 RPC 请求码和 RPC 响应码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
public abstract class Message implements Serializable {

// 省略旧的代码

public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}

}

RPC 消息请求类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;

public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}

RPC 消息响应类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}

服务端基础框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// rpc 请求消息处理器,待实现
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端基本框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}

通过接口获取对象实例的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ServicesFactory {

static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
Class<?> interfaceClass = Class.forName(name);
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}

public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}

相关配置

1
2
serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

RpcRequestMessageHandler 的设计

重点:通过反射调用方法

当客户端通过网络请求向服务端发送调用方法信息时,服务端需要执行方法并将方法的返回信息传递给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception {
// 返回给客户端的内容
RpcResponseMessage response = new RpcResponseMessage();
// 保证和客户端发送内容id一致
response.setSequenceId(message.getSequenceId());
try {
// 方法调用 -> 反射
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
Object invoke = method.invoke(service, message.getParameterValue());
response.setReturnValue(invoke);
} catch (Exception e) {
// 远程调用失败
e.printStackTrace();
response.setExceptionValue(new Exception("远程调用错误:" + e.getCause().getMessage()));
}
// 将返回结果写回客户端
ctx.writeAndFlush(response);
}
}

通过以上方法,可以成功实现在客户端远程调用方法

客户端调用测试如下

1
2
3
4
5
6
7
8
9
10
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.writeAndFlush(new RpcRequestMessage(
1,
"com.chenxiniubi.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
));
channel.closeFuture().sync();

RpcClientManager 的设计

重点:重构客户端代码实现远程调用

重点:通过代理类简化远程调用的过程

从以上测试用例来看,我们远程调用方法过于复杂,而实际情况中,我们希望调用方法更为简洁

1
2
HelloService service = getXxx();
service.sayHello("张三");

接下来我们需要对客户端进行重构

首先,我们后续通过 channel 向服务端发送信息,需要将 channel 抽离出来作为一个对象供后续使用

而在原有的客户端代码上,channel.closeFuture().sync(); 会阻塞在关闭 channel 上,因此需要对此重构,以异步的方式关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 初始化channel
*/
private static Channel channel;
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}

为了保证 channel 的唯一性,需要保证采用单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 单例 Channel 获取
*
* @return
*/
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (LOCK) {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}

最后,为了保证用户能简介的调用方法

我们将创建消息对象和发送消息对象交给一个代理类来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static <T> T getProxyInstance(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
int id = SequenceIdGenerator.nextId();
// 将方法调用转换为消息对象
RpcRequestMessage msg = new RpcRequestMessage(
id,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 将消息对象发出去
getChannel().writeAndFlush(msg);
// 暂时返回null
return null;
});
return (T) o;
}

此时我们已经可以实现简单的调用远程方法

1
2
3
4
5
public static void main(String[] args) {
HelloService service = getProxyInstance(HelloService.class);
System.out.println(service.sayHello("张三"));
System.out.println(service.sayHello("李四"));
}

RpcResponseMessageHandler 的设计

重点:通过 Promise 实现线程之间数据的传输

我们在客户端远程调用方法后,服务端会给我们传递一个 response 返回,里面包含我们所需要的信息

而我们通过 RpcResponseMessageHandler 来接收消息,是在 NIO 线程中接收消息,并不是在主线程中接收消息

因此需要还需要将消息传递给主线程,这里采用的是 Promise 的方式

当我们的 RpcClientManager 发送消息后,创建一个空的 Promise,等待返回结果

而 RpcResponseMessageHandler 会在这个空的 Promise 写入内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
// 定义一个Map 保存了sequenceId对应的Promise
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
// 为了避免Map内容无限扩大,我们需要在给某个Promise传递数据后删除
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
// 如果不包含异常则说明成功调用
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
}
}

RpcClientManager 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static <T> T getProxyInstance(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
int id = SequenceIdGenerator.nextId();
// 将方法调用转换为消息对象
RpcRequestMessage msg = new RpcRequestMessage(
id,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 将消息对象发出去
getChannel().writeAndFlush(msg);

// 新建一个空的Promise 用于保存返回内容 指定promise对象异步接收结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
RpcResponseMessageHandler.PROMISES.put(id, promise);

// 等待返回结果
promise.await();

if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}

全部代码

RpcServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});

Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("server error", e);
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}

RpcClientManager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Slf4j
public class RpcClientManager {
private static Channel channel;
private static final Object LOCK = new Object();

public static void main(String[] args) {
HelloService service = getProxyInstance(HelloService.class);
System.out.println(service.sayHello("张三"));
System.out.println(service.sayHello("李四"));

}

public static <T> T getProxyInstance(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
int id = SequenceIdGenerator.nextId();
// 将方法调用转换为消息对象
RpcRequestMessage msg = new RpcRequestMessage(
id,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 将消息对象发出去
getChannel().writeAndFlush(msg);

// 新建一个空的Promise 用于保存返回内容 指定promise对象异步接收结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
RpcResponseMessageHandler.PROMISES.put(id, promise);

// 等待返回结果
promise.await();

if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}


/**
* 单例 Channel 获取
*
* @return
*/
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (LOCK) {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}

/**
* 初始化channel
*/
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}

RpcRequestMessageHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception {
// 返回给客户端的内容
RpcResponseMessage response = new RpcResponseMessage();
// 保证和客户端发送内容id一致
response.setSequenceId(message.getSequenceId());
try {
// 方法调用 -> 反射
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
Object invoke = method.invoke(service, message.getParameterValue());
response.setReturnValue(invoke);
} catch (Exception e) {
// 远程调用失败
e.printStackTrace();
response.setExceptionValue(new Exception("远程调用错误:" + e.getCause().getMessage()));
}
// 将返回结果写回客户端
ctx.writeAndFlush(response);
}
}

RpcResponseMessageHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
}
}

源码

启动流程

启动流程可分为以下几个部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  1. 获得选择器 Selector ,Netty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器
  2. 创建 NioServerSocketChannel,该Channel作为附件添加到 ServerSocketChannel
  3. 创建 ServerSocketChannel,将其设置为非阻塞模式,并注册到Selector中,**此时未关注事件,但是添加了附件 **NioServerSocketChannel
  4. 绑定端口
  5. 通过 interestOps 设置感兴趣的事件

接下来我们看源码

源码入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

创建 NioServerSocketChannel

我们看 dobind() 方法

1
2
3
4
5
private ChannelFuture doBind(final SocketAddress localAddress) {
// 这里执行了 NioServerSocketChannel 的初始化过程
final ChannelFuture regFuture = this.initAndRegister();
...
}

接着看 initAndRegister() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 在这里创建了 NioServerSocketChannel
channel = this.channelFactory.newChannel();
// 为 NioServerSocketChannel 注册 handler
this.init(channel);
} catch (Throwable var3) {
...
}
...
return regFuture;
}

接着我们看 ReflectiveChannelFactory#newChannel() 方法,它会调用构造器来初始化 NioServerSocketChannel

1
2
3
4
5
6
7
public T newChannel() {
try {
return (Channel)this.constructor.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2);
}
}

参考 NioServerSocketChannel 的构造器,至此 NioServerSocketChannel 对象建立完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
// =================== ServerSocketChannel ====================
// 参考 java.nio.channels.ServerSocketChannel#open() 方法
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}

接着看 ServerBootstrap#init() 方法

会给 NioServerSocketChannel 添加 handler

然而这个 handler 不会马上执行,而是等到 register 后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void init(Channel channel) {
...
// ChannelHandler 这个 handler 只会调用一次
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}

ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}

register

initAndRegister() 方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 在这里创建了 NioServerSocketChannel
channel = this.channelFactory.newChannel();
// 为 NioServerSocketChannel 注册 handler
this.init(channel);
} catch (Throwable var3) {
...
}
// 在这里 register
ChannelFuture regFuture = this.config().group().register(channel);
...
return regFuture;
}

通过不断追踪方法的执行,可以看到方法来到 AbstractChannel#register() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...
// 判断当前线程是否为 NIO 线程
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
// 由于是主线程所以会执行以下内容
try {
// 在这里进行了线程切换,后续内容由NIO线程执行
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
//...
}
}
}

我们继续追踪 AbstractUnsafe.this.register0(promise);

1
2
3
4
5
6
private void register0(ChannelPromise promise) {
...
// 关键代码
AbstractChannel.this.doRegister();
...
}

接着看 AbstractNioChannel#doRegister() 方法

NioServerSocketChannel 作为 ServerSocketChannel附件注册到 selector 上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
// 在这儿里把 NioServerSocketChannel 作为 ServerSocketChannel 的附件注册到 selector 上
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}

注册完成后,会执行之前注册在 NioServerSocketChannel 中的 handler

向其中注册一个 handler,作用是在 accept 事件发生后建立连接

以下过程都是在 nio 线程中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ServerBootstrap#init()
void init(Channel channel) {
...
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
// 向其中再次加入一个 accept handler
// 作用是在 accept 事件发生后建立连接
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}

dobind

在初始化完 NioServerSocketChannel 之后,需要绑定端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// AbstractBootstrap#dobind()
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建和注册NioServerSocketChannel 返回一个异步结果
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 绑定端口号
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}

}
});
return promise;
}
}

追踪 dobind0() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// AbstractChannel#bind()
public final void bind(SocketAddress localAddress, ChannelPromise promise) {
//...
try {
// 绑定端口
AbstractChannel.this.doBind(localAddress);
} catch (Throwable var5) {
this.safeSetFailure(promise, var5);
this.closeIfClosed();
return;
}
if (!wasActive && AbstractChannel.this.isActive()) {
this.invokeLater(new Runnable() {
public void run() {
AbstractChannel.this.pipeline.fireChannelActive();
}
});
}
this.safeSetSuccess(promise);
...
}

NioServerSocketChannel 会判断当前 JDK 的版本

this.javaChannel() 返回的是 ServerSocketChannel 对象

1
2
3
4
5
6
7
8
// NioServerSocketChannel#dobind()
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
this.javaChannel().bind(localAddress, this.config.getBacklog());
} else {
this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
}
}

至此端口绑定完成

绑定 accept 事件

我们还是关注 AbstractChannel#bind() 方法

其中 fireChannelActive() 方法会执行 NioServerSocketChannel 上注册的所有 handler 的 active 事件

此时有的 handler 只有 head,acceptor,tail

其中 acceptor 和 tail 的 channelActive() 并没有做什么工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AbstractChannel#bind()
public final void bind(SocketAddress localAddress, ChannelPromise promise) {
//...
try {
// 绑定端口
AbstractChannel.this.doBind(localAddress);
} catch (Throwable var5) {
this.safeSetFailure(promise, var5);
this.closeIfClosed();
return;
}
if (!wasActive && AbstractChannel.this.isActive()) {
this.invokeLater(new Runnable() {
public void run() {
// 会执行所有 handler 的 active 事件
AbstractChannel.this.pipeline.fireChannelActive();
}
});
}
this.safeSetSuccess(promise);
...
}

我们关注 DefaultChannelPipeline#channelActive() 方法

1
2
3
4
5
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 关键代码
this.readIfIsAutoRead();
}

最后我们看到 AbstractNioChannel#doBeginRead() 方法

在这里我们关注了 OP_ACCEPT 事件

1
2
3
4
5
6
7
8
9
10
11
protected void doBeginRead() throws Exception {
SelectionKey selectionKey = this.selectionKey;
if (selectionKey.isValid()) {
this.readPending = true;
int interestOps = selectionKey.interestOps();
if ((interestOps & this.readInterestOp) == 0) {
// 关注 OP_ACCEPT 事件
selectionKey.interestOps(interestOps | this.readInterestOp);
}
}
}

EventLoop

EventLoop 中有三个重要的组成:selector,线程,任务队列

selector的创建

但是我们看源码的时候会发现,NioEventLoop 中有两个 selector

1
2
private Selector selector;
private Selector unwrappedSelector;

它们会在 NioEventLoop 的构造方法中初始化

1
2
3
4
5
6
7
8
9
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler);
this.provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 初始化
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

那么为什么需要两个 selector 呢?

这是由于原有的 selector 是通过哈希表的方式来遍历 selectedKeys,这样的效率不理想

netty 重新包装了 selector,使其可以通过数组的方式来遍历 selectedKeys,提高了遍历效率

同样为了保证原有的 selector 的部分功能,并没有将其从 NioEventLoop 中移除

线程的启动

当我们首次调用 execute() 方法时,线程会启动

1
2
3
4
5
6
7
8
public class TestEventLoop {
public static void main(String[] args) {
EventLoop eventLoop = new NioEventLoopGroup().next();
eventLoop.execute(() -> { // debug
System.out.println();
});
}
}

我们查看 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是否为 nio 线程
// 第一次启动时是主线程 所以为false
boolean inEventLoop = this.inEventLoop();
this.addTask(task);
if (!inEventLoop) {
// 此时需要启动nio线程
this.startThread();
if (this.isShutdown()) {
boolean reject = false;

try {
if (this.removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException var6) {
}

if (reject) {
reject();
}
}
}

if (!this.addTaskWakesUp && immediate) {
this.wakeup(inEventLoop);
}

}

接着我们看 startThread() 方法,该方法用于避免线程重复启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void startThread() {
// 判断当前是否启动
// 将状态由 未启动1->启动2
if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
boolean success = false;

try {
// 真正的启动线程过程
this.doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, 2, 1);
}

}
}

}

wakeup()

当线程提交任务时,需要从 select 阻塞状态中唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = this.inEventLoop();
this.addTask(task);
if (!inEventLoop) {
this.startThread();
if (this.isShutdown()) {
boolean reject = false;

try {
if (this.removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException var6) {
}

if (reject) {
reject();
}
}
}

if (!this.addTaskWakesUp && immediate) {
// 唤醒阻塞线程
this.wakeup(inEventLoop);
}

}

wakeup() 方法的具体实现

  • 其余线程提交任务时才会调用 wakeup() 方法
  • 由于 wakeup() 是一个重量级的操作,当多个线程访问时其实只需要执行一次,因此使用 CAS 来进行处理,避免被频繁调用
1
2
3
4
5
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && this.nextWakeupNanos.getAndSet(-1L) != -1L) {
this.selector.wakeup();
}
}

空轮询

jdk 在 linux 情况下会出现空轮询

我们看 NioEventLoop#run() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected void run() {
int selectCnt = 0;
// ....
if (!this.hasTasks()) {
// 可能会发生空轮询
strategy = this.select(curDeadlineNanos);
}
++selectCnt;
// ...
if (!ranTasks && strategy <= 0) {
if (this.unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
var34 = false;
} else {
var34 = false;
}
break label791;
}
// ...
}

private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
//...
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 设置阈值大于0且selectCnt值已经大于阈值 说明发生空轮询
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, this.selector);
// 重新创建一个 selector 替换原有的 selector
this.rebuildSelector();
return true;
} else {
return false;
}
}