Netty

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

Netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO

Hello world

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class HelloServer {
public static void main(String[] args) {
// 1.启动器 负责组装netty组件 启动服务器
new ServerBootstrap()
// 2.BossEventLoop WorkerEventLoop(selector, thread) 组成NioEventLoop
.group(new NioEventLoopGroup())
// 3.选择服务器ServerSocketChannel的实现
.channel(NioServerSocketChannel.class)
// 4.boss 负责处理连接 worker(child)负责处理读写
// 这里是worker的处理逻辑 决定worker(child)能执行哪些操作
.childHandler(
// 5.channel代表和客户端数据读写的通道 initializer初始化负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 6.添加具体的Handler
ch.pipeline().addLast(new StringDecoder()); //将ByteBuf转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义handler
@Override
// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
// 7.绑定监听端口
.bind(8080);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1.启动类
new Bootstrap()
// 2.添加EventLoop
.group(new NioEventLoopGroup())
// 3.选择客户端channel事件
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 5.连接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 向服务器发送数据
.writeAndFlush("hello, world");

}
}

执行流程

执行流程

  • channel 可以理解为数据的通道
  • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • handler 可以理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • pipeline 中有多个 handler,处理时会依次调用其中的 handler
    • handler 分 Inbound 和 Outbound 两类
      • Inbound 入站
      • Outbound 出站
  • eventLoop 可以理解为处理数据的工人
    • eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责
    • eventLoop 既可以执行 io 操作,也可以进行任务处理,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • eventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop

组件

EventLoop

事件循环对象 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

它的继承关系如下

  • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

处理普通任务和定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
// 1.创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // io事件 普通任务 定时任务
//EventLoopGroup group = new DefaultEventLoopGroup(); //普通任务 定时任务

// 2.获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

// 3.执行普通任务
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});

// 4.执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("time");
}, 0, 1, TimeUnit.SECONDS);

log.debug("main");
}
}

输出结果如下

1
2
3
4
5
6
7
8
9
10
11
12
-- 可以看出EventLoopGroup对象的next方法可以循环获取EventLoop对象
io.netty.channel.nio.NioEventLoop@1d251891
io.netty.channel.nio.NioEventLoop@48140564
io.netty.channel.nio.NioEventLoop@1d251891
14:18:26.731 [main] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - main
-- 普通任务和定时任务的执行结果
14:18:26.732 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time
14:18:27.746 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time
14:18:27.746 [nioEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - ok
14:18:28.747 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time
14:18:29.744 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time
14:18:30.742 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time

处理IO任务

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// msg: ByteBuf类型
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 1.启动类
Channel channel = new Bootstrap()
// 2.添加EventLoop
.group(new NioEventLoopGroup())
// 3.选择客户端channel事件
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 5.连接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();

System.out.println(channel);
System.out.println(""); //debug

}
}

执行结果

启动服务端代码,并以debug模式启动客户端代码

客户端通过 channel.writeAndFlush() 方法向服务端写入数据 “hello” “world”

结果如下

1
2
14:14:30.026 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - hello
14:14:40.932 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - world

可以看出处理客户端输入的线程是同一个线程

这是由于当客户端一旦和服务端建立连接后, Channel 会和一个 EventLoop 绑定,后续 Channel 的所有请求都会由这个 EventLoop 来处理

分工处理任务

细化1:分为 boss 和 worker

我们可以通过 #group() 方法,将 EventLoopGroup 分为 boss 和 worker

boss 只处理 ServerSocketChannel 上的 accept 事件

woker 只处理 SockerChannel 上的读写事件 一般线程数设置为CPU核心数*2

1
2
3
4
5
6
new ServerBootstrap()
// 分为 boss 和 worker
// boss只负责ServerSocketChannel上的accept事件
// worker只负责SocketChannel上的读写事件 一般是CPU核心数*2 这里设置为2
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).
....;

通过多个客户端向服务端发送消息,结果如下

