Chan Chen Coding...

Netty 4.0 源码分析(三):Channel和ChannelPipeline

Clientserver通过Channel连接,然后通过ByteBuf进行传输。每个Channel有自己的PipelinePipeline上面可以添加和定义HandlerEvent

 

Channel

 1 package io.netty.channel;
 2
 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.MessageBuf;
 4 import io.netty.channel.socket.DatagramChannel;
 5 import io.netty.channel.socket.ServerSocketChannel;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.util.AttributeMap;
 8 import java.net.InetSocketAddress;
 9 import java.net.SocketAddress;
10 import java.nio.channels.SelectionKey;
11 public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFutureFactory, Comparable<Channel> {
12     Integer id();
13     EventLoop eventLoop();
14     Channel parent();
15     ChannelConfig config();
16     ChannelPipeline pipeline();
17     boolean isOpen();
18     boolean isRegistered();
19     boolean isActive();
20     ChannelMetadata metadata();
21     ByteBuf outboundByteBuffer();
22     <T> MessageBuf<T> outboundMessageBuffer();
23     SocketAddress localAddress();
24     SocketAddress remoteAddress();
25     ChannelFuture closeFuture();
26     Unsafe unsafe();
27     interface Unsafe {
28         ChannelHandlerContext directOutboundContext();
29         ChannelFuture voidFuture();
30         SocketAddress localAddress();
31         SocketAddress remoteAddress();
32         void register(EventLoop eventLoop, ChannelFuture future);
33         void bind(SocketAddress localAddress, ChannelFuture future);
34         void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
35         void disconnect(ChannelFuture future);
36         void close(ChannelFuture future);
37         void closeForcibly();
38         void deregister(ChannelFuture future);
39         void flush(ChannelFuture future);
40         void flushNow();
41         void suspendRead();
42         void resumeRead();
43     }
44 }
45 


ChannelUML


Netty 4.0
中,定义了Channel接口,这个接口用于连接网络的socket传输,或者具有I/O操作的组件连接。这里的I/O操作有,read,write,bind,connect

 

Channel接口为用户提供了:

1.   Channel的当前状态,比如:Channel是否open,或者Channel是否已经连接。

2.   Channel的参数,比如:接受的buffer大小。

3.   Channel支持的I/O操作,比如:read,write,connect,bind

4.   注册在Channel上的ChannelPipelineChannelPipeline用于处理所有的I/O事件和请求。

 

Channel类的几个重要方法

ChannelFuture closeFuture();

所有在Netty中的I/O操作都是异步的,这就意味着任何的I/O调用都会立即返回,但是无法保证所有被调用的I/O操作到最后能够成功执行完成。closeFuture() 返回一个ChannelFuture对象, 并且告诉I/O的调用者,这个I/O调用的最后状态是succeeded,failed 或者 canceled

 

void register(EventLoop eventLoop, ChannelFuture future);

Channel中注册EventLoop和对应的ChannelFuture

 

void deregister(ChannelFuture future);

Channel中取消ChannelFuture的注册。

 

 

Channel的层次结构中,Channel子类的实现取决于传输的具体实现。比如SocketChannel,能够被ServerSocketChannel接受,并且SocketChannel中的getParent()方法会返回ServerSocketChannel。开发者可以实现Channel接口,共享Socket连接,比如SSH

 

ChannelPipeLine接口

 1 public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker {
 2     MessageBuf<Object> inboundMessageBuffer();
 3     ByteBuf inboundByteBuffer();
 4     MessageBuf<Object> outboundMessageBuffer();
 5     ByteBuf outboundByteBuffer();
 6     ChannelPipeline addFirst(String name, ChannelHandler handler);
 7     ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
 8     ChannelPipeline addLast(String name, ChannelHandler handler);
 9     ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
10     ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
11     ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
12     ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
13     ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
14     ChannelPipeline addFirst(ChannelHandler handlers);
15     ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler handlers);
16     ChannelPipeline addLast(ChannelHandler handlers);
17     ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler handlers);
18     void remove(ChannelHandler handler);
19     ChannelHandler remove(String name);
20     <T extends ChannelHandler> T remove(Class<T> handlerType);
21     ChannelHandler removeFirst();
22     ChannelHandler removeLast();
23     void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
24     ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
25     <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
26     ChannelHandler first();
27     ChannelHandlerContext firstContext();
28     ChannelHandler last();
29     ChannelHandlerContext lastContext();
30     ChannelHandler get(String name);
31     <T extends ChannelHandler> T get(Class<T> handlerType);
32     ChannelHandlerContext context(ChannelHandler handler);
33     ChannelHandlerContext context(String name);
34     ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
35     Channel channel();
36     List<String> names();
37     Map<String, ChannelHandler> toMap();
38 }

 

 


 

