Netty 优化 序列化 序列化和反序列化主要用在对消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据
反序列化时,需要将传入的正文数据还原成 Java 对象
目前我们的代码仅支持 Java 自带的序列化和反序列化方式
1 2 3 4 5 6 7 8 9 ByteArrayOutputStream bos = new ByteArrayOutputStream ();ObjectOutputStream oos = new ObjectOutputStream (bos);oos.writeObject(message); byte [] bytes = bos.toByteArray();ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (content));Message message = (Message) ois.readObject();
为了后续能更好的扩展序列化的使用,我们抽象一个 Serializer 接口,专门用于处理序列化的方式
1 2 3 4 5 6 7 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); }
并采用枚举类的方式,来实现不同的序列化方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); enum Algorithm implements Serializer { Java { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bytes)); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("反序列化失败" , e); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(object); return bos.toByteArray(); } catch (IOException e) { throw new RuntimeException ("序列化失败" , e); } } }, Json { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { String json = new String (bytes, StandardCharsets.UTF_8); return new Gson ().fromJson(json, clazz); } @Override public <T> byte [] serialize(T object) { String json = new Gson ().toJson(object); return json.getBytes(StandardCharsets.UTF_8); } } } }
设置 Config 类,通过读取 application.properties 中的 serializer.algorithm 来选择序列化方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError (e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
修改之前 MessageCodecSharable 类中的编码方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf4j @ChannelHandler .Sharablepublic class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override protected void encode (ChannelHandlerContext ctx, Message message, List<Object> list) throws Exception { ByteBuf byteBuf = ctx.alloc().buffer(); byteBuf.writeBytes(new byte [] {0 , 6 , 1 , 8 }); byteBuf.writeByte(1 ); byteBuf.writeByte(Config.getSerializerAlgorithm().ordinal()); byteBuf.writeByte(message.getMessageType()); byteBuf.writeInt(message.getSequenceId()); byteBuf.writeByte(0 ); byte [] bytes = Config.getSerializerAlgorithm().serialize(message); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); list.add(byteBuf); } @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int magicNum = byteBuf.readInt(); byte version = byteBuf.readByte(); byte serializedType = byteBuf.readByte(); byte msgType = byteBuf.readByte(); int sequenceId = byteBuf.readInt(); byteBuf.readByte(); int length = byteBuf.readInt(); byte [] content = new byte [length]; byteBuf.readBytes(content, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializedType]; Class<?> messageClass = Message.getMessageClass(msgType); Object message = algorithm.deserialize(messageClass, content); list.add(message); log.debug("magicNum: {}" , magicNum); log.debug("version: {}" , version); log.debug("serializedType: {}" , serializedType); log.debug("msgType: {}" , msgType); log.debug("sequenceId: {}" , sequenceId); } }
最后我们编写测试类来对编码方式进行测试
1 2 3 4 5 6 7 8 9 10 public class TestSerializer { public static void main (String[] args) { MessageCodecSharable CODEC = new MessageCodecSharable (); LoggingHandler LOGGING = new LoggingHandler (); EmbeddedChannel channel = new EmbeddedChannel (LOGGING, CODEC, LOGGING); LoginRequestMessage message = new LoginRequestMessage ("zhangsan" , "123" ); channel.writeOutbound(message); } }
这里加了两个 LoggingHandler 是为了查看序列化前后的内容
结果如下,通过 Json 序列化的方式传输的对象内容更小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 =================================== Java =================================== 14:37:36 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 220B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 06 01 08 01 00 00 00 00 00 00 00 00 00 00 cc |................| |00000010| ac ed 00 05 73 72 00 28 63 6f 6d 2e 63 68 65 6e |....sr.(com.chen| |00000020| 78 69 69 69 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 |xiii.message.Log| |00000030| 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 |inRequestMessage| |00000040| 31 f3 3a 3f 09 31 1f d2 02 00 02 4c 00 08 70 61 |1.:?.1.....L..pa| |00000050| 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c |sswordt..Ljava/l| |00000060| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 |ang/String;L..us| |00000070| 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1c 63 |ernameq.~..xr..c| |00000080| 6f 6d 2e 63 68 65 6e 78 69 69 69 2e 6d 65 73 73 |om.chenxiii.mess| |00000090| 61 67 65 2e 4d 65 73 73 61 67 65 d6 b9 3e a7 6d |age.Message..>.m| |000000a0| c9 2f 2f 02 00 02 49 00 0b 6d 65 73 73 61 67 65 |.//...I..message| |000000b0| 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 |TypeI..sequenceI| |000000c0| 64 78 70 00 00 00 00 00 00 00 00 74 00 03 31 32 |dxp........t..12| |000000d0| 33 74 00 08 7a 68 61 6e 67 73 61 6e |3t..zhangsan | +--------+-------------------------------------------------+----------------+ =================================== Json =================================== 14:30:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 87B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 06 01 08 01 01 00 00 00 00 00 00 00 00 00 47 |...............G| |00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha| |00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password| |00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |":"123","sequenc| |00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT| |00000050| 79 70 65 22 3a 30 7d |ype":0} | +--------+-------------------------------------------------+----------------+
参数优化 CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 的参数
用在客户端建立连接 时,如果在指定毫秒内无法连接,会抛出 timeout 异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class TestTimeOut { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap () .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler ()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.debug("timeout" ); } finally { group.shutdownGracefully(); } } }
客户端通过 Bootstrap#option() 方法来配置参数,配置参数作用于 SocketChannel
服务器通过 ServerBootstrap 来配置参数,但是对于不同的 Channel 需要选择不同的方法
通过 ServerBootstrap#option() 来配置 ServerSocketChannel 上的参数
通过 ServerBootstrap#childOption() 来配置 SocketChannel 上的参数
源码分析
在 AbstractNioChannel#connect() 中有这么一段代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int connectTimeoutMillis = AbstractNioChannel.this .config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0 ) { AbstractNioChannel.this .connectTimeoutFuture = AbstractNioChannel.this .eventLoop().schedule(new Runnable () { public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; if (connectPromise != null && !connectPromise.isDone() && connectPromise.tryFailure(new ConnectTimeoutException ("connection timed out: " + remoteAddress))) { AbstractNioUnsafe.this .close(AbstractNioUnsafe.this .voidPromise()); } } }, (long )connectTimeoutMillis, TimeUnit.MILLISECONDS); }
SO_BACKLOG
第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列 中
当完成三次握手以后,连接会被放入全连接队列中
服务器处理 Accept 事件是在 TCP 三次握手,也就是建立连接之后 。服务器会从全连接队列中获取连接并进行处理
其中
源码调试
测试代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class TestBacklogServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .option(ChannelOption.SO_BACKLOG, 2 ) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { } }).bind(8080 ); } }
在 NioEventLoop#processSelectedKey() 中有如下代码
1 2 3 if ((readyOps & 17 ) != 0 || readyOps == 0 ) { unsafe.read(); }
其中 readOpts & 17 表示接收到 accept 事件,然后通过 unsafe.read() 从全连接队列中获取客户端连接
我们在unsafe.read() 打上断点,让服务端无法从全连接队列中处理客户端连接
最后,我们打开三个客户端去连接服务端,结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 Caused by: java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
其余参数
SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数 (建议设置到 ServerSocketChannal 上)
该参数用于指定接收方与发送方的滑动窗口大小
ALLOCATOR
属于 SocketChannal 参数
用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存
设置的是 handler 内部分配的 ByteBuf 类型
网络 IO 读取数据时,直接内存比堆内存效率要高,此时 Netty 会采用直接内存
RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 Netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存 ,具体池化还是非池化由 allocator 决定
RPC 简单框架 准备工作 在原有 Message 类上添加 RPC 请求码和 RPC 响应码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public abstract class Message implements Serializable { public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
RPC 消息请求类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
RPC 消息响应类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } }
服务端基础框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
客户端基本框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
通过接口获取对象实例的简单实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap <>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service" )) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError (e); } } public static <T> T getService (Class<T> interfaceClass) { return (T) map.get(interfaceClass); } }
相关配置
1 2 serializer.algorithm =Json cn.itcast.server.service.HelloService =cn.itcast.server.service.HelloServiceImpl
RpcRequestMessageHandler 的设计 重点:通过反射调用方法
当客户端通过网络请求向服务端发送调用方法信息时,服务端需要执行方法并将方法的返回信息传递给客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception { RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(message.getSequenceId()); try { HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(new Exception ("远程调用错误:" + e.getCause().getMessage())); } ctx.writeAndFlush(response); } }
通过以上方法,可以成功实现在客户端远程调用方法
客户端调用测试如下
1 2 3 4 5 6 7 8 9 10 Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel();channel.writeAndFlush(new RpcRequestMessage ( 1 , "com.chenxiniubi.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"张三" } )); channel.closeFuture().sync();
RpcClientManager 的设计 重点:重构客户端代码实现远程调用
重点:通过代理类简化远程调用的过程
从以上测试用例来看,我们远程调用方法过于复杂,而实际情况中,我们希望调用方法更为简洁
1 2 HelloService service = getXxx();service.sayHello("张三" );
接下来我们需要对客户端进行重构
首先,我们后续通过 channel 向服务端发送信息,需要将 channel 抽离出来作为一个对象供后续使用
而在原有的客户端代码上,channel.closeFuture().sync(); 会阻塞在关闭 channel 上,因此需要对此重构,以异步的方式关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private static Channel channel;private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error" , e); } }
为了保证 channel 的唯一性,需要保证采用单例模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static Channel getChannel () { if (channel != null ) { return channel; } synchronized (LOCK) { if (channel != null ) { return channel; } initChannel(); return channel; } }
最后,为了保证用户能简介的调用方法
我们将创建消息对象和发送消息对象交给一个代理类来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static <T> T getProxyInstance (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int id = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage ( id, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); return null ; }); return (T) o; }
此时我们已经可以实现简单的调用远程方法
1 2 3 4 5 public static void main (String[] args) { HelloService service = getProxyInstance(HelloService.class); System.out.println(service.sayHello("张三" )); System.out.println(service.sayHello("李四" )); }
RpcResponseMessageHandler 的设计 重点:通过 Promise 实现线程之间数据的传输
我们在客户端远程调用方法后,服务端会给我们传递一个 response 返回,里面包含我们所需要的信息
而我们通过 RpcResponseMessageHandler 来接收消息,是在 NIO 线程中接收消息,并不是在主线程中接收消息
因此需要还需要将消息传递给主线程,这里采用的是 Promise 的方式
当我们的 RpcClientManager 发送消息后,创建一个空的 Promise,等待返回结果
而 RpcResponseMessageHandler 会在这个空的 Promise 写入内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap <>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null ) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } }
RpcClientManager 代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static <T> T getProxyInstance (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int id = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage ( id, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); DefaultPromise<Object> promise = new DefaultPromise <>(channel.eventLoop()); RpcResponseMessageHandler.PROMISES.put(id, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } }); return (T) o; }
全部代码 RpcServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(LOGGING); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("server error" , e); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
RpcClientManager:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @Slf4j public class RpcClientManager { private static Channel channel; private static final Object LOCK = new Object (); public static void main (String[] args) { HelloService service = getProxyInstance(HelloService.class); System.out.println(service.sayHello("张三" )); System.out.println(service.sayHello("李四" )); } public static <T> T getProxyInstance (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int id = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage ( id, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); DefaultPromise<Object> promise = new DefaultPromise <>(channel.eventLoop()); RpcResponseMessageHandler.PROMISES.put(id, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } }); return (T) o; } public static Channel getChannel () { if (channel != null ) { return channel; } synchronized (LOCK) { if (channel != null ) { return channel; } initChannel(); return channel; } } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error" , e); } } }
RpcRequestMessageHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception { RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(message.getSequenceId()); try { HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(new Exception ("远程调用错误:" + e.getCause().getMessage())); } ctx.writeAndFlush(response); } }
RpcResponseMessageHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap <>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null ) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } }
源码 启动流程 启动流程可分为以下几个部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector selector = Selector.open(); NioServerSocketChannel attachment = new NioServerSocketChannel ();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey selectionKey = serverSocketChannel.register(selector, 0 , attachment);serverSocketChannel.bind(new InetSocketAddress (8080 )); selectionKey.interestOps(SelectionKey.OP_ACCEPT);
获得选择器 Selector ,Netty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器
创建 NioServerSocketChannel,该Channel作为附件 添加到 ServerSocketChannel 中
创建 ServerSocketChannel,将其设置为非阻塞模式,并注册到Selector中,**此时未关注事件,但是添加了附件 **NioServerSocketChannel
绑定端口
通过 interestOps 设置感兴趣的事件
接下来我们看源码
源码入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
创建 NioServerSocketChannel 我们看 dobind() 方法
1 2 3 4 5 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = this .initAndRegister(); ... }
接着看 initAndRegister() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = this .channelFactory.newChannel(); this .init(channel); } catch (Throwable var3) { ... } ... return regFuture; }
接着我们看 ReflectiveChannelFactory#newChannel() 方法,它会调用构造器来初始化 NioServerSocketChannel
1 2 3 4 5 6 7 public T newChannel () { try { return (Channel)this .constructor.newInstance(); } catch (Throwable var2) { throw new ChannelException ("Unable to create Channel from class " + this .constructor.getDeclaringClass(), var2); } }
参考 NioServerSocketChannel 的构造器,至此 NioServerSocketChannel 对象建立完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public NioServerSocketChannel (SelectorProvider provider) { this (newSocket(provider)); } private static java.nio.channels.ServerSocketChannel newSocket (SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException var2) { throw new ChannelException ("Failed to open a server socket." , var2); } } public static ServerSocketChannel open () throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
接着看 ServerBootstrap#init() 方法
会给 NioServerSocketChannel 添加 handler
然而这个 handler 不会马上执行,而是等到 register 后执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void init (Channel channel) { ... p.addLast(new ChannelHandler []{new ChannelInitializer <Channel>() { public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this .config.handler(); if (handler != null ) { pipeline.addLast(new ChannelHandler []{handler}); } ch.eventLoop().execute(new Runnable () { public void run () { pipeline.addLast(new ChannelHandler []{new ServerBootstrap .ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }}); }
register 在 initAndRegister() 方法中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = this .channelFactory.newChannel(); this .init(channel); } catch (Throwable var3) { ... } ChannelFuture regFuture = this .config().group().register(channel); ... return regFuture; }
通过不断追踪方法的执行,可以看到方法来到 AbstractChannel#register() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final void register (EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop.inEventLoop()) { this .register0(promise); } else { try { eventLoop.execute(new Runnable () { public void run () { AbstractUnsafe.this .register0(promise); } }); } catch (Throwable var4) { } } }
我们继续追踪 AbstractUnsafe.this.register0(promise);
1 2 3 4 5 6 private void register0 (ChannelPromise promise) { ... AbstractChannel.this .doRegister(); ... }
接着看 AbstractNioChannel#doRegister() 方法
把 NioServerSocketChannel 作为 ServerSocketChannel 的附件 注册到 selector 上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void doRegister () throws Exception { boolean selected = false ; while (true ) { try { this .selectionKey = this .javaChannel().register(this .eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this .eventLoop().selectNow(); selected = true ; } } }
注册完成后,会执行之前注册在 NioServerSocketChannel 中的 handler
向其中注册一个 handler,作用是在 accept 事件发生后建立连接
以下过程都是在 nio 线程中执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void init (Channel channel) { ... p.addLast(new ChannelHandler []{new ChannelInitializer <Channel>() { public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this .config.handler(); if (handler != null ) { pipeline.addLast(new ChannelHandler []{handler}); } ch.eventLoop().execute(new Runnable () { public void run () { pipeline.addLast(new ChannelHandler []{new ServerBootstrap .ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }}); }
dobind 在初始化完 NioServerSocketChannel 之后,需要绑定端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = this .initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap .PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener () { public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
追踪 dobind0() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public final void bind (SocketAddress localAddress, ChannelPromise promise) { try { AbstractChannel.this .doBind(localAddress); } catch (Throwable var5) { this .safeSetFailure(promise, var5); this .closeIfClosed(); return ; } if (!wasActive && AbstractChannel.this .isActive()) { this .invokeLater(new Runnable () { public void run () { AbstractChannel.this .pipeline.fireChannelActive(); } }); } this .safeSetSuccess(promise); ... }
NioServerSocketChannel 会判断当前 JDK 的版本
this.javaChannel() 返回的是 ServerSocketChannel 对象
1 2 3 4 5 6 7 8 protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { this .javaChannel().bind(localAddress, this .config.getBacklog()); } else { this .javaChannel().socket().bind(localAddress, this .config.getBacklog()); } }
至此端口绑定完成
绑定 accept 事件 我们还是关注 AbstractChannel#bind() 方法
其中 fireChannelActive() 方法会执行 NioServerSocketChannel 上注册的所有 handler 的 active 事件
此时有的 handler 只有 head,acceptor,tail
其中 acceptor 和 tail 的 channelActive() 并没有做什么工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final void bind (SocketAddress localAddress, ChannelPromise promise) { try { AbstractChannel.this .doBind(localAddress); } catch (Throwable var5) { this .safeSetFailure(promise, var5); this .closeIfClosed(); return ; } if (!wasActive && AbstractChannel.this .isActive()) { this .invokeLater(new Runnable () { public void run () { AbstractChannel.this .pipeline.fireChannelActive(); } }); } this .safeSetSuccess(promise); ... }
我们关注 DefaultChannelPipeline#channelActive() 方法
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); this .readIfIsAutoRead(); }
最后我们看到 AbstractNioChannel#doBeginRead() 方法
在这里我们关注了 OP_ACCEPT 事件
1 2 3 4 5 6 7 8 9 10 11 protected void doBeginRead () throws Exception { SelectionKey selectionKey = this .selectionKey; if (selectionKey.isValid()) { this .readPending = true ; int interestOps = selectionKey.interestOps(); if ((interestOps & this .readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | this .readInterestOp); } } }
EventLoop EventLoop 中有三个重要的组成:selector,线程,任务队列
selector的创建 但是我们看源码的时候会发现,NioEventLoop 中有两个 selector
1 2 private Selector selector;private Selector unwrappedSelector;
它们会在 NioEventLoop 的构造方法中初始化
1 2 3 4 5 6 7 8 9 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super (parent, executor, false , newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this .provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider" ); this .selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy" ); NioEventLoop.SelectorTuple selectorTuple = this .openSelector(); this .selector = selectorTuple.selector; this .unwrappedSelector = selectorTuple.unwrappedSelector; }
那么为什么需要两个 selector 呢?
这是由于原有的 selector 是通过哈希表的方式 来遍历 selectedKeys,这样的效率不理想
netty 重新包装了 selector,使其可以通过数组的方式 来遍历 selectedKeys,提高了遍历效率
同样为了保证原有的 selector 的部分功能,并没有将其从 NioEventLoop 中移除
线程的启动 当我们首次调用 execute() 方法时,线程会启动
1 2 3 4 5 6 7 8 public class TestEventLoop { public static void main (String[] args) { EventLoop eventLoop = new NioEventLoopGroup ().next(); eventLoop.execute(() -> { System.out.println(); }); } }
我们查看 execute() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void execute (Runnable task, boolean immediate) { boolean inEventLoop = this .inEventLoop(); this .addTask(task); if (!inEventLoop) { this .startThread(); if (this .isShutdown()) { boolean reject = false ; try { if (this .removeTask(task)) { reject = true ; } } catch (UnsupportedOperationException var6) { } if (reject) { reject(); } } } if (!this .addTaskWakesUp && immediate) { this .wakeup(inEventLoop); } }
接着我们看 startThread() 方法,该方法用于避免线程重复启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void startThread () { if (this .state == 1 && STATE_UPDATER.compareAndSet(this , 1 , 2 )) { boolean success = false ; try { this .doStartThread(); success = true ; } finally { if (!success) { STATE_UPDATER.compareAndSet(this , 2 , 1 ); } } } }
wakeup() 当线程提交任务时,需要从 select 阻塞状态中唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void execute (Runnable task, boolean immediate) { boolean inEventLoop = this .inEventLoop(); this .addTask(task); if (!inEventLoop) { this .startThread(); if (this .isShutdown()) { boolean reject = false ; try { if (this .removeTask(task)) { reject = true ; } } catch (UnsupportedOperationException var6) { } if (reject) { reject(); } } } if (!this .addTaskWakesUp && immediate) { this .wakeup(inEventLoop); } }
wakeup() 方法的具体实现
其余线程提交任务时才会调用 wakeup() 方法
由于 wakeup() 是一个重量级的操作,当多个线程访问时其实只需要执行一次,因此使用 CAS 来进行处理,避免被频繁调用
1 2 3 4 5 protected void wakeup (boolean inEventLoop) { if (!inEventLoop && this .nextWakeupNanos.getAndSet(-1L ) != -1L ) { this .selector.wakeup(); } }
空轮询 jdk 在 linux 情况下会出现空轮询
我们看 NioEventLoop#run() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 protected void run () { int selectCnt = 0 ; if (!this .hasTasks()) { strategy = this .select(curDeadlineNanos); } ++selectCnt; if (!ranTasks && strategy <= 0 ) { if (this .unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0 ; var34 = false ; } else { var34 = false ; } break label791; } } private boolean unexpectedSelectorWakeup (int selectCnt) { if (Thread.interrupted()) { } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}." , selectCnt, this .selector); this .rebuildSelector(); return true ; } else { return false ; } }