1
2
3
4
14:54:03.944 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1
14:54:13.397 [nioEventLoopGroup-3-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2
14:54:20.785 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3
14:55:28.642 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1

可以看出,一个 EventLoop 可以负责多个 Channel ,且 EventLoop 一旦与 Channel 绑定,则一直负责处理该 Channel 中的事件

细化2:处理耗时较长的任务

假如某个 EventGroup 绑定了多个 Channel,某个 Channel 中有一项任务耗时较长,那么该任务会阻塞住所有 Channel 中的读写任务

因此,我们可以通过给这个耗时任务重新分配一个 group ,这样可以避免该任务在 nio 线程上阻塞住其它 Channel 的读写任务

我们可以通过 Channel#addLast() 方法指定 handler 所对应的 EventGroup,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
// 增加自定义的group 用于处理耗时较长的任务
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// 分为 boss 和 worker
// boss只负责ServerSocketChannel上的accept事件
// worker只负责SocketChannel上的读写事件 一般是CPU核心数*2 这里设置为2
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("nio-handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// msg: ByteBuf类型
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); //将消息传递给下一个Handler
}
}).addLast(group, "time-handler", new ChannelInboundHandlerAdapter() {
// 将耗时任务指定给自定的group
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

通过多个客户端向服务端发送消息,结果如下

1
2
3
4
5
6
15:03:57.878 [nioEventLoopGroup-4-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1
15:03:57.878 [defaultEventLoopGroup-2-3] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1
15:04:16.904 [nioEventLoopGroup-4-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2
15:04:16.905 [defaultEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2
15:04:26.716 [nioEventLoopGroup-4-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3
15:04:26.717 [defaultEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3

可以看出我们实现了使用不同 group 来处理耗时任务和普通任务

handler的切换

在分工处理中,我们分别使用了 DefaultEventLoopGroup 和 NioEventLoopGroup 来处理不同的任务,那么 handler 是如何在两者之间切换的呢?

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();

// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}

EventExecutor#invokeChannelRead() 方法用于判断当前 handler 中的线程是否和 next.executor() 返回的 EventLoop 绑定的线程是同一个线程

如果二者绑定的是同一个线程则可以直接调用

否则,要把调用代码封装为任务对象,由下一个 handler 来执行

Channel

Channel 的常用方法

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

ChannelFuture的结果处理

客户端代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class ChannelClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
// channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("==={}===", channel);
channel.writeAndFlush("hello world");
}
}

如果我们注释了 channelFuture.sync() 方法,那么当我们启动客户端时,服务端将无法收到消息

这是由于 #connect() 方法是一个异步非阻塞方法,当 main 线程执行 connect() 方法时,会启动一个 nio 线程来连接服务端

如果代码没有在 channelFuture.sync() 处阻塞的话,那么 main 线程会直接获取一个 channel ,在这个 channel 中写入数据,而实际上这个 channel 并没有连接到服务端

我们通过打印这个 channel 可以得到如下结果

1
[main] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0xe416df7f]===

可以看出这个 channel 并没有自身的 ip 信息和主机的 ip 信息

同步处理结果

我们可以采用 ChannelFuture#sync() 方法来阻塞住 main 线程

当 nio 线程成功连接服务端后,继续向下执行

此时我们可以通过打印 channel 的方式查看结果如下

1
15:49:14.767 [main] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0x60434a31, L:/127.0.0.1:3883 - R:localhost/127.0.0.1:8080]===

可以看出 channel 成功连接了服务端

需要注意的是,由于是同步处理结果,因此我们获取 channel 的线程依旧是主线程

异步处理结果

我们可以采用 ChannelFuture#addListener() 来异步处理结果

我们需要在方法中传入一个 ChannelFutureListener 回调对象,该对象会在 nio 线程成功连接服务端后处理返回结果

我们可以通过打印 channel 的方式查看结果如下

1
15:50:23.721 [nioEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0x2e2a8216, L:/127.0.0.1:3691 - R:localhost/127.0.0.1:8080]===

需要注意的是,由于是异步处理结果,因此我们获取 channel 的线程依旧是nio线程

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class ChannelClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 异步非阻塞 main发起调用 但真正执行connect是一个nio线程
.connect(new InetSocketAddress("localhost", 8080));

// 1.使用sync()方法同步处理结果
channelFuture.sync(); //阻塞住当前线程知道nio线程连接建立完毕
// 无阻塞获取channel
// [main] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0xe416df7f]===
Channel channel = channelFuture.channel(); //此时获取channel的是主线程
log.debug("==={}===", channel);
channel.writeAndFlush("hello world");


// 2.使用addListener(回调对象)方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在nio线程连接建立完成后调用
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel(); //此时获取channel的是某个nio线程
// [nioEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0x2e2a8216, L:/127.0.0.1:3691 - R:localhost/127.0.0.1:8080]===
log.debug("==={}===", channel);
channel.writeAndFlush("hello world");
}
});
}
}

CloseFuture处理关闭Channel后的操作

客户端代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class ChannelCloseClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));

channelFuture.sync();
Channel channel = channelFuture.channel();

new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String input = scanner.nextLine();
if ("q".equals(input)) {
channel.close(); // close()方法是一个异步方法
//log.debug("处理之后的工作");
break;
}
channel.writeAndFlush(input);
}
}, "input").start();
}
}

现在我们通过客户端不断向服务端发送数据,知道键盘输入 “q” 为止

当 channel 关闭后,我们还有一些后续操作需要执行,该如何处理?

由于 Channel#close() 方法是一个异步方法,因此如果我们在 channel.close(); 后直接进行处理操作,其实是无效的

我们可以通过 ChannelFuture 来处理关闭后的操作,处理关闭后的操作依旧可以分为同步方式和异步方式

当后续操作执行完毕后,我们需要执行 NioEventLoopGroup#shutdownGracefully() 方法将整个组的线程停止

同步方式处理关闭

和 ChannelFuture 类似,我们可以采用 ChannelFuture#sync() 方法来阻塞住 main 线程

当 channel 关闭后,继续向下执行

我们可以通过打印的方式来查看是否成功阻塞 main 线程

1
2
3
log.debug("waiting close...");
closeFuture.sync();
log.debug("处理关闭后的工作..");

结果如下,当我们未输入 “q” 时,控制台如下所示

1
16:20:15 [DEBUG] [main] c.c.n.c.ChannelCloseClient - waiting close...

当我们输入 “q” 后

1
2
3
4
16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 - R:localhost/127.0.0.1:8080] CLOSE
16:20:36 [DEBUG] [main] c.c.n.c.ChannelCloseClient - 处理关闭后的工作..
16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 ! R:localhost/127.0.0.1:8080] INACTIVE
16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 ! R:localhost/127.0.0.1:8080] UNREGISTERED

可以看出,当 channel 关闭后,执行后续操作的线程是主线程

异步方式处理关闭

同样也采用 ChannelFuture#addListener() 方法来处理异步返回结果

当我们输入 “q” 时,控制台如下所示