ChannelPipelineUML


ChannelHandler
接口用于处理和拦截Channel接口中的ChannelEventsNetty中的ChannelPipeline概念使用了Intecepting Filter Patter模式来实现,这样做的好处是允许用户可以完全控制Event的处理,并且控制ChannelHandlersChannelPipeline的交互。

 

当一个全新的Channel创建的时候,都必须创建一个ChannelPipeline,并使ChannelChannelPipeline想关联。这种关联关系式永久性的,这意味着,一旦一个ChannelPipeleineChannel关联了,那么这个Channel就在也无法关联其他ChannelPipeline,也无法取消与当前ChannelPipeline的关联。

【官方推荐】使用Channel接口中的pipeleine()方法来创建一个ChannelPipelien,而不要用new去实例一个ChannePipeline

ChannelPipeline pipeline = Channel.pipeline();

 

Pipeline中的事件流


ChannelPipeline中的事件流

图中显示了一个典型的ChannelHandler或者ChannelPipeline对于ChannelEvent的处理流程。ChannelHandler接口有两个子类,分别是ChannelUpstreamHandler(ChannelInboundHandler)ChannelDownstreamHandler(ChannelOutBoundstreamHandler)。这两个之类用于处理每一个ChannelEvent,然后由ChannelHandlerContext.sendUpstream(ChannelEvent)ChannelHandlerContext.sendDownstream(ChannelEvent)将每一个ChannelEvent转发到最近的handler。根据upstreamdownstream的不同,每个Event的处理也会有所不同。

 

如事件流图中的左边显示,Upstream Handlers会从低至上的处理一个Upstream EventInbound的数据有图中底部的Netty Internal I/O threads生成。通过调用InputStream.readByte(byte[])方法,可以从一个远程的服务器读取inbound data。如果一个upstream event达到upstream handler的顶部,那么这个upstream event最终将被丢弃掉。

 

如事件流图中的右边显示,Dpstream Handlers会从低至上的处理一个Upstream EventDownstream Handler会生成和传输outbount数据流,比如一个写操作。当一个Downstream Event达到Downstream Handler的底部,那么与之相关的Channal中的I/O thread对对其进行处理。Channel中的I/Othread会执行真正的操作,例如OutputStream.write(byte[])

 

 

假设我们创建了这么一个ChannelPipeline

ChannelPipelien p = Channel.pipeline();

p.addLast(“1”, new UpstreamHandlerA());

p.addList(“2”, new UpstreamHandlerB());

p.addList(“3”, new DownstreamHandlerA());

p.addList(“4”, new DownstreamHandlerB());

p.addList(“5”, new UpstreamHandlerX());

ChannelPipeline的栈中,upstream的执行顺序是12,而downstream的执行顺序是43

 

生产Pipeline

在一条Pipeline中,一般会有一个或者多个ChannelHandler用于接收I/O事件(read)或者请求I/O操作(writeclose)。一个典型的服务器会有如下的一个ChannelPipeline用于处理不同的ChannelHandler

ChannelPipelien p = Channel.pipeline();

p.addLast(“decoder”, new MyProtocalDecoder());

p.addList(“encoder”, new MyProtocalEncoder());

p.addList(“executor”, new ExectionHandler());

p.addList(“handler”, new MyBusinessLogicHandler());

 

1.   Protocal Decoder – 将二进制数据(如ByteBuf)装换成Java对象。

2.   Protocal Encoder – Java对象装换成二进制数据。

3.   ExecutionHandler – 使用一个线程模型。

4.   BusinessLogicHandler – 执行一个具体的业务逻辑(如数据库访问)

 

由于ChannelPipeline是线程安全的,所以ChannelHandler可以再任何时候从ChannelPipeline中被添加或者删除。例如,可以插入一个Handler用于处理被加密过的敏感数据信息,在处理之后,删除掉这个Handler

 
备注:因为笔者开始写Netty源码分析的时候,Netty 4.0还是处于Alpha阶段,之后的API可能还会有改动,笔者将会及时更改。使用开源已经有好几年的时间了,一直没有时间和精力来具体研究某个开源项目的具体实现,这次是第一次写开源项目的源码分析,如果文中有错误的地方,欢迎读者可以留言指出。对于转载的读者,请注明文章的出处。
希望和广大的开发者/开源爱好者进行交流,欢迎大家的留言和讨论。

 



-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;

posted on 2012-11-25 14:53 Chan Chen 阅读(9145) 评论(0)  编辑  收藏 所属分类: Netty


只有注册用户登录后才能发表评论。


网站导航: