Netty Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
Netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
Hello world
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class HelloServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler( new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }) .bind(8080 ); } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class HelloClient { public static void main (String[] args) throws InterruptedException { new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )) .sync() .channel() .writeAndFlush("hello, world" ); } }
执行流程
channel 可以理解为数据的通道
msg 理解为流动的数据 ,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
handler 可以理解为数据的处理工序
工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
pipeline 中有多个 handler,处理时会依次调用其中的 handler
handler 分 Inbound 和 Outbound 两类
eventLoop 可以理解为处理数据的工人
eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定 ,以后该 channel 中的 io 操作都由该 eventLoop 负责
eventLoop 既可以执行 io 操作,也可以进行任务处理 ,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
eventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop
组件 EventLoop 事件循环对象 EventLoop
EventLoop 本质是一个单线程执行器 (同时维护了一个 Selector ),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
它的继承关系如下
继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
继承自 netty 自己的 OrderedEventExecutor
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
事件循环组 EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
实现了 Iterable 接口提供遍历 EventLoop 的能力
另有 next 方法获取集合中下一个 EventLoop
处理普通任务和定时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j public class TestEventLoop { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (2 ); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); group.next().submit(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("ok" ); }); group.next().scheduleAtFixedRate(() -> { log.debug("time" ); }, 0 , 1 , TimeUnit.SECONDS); log.debug("main" ); } }
输出结果如下
1 2 3 4 5 6 7 8 9 10 11 12 -- 可以看出EventLoopGroup对象的next方法可以循环获取EventLoop对象 io.netty.channel.nio.NioEventLoop@1d251891 io.netty.channel.nio.NioEventLoop@48140564 io.netty.channel.nio.NioEventLoop@1d251891 14:18:26.731 [main] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - main -- 普通任务和定时任务的执行结果 14:18:26.732 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time 14:18:27.746 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time 14:18:27.746 [nioEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - ok 14:18:28.747 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time 14:18:29.744 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time 14:18:30.742 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.TestEventLoop - time
处理IO任务
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class EventLoopServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080 ); } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class EventLoopClient { public static void main (String[] args) throws InterruptedException { Channel channel = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )) .sync() .channel(); System.out.println(channel); System.out.println("" ); } }
执行结果
启动服务端代码,并以debug模式启动客户端代码
客户端通过 channel.writeAndFlush() 方法向服务端写入数据 “hello” “world”
结果如下
1 2 14:14:30.026 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - hello 14:14:40.932 [nioEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - world
可以看出处理客户端输入的线程是同一个线程
这是由于当客户端一旦和服务端建立连接后, Channel 会和一个 EventLoop 绑定,后续 Channel 的所有请求都会由这个 EventLoop 来处理
分工处理任务
细化1:分为 boss 和 worker
我们可以通过 #group() 方法,将 EventLoopGroup 分为 boss 和 worker
boss 只处理 ServerSocketChannel 上的 accept 事件
woker 只处理 SockerChannel 上的读写事件 一般线程数设置为CPU核心数*2
1 2 3 4 5 6 new ServerBootstrap () .group(new NioEventLoopGroup (), new NioEventLoopGroup (2 )). ....;
通过多个客户端向服务端发送消息,结果如下
1 2 3 4 14:54:03.944 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1 14:54:13.397 [nioEventLoopGroup-3-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2 14:54:20.785 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3 14:55:28.642 [nioEventLoopGroup-3-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1
可以看出,一个 EventLoop 可以负责多个 Channel ,且 EventLoop 一旦与 Channel 绑定,则 一直负责 处理该 Channel 中的事件
细化2:处理耗时较长的任务
假如某个 EventGroup 绑定了多个 Channel,某个 Channel 中有一项任务耗时较长,那么该任务会阻塞住所有 Channel 中的读写任务
因此,我们可以通过给这个耗时任务重新分配一个 group ,这样可以避免该任务在 nio 线程上阻塞住其它 Channel 的读写任务
我们可以通过 Channel#addLast() 方法指定 handler 所对应的 EventGroup,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Slf4j public class EventLoopServer { public static void main (String[] args) { EventLoopGroup group = new DefaultEventLoopGroup (); new ServerBootstrap () .group(new NioEventLoopGroup (), new NioEventLoopGroup (2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast("nio-handler" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); ctx.fireChannelRead(msg); } }).addLast(group, "time-handler" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080 ); } }
通过多个客户端向服务端发送消息,结果如下
1 2 3 4 5 6 15:03:57.878 [nioEventLoopGroup-4-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1 15:03:57.878 [defaultEventLoopGroup-2-3] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 1 15:04:16.904 [nioEventLoopGroup-4-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2 15:04:16.905 [defaultEventLoopGroup-2-2] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 2 15:04:26.716 [nioEventLoopGroup-4-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3 15:04:26.717 [defaultEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.EventLoopServer - 3
可以看出我们实现了使用不同 group 来处理耗时任务和普通任务
handler的切换 在分工处理中,我们分别使用了 DefaultEventLoopGroup 和 NioEventLoopGroup 来处理不同的任务,那么 handler 是如何在两者之间切换的呢?
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable () { public void run () { next.invokeChannelRead(m); } }); } }
EventExecutor#invokeChannelRead() 方法用于判断当前 handler 中的线程是否和 next.executor() 返回的 EventLoop 绑定的线程是同一个线程
如果二者绑定的是同一个线程则可以直接调用
否则,要把调用代码封装为任务对象,由下一个 handler 来执行
Channel Channel 的常用方法
close() 可以用来关闭Channel
closeFuture() 用来处理 Channel 的关闭
sync 方法作用是同步等待 Channel 关闭
而 addListener 方法是异步等待 Channel 关闭
pipeline() 方法用于添加处理器
write() 方法将数据写入
因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
只有当缓冲满了或者调用了flush()方法后 ,才会将数据通过 Channel 发送出去
writeAndFlush() 方法将数据写入并立即发送(刷出)
ChannelFuture的结果处理 客户端代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class ChannelClient { public static void main (String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); Channel channel = channelFuture.channel(); log.debug("==={}===" , channel); channel.writeAndFlush("hello world" ); } }
如果我们注释了 channelFuture.sync() 方法,那么当我们启动客户端时,服务端将无法收到消息
这是由于 #connect() 方法是一个异步非阻塞方法,当 main 线程执行 connect() 方法时,会启动一个 nio 线程来连接服务端
如果代码没有在 channelFuture.sync() 处阻塞的话,那么 main 线程会直接获取一个 channel ,在这个 channel 中写入数据,而实际上这个 channel 并没有连接到服务端
我们通过打印这个 channel 可以得到如下结果
1 [main] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0xe416df7f]===
可以看出这个 channel 并没有自身的 ip 信息和主机的 ip 信息
同步处理结果
我们可以采用 ChannelFuture#sync() 方法来阻塞住 main 线程
当 nio 线程成功连接服务端后,继续向下执行
此时我们可以通过打印 channel 的方式查看结果如下
1 15:49:14.767 [main] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0x60434a31, L:/127.0.0.1:3883 - R:localhost/127.0.0.1:8080]===
可以看出 channel 成功连接了服务端
需要注意的是,由于是同步处理结果 ,因此我们获取 channel 的线程依旧是主线程
异步处理结果
我们可以采用 ChannelFuture#addListener() 来异步处理结果
我们需要在方法中传入一个 ChannelFutureListener 回调对象,该对象会在 nio 线程成功连接服务端后处理返回结果
我们可以通过打印 channel 的方式查看结果如下
1 15:50:23.721 [nioEventLoopGroup-2-1] DEBUG com.chenxiniubi.netty.c2.ChannelClient - ===[id: 0x2e2a8216, L:/127.0.0.1:3691 - R:localhost/127.0.0.1:8080]===
需要注意的是,由于是异步处理结果 ,因此我们获取 channel 的线程依旧是nio线程
具体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j public class ChannelClient { public static void main (String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); channelFuture.sync(); Channel channel = channelFuture.channel(); log.debug("==={}===" , channel); channel.writeAndFlush("hello world" ); channelFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); log.debug("==={}===" , channel); channel.writeAndFlush("hello world" ); } }); } }
CloseFuture处理关闭Channel后的操作 客户端代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j public class ChannelCloseClient { public static void main (String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); channelFuture.sync(); Channel channel = channelFuture.channel(); new Thread (() -> { Scanner scanner = new Scanner (System.in); while (true ) { String input = scanner.nextLine(); if ("q" .equals(input)) { channel.close(); break ; } channel.writeAndFlush(input); } }, "input" ).start(); } }
现在我们通过客户端不断向服务端发送数据,知道键盘输入 “q” 为止
当 channel 关闭后,我们还有一些后续操作需要执行,该如何处理?
由于 Channel#close() 方法是一个异步方法,因此如果我们在 channel.close(); 后直接进行处理操作,其实是无效的
我们可以通过 ChannelFuture 来处理关闭后的操作,处理关闭后的操作依旧可以分为同步方式和异步方式
当后续操作执行完毕后,我们需要执行 NioEventLoopGroup#shutdownGracefully() 方法将整个组的线程停止
同步方式处理关闭
和 ChannelFuture 类似,我们可以采用 ChannelFuture#sync() 方法来阻塞住 main 线程
当 channel 关闭后,继续向下执行
我们可以通过打印的方式来查看是否成功阻塞 main 线程
1 2 3 log.debug("waiting close..." ); closeFuture.sync(); log.debug("处理关闭后的工作.." );
结果如下,当我们未输入 “q” 时,控制台如下所示
1 16:20:15 [DEBUG] [main] c.c.n.c.ChannelCloseClient - waiting close...
当我们输入 “q” 后
1 2 3 4 16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 - R:localhost/127.0.0.1:8080] CLOSE 16:20:36 [DEBUG] [main] c.c.n.c.ChannelCloseClient - 处理关闭后的工作.. 16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 ! R:localhost/127.0.0.1:8080] INACTIVE 16:20:36 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x17d025eb, L:/127.0.0.1:5268 ! R:localhost/127.0.0.1:8080] UNREGISTERED
可以看出,当 channel 关闭后,执行后续操作的线程是主线程
异步方式处理关闭
同样也采用 ChannelFuture#addListener() 方法来处理异步返回结果
当我们输入 “q” 时,控制台如下所示
1 2 3 4 16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 - R:localhost/127.0.0.1:8080] CLOSE 16:22:55 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.ChannelCloseClient - 处理关闭后的工作.. 16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 ! R:localhost/127.0.0.1:8080] INACTIVE 16:22:55 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x495727ec, L:/127.0.0.1:5360 ! R:localhost/127.0.0.1:8080] UNREGISTERED
可以看出,当 channel 关闭后,执行后续操作的线程是 nio 线程
具体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Slf4j public class ChannelCloseClient { public static void main (String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup (); ChannelFuture channelFuture = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); channelFuture.sync(); Channel channel = channelFuture.channel(); new Thread (() -> { Scanner scanner = new Scanner (System.in); while (true ) { String input = scanner.nextLine(); if ("q" .equals(input)) { channel.close(); break ; } channel.writeAndFlush(input); } }, "input" ).start(); ChannelFuture closeFuture = channel.closeFuture(); log.debug("waiting close..." ); closeFuture.sync(); log.debug("处理关闭后的工作.." ); group.shutdownGracefully(); closeFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { log.debug("处理关闭后的工作.." ); group.shutdownGracefully(); } }); } }
Future & Promise netty 中的 Future 与 jdk 中的 Future 同名 ,但是是两个接口
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称
jdk Future
netty Future
Promise
cancel
取消任务
-
-
isCanceled
任务是否取消
-
-
isDone
任务是否完成,不能区分成功失败
-
-
get
获取任务结果,阻塞等待
-
-
getNow
-
获取任务结果,非阻塞,还未产生结果时返回 null
-
await
-
等待任务结束,如果任务失败,不会抛异常 ,而是通过 isSuccess 判断
-
sync
-
等待任务结束,如果任务失败,抛出异常
-
isSuccess
-
判断任务是否成功
-
cause
-
获取失败信息,非阻塞,如果没有失败,返回null
-
addLinstener
-
添加回调,异步接收结果
-
setSuccess
-
-
设置成功结果
setFailure
-
-
设置失败结果
JDK Future代码测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class TestJdkFuture { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(2 ); Future<Integer> future = service.submit(() -> { log.debug("执行计算..." ); Thread.sleep(1000 ); return 30 ; }); log.debug("等待结果" ); log.debug("结果是: {}" , future.get()); } }
控制台输出结果如下
1 2 3 17:03:54 [DEBUG] [main] c.c.n.c.TestJdkFuture - 等待结果 17:03:54 [DEBUG] [pool-1-thread-1] c.c.n.c.TestJdkFuture - 执行计算... 17:03:55 [DEBUG] [main] c.c.n.c.TestJdkFuture - 结果是: 30
Netty Future代码测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j public class TestNettyFuture { public static void main (String[] args) throws Exception{ NioEventLoopGroup group = new NioEventLoopGroup (); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(() -> { log.debug("执行计算..." ); Thread.sleep(1000 ); return 70 ; }); log.debug("等待结果" ); log.debug("结果是: {}" , future.get()); future.addListener(new GenericFutureListener <Future<? super Integer>>() { @Override public void operationComplete (Future<? super Integer> future) throws Exception { log.debug("接收结果: " + future.getNow()); } }); } }
同步处理结果如下
1 2 3 17:05:26 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 执行计算... 17:05:26 [DEBUG] [main] c.c.n.c.TestNettyFuture - 等待结果 17:05:27 [DEBUG] [main] c.c.n.c.TestNettyFuture - 结果是: 70
异步处理结果如下
1 2 17:06:02 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 执行计算... 17:06:03 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.TestNettyFuture - 接收结果: 70
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
可以通过Future对象的get方法 ,阻塞地获取返回结果
也可以通过getNow方法 ,获取结果,若还没有结果,则返回null,该方法是非阻塞的
还可以通过future.addListener方法 ,在Callable方法执行的线程中,异步获取返回结果
Promise代码测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class TestPromise { public static void main (String[] args) throws ExecutionException, InterruptedException { EventLoop eventLoop = new NioEventLoopGroup ().next(); DefaultPromise<Integer> promise = new DefaultPromise <>(eventLoop); new Thread (() -> { log.debug("开始计算..." ); try { Thread.sleep(1000 ); promise.setSuccess(80 ); } catch (InterruptedException e) { e.printStackTrace(); promise.setFailure(e); } }).start(); log.debug("等待结果..." ); log.debug("结果是: {}" , promise.get()); } }
控制台输出结果如下
1 2 3 17:06:37 [DEBUG] [Thread-0] c.c.n.c.TestPromise - 开始计算... 17:06:37 [DEBUG] [main] c.c.n.c.TestPromise - 等待结果... 17:06:38 [DEBUG] [main] c.c.n.c.TestPromise - 结果是: 80
Handler & Pipeline 入站和出站
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Slf4j public class TestPipeline { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast("ch1" , new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1" ); ByteBuf content = (ByteBuf) msg; String name = content.toString(Charset.defaultCharset()); log.debug("收到消息: {}" , content); super .channelRead(ctx, name); } }); ch.pipeline().addLast("ch2" , new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2" ); log.debug("消息传递: {}, class: {}" , msg.toString(), msg.getClass()); super .channelRead(ctx, msg.toString()); } }); ch.pipeline().addLast("ch3" , new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("3" ); super .channelRead(ctx, msg); ch.writeAndFlush(ctx.alloc().buffer().writeBytes("hello world.." .getBytes())); } }); ch.pipeline().addLast("ch4" , new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("4" ); super .write(ctx, msg, promise); } }); ch.pipeline().addLast("ch5" , new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("5" ); super .write(ctx, msg, promise); } }); ch.pipeline().addLast("ch6" , new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("6" ); super .write(ctx, msg, promise); } }); } }) .bind(8080 ); } }
执行结果
我们启动 ChannelCloseClient 作为客户端,向服务端发送数据
控制台结果如下
1 2 3 4 5 6 7 8 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 1 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 收到消息: PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 2048) 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 2 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 消息传递: qwe, class: class java.lang.String 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 3 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 6 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 5 10:55:05 [DEBUG] [nioEventLoopGroup-2-2] c.c.n.c.TestPipeline - 4
注意点:
handler 内部节点之间采用的数据结构是双向链表
我们的入站 handler 的处理顺序是 ch1 -> ch2 -> ch3,入站时,handler 从 head 往后调用
我们的出站 handler 的处理顺序是 ch6 -> ch5 -> ch4,出站时,handler 从 tail 往前调用
当我们注释了 ch.writeAndFlush(ctx.alloc().buffer().writeBytes("hello world..".getBytes())); 之后在客户端发送数据,是看不到出站 handler 的日志输出的,这是由于出站 handler 一般处理写操作,而入站 handler 一般处理读操作
super.channelRead(ctx, name); 的作用是给下一个入站 hander 传递数据,如果不写这一行内容,则入站链就断了,下一个入站 handler 接收不到数据
我们在 ch1 中传递的数据是一个 String 类型的数据,而在 ch2 中接收的数据也同样是 String 类型的,这点从控制台输出可以看出
NioSocketChannel#writeAndFlush() 方法是从 tail 往前 调用 hanlder,而 ChannelHandlerContext#writeAndFlush() 方法是从当前 handler 往前 调用hander
EmbeddedChannel EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class TestEmbeddedChannel { public static void main (String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1" ); super .channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2" ); super .channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter () { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("3" ); super .write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter () { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("4" ); super .write(ctx, msg, promise); } }; EmbeddedChannel channel = new EmbeddedChannel (h1, h2, h3, h4); channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello" .getBytes(StandardCharsets.UTF_8))); channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello" .getBytes(StandardCharsets.UTF_8))); } }
测试结果如下
1 2 3 4 11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 1 11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 2 11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 4 11:14:06 [DEBUG] [main] c.c.n.c.TestEmbeddedChannel - 3
通过 EmbeddedChannel#writeInbound() 方法来测试入站操作
通过 EmbeddedChannel#writeOutbound() 方法来测试出站操作
ByteBuf 创建 1 2 3 4 5 6 7 8 9 10 11 12 public class TestByteBuf { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10 ); ByteBufferUtil.log(buffer); StringBuilder sb = new StringBuilder (); for (int i = 0 ; i < 20 ; i++) { sb.append("a" ); } buffer.writeBytes(sb.toString().getBytes()); ByteBufferUtil.log(buffer); } }
运行结果如下
1 2 3 4 5 6 7 8 9 read index:0 write index:0 capacity:10 read index:0 write index:20 capacity:64 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa| |00000010| 61 61 61 61 |aaaa | +--------+-------------------------------------------------+----------------+
ByteBuf 通过 ByteBufAllocator 选择 allocator 并调用对应的 buffer() 方法来创建的,默认使用直接内存 作为 ByteBuf ,容量为256个字节,可以指定初始容量的大小
当 ByteBuf 的容量无法容纳所有数据时, ByteBuf 会进行扩容操作
如果在 handler 中创建 ByteBuf ,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
直接内存 vs 堆内存 通过该方法创建的 ByteBuf ,使用的是基于直接内存 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 );
可以使用下面的代码来创建池化基于堆 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16 );
也可以使用下面的代码来创建池化基于直接内存 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16 );Copy
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化 池化的最大意义在于可以重用 ByteBuf,优点有
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
1 -Dio.netty.allocator.type={unpooled|pooled}
4.1 以后,非 Android 平台默认启用池化实现 ,Android 平台启用非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
验证池化和内存模型
我们采用之前创建 ByteBuf 的方式来查看 ByteBuf
1 2 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10 );System.out.println(buffer.getClass());
控制台输出如下
1 class io.netty.buffer.PooledUnsafeDirectByteBuf
可以看出 buffer 是一个 PooledUnsafeDirectByteBuf 的实例对象
其中 Pooled 代表采用了池化,Direct 代表采用了直接内存
接下来我们换一种创建 ByteBuf 的方式
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
控制台输出如下
1 class io.netty.buffer.PooledUnsafeHeapByteBuf
同理,buffer 采用了池化和堆内存模型
接下来我们为程序添加虚拟机参数 -Dio.netty.allocator.type=unpooled
重新执行以上两种创建 ByteBuf 的方式
得到结果分别如下
1 2 class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf
可以看出采用非池化的方式来创建 ByteBuf
组成 ByteBuf 主要有以下几个组成部分
最大容量与当前容量
在构造 ByteBuf 时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为 Integer.MAX_VALUE
当 ByteBuf 容量无法容纳所有数据时,会进行扩容操作,若超出最大容量 ,会抛出 java.lang.IndexOutOfBoundsException 异常
读写操作不同于 ByteBuffer 只用 position 进行控制,ByteBuf 分别由读指针和写指针两个指针控制进行读写操作时,无需进行模式的切换
读指针前的部分被称为废弃部分,是已经读过的内容
读指针与写指针之间的空间称为可读部分
写指针与当前容量之间的空间称为可写部分
写入 常用方法如下
方法签名
含义
备注
writeBoolean(boolean value)
写入 boolean 值
用一字节 01|00 代表 true|false
writeByte(int value)
写入 byte 值
writeShort(int value)
写入 short 值
writeInt(int value)
写入 int 值
Big Endian(大端写入 ),即 0x250,写入后 00 00 02 50
writeIntLE(int value)
写入 int 值
Little Endian(小端写入 ),即 0x250,写入后 50 02 00 00
writeLong(long value)
写入 long 值
writeChar(int value)
写入 char 值
writeFloat(float value)
写入 float 值
writeDouble(double value)
写入 double 值
writeBytes(ByteBuf src)
写入 netty 的 ByteBuf
writeBytes(byte[] src)
写入 byte[]
writeBytes(ByteBuffer src)
写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)
写入字符串
CharSequence为字符串类的父类,第二个参数为对应的字符集
注意
这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
网络传输中,默认习惯是 Big Endian ,使用 writeInt(int value)
扩容 当 ByteBuf 中的容量无法容纳写入的数据时,会进行扩容操作
扩容规则
如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
如果写入后数据大小超过 512 字节,则选择下一个 2^n
例如写入后大小为 513 字节,则扩容后 capacity 是 2^10=1024 字节(2^9=512 已经不够了)
扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常
实际操作中,我给 ByteBuf 分配了一个大小为16字节的空间,并往其中写入5个整型数据(20字节),但是日志输出扩容后大小为64
这里实际扩容的方式还是存在一些问题,后续看一下
读取 读取主要是通过一系列 read 方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取 ,需要调用 ByteBuf#markReaderIndex() 对读指针进行标记,并通过 ByteBuf#resetReaderIndex() 将读指针恢复到 mark 标记的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ByteBufStudy { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 , 20 ); buffer.writeBytes(new byte []{1 , 2 , 3 , 4 }); buffer.writeInt(5 ); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); ByteBufUtil.log(buffer); buffer.markReaderIndex(); System.out.println(buffer.readInt()); ByteBufUtil.log(buffer); buffer.resetReaderIndex(); ByteBufUtil.log(buffer); } }
结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 1 2 3 4 read index:4 write index:8 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 |.... | +--------+-------------------------------------------------+----------------+ 5 read index:8 write index:8 capacity:16 read index:4 write index:8 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 |.... | +--------+-------------------------------------------------+----------------+
还有以 get 开头的一系列方法,这些方法不会改变读指针的位置
释放 由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放 ,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read() 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传 ,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
tail节点的释放
我们追踪 TailContext#channelRead() 方法,可以看到以下一段代码
1 2 3 4 5 6 7 8 protected void onUnhandledInboundMessage (Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } }
它通过 ReferenceCountUtil 来实现释放 ByteBuf
我们接下看看看 ReferenceCountUtil 的 release() 方法
1 2 3 public static boolean release (Object msg) { return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false ; }
release() 方法通过判断 msg 是否实现了 ReferenceCounted 接口来判断传递的参数是否为 ByteBuf
切片 ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存 ,切片后的 ByteBuf 维护独立的 read,write 指针
得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class TestSlice { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10 ); buffer.writeBytes("abcdefghij" .getBytes()); ByteBufferUtil.log(buffer); ByteBuf f1 = buffer.slice(0 , 5 ); ByteBuf f2 = buffer.slice(5 , 5 ); f1.retain(); f2.retain(); ByteBufferUtil.log(f1); ByteBufferUtil.log(f2); f1.setByte(0 , 'q' ); ByteBufferUtil.log(f1); ByteBufferUtil.log(buffer); } }
控制台输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 read index:0 write index:10 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 66 67 68 69 6a |abcdefghij | +--------+-------------------------------------------------+----------------+ read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 |abcde | +--------+-------------------------------------------------+----------------+ read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 66 67 68 69 6a |fghij | +--------+-------------------------------------------------+----------------+ read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 71 62 63 64 65 |qbcde | +--------+-------------------------------------------------+----------------+ read index:0 write index:10 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 71 62 63 64 65 66 67 68 69 6a |qbcdefghij | +--------+-------------------------------------------------+----------------+
我们修改切片后 f1 中的第一个字母后,原有 buffer 中第一个字符也同样修改了,证明 f1 和 buffer 用的是同一块内存空间,并没有发生数据复制
优势
池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
读写指针分离 ,不需要像 ByteBuffer 一样切换读写模式
可以自动扩容
支持链式调用,使用更流畅
很多地方体现零拷贝,例如
slice、duplicate、CompositeByteBuf
应用 粘包 & 半包 粘包现象
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class HelloServer { static final Logger log = LoggerFactory.getLogger(HelloServer.class); public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (1 ); NioEventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("connected {}" , ctx.channel()); super .channelActive(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { log.debug("disconnect {}" , ctx.channel()); super .channelInactive(ctx); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080 ); log.debug("{} binding..." , channelFuture.channel()); channelFuture.sync(); log.debug("{} bound..." , channelFuture.channel()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); log.debug("stopped" ); } } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class HelloClient { static final Logger log = LoggerFactory.getLogger(HelloClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connected..." ); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); for (int i = 0 ; i < 10 ; i++) { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); } } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1" , 8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
结果分析
服务端控制台输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 16:20:43 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x3a0c5e7b] binding... 16:20:43 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x3a0c5e7b, L:/0:0:0:0:0:0:0:0:8080] bound... 16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] REGISTERED 16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] ACTIVE 16:20:50 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c.HelloServer - connected [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] 16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] READ: 160B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16:20:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9658b218, L:/127.0.0.1:8080 - R:/127.0.0.1:7832] READ COMPLETE
理想情况下,我们通过客户端发送10次16字节字节数组,服务端应该也是接收的10次16字节的数组
而实际上服务端接收的是一个160字节的内容,这就是粘包
半包现象 我们在服务端添加以下代码,用于设置服务端的接收缓冲区
1 serverBootstrap.option(ChannelOption.SO_RCVBUF, 10 );
并通过客户端发送一条30字节的数据
1 2 3 4 5 ByteBuf buffer = ctx.alloc().buffer();for (int i = 0 ; i < 3 ; i++) { buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); } ctx.writeAndFlush(buffer);
服务端输出结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 16:28:06 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x9c9f8460] binding... 16:28:06 [DEBUG] [main] c.c.n.c.HelloServer - [id: 0x9c9f8460, L:/0:0:0:0:0:0:0:0:8080] bound... 16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] REGISTERED 16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] ACTIVE 16:28:08 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c.HelloServer - connected [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] 16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 |.......... | +--------+-------------------------------------------------+----------------+ 16:28:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE 16:28:14 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 0a 0b 0c 0d 0e 0f 00 01 02 03 |.......... | +--------+-------------------------------------------------+----------------+ 16:28:14 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE 16:28:19 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 04 05 06 07 08 09 0a 0b 0c 0d |.......... | +--------+-------------------------------------------------+----------------+ 16:28:19 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE 16:28:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ: 18B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d |................| |00000010| 0e 0f |.. | +--------+-------------------------------------------------+----------------+ 16:28:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3e5c142, L:/127.0.0.1:8080 - R:/127.0.0.1:8336] READ COMPLETE
可以看出由于服务端接收缓冲区大小不足,30字节的数据被分为多次读取
注意
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
即有可能某一个服务端读取的数据是20字节
现象分析 粘包
现象,发送 abc def,接收 abcdef
原因
应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
Nagle 算法:会造成粘包
半包
现象,发送 abcdef,接收 abc def
原因
应用层:接收方 ByteBuf 小于实际发送数据量
滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质是因为 TCP 是流式协议,消息无边界
解决方案 短连接 客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开 。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
粘包问题
客户端代码将建立连接抽象成一个方法,并通过多次调用该方法来实现和服务器的短连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void send () { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connected..." ); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); ctx.channel().close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1" , 8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } }
1 2 3 for (int i = 0 ; i < 3 ; i++) { send(); }
我们查看服务端结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 16:43:27 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler - [id: 0x8daa051e, L:/127.0.0.1:8080 - R:/127.0.0.1:9028] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16:43:27 [DEBUG] [nioEventLoopGroup-3-3] i.n.h.l.LoggingHandler - [id: 0x33e8a665, L:/127.0.0.1:8080 - R:/127.0.0.1:9094] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16:43:27 [DEBUG] [nioEventLoopGroup-3-4] i.n.h.l.LoggingHandler - [id: 0xc075c498, L:/127.0.0.1:8080 - R:/127.0.0.1:9159] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+
可以看出我们确实是按理想情况接收了3次10个字节的数据,即可以解决粘包问题
半包问题
我们在服务端添加以下代码,用于调整 Netty 的缓冲区大小
1 serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator (16 , 16 , 16 ));
并在客户端发送18字节的数据
1 buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 });
服务端输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -4 ] i.n.h.l.LoggingHandler - [id: 0x5b4b1096 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9593 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -4 ] i.n.h.l.LoggingHandler - [id: 0x5b4b1096 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9593 ] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 10 11 |.. | +--------+-------------------------------------------------+----------------+ 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -5 ] i.n.h.l.LoggingHandler - [id: 0xb1e36c6d , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9658 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -5 ] i.n.h.l.LoggingHandler - [id: 0xb1e36c6d , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9658 ] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 10 11 |.. | +--------+-------------------------------------------------+----------------+ 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -6 ] i.n.h.l.LoggingHandler - [id: 0xc2a44645 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9723 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 16 :49 :23 [DEBUG] [nioEventLoopGroup-3 -6 ] i.n.h.l.LoggingHandler - [id: 0xc2a44645 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :9723 ] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 10 11 |.. | +--------+-------------------------------------------------+----------------+
可以看出并没有解决半包问题
定长解码器 客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐 至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分 ,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到 FixedLengthFrameDecoder 对数据进行定长解码 ,具体使用方法如下
客户端代码
在客户端中,我们在这里通过一个 getFixLen() 方法来实现获得定长的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 public static byte [] getFixLen(char ch, int len, int maxLen) { byte [] ans = new byte [maxLen]; for (int i = 0 ; i < maxLen; i++) { if (i < len) ans[i] = (byte ) ch; else ans[i] = '_' ; } StringBuilder sb = new StringBuilder (); for (int i = 0 ; i < ans.length; i++) { sb.append((char ) ans[i]); } System.out.println(sb.toString()); return ans; }
并通过随机生成长度的方式,控制每次客户端写入的数据长度
1 2 3 4 5 6 7 8 9 10 ByteBuf buffer = ctx.alloc().buffer();char ch = 'a' ;Random r = new Random ();for (int i = 0 ; i < 5 ; i++) { byte [] fixLen = getFixLen(ch, r.nextInt(10 ) + 1 , 10 ); buffer.writeBytes(fixLen); ch++; } ctx.writeAndFlush(buffer); ctx.channel().close();
最后在客户端中添加一个日志的 handler
1 ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG));
服务端代码
而在服务端中,我们添加以下 handler ,用于解码定长的数据
1 ch.pipeline().addLast(new FixedLengthFrameDecoder (10 ));
执行结果
客户端输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.HelloClient2 - connected... 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e] REGISTERED 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e] CONNECT: /127.0.0.1:8080 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e, L:/127.0.0.1:10399 - R:/127.0.0.1:8080] ACTIVE 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] c.c.n.c.HelloClient2 - sending... aa________ bbbbbbbbbb cccccccccc dddd______ eeeeeee___ 17:11:51 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x50ece87e, L:/127.0.0.1:10399 - R:/127.0.0.1:8080] WRITE: 50B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 5f 5f 5f 5f 5f 5f 5f 5f 62 62 62 62 62 62 |aa________bbbbbb| |00000010| 62 62 62 62 63 63 63 63 63 63 63 63 63 63 64 64 |bbbbccccccccccdd| |00000020| 64 64 5f 5f 5f 5f 5f 5f 65 65 65 65 65 65 65 5f |dd______eeeeeee_| |00000030| 5f 5f |__ | +--------+-------------------------------------------------+----------------+
服务端输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 17 :11 :51 [DEBUG] [nioEventLoopGroup-3 -3 ] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10399 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 61 5f 5f 5f 5f 5f 5f 5f 5f |aa________ | +--------+-------------------------------------------------+----------------+ 17 :11 :51 [DEBUG] [nioEventLoopGroup-3 -3 ] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10399 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbb | +--------+-------------------------------------------------+----------------+ 17 :11 :51 [DEBUG] [nioEventLoopGroup-3 -3 ] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10399 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 63 63 63 63 63 63 63 63 63 63 |cccccccccc | +--------+-------------------------------------------------+----------------+ 17 :11 :51 [DEBUG] [nioEventLoopGroup-3 -3 ] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10399 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 64 64 64 64 5f 5f 5f 5f 5f 5f |dddd______ | +--------+-------------------------------------------------+----------------+ 17 :11 :51 [DEBUG] [nioEventLoopGroup-3 -3 ] i.n.h.l.LoggingHandler - [id: 0xfeb7a7ec , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10399 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 65 65 65 65 65 65 65 5f 5f 5f |eeeeeee___ | +--------+-------------------------------------------------+----------------+
可以看出虽然我们客户端是以一个50字节的粘包状态发送过去的,但是服务端可以重新将数据拆分为5个10字节的数据
该方法的缺点在于数据包的大小不好把握
长度定的太大,浪费
长度定的太小,对某些数据包又显得不够
行解码器 行解码器的是通过分隔符对数据进行拆分 来解决粘包半包问题的
LineBasedFrameDecoder(int maxLength):通过换行符来拆分数据
DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters):通过自定义分隔符来拆分数据
解码器需要定义最长长度,用于判断数据多久没有接收到分隔符,如果超过最大长度,会抛出 TooLongFrameException 异常
这里我们演示的是 LineBasedFrameDecoder 解码器
客户端代码
在客户端中,我们传入随即长度的字符串来作为数据,并约定最大长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); ByteBuf buffer = ctx.alloc().buffer(); char ch = 'a' ; Random r = new Random (); int maxLength = 64 ; for (int i = 0 ; i < 5 ; i++) { StringBuilder sb = new StringBuilder (); for (int j = 0 ; j < (int )(r.nextInt(maxLength-2 )); j++) { sb.append(ch); } sb.append("\n" ); buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8)); ch++; } ctx.writeAndFlush(buffer); ctx.channel().close(); } });
服务端代码
在服务端中,我们添加 LineBasedFrameDecoder ,并指定最大长度
1 2 3 4 5 6 7 serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LineBasedFrameDecoder (64 )); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); } });
执行结果
分别启动服务端和客户端
客户端控制台输出如下,其中 . 代表的是 \n 换行符
1 2 3 4 5 6 7 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 61 61 61 61 0a 62 0a 0a 64 64 64 64 |aaaaaaaa.b..dddd| |00000010| 64 64 64 64 64 0a 65 65 65 65 65 65 65 65 65 65 |ddddd.eeeeeeeeee| |00000020| 65 65 65 0a |eee. | +--------+-------------------------------------------------+----------------+
服务端输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 61 61 61 61 |aaaaaaaa | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 62 |b | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 64 64 64 64 64 64 64 64 64 |ddddddddd | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 65 65 65 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeeeeee | +--------+-------------------------------------------------+----------------+
缺点也是效率低,需要去遍历内容获取换行符
LTC解码器 在传送数据时可以在数据中添加一个用于表示有用数据长度的字段 ,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
Netty 为我们提供了一个 LengthFieldBasedFrameDecoder 类来实现以上操作
其构造方法中有五个重要参数如下
maxFrameLength:表示数据的最大长度(包括附加信息、长度标识等内容)
lengthFieldOffset:数据长度标识的起始偏移量
lengthFieldLength:数据长度标识所占字节数
lengthAdjustment:长度标识和数据内容之间还有多少长度的其他内容
initialBytesToStrip:数据读取起点
以下例子来自于官方:
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 1 (= the length of HDR1) // 长度的偏移量为1字节 lengthFieldLength = 2 // 长度标识为2字节 lengthAdjustment = 1 (= the length of HDR2) // 长度后还有1字节的其他数据 initialBytesToStrip = 3 (= the length of HDR1 + LEN) // 从第3个字节后开始读取数据 BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+
实际代码测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class TestLengthFieldDecoder { public static void main (String[] args) { EmbeddedChannel channel = new EmbeddedChannel ( new LengthFieldBasedFrameDecoder (1024 , 0 , 4 , 1 , 5 ), new LoggingHandler (LogLevel.DEBUG) ); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); send(buffer, "Hello, world" ); send(buffer, "Game Start" ); channel.writeInbound(buffer); } private static void send (ByteBuf buffer, String content) { byte [] bytes = content.getBytes(); int length = bytes.length; buffer.writeInt(length); buffer.writeByte(0 ); buffer.writeBytes(bytes); } }
控制台输出结果如下
1 2 3 4 5 6 7 8 9 10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 2c 20 77 6f 72 6c 64 |Hello, world | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 47 61 6d 65 20 53 74 61 72 74 |Game Start | +--------+-------------------------------------------------+----------------+
协议设计与解析 网络传输中,我们需要遵循一定的协议来进行内容传输
Redis协议 以 redis 为例,当我们需要传输一条 set name zhangsan 的内容时,需要遵循 redis 的协议来进行传输
首先传输 *3 ,代表有三个字段(set、name、zhangsan),然后传输 $3 ,代表第一个字段长度为3,接着传输 set ,即可以正确解析,后面两个字段同理
我们用实际代码来进行演示
通过 LINE 来进行换行,输出遵循 redis 协议的 set name zhangsan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class TestRedis { public static void main (String[] args) { final byte [] LINE = {13 , 10 }; new Bootstrap ().group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes("*3" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$3" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("set" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$4" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("name" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$8" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("zhangsan" .getBytes()); buffer.writeBytes(LINE); ctx.writeAndFlush(buffer); } }); } }) .connect(new InetSocketAddress ("localhost" , 6379 )); } }
在 redis 中,我们首先通过 get name 来获取结果,然后通过网络传输后,我们再次通过 get name 来获取结果,如下所示
1 2 3 4 127.0.0.1:6379> get name (nil) 127.0.0.1:6379> get name "zhangsan"
且 TestRedis 的控制台输出如下
1 2 3 4 5 6 7 8 9 10 11 12 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.| |00000010| 0a 6e 61 6d 65 0d 0a 24 38 0d 0a 7a 68 61 6e 67 |.name..$8..zhang| |00000020| 73 61 6e 0d 0a |san.. | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 2b 4f 4b 0d 0a |+OK.. | +--------+-------------------------------------------------+----------------+
其中 +OK 代表传输成功
Http协议 HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用 HttpServerCodec 作为服务器端的解码器与编码器,来处理HTTP请求
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Slf4j public class TestHttp { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.channel(NioServerSocketChannel.class); bootstrap.group(boss, worker); bootstrap.childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("====== class : {} =====" , msg.getClass()); super .channelRead(ctx, msg); } }); ch.pipeline().addLast(new SimpleChannelInboundHandler <HttpRequest>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception { log.debug(httpRequest.uri()); DefaultFullHttpResponse response = new DefaultFullHttpResponse (httpRequest.protocolVersion(), HttpResponseStatus.OK); byte [] content = "<h1>hello world</h1>" .getBytes(); response.headers().set(CONTENT_LENGTH, content.length); response.content().writeBytes(content); ctx.writeAndFlush(response); } }); } }); ChannelFuture channelFuture = bootstrap.bind(8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
执行结果
这里列举以下 log.debug() 的结果
1 2 10:40:59 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c6.TestHttp - ====== class : class io.netty.handler.codec.http.DefaultHttpRequest ===== 10:40:59 [DEBUG] [nioEventLoopGroup-3-1] c.c.n.c6.TestHttp - ====== class : class io.netty.handler.codec.http.LastHttpContent$1 =====
可以看出虽然我们只发了一次请求,但是 Netty 会默认解析成两个部分,分别标识的是请求头和请求体
我们可以通过以下方式来分别处理
1 2 3 4 5 if (msg instanceof HttpRequest) { } else if (msg instanceof HttpContent){ }
也可以通过使用 SimpleChannelInboundHandler 来处理具体的内容
此外,浏览器有点笨比,如果你不传入响应的长度,它会一直空转处理响应内容
因此在返回响应时,我们需要写入响应体的内容长度
自定义协议 自定义协议主要有以下几个部分组成
魔数 :用来在第一时间判定接收的数据是否为无效数据包
版本号 :可以支持协议的升级
序列化算法 :消息正文到底采用哪种序列化反序列化方式
如:json、protobuf、hessian、jdk
指令类型 :是登录、注册、单聊、群聊… 跟业务相关
请求序号 :为了双工通信,提供异步能力
正文长度
消息正文
编码器与解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Slf4j public class MessageCodec extends ByteToMessageCodec <Message> { @Override protected void encode (ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(new byte [] {0 , 6 , 1 , 8 }); byteBuf.writeByte(1 ); byteBuf.writeByte(0 ); byteBuf.writeByte(message.getMessageType()); byteBuf.writeInt(message.getSequenceId()); byteBuf.writeByte(0 ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(message); byte [] bytes = bos.toByteArray(); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); } @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int magicNum = byteBuf.readInt(); byte version = byteBuf.readByte(); byte serializedType = byteBuf.readByte(); byte msgType = byteBuf.readByte(); int sequenceId = byteBuf.readInt(); byteBuf.readByte(); int length = byteBuf.readInt(); byte [] content = new byte [length]; byteBuf.readBytes(content, 0 , length); ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (content)); Message message = (Message) ois.readObject(); list.add(message); log.debug("magicNum: {}" , magicNum); log.debug("version: {}" , version); log.debug("serializedType: {}" , serializedType); log.debug("msgType: {}" , msgType); log.debug("sequenceId: {}" , sequenceId); } }
这里我们的编码器和解码器类继承自 ByteToMessageCodec,泛型指定为我们实际的消息类,我们需要实现其 #encode() 和 #decode() 方法
编码器中,将自定义协议写入到 ByteBuf 中,如果需要写入一个对象,则需要将该对象序列化
自定义协议除正文信息外,其余信息长度和最好保证是 2^n,不足需要补齐
解码器中,需要将 ByteBuf 的内容取出写入到 list 中,传递给下一个解码器
编写测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class TestCodec { public static void main (String[] args) throws Exception { EmbeddedChannel channel = new EmbeddedChannel ( new LengthFieldBasedFrameDecoder (1024 , 12 , 4 , 0 , 0 ), new LoggingHandler (LogLevel.DEBUG), new MessageCodec () ); LoginRequestMessage message = new LoginRequestMessage ("zhangsan" , "123" ); channel.writeOutbound(message); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec ().encode(null , message, buffer); channel.writeInbound(buffer); } }
控制台输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................| |00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen| |00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log| |00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage| |00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa| |00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l| |00000060| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 |ang/String;L..us| |00000070| 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1c 63 |ernameq.~..xr..c| |00000080| 6f 6d 2e 63 68 65 6e 78 69 69 69 2e 6d 65 73 73 |om.chenxiii.mess| |00000090| 61 67 65 2e 4d 65 73 73 61 67 65 d6 b9 3e a7 6d |age.Message..>.m| |000000a0| c9 2f 2f 02 00 02 49 00 0b 6d 65 73 73 61 67 65 |.//...I..message| |000000b0| 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 |TypeI..sequenceI| |000000c0| 64 78 70 00 00 00 00 00 00 00 00 74 00 03 31 32 |dxp........t..12| |000000d0| 33 74 00 08 7a 68 61 6e 67 73 61 6e |3t..zhangsan | +--------+-------------------------------------------------+----------------+ 14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - magicNum: 393480 14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - version: 1 14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - serializedType: 0 14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - msgType: 0 14:22:59 [DEBUG] [main] c.c.n.c.MessageCodec - sequenceId: 0
在第一行中,00 06 01 08代表是我们的魔数,接下来的 01 代表的是版本号,00 代表的是序列化方式,00 代表的是指令方式,00 00 00 00 代表的是序列号,00 00 00 cc 代表后面传输内容的长度
粘包和半包
由于我们会在自定义协议中指定传输内容的长度,因此粘包现象不会出现
在这里我们主要讨论半包现象
将以上代码修改如下部分
1 2 3 4 5 6 7 8 9 10 11 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec ().encode(null , message, buffer);ByteBuf s1 = buffer.slice(0 , 100 );ByteBuf s2 = buffer.slice(100 , buffer.readableBytes() - 100 );s1.retain(); channel.writeInbound(s1); channel.writeInbound(s2);
需要注意的是
#writeInbound() 方法会将引用计数器-1,当计数器为0时会释放 ByteBuf 的内存,而 s1,s2 共享 buffer 的内存,若内存释放则两个都会释放,因此需要将计数器 +1
我们可以通过 #readableBytes() 方法来获取可读的内容长度
当我们将 channel.writeInbound(s2); 注释掉时,由于信息长度不完整,控制台不会输出信息
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................| |00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen| |00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log| |00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage| |00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa| |00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l| |00000060| 61 6e 67 2f |ang/ | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e 61 |String;L..userna| |00000010| 6d 65 71 00 7e 00 01 78 72 00 1c 63 6f 6d 2e 63 |meq.~..xr..com.c| |00000020| 68 65 6e 78 69 69 69 2e 6d 65 73 73 61 67 65 2e |henxiii.message.| |00000030| 4d 65 73 73 61 67 65 d6 b9 3e a7 6d c9 2f 2f 02 |Message..>.m.//.| |00000040| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType| |00000050| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.| |00000060| 00 00 00 00 00 00 00 74 00 03 31 32 33 74 00 08 |.......t..123t..| |00000070| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+
可以看出很好的解决了半包的问题
@Sharable 为了提高 handler 的复用,我们通常会将具体的 handler 抽象成一个个的对象来使用
1 2 3 4 5 6 7 LoggingHandler log = new LoggingHandler (LogLevel.DEBUG);LengthFieldBasedFrameDecoder decoder = new LengthFieldBasedFrameDecoder (1024 , 12 , 4 , 0 , 0 );EmbeddedChannel channel = new EmbeddedChannel ( log, decoder, new MessageCodec () );
这样做看起来并没有什么问题,但是考虑以下这种情况
我们的 EventLoop 同时监听多个 channel,而我们的 channel1 发送了一个数据包,由于数据包长度过大,一次不能发送,第一次只发送了半包
由于 channel1 发送的是半包,LengthFieldBasedFrameDecoder 检测出该数据包未完整,不会向下传播
此时,channel2 也发送了一个数据包,但是 LengthFieldBasedFrameDecoder 会将这个数据包和之前未完整的数据包整合,因此数据包发生了错误的传递
为了提高 handler 的复用率,同时又避免出现一些并发问题,Netty 中原生的 handler 中用 @Sharable 注解来标明,该 handler 能否在多个 channel 中共享。
此外,当我们给之前的 MessageCodec 类加上 @Sharable 注解后,启动服务器会有以下报错
ChannelHandler MessageCodec is not allowed to be shared
这是由于我们 MessageCodec 继承于 ByteToMessageCodec,其不可以加 @Sharable 注解,源码如下所示
1 2 3 4 5 protected void ensureNotSharable () { if (this .isSharable()) { throw new IllegalStateException ("ChannelHandler " + this .getClass().getName() + " is not allowed to be shared" ); } }
在这里我们继承 MessageToMessageCodec 并加上 @Sharable 注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Slf4j @ChannelHandler .Sharablepublic class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override protected void encode (ChannelHandlerContext ctx, Message message, List<Object> list) throws Exception { ByteBuf byteBuf = ctx.alloc().buffer(); byteBuf.writeBytes(new byte [] {0 , 6 , 1 , 8 }); byteBuf.writeByte(1 ); byteBuf.writeByte(0 ); byteBuf.writeByte(message.getMessageType()); byteBuf.writeInt(message.getSequenceId()); byteBuf.writeByte(0 ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(message); byte [] bytes = bos.toByteArray(); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); list.add(byteBuf); } @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int magicNum = byteBuf.readInt(); byte version = byteBuf.readByte(); byte serializedType = byteBuf.readByte(); byte msgType = byteBuf.readByte(); int sequenceId = byteBuf.readInt(); byteBuf.readByte(); int length = byteBuf.readInt(); byte [] content = new byte [length]; byteBuf.readBytes(content, 0 , length); ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (content)); Message message = (Message) ois.readObject(); list.add(message); log.debug("magicNum: {}" , magicNum); log.debug("version: {}" , version); log.debug("serializedType: {}" , serializedType); log.debug("msgType: {}" , msgType); log.debug("sequenceId: {}" , sequenceId); } }