1
2
3
4
16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 - R:localhost/127.0.0.1:8080] CLOSE
16:22:55 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.ChannelCloseClient - 处理关闭后的工作..
16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 ! R:localhost/127.0.0.1:8080] INACTIVE
16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 ! R:localhost/127.0.0.1:8080] UNREGISTERED

可以看出,当 channel 关闭后,执行后续操作的线程是 nio 线程

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
public class ChannelCloseClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));

channelFuture.sync();
Channel channel = channelFuture.channel();

new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String input = scanner.nextLine();
if ("q".equals(input)) {
channel.close(); // close()方法是一个异步方法
//log.debug("处理之后的工作");
break;
}
channel.writeAndFlush(input);
}
}, "input").start();

// 1.同步方式处理关闭
ChannelFuture closeFuture = channel.closeFuture();
log.debug("waiting close...");
closeFuture.sync();
// 16:12:48 [DEBUG] [main] c.c.n.c.ChannelCloseClient - 处理关闭后的工作..
log.debug("处理关闭后的工作..");
group.shutdownGracefully();

// 2.异步方式处理关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//16:13:23 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.ChannelCloseClient - 处理关闭后的工作..
log.debug("处理关闭后的工作..");
group.shutdownGracefully();
}
});

}
}

Future & Promise

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

JDK Future代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(() -> {
log.debug("执行计算...");
Thread.sleep(1000);
return 30;
});
// 同步处理结果
log.debug("等待结果");
log.debug("结果是: {}", future.get());
}
}

控制台输出结果如下

1
2
3
17:03:54 [DEBUG] [main] c.c.n.c.TestJdkFuture - 等待结果
17:03:54 [DEBUG] [pool-1-thread-1] c.c.n.c.TestJdkFuture - 执行计算...
17:03:55 [DEBUG] [main] c.c.n.c.TestJdkFuture - 结果是: 30

Netty Future代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws Exception{
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(() -> {
log.debug("执行计算...");
Thread.sleep(1000);
return 70;
});

// 同步处理结果
log.debug("等待结果");
log.debug("结果是: {}", future.get());

// 异步处理结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果: " + future.getNow());
}
});
}
}

同步处理结果如下

1
2
3
17:05:26 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 执行计算...
17:05:26 [DEBUG] [main] c.c.n.c.TestNettyFuture - 等待结果
17:05:27 [DEBUG] [main] c.c.n.c.TestNettyFuture - 结果是: 70

异步处理结果如下

1
2
17:06:02 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 执行计算...
17:06:03 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 接收结果: 70

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果

Promise代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.准备EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 2.主动创建promise,结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(() -> {
// 3.任意一个线程执行完毕后向promise填充结果
log.debug("开始计算...");
try {
Thread.sleep(1000);
promise.setSuccess(80);
} catch (InterruptedException e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();

// 4.接收结果的线程
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
}
}

控制台输出结果如下

1
2
3
17:06:37 [DEBUG] [Thread-0] c.c.n.c.TestPromise - 开始计算...
17:06:37 [DEBUG] [main] c.c.n.c.TestPromise - 等待结果...
17:06:38 [DEBUG] [main] c.c.n.c.TestPromise - 结果是: 80

Handler & Pipeline

入站和出站

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("ch1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf content = (ByteBuf) msg;
String name = content.toString(Charset.defaultCharset());
log.debug("收到消息: {}" , content);
super.channelRead(ctx, name);
}
});
ch.pipeline().addLast("ch2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
log.debug("消息传递: {}, class: {}", msg.toString(), msg.getClass());
super.channelRead(ctx, msg.toString());
}
});
ch.pipeline().addLast("ch3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("hello world..".getBytes()));
}
});

ch.pipeline().addLast("ch4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
ch.pipeline().addLast("ch5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
ch.pipeline().addLast("ch6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});

}
})
.bind(8080);
}
}

执行结果

我们启动 ChannelCloseClient 作为客户端,向服务端发送数据

控制台结果如下

1
2
3
4
5
6
7
8
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 1
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 收到消息: PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 2048)
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 2
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 消息传递: qwe, class: class java.lang.String
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 3
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 6
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 5
10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 4

注意点:

  • handler 内部节点之间采用的数据结构是双向链表
  • 我们的入站 handler 的处理顺序是 ch1 -> ch2 -> ch3,入站时,handler 从 head 往后调用
  • 我们的出站 handler 的处理顺序是 ch6 -> ch5 -> ch4,出站时,handler 从 tail 往前调用
  • 当我们注释了 ch.writeAndFlush(ctx.alloc().buffer().writeBytes("hello world..".getBytes())); 之后在客户端发送数据,是看不到出站 handler 的日志输出的,这是由于出站 handler 一般处理写操作,而入站 handler 一般处理读操作
  • super.channelRead(ctx, name); 的作用是给下一个入站 hander 传递数据,如果不写这一行内容,则入站链就断了,下一个入站 handler 接收不到数据
  • 我们在 ch1 中传递的数据是一个 String 类型的数据,而在 ch2 中接收的数据也同样是 String 类型的,这点从控制台输出可以看出
  • NioSocketChannel#writeAndFlush() 方法是从 tail 往前调用 hanlder,而 ChannelHandlerContext#writeAndFlush() 方法是从当前 handler 往前调用hander

入站和出站

EmbeddedChannel

EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};

ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};

ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};

ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};

// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);

// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}

测试结果如下

1
2
3
4
11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 1
11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 2
11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 4
11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 3

通过 EmbeddedChannel#writeInbound() 方法来测试入站操作

通过 EmbeddedChannel#writeOutbound() 方法来测试出站操作

ByteBuf

创建

1
2
3
4
5
6
7
8
9
10
11
12
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
ByteBufferUtil.log(buffer);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 20; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes());
ByteBufferUtil.log(buffer);
}
}

运行结果如下

1
2
3
4
5
6
7
8
9
read index:0 write index:0 capacity:10

read index:0 write index:20 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 |aaaa |
+--------+-------------------------------------------------+----------------+

ByteBuf 通过 ByteBufAllocator 选择 allocator 并调用对应的 buffer() 方法来创建的,默认使用直接内存作为 ByteBuf ,容量为256个字节,可以指定初始容量的大小

当 ByteBuf 的容量无法容纳所有数据时, ByteBuf 会进行扩容操作

如果在 handler 中创建 ByteBuf ,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

直接内存 vs 堆内存

通过该方法创建的 ByteBuf ,使用的是基于直接内存的 ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);Copy
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

1
-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

验证池化和内存模型

我们采用之前创建 ByteBuf 的方式来查看 ByteBuf

1
2
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
System.out.println(buffer.getClass());

控制台输出如下

1
class io.netty.buffer.PooledUnsafeDirectByteBuf

可以看出 buffer 是一个 PooledUnsafeDirectByteBuf 的实例对象

其中 Pooled 代表采用了池化,Direct 代表采用了直接内存

接下来我们换一种创建 ByteBuf 的方式

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();

控制台输出如下

1
class io.netty.buffer.PooledUnsafeHeapByteBuf

同理,buffer 采用了池化和堆内存模型

接下来我们为程序添加虚拟机参数 -Dio.netty.allocator.type=unpooled

添加虚拟机参数

重新执行以上两种创建 ByteBuf 的方式

得到结果分别如下

1
2
class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf

可以看出采用非池化的方式来创建 ByteBuf

组成

ByteBuf 主要有以下几个组成部分

  • 最大容量与当前容量
    • 在构造 ByteBuf 时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为 Integer.MAX_VALUE
    • 当 ByteBuf 容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出 java.lang.IndexOutOfBoundsException 异常
  • 读写操作不同于 ByteBuffer 只用 position 进行控制,ByteBuf 分别由读指针和写指针两个指针控制进行读写操作时,无需进行模式的切换
    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

ByteBuf的组成

写入

常用方法如下

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

扩容

当 ByteBuf 中的容量无法容纳写入的数据时,会进行扩容操作

扩容规则

  • 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则选择下一个 2^n
    • 例如写入后大小为 513 字节,则扩容后 capacity 是 2^10=1024 字节(2^9=512 已经不够了)
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

实际操作中,我给 ByteBuf 分配了一个大小为16字节的空间,并往其中写入5个整型数据(20字节),但是日志输出扩容后大小为64

这里实际扩容的方式还是存在一些问题,后续看一下

读取

读取主要是通过一系列 read 方法进行读取,读取时会根据读取数据的字节数移动读指针

如果需要重复读取,需要调用 ByteBuf#markReaderIndex() 对读指针进行标记,并通过 ByteBuf#resetReaderIndex() 将读指针恢复到 mark 标记的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);

// 读取4个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);

// 通过mark与reset实现重复读取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);

// 恢复到mark标记处
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1
2
3
4
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read() 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则
    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则
    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则
    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

tail节点的释放

我们追踪 TailContext#channelRead() 方法,可以看到以下一段代码

1
2
3
4
5
6
7
8
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}

}

它通过 ReferenceCountUtil 来实现释放 ByteBuf

我们接下看看看 ReferenceCountUtil 的 release() 方法

1
2
3
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

release() 方法通过判断 msg 是否实现了 ReferenceCounted 接口来判断传递的参数是否为 ByteBuf

切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

ByteBuf的切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestSlice {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes("abcdefghij".getBytes());
ByteBufferUtil.log(buffer);

// 在切片过程中 没有发生数据复制
ByteBuf f1 = buffer.slice(0, 5);
ByteBuf f2 = buffer.slice(5, 5);
f1.retain();
f2.retain();
ByteBufferUtil.log(f1);
ByteBufferUtil.log(f2);

f1.setByte(0, 'q');
ByteBufferUtil.log(f1);
ByteBufferUtil.log(buffer);
}
}

控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 67 68 69 6a |abcdefghij |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 |abcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 67 68 69 6a |fghij |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 71 62 63 64 65 |qbcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 71 62 63 64 65 66 67 68 69 6a |qbcdefghij |
+--------+-------------------------------------------------+----------------+

我们修改切片后 f1 中的第一个字母后,原有 buffer 中第一个字符也同样修改了,证明 f1 和 buffer 用的是同一块内存空间,并没有发生数据复制

优势

  • 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如
    • slice、duplicate、CompositeByteBuf

应用

粘包 & 半包

粘包现象

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class HelloServer {
static final Logger log = LoggerFactory.getLogger(HelloServer.class);

public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立时会执行该方法
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时会执行该方法
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
// 关闭channel
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stopped");
}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class HelloClient {
static final Logger log = LoggerFactory.getLogger(HelloClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connected...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}

结果分析

服务端控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
16:20:43 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x3a0c5e7b] binding...
16:20:43 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x3a0c5e7b, L:/0:0:0:0:0:0:0:0:8080] bound...
16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] REGISTERED
16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] ACTIVE
16:20:50 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c.HelloServer - connected [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832]
16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] READ COMPLETE

理想情况下,我们通过客户端发送10次16字节字节数组,服务端应该也是接收的10次16字节的数组

而实际上服务端接收的是一个160字节的内容,这就是粘包

半包现象

我们在服务端添加以下代码,用于设置服务端的接收缓冲区

1
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);

并通过客户端发送一条30字节的数据

1
2
3
4
5
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 3; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);

服务端输出结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
16:28:06 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x9c9f8460] binding...
16:28:06 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x9c9f8460, L:/0:0:0:0:0:0:0:0:8080] bound...
16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] REGISTERED
16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] ACTIVE
16:28:08 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c.HelloServer - connected [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336]
16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 |.......... |
+--------+-------------------------------------------------+----------------+
16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE
16:28:14 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0a 0b 0c 0d 0e 0f 00 01 02 03 |.......... |
+--------+-------------------------------------------------+----------------+
16:28:14 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE
16:28:19 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d |.......... |
+--------+-------------------------------------------------+----------------+
16:28:19 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE
16:28:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 18B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d |................|
|00000010| 0e 0f |.. |
+--------+-------------------------------------------------+----------------+
16:28:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE

可以看出由于服务端接收缓冲区大小不足,30字节的数据被分为多次读取

注意

serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍

即有可能某一个服务端读取的数据是20字节

现象分析

粘包

  • 现象,发送 abc def,接收 abcdef
  • 原因
    • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
    • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
    • Nagle 算法:会造成粘包

半包

  • 现象,发送 abcdef,接收 abc def
  • 原因
    • 应用层:接收方 ByteBuf 小于实际发送数据量
    • 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
    • MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包

本质是因为 TCP 是流式协议,消息无边界

解决方案

短连接

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

粘包问题

客户端代码将建立连接抽象成一个方法,并通过多次调用该方法来实现和服务器的短连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connected...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
1
2
3
for (int i = 0; i < 3; i++) {
send();
}

我们查看服务端结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
16:43:27 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler - [id: 0x8daa051e, L:/127.0.0.1:8080 - R:/127.0.0.1:9028] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:43:27 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0x33e8a665, L:/127.0.0.1:8080 - R:/127.0.0.1:9094] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:43:27 [DEBUG] [nioEventLoopGroup-3-4] i.n.h.l.LoggingHandler - [id: 0xc075c498, L:/127.0.0.1:8080 - R:/127.0.0.1:9159] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+

可以看出我们确实是按理想情况接收了3次10个字节的数据,即可以解决粘包问题

半包问题

我们在服务端添加以下代码,用于调整 Netty 的缓冲区大小

1
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));

并在客户端发送18字节的数据

1
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17});

服务端输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
16:49:23 [DEBUG] [nioEventLoopGroup-3-4] i.n.h.l.LoggingHandler - [id: 0x5b4b1096, L:/127.0.0.1:8080 - R:/127.0.0.1:9593] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:49:23 [DEBUG] [nioEventLoopGroup-3-4] i.n.h.l.LoggingHandler - [id: 0x5b4b1096, L:/127.0.0.1:8080 - R:/127.0.0.1:9593] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
16:49:23 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xb1e36c6d, L:/127.0.0.1:8080 - R:/127.0.0.1:9658] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:49:23 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xb1e36c6d, L:/127.0.0.1:8080 - R:/127.0.0.1:9658] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
16:49:23 [DEBUG] [nioEventLoopGroup-3-6] i.n.h.l.LoggingHandler - [id: 0xc2a44645, L:/127.0.0.1:8080 - R:/127.0.0.1:9723] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
16:49:23 [DEBUG] [nioEventLoopGroup-3-6] i.n.h.l.LoggingHandler - [id: 0xc2a44645, L:/127.0.0.1:8080 - R:/127.0.0.1:9723] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+

可以看出并没有解决半包问题

定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度

服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到 FixedLengthFrameDecoder 对数据进行定长解码,具体使用方法如下

客户端代码

在客户端中,我们在这里通过一个 getFixLen() 方法来实现获得定长的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
public static byte[] getFixLen(char ch, int len, int maxLen) {
byte[] ans = new byte[maxLen];
for (int i = 0; i < maxLen; i++) {
if (i < len) ans[i] = (byte) ch;
else ans[i] = '_';
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < ans.length; i++) {
sb.append((char) ans[i]);
}
System.out.println(sb.toString());
return ans;
}

并通过随机生成长度的方式,控制每次客户端写入的数据长度

1
2
3
4
5
6
7
8
9
10
ByteBuf buffer = ctx.alloc().buffer();
char ch = 'a';
Random r = new Random();
for (int i = 0; i < 5; i++) {
byte[] fixLen = getFixLen(ch, r.nextInt(10) + 1, 10);
buffer.writeBytes(fixLen);
ch++;
}
ctx.writeAndFlush(buffer);
ctx.channel().close();

最后在客户端中添加一个日志的 handler

1
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

服务端代码

而在服务端中,我们添加以下 handler ,用于解码定长的数据

1
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));

执行结果

