Client和server通过Channel连接,然后通过ByteBuf进行传输。每个Channel有自己的Pipeline,Pipeline上面可以添加和定义Handler和Event。
Channel类
1 package io.netty.channel;
2 import io.netty.buffer.ByteBuf;
3 import io.netty.buffer.MessageBuf;
4 import io.netty.channel.socket.DatagramChannel;
5 import io.netty.channel.socket.ServerSocketChannel;
6 import io.netty.channel.socket.SocketChannel;
7 import io.netty.util.AttributeMap;
8 import java.net.InetSocketAddress;
9 import java.net.SocketAddress;
10 import java.nio.channels.SelectionKey;
11 public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFutureFactory, Comparable<Channel> {
12 Integer id();
13 EventLoop eventLoop();
14 Channel parent();
15 ChannelConfig config();
16 ChannelPipeline pipeline();
17 boolean isOpen();
18 boolean isRegistered();
19 boolean isActive();
20 ChannelMetadata metadata();
21 ByteBuf outboundByteBuffer();
22 <T> MessageBuf<T> outboundMessageBuffer();
23 SocketAddress localAddress();
24 SocketAddress remoteAddress();
25 ChannelFuture closeFuture();
26 Unsafe unsafe();
27 interface Unsafe {
28 ChannelHandlerContext directOutboundContext();
29 ChannelFuture voidFuture();
30 SocketAddress localAddress();
31 SocketAddress remoteAddress();
32 void register(EventLoop eventLoop, ChannelFuture future);
33 void bind(SocketAddress localAddress, ChannelFuture future);
34 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
35 void disconnect(ChannelFuture future);
36 void close(ChannelFuture future);
37 void closeForcibly();
38 void deregister(ChannelFuture future);
39 void flush(ChannelFuture future);
40 void flushNow();
41 void suspendRead();
42 void resumeRead();
43 }
44 }
45
Channel类UML图
Netty 4.0中,定义了Channel接口,这个接口用于连接网络的socket传输,或者具有I/O操作的组件连接。这里的I/O操作有,read,write,bind,connect
Channel接口为用户提供了:
1. Channel的当前状态,比如:Channel是否open,或者Channel是否已经连接。
2. Channel的参数,比如:接受的buffer大小。
3. Channel支持的I/O操作,比如:read,write,connect,bind。
4. 注册在Channel上的ChannelPipeline,ChannelPipeline用于处理所有的I/O事件和请求。
Channel类的几个重要方法
ChannelFuture closeFuture();
所有在Netty中的I/O操作都是异步的,这就意味着任何的I/O调用都会立即返回,但是无法保证所有被调用的I/O操作到最后能够成功执行完成。closeFuture() 返回一个ChannelFuture对象, 并且告诉I/O的调用者,这个I/O调用的最后状态是succeeded,failed 或者 canceled。
void register(EventLoop eventLoop, ChannelFuture future);
在Channel中注册EventLoop和对应的ChannelFuture。
void deregister(ChannelFuture future);
在Channel中取消ChannelFuture的注册。
在Channel的层次结构中,Channel子类的实现取决于传输的具体实现。比如SocketChannel,能够被ServerSocketChannel接受,并且SocketChannel中的getParent()方法会返回ServerSocketChannel。开发者可以实现Channel接口,共享Socket连接,比如SSH。
ChannelPipeLine接口
1 public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker {
2 MessageBuf<Object> inboundMessageBuffer();
3 ByteBuf inboundByteBuffer();
4 MessageBuf<Object> outboundMessageBuffer();
5 ByteBuf outboundByteBuffer();
6 ChannelPipeline addFirst(String name, ChannelHandler handler);
7 ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
8 ChannelPipeline addLast(String name, ChannelHandler handler);
9 ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
10 ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
11 ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
12 ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
13 ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
14 ChannelPipeline addFirst(ChannelHandler
handlers);
15 ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler
handlers);
16 ChannelPipeline addLast(ChannelHandler
handlers);
17 ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler
handlers);
18 void remove(ChannelHandler handler);
19 ChannelHandler remove(String name);
20 <T
extends ChannelHandler> T remove(Class<T> handlerType);
21 ChannelHandler removeFirst();
22 ChannelHandler removeLast();
23 void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
24 ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
25 <T
extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
26 ChannelHandler first();
27 ChannelHandlerContext firstContext();
28 ChannelHandler last();
29 ChannelHandlerContext lastContext();
30 ChannelHandler get(String name);
31 <T
extends ChannelHandler> T get(Class<T> handlerType);
32 ChannelHandlerContext context(ChannelHandler handler);
33 ChannelHandlerContext context(String name);
34 ChannelHandlerContext context(Class<?
extends ChannelHandler> handlerType);
35 Channel channel();
36 List<String> names();
37 Map<String, ChannelHandler> toMap();
38 }
ChannelPipeline类UML图
ChannelHandler接口用于处理和拦截Channel接口中的ChannelEvents。Netty中的ChannelPipeline概念使用了Intecepting Filter Patter模式来实现,这样做的好处是允许用户可以完全控制Event的处理,并且控制ChannelHandlers在ChannelPipeline的交互。
当一个全新的Channel创建的时候,都必须创建一个ChannelPipeline,并使Channel和ChannelPipeline想关联。这种关联关系式永久性的,这意味着,一旦一个ChannelPipeleine和Channel关联了,那么这个Channel就在也无法关联其他ChannelPipeline,也无法取消与当前ChannelPipeline的关联。
【官方推荐】使用Channel接口中的pipeleine()方法来创建一个ChannelPipelien,而不要用new去实例一个ChannePipeline类
ChannelPipeline pipeline = Channel.pipeline();
Pipeline中的事件流
ChannelPipeline中的事件流
图中显示了一个典型的ChannelHandler或者ChannelPipeline对于ChannelEvent的处理流程。ChannelHandler接口有两个子类,分别是ChannelUpstreamHandler(ChannelInboundHandler)和ChannelDownstreamHandler(ChannelOutBoundstreamHandler)。这两个之类用于处理每一个ChannelEvent,然后由ChannelHandlerContext.sendUpstream(ChannelEvent)和ChannelHandlerContext.sendDownstream(ChannelEvent)将每一个ChannelEvent转发到最近的handler。根据upstream和downstream的不同,每个Event的处理也会有所不同。
如事件流图中的左边显示,Upstream Handlers会从低至上的处理一个Upstream Event。Inbound的数据有图中底部的Netty Internal I/O threads生成。通过调用InputStream.readByte(byte[])方法,可以从一个远程的服务器读取inbound data。如果一个upstream event达到upstream handler的顶部,那么这个upstream event最终将被丢弃掉。
如事件流图中的右边显示,Dpstream Handlers会从低至上的处理一个Upstream Event。Downstream Handler会生成和传输outbount数据流,比如一个写操作。当一个Downstream Event达到Downstream Handler的底部,那么与之相关的Channal中的I/O thread对对其进行处理。Channel中的I/Othread会执行真正的操作,例如OutputStream.write(byte[])。
假设我们创建了这么一个ChannelPipeline,
ChannelPipelien p = Channel.pipeline();
p.addLast(“1”, new UpstreamHandlerA());
p.addList(“2”, new UpstreamHandlerB());
p.addList(“3”, new DownstreamHandlerA());
p.addList(“4”, new DownstreamHandlerB());
p.addList(“5”, new UpstreamHandlerX());
在ChannelPipeline的栈中,upstream的执行顺序是1,2,而downstream的执行顺序是4,3。
生产Pipeline
在一条Pipeline中,一般会有一个或者多个ChannelHandler用于接收I/O事件(read)或者请求I/O操作(write,close)。一个典型的服务器会有如下的一个ChannelPipeline用于处理不同的ChannelHandler。
ChannelPipelien p = Channel.pipeline();
p.addLast(“decoder”, new MyProtocalDecoder());
p.addList(“encoder”, new MyProtocalEncoder());
p.addList(“executor”, new ExectionHandler());
p.addList(“handler”, new MyBusinessLogicHandler());
1. Protocal Decoder – 将二进制数据(如ByteBuf)装换成Java对象。
2. Protocal Encoder – 将Java对象装换成二进制数据。
3. ExecutionHandler – 使用一个线程模型。
4. BusinessLogicHandler – 执行一个具体的业务逻辑(如数据库访问)
由于ChannelPipeline是线程安全的,所以ChannelHandler可以再任何时候从ChannelPipeline中被添加或者删除。例如,可以插入一个Handler用于处理被加密过的敏感数据信息,在处理之后,删除掉这个Handler。
备注:因为笔者开始写Netty源码分析的时候,Netty 4.0还是处于Alpha阶段,之后的API可能还会有改动,笔者将会及时更改。使用开源已经有好几年的时间了,一直没有时间和精力来具体研究某个开源项目的具体实现,这次是第一次写开源项目的源码分析,如果文中有错误的地方,欢迎读者可以留言指出。对于转载的读者,请注明文章的出处。
希望和广大的开发者/开源爱好者进行交流,欢迎大家的留言和讨论。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;