netty3.2.3源码分析--服务器端发送数据分析

Posted on 2010-12-04 14:54 alex_zheng 阅读(1273) 评论(0)  编辑  收藏 所属分类: java
上一篇分析了服务器端读取客户发送的数据,这篇来看服务器端如何发送数据给客户端,服务器往外发送数据是通过downstreamhandler从下到上执行
发送从ChannelFuture future = e.getChannel().write(response)开始执行Channels下的
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future 
= future(channel);
        channel.getPipeline().sendDownstream(
                
new DownstreamMessageEvent(channel, future, message, remoteAddress));
        
return future;
 }

telentpipeline中最下面一个downstreamhandler是stringencoder,最后执行OneToOneEncoder的handleDownstream
public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent evt) 
throws Exception {
        
if (!(evt instanceof MessageEvent)) {
            ctx.sendDownstream(evt);
            
return;
        }

        MessageEvent e 
= (MessageEvent) evt;
        Object originalMessage 
= e.getMessage();
        Object encodedMessage 
= encode(ctx, e.getChannel(), originalMessage);
        
if (originalMessage == encodedMessage) {
            ctx.sendDownstream(evt);
        } 
else if (encodedMessage != null) {
            
//这里写encode数据,DefaultChannelPipeline的sendDownstream
            write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
        }
    }
DefaultChannelPipeline的sendDownstream方法
public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext prev 
= getActualDownstreamContext(this.prev);
            
if (prev == null) {
                
try {
                    
//因为stringencoder是唯一一个downstreamhandler,这里执行NioServerSocketPipelineSink.eventSunk
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } 
catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } 
else {
                DefaultChannelPipeline.
this.sendDownstream(prev, e);
            }
        }
eventSunk方法会执行
private void handleAcceptedSocket(ChannelEvent e) {
        
if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event 
= (ChannelStateEvent) e;
            NioSocketChannel channel 
= (NioSocketChannel) event.getChannel();
            ChannelFuture future 
= event.getFuture();
            ChannelState state 
= event.getState();
            Object value 
= event.getValue();

            
switch (state) {
            
case OPEN:
                
if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                
break;
            
case BOUND:
            
case CONNECTED:
                
if (value == null) {
                    channel.worker.close(channel, future);
                }
                
break;
            
case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                
break;
            }
        } 
else if (e instanceof MessageEvent) {
            MessageEvent event 
= (MessageEvent) e;
            NioSocketChannel channel 
= (NioSocketChannel) event.getChannel();
            
//放入writerequestqueue队列
            boolean offered = channel.writeBuffer.offer(event);
            
assert offered;
            
//执行nioworker的writeFromUserCode,之后执行write0方法
            channel.worker.writeFromUserCode(channel);
        }
    }

private void write0(NioSocketChannel channel) {
        
boolean open = true;
        
boolean addOpWrite = false;
        
boolean removeOpWrite = false;

        
long writtenBytes = 0;

        
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        
final SocketChannel ch = channel.socket;
        
//之前将channel放到了该队列
        final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
        //默认尝试16次写
        
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
        
synchronized (channel.writeLock) {
            channel.inWriteNowLoop 
= true;
            
for (;;) {
                MessageEvent evt 
= channel.currentWriteEvent;
                SendBuffer buf;
                
if (evt == null) {
            
//从writebuffer中获得一个writeevent
                    if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                        removeOpWrite 
= true;
                        channel.writeSuspended 
= false;
                        
break;
                    }
                    
                    channel.currentWriteBuffer 
= buf = sendBufferPool.acquire(evt.getMessage());
                } 
else {
                    buf 
= channel.currentWriteBuffer;
                }

                ChannelFuture future 
= evt.getFuture();
                
try {
                    
long localWrittenBytes = 0;
                    
for (int i = writeSpinCount; i > 0; i --) {
                        
//发送数据给客户端,执行PooledSendBuffer.transferTo
                        localWrittenBytes = buf.transferTo(ch);
                        
if (localWrittenBytes != 0) {
                            writtenBytes 
+= localWrittenBytes;
                            
break;
                        }
                        
if (buf.finished()) {
                            
break;
                        }
                    }

                    
if (buf.finished()) {
                        
// Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent 
= null;
                        channel.currentWriteBuffer 
= null;
                        evt 
= null;
                        buf 
= null;
                        future.setSuccess();
                    } 
else {
                        
// Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;
                        channel.writeSuspended 
= true;

                        
if (localWrittenBytes > 0) {
                            
// Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        
break;
                    }
                } 
catch (AsynchronousCloseException e) {
                    
// Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    buf.release();
                    channel.currentWriteEvent 
= null;
                    channel.currentWriteBuffer 
= null;
                    buf 
= null;
                    evt 
= null;
                    future.setFailure(t);
                    fireExceptionCaught(channel, t);
                    
if (t instanceof IOException) {
                        open 
= false;
                        close(channel, succeededFuture(channel));
                    }
                }
            }
            channel.inWriteNowLoop 
= false;
        }
        
//触发写完成事件,执行的是DefaultChannelPipeline的sendUpstream,最后调用SimpleChannelUpstreamHandler.writeComplete
        
//pipeline中的upstreamhandler的writeComplete都未重写,所以只是简单的传递该事件
        fireWriteComplete(channel, writtenBytes);

        
if (open) {
            
if (addOpWrite) {
                setOpWrite(channel);
            } 
else if (removeOpWrite) {
                clearOpWrite(channel);
            }
        }
    }

只有注册用户登录后才能发表评论。


网站导航:
 

posts - 10, comments - 9, trackbacks - 0, articles - 15

Copyright © alex_zheng