客户端输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.HelloClient2 - connected...
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e] REGISTERED
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e] CONNECT: /127.0.0.1:8080
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e, L:/127.0.0.1:10399 - R:/127.0.0.1:8080] ACTIVE
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.HelloClient2 - sending...
aa________
bbbbbbbbbb
cccccccccc
dddd______
eeeeeee___
17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e, L:/127.0.0.1:10399 - R:/127.0.0.1:8080] WRITE: 50B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 5f 5f 5f 5f 5f 5f 5f 5f 62 62 62 62 62 62 |aa________bbbbbb|
|00000010| 62 62 62 62 63 63 63 63 63 63 63 63 63 63 64 64 |bbbbccccccccccdd|
|00000020| 64 64 5f 5f 5f 5f 5f 5f 65 65 65 65 65 65 65 5f |dd______eeeeeee_|
|00000030| 5f 5f |__ |
+--------+-------------------------------------------------+----------------+

服务端输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
17:11:51 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec, L:/127.0.0.1:8080 - R:/127.0.0.1:10399] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 5f 5f 5f 5f 5f 5f 5f 5f |aa________ |
+--------+-------------------------------------------------+----------------+
17:11:51 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec, L:/127.0.0.1:8080 - R:/127.0.0.1:10399] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbb |
+--------+-------------------------------------------------+----------------+
17:11:51 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec, L:/127.0.0.1:8080 - R:/127.0.0.1:10399] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 63 63 63 63 63 |cccccccccc |
+--------+-------------------------------------------------+----------------+
17:11:51 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec, L:/127.0.0.1:8080 - R:/127.0.0.1:10399] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 5f 5f 5f 5f 5f 5f |dddd______ |
+--------+-------------------------------------------------+----------------+
17:11:51 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec, L:/127.0.0.1:8080 - R:/127.0.0.1:10399] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 5f 5f 5f |eeeeeee___ |
+--------+-------------------------------------------------+----------------+

可以看出虽然我们客户端是以一个50字节的粘包状态发送过去的,但是服务端可以重新将数据拆分为5个10字节的数据

该方法的缺点在于数据包的大小不好把握

  • 长度定的太大,浪费
  • 长度定的太小,对某些数据包又显得不够
行解码器

行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的

  • LineBasedFrameDecoder(int maxLength):通过换行符来拆分数据
  • DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters):通过自定义分隔符来拆分数据
  • 解码器需要定义最长长度,用于判断数据多久没有接收到分隔符,如果超过最大长度,会抛出 TooLongFrameException 异常

这里我们演示的是 LineBasedFrameDecoder 解码器

客户端代码

在客户端中,我们传入随即长度的字符串来作为数据,并约定最大长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();

char ch = 'a';
Random r = new Random();
int maxLength = 64; // 最大长度
for (int i = 0; i < 5; i++) {
StringBuilder sb = new StringBuilder();
for (int j = 0; j < (int)(r.nextInt(maxLength-2)); j++) {
sb.append(ch);
}
// 数据以 \n 结尾
sb.append("\n");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
ch++;
}

ctx.writeAndFlush(buffer);
ctx.channel().close();
}
});

服务端代码

在服务端中,我们添加 LineBasedFrameDecoder ,并指定最大长度

1
2
3
4
5
6
7
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LineBasedFrameDecoder(64));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});

执行结果

分别启动服务端和客户端

客户端控制台输出如下,其中 . 代表的是 \n 换行符

1
2
3
4
5
6
7
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 0a 62 0a 0a 64 64 64 64 |aaaaaaaa.b..dddd|
|00000010| 64 64 64 64 64 0a 65 65 65 65 65 65 65 65 65 65 |ddddd.eeeeeeeeee|
|00000020| 65 65 65 0a |eee. |
+--------+-------------------------------------------------+----------------+

服务端输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 |aaaaaaaa |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 |b |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 64 64 64 64 64 |ddddddddd |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeeeeee |
+--------+-------------------------------------------------+----------------+

缺点也是效率低,需要去遍历内容获取换行符

LTC解码器

在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的

Netty 为我们提供了一个 LengthFieldBasedFrameDecoder 类来实现以上操作

其构造方法中有五个重要参数如下

  1. maxFrameLength:表示数据的最大长度(包括附加信息、长度标识等内容)
  2. lengthFieldOffset:数据长度标识的起始偏移量
  3. lengthFieldLength:数据长度标识所占字节数
  4. lengthAdjustment:长度标识和数据内容之间还有多少长度的其他内容
  5. initialBytesToStrip:数据读取起点

参数说明

以下例子来自于官方:

1
2
3
4
5
6
7
8
9
10
lengthFieldOffset   = 1 (= the length of HDR1)  		// 长度的偏移量为1字节
lengthFieldLength = 2 // 长度标识为2字节
lengthAdjustment = 1 (= the length of HDR2) // 长度后还有1字节的其他数据
initialBytesToStrip = 3 (= the length of HDR1 + LEN) // 从第3个字节后开始读取数据

BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+

实际代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestLengthFieldDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 0, 4, 1, 5),
new LoggingHandler(LogLevel.DEBUG)
);

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Game Start");
channel.writeInbound(buffer);
}

private static void send(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes();
int length = bytes.length;
buffer.writeInt(length);
buffer.writeByte(0);
buffer.writeBytes(bytes);
}
}

控制台输出结果如下

1
2
3
4
5
6
7
8
9
10
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 20 77 6f 72 6c 64 |Hello, world |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 61 6d 65 20 53 74 61 72 74 |Game Start |
+--------+-------------------------------------------------+----------------+

协议设计与解析

网络传输中,我们需要遵循一定的协议来进行内容传输

Redis协议

以 redis 为例,当我们需要传输一条 set name zhangsan 的内容时,需要遵循 redis 的协议来进行传输

首先传输 *3 ,代表有三个字段(set、name、zhangsan),然后传输 $3 ,代表第一个字段长度为3,接着传输 set ,即可以正确解析,后面两个字段同理

我们用实际代码来进行演示

通过 LINE 来进行换行,输出遵循 redis 协议的 set name zhangsan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TestRedis {
public static void main(String[] args) {
final byte[] LINE = {13, 10}; // \n
new Bootstrap().group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$8".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("zhangsan".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
}
}

在 redis 中,我们首先通过 get name 来获取结果,然后通过网络传输后,我们再次通过 get name 来获取结果,如下所示

1
2
3
4
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> get name
"zhangsan"

且 TestRedis 的控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
12
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 38 0d 0a 7a 68 61 6e 67 |.name..$8..zhang|
|00000020| 73 61 6e 0d 0a |san.. |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2b 4f 4b 0d 0a |+OK.. |
+--------+-------------------------------------------------+----------------+

其中 +OK 代表传输成功

Http协议

HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用 HttpServerCodec 作为服务器端的解码器与编码器,来处理HTTP请求

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(boss, worker);
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("====== class : {} =====", msg.getClass());
super.channelRead(ctx, msg);
}
});
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
// 获取请求
log.debug(httpRequest.uri());

// 返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.OK);

byte[] content = "<h1>hello world</h1>".getBytes();
response.headers().set(CONTENT_LENGTH, content.length);
response.content().writeBytes(content);

// 写回响应
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

执行结果

这里列举以下 log.debug() 的结果

1
2
10:40:59 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c6.TestHttp - ====== class : class io.netty.handler.codec.http.DefaultHttpRequest =====
10:40:59 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c6.TestHttp - ====== class : class io.netty.handler.codec.http.LastHttpContent$1 =====

可以看出虽然我们只发了一次请求,但是 Netty 会默认解析成两个部分,分别标识的是请求头和请求体

我们可以通过以下方式来分别处理

1
2
3
4
5
if (msg instanceof HttpRequest) {  // 请求行 请求头

} else if (msg instanceof HttpContent){ // 请求体

}

也可以通过使用 SimpleChannelInboundHandler 来处理具体的内容

此外,浏览器有点笨比,如果你不传入响应的长度,它会一直空转处理响应内容

因此在返回响应时,我们需要写入响应体的内容长度

自定义协议

自定义协议主要有以下几个部分组成

  • 魔数:用来在第一时间判定接收的数据是否为无效数据包
  • 版本号:可以支持协议的升级
  • 序列化算法:消息正文到底采用哪种序列化反序列化方式
    • 如:json、protobuf、hessian、jdk
  • 指令类型:是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号:为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编码器与解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 设置4个字节的魔数
byteBuf.writeBytes(new byte[] {0, 6, 1, 8});
// 设置1个字节的版本号
byteBuf.writeByte(1);
// 设置1个字节的序列化方式
byteBuf.writeByte(0);
// 设置1个字节的指令方式
byteBuf.writeByte(message.getMessageType());
// 设置4个字节的请求序列号
byteBuf.writeInt(message.getSequenceId());
// 补齐字节 到2的次方
byteBuf.writeByte(0);

// 获得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();

// 用4个字节标识正文长度
byteBuf.writeInt(bytes.length);
// 设置正文
byteBuf.writeBytes(bytes);
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int magicNum = byteBuf.readInt(); // 魔数
byte version = byteBuf.readByte(); // 版本号
byte serializedType = byteBuf.readByte(); // 序列化方式
byte msgType = byteBuf.readByte(); // 指令方式
int sequenceId = byteBuf.readInt(); // 请求序列号
byteBuf.readByte(); // 补齐字符

int length = byteBuf.readInt(); // 长度
byte[] content = new byte[length]; // 内容
byteBuf.readBytes(content, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content));
Message message = (Message) ois.readObject();

list.add(message);

log.debug("magicNum: {}", magicNum);
log.debug("version: {}", version);
log.debug("serializedType: {}", serializedType);
log.debug("msgType: {}", msgType);
log.debug("sequenceId: {}", sequenceId);
}
}

这里我们的编码器和解码器类继承自 ByteToMessageCodec,泛型指定为我们实际的消息类,我们需要实现其 #encode()#decode() 方法

编码器中,将自定义协议写入到 ByteBuf 中,如果需要写入一个对象,则需要将该对象序列化

自定义协议除正文信息外,其余信息长度和最好保证是 2^n,不足需要补齐

解码器中,需要将 ByteBuf 的内容取出写入到 list 中,传递给下一个解码器

编写测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(LogLevel.DEBUG),
new MessageCodec()
);

// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(message);

// decode
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buffer);
// 入站
channel.writeInbound(buffer);

}
}

控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................|
|00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen|
|00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log|
|00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage|
|00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa|
|00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l|
|00000060| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 |ang/String;L..us|
|00000070| 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1c 63 |ernameq.~..xr..c|
|00000080| 6f 6d 2e 63 68 65 6e 78 69 69 69 2e 6d 65 73 73 |om.chenxiii.mess|
|00000090| 61 67 65 2e 4d 65 73 73 61 67 65 d6 b9 3e a7 6d |age.Message..>.m|
|000000a0| c9 2f 2f 02 00 02 49 00 0b 6d 65 73 73 61 67 65 |.//...I..message|
|000000b0| 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 |TypeI..sequenceI|
|000000c0| 64 78 70 00 00 00 00 00 00 00 00 74 00 03 31 32 |dxp........t..12|
|000000d0| 33 74 00 08 7a 68 61 6e 67 73 61 6e |3t..zhangsan |
+--------+-------------------------------------------------+----------------+
14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - magicNum: 393480
14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - version: 1
14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - serializedType: 0
14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - msgType: 0
14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - sequenceId: 0

在第一行中,00 06 01 08代表是我们的魔数,接下来的 01 代表的是版本号,00 代表的是序列化方式,00 代表的是指令方式,00 00 00 00 代表的是序列号,00 00 00 cc 代表后面传输内容的长度

粘包和半包

由于我们会在自定义协议中指定传输内容的长度,因此粘包现象不会出现

在这里我们主要讨论半包现象

将以上代码修改如下部分

1
2
3
4
5
6
7
8
9
10
11
// decode
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // 1
new MessageCodec().encode(null, message, buffer);
// 入站
//channel.writeInbound(buffer);

ByteBuf s1 = buffer.slice(0, 100);
ByteBuf s2 = buffer.slice(100, buffer.readableBytes() - 100);
s1.retain(); // 2
channel.writeInbound(s1); // 1
channel.writeInbound(s2); // 0

需要注意的是

  • #writeInbound() 方法会将引用计数器-1,当计数器为0时会释放 ByteBuf 的内存,而 s1,s2 共享 buffer 的内存,若内存释放则两个都会释放,因此需要将计数器 +1
  • 我们可以通过 #readableBytes() 方法来获取可读的内容长度
  • 当我们将 channel.writeInbound(s2); 注释掉时,由于信息长度不完整,控制台不会输出信息

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................|
|00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen|
|00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log|
|00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage|
|00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa|
|00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l|
|00000060| 61 6e 67 2f |ang/ |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e 61 |String;L..userna|
|00000010| 6d 65 71 00 7e 00 01 78 72 00 1c 63 6f 6d 2e 63 |meq.~..xr..com.c|
|00000020| 68 65 6e 78 69 69 69 2e 6d 65 73 73 61 67 65 2e |henxiii.message.|
|00000030| 4d 65 73 73 61 67 65 d6 b9 3e a7 6d c9 2f 2f 02 |Message..>.m.//.|
|00000040| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|00000050| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|00000060| 00 00 00 00 00 00 00 74 00 03 31 32 33 74 00 08 |.......t..123t..|
|00000070| 7a 68 61 6e 67 73 61 6e |zhangsan |
+--------+-------------------------------------------------+----------------+

可以看出很好的解决了半包的问题

@Sharable

为了提高 handler 的复用,我们通常会将具体的 handler 抽象成一个个的对象来使用

1
2
3
4
5
6
7
LoggingHandler log = new LoggingHandler(LogLevel.DEBUG);
LengthFieldBasedFrameDecoder decoder = new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0);
EmbeddedChannel channel = new EmbeddedChannel(
log,
decoder,
new MessageCodec()
);

这样做看起来并没有什么问题,但是考虑以下这种情况

  1. 我们的 EventLoop 同时监听多个 channel,而我们的 channel1 发送了一个数据包,由于数据包长度过大,一次不能发送,第一次只发送了半包
  2. 由于 channel1 发送的是半包,LengthFieldBasedFrameDecoder 检测出该数据包未完整,不会向下传播
  3. 此时,channel2 也发送了一个数据包,但是 LengthFieldBasedFrameDecoder 会将这个数据包和之前未完整的数据包整合,因此数据包发生了错误的传递

为了提高 handler 的复用率,同时又避免出现一些并发问题,Netty 中原生的 handler 中用 @Sharable 注解来标明,该 handler 能否在多个 channel 中共享。

此外,当我们给之前的 MessageCodec 类加上 @Sharable 注解后,启动服务器会有以下报错

ChannelHandler MessageCodec is not allowed to be shared

这是由于我们 MessageCodec 继承于 ByteToMessageCodec,其不可以加 @Sharable 注解,源码如下所示

1
2
3
4
5
protected void ensureNotSharable() {
if (this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}

在这里我们继承 MessageToMessageCodec 并加上 @Sharable 注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Message message, List<Object> list) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
// 设置4个字节的魔数
byteBuf.writeBytes(new byte[] {0, 6, 1, 8});
// 设置1个字节的版本号
byteBuf.writeByte(1);
// 设置1个字节的序列化方式
byteBuf.writeByte(0);
// 设置1个字节的指令方式
byteBuf.writeByte(message.getMessageType());
// 设置4个字节的请求序列号
byteBuf.writeInt(message.getSequenceId());
// 补齐字节 到2的次方
byteBuf.writeByte(0);

// 获得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();

// 用4个字节标识正文长度
byteBuf.writeInt(bytes.length);
// 设置正文
byteBuf.writeBytes(bytes);
list.add(byteBuf);
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int magicNum = byteBuf.readInt(); // 魔数
byte version = byteBuf.readByte(); // 版本号
byte serializedType = byteBuf.readByte(); // 序列化方式
byte msgType = byteBuf.readByte(); // 指令方式
int sequenceId = byteBuf.readInt(); // 请求序列号
byteBuf.readByte(); // 补齐字符

int length = byteBuf.readInt(); // 长度
byte[] content = new byte[length]; // 内容
byteBuf.readBytes(content, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content));
Message message = (Message) ois.readObject();

list.add(message);

log.debug("magicNum: {}", magicNum);
log.debug("version: {}", version);
log.debug("serializedType: {}", serializedType);
log.debug("msgType: {}", msgType);
log.debug("sequenceId: {}", sequenceId);
}
}