netty3.2.3源码分析--ServerBootstrap启动分析

Posted on 2010-12-01 21:37 alex_zheng 阅读(4374) 评论(0)  编辑  收藏 所属分类: java
这里首先分析下ServerBootstrap的启动过程,在netty中,channel可以看成是socketchannel的抽象
channelpipeline里存放着channelhandler,channelpipeline根据不同的channelevent触发对应的操作
如channel的open,bind,connect等
下面以TelnetServer为例来一步步看server启动
public static void main(String[] args) throws Exception {
        
// Configure the server.
        
// new NioServerSocketChannelFactory中初始化一个NioServerSocketPipelineSink,用来处理downstreamhandler
        ServerBootstrap bootstrap = new ServerBootstrap(
                
new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        
// Set up the event pipeline factory.
        bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());

        
// Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(8080));
    }

直接看serverbootstrap的bind方法
public Channel bind(final SocketAddress localAddress) {
        
if (localAddress == null) {
            
throw new NullPointerException("localAddress");
        }
        
//该队列中只放了一个Binder
        final BlockingQueue<ChannelFuture> futureQueue =
            
new LinkedBlockingQueue<ChannelFuture>();
        
//Binder extends SimpleChannelUpstreamHandler,处理channelOpen
        ChannelHandler binder = new Binder(localAddress, futureQueue);
    
       
//这里parenthandler为null
        ChannelHandler parentHandler = getParentHandler();
         
//初始化DefaultChannelPipeline
         
//在绑定端口前的pipeline里只有一个binder的upstreamhandler
        ChannelPipeline bossPipeline = pipeline();  
        
//这里调用DefaultChannelPipeline的addlast方法,初始化一个DefaultChannelHandlerContext,
        
//handlercontext里面是一个链表结构
       
//该context中只有一个binder      
        bossPipeline.addLast("binder", binder);
        
if (parentHandler != null) {
            bossPipeline.addLast(
"userHandler", parentHandler);
        }
        
//一切从这里开始,getFactory()==NioServerSocketChannelFactory
        Channel channel = getFactory().newChannel(bossPipeline);
    
    }

NioServerSocketChannelFactory.newChannel(ChannelPipeline pipeline)如下
 public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
        
//初始化一个NioServerSocketChannel,pipeline中放的是binder,sink是NioServerSocketPipelineSink
        return new NioServerSocketChannel(this, pipeline, sink);
 }

来看NioServerSocketChannel的构造函数中我们看到这么一句fireChannelOpen(this);引用自Channles
 
public static void fireChannelOpen(Channel channel) {
        
// Notify the parent handler.
        if (channel.getParent() != null) {
            fireChildChannelStateChanged(channel.getParent(), channel);
        }
        
//这里调用DefaultChannelPipeline的sendUpstream方法
        channel.getPipeline().sendUpstream(
                
new UpstreamChannelStateEvent(
                        channel, ChannelState.OPEN, Boolean.TRUE));
    }
DefaultChannelPipeline.sendUpstream(ChannelEvent e)
public void sendUpstream(ChannelEvent e) {
        
//this.head==binder
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        
if (head == null) {
            logger.warn(
                    
"The pipeline contains no upstream handlers; discarding: " + e);
            
return;
        }

        sendUpstream(head, e);
    }

执行
 
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        
try {
        
//ctx.getHandler()==binder-->SimpleChannelUpstreamHandler
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } 
catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }
这里会在SimpleChannelUpstreamHandler.handleUpstream(ctx, e);中调用binder的channelOpen
public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            
try {
                
//设置NioServerSocketChannel的DefaultServerSocketChannelConfig的pipelinetfactory
                
//在之后的线程分发中会去取该factory的pipeline,即TelnetServerPipelineFactory中的pipeline
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
        
            } 
finally {
                
                ctx.sendUpstream(evt);
            }
            
//执行NioServerSocketChannel.bind,最终会调用Channels.bind(Channel channel, SocketAddress localAddress)
            boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
            
assert finished;
        }
Channels.bind方法如下:
 
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
        
if (localAddress == null) {
            
throw new NullPointerException("localAddress");
        }
        ChannelFuture future 
= future(channel);
        
//又调用了DefaultChannelPipeline的senddownstream,对应事件是bound
        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
                channel, future, ChannelState.BOUND, localAddress));
        
return future;
}
DefaultChannelPipeline的senddownstream
 
public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail 
= getActualDownstreamContext(this.tail);
        
if (tail == null) {
            
try {
                getSink().eventSunk(
this, e);
                
return;
            } 
catch (Throwable t) {
                notifyHandlerException(e, t);
                
return;
            }
        }

        sendDownstream(tail, e);
    }
从getActualDownstreamContext返回的是null,所以上面会执行 getSink().eventSunk(this, e);
DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
        
if (ctx == null) {
            
return null;
        }

        DefaultChannelHandlerContext realCtx 
= ctx;

        
//Binder是upstream,这里返回null
        while (!realCtx.canHandleDownstream()) {
            realCtx 
= realCtx.prev;
            
if (realCtx == null) {
                
return null;
            }
        }

        
return realCtx;
    }
sendDownstream将执行 getSink().eventSunk(this, e);
getSink()获得的是NioServerSocketPipelineSink,
public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) 
throws Exception {
        Channel channel 
= e.getChannel();
        
if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);
        } 
else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);
        }
    }

 
private void handleServerSocket(ChannelEvent e) {
        
if (!(e instanceof ChannelStateEvent)) {
            
return;
        }

        ChannelStateEvent event 
= (ChannelStateEvent) e;
        NioServerSocketChannel channel 
=
            (NioServerSocketChannel) event.getChannel();
        ChannelFuture future 
= event.getFuture();
        ChannelState state 
= event.getState();
        Object value 
= event.getValue();
       
//根据new DownstreamChannelStateEvent(channel, future, ChannelState.BOUND, localAddress)
        switch (state) {
        
case OPEN:
            
if (Boolean.FALSE.equals(value)) {
                close(channel, future);
            }
            
break;
        
case BOUND:
            
if (value != null) {
                
//在这里完成socketAddress绑定
                bind(channel, future, (SocketAddress) value);
            } 
else {
                close(channel, future);
            }
            
break;
        }
    }

对应的NioServerSocketPipelineSink.bind方法
private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {

        
boolean bound = false;
        
boolean bossStarted = false;
        
try {
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound 
= true;

            future.setSuccess();

            
//触发channelbound
            fireChannelBound(channel, channel.getLocalAddress());

            Executor bossExecutor 
=
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
            bossExecutor.execute(
                    
new IoWorkerRunnable(
                            
new ThreadRenamingRunnable(
                                    
new Boss(channel),
                                    
"New I/O server boss #" + id +
                                    
" (" + channel + ')')));
            bossStarted 
= true;
        } 
catch (Throwable t) {
            future.setFailure(t);
            fireExceptionCaught(channel, t);
        } 
finally {
            
if (!bossStarted && bound) {
                close(channel, future);
            }
        }
    }
先来看Channels.fireChannelBound方法做了什么
 
public static void fireChannelBound(Channel channel, SocketAddress localAddress) {
        
//channel.getPipeline()的DefaultChannelPipeline中只有一个binder
        
//这里调用SimpleChannelUpstreamHandler的handleUpstream中的hannelBound(ctx, evt);
        
        channel.getPipeline().sendUpstream(
                
new UpstreamChannelStateEvent(
                        channel, ChannelState.BOUND, localAddress));
    }

接着看bind方法
Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
           
//在bossexcutor中创建一个boss线程
           
//在该boss线程中分派新的客户端连接给workerExecutor,workerExecutor的数量为cpu*2
            bossExecutor.execute(
                    
new IoWorkerRunnable(
                            
new ThreadRenamingRunnable(
                                    
new Boss(channel),
                                    
"New I/O server boss #" + id +
                                    
" (" + channel + ')')));         
在new Boss的时候,注册channel的accept事件
Boss(NioServerSocketChannel channel) throws IOException {
            
this.channel = channel;

            selector 
= Selector.open();

            
boolean registered = false;
            
try {
               
                channel.socket.register(selector, SelectionKey.OP_ACCEPT);
                registered 
= true;
            } 
finally {
                
if (!registered) {
                    closeSelector();
                }
            }

            channel.selector 
= selector;
        }

最终调用Boss.run()

public void run() {
            
//获得当前boss线程,mainreactor
            final Thread currentThread = Thread.currentThread();

            channel.shutdownLock.lock();
            
try {
                
for (;;) {
                    
try {
                        
if (selector.select(1000> 0) {
                            selector.selectedKeys().clear();
                        }
                        
//接收新的客户端连接
                        SocketChannel acceptedSocket = channel.socket.accept();
                        
if (acceptedSocket != null) {
                            
//分派当前连接给workerexcutor,即subreactor
                            registerAcceptedChannel(acceptedSocket, currentThread);
                        }
                    }
                }
            } 
finally {
                channel.shutdownLock.unlock();
                closeSelector();
            }
        }

private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
            
try {
                
//这里获得用户的pipleline,那么这个是在哪里设置的呢,在Binder的channelopen方法的第一句
                
// evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
                
//在这之前的pipeline都是defalutchannelpipeline,里面只有一个Binder
                
//在这之后,每一个NioAcceptedSocketChannel的pipeline获得的是TelnetServerPipelineFactory中的pipeline
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
                
//nioworker充当subreactor
                NioWorker worker = nextWorker();
                worker.register(
new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.
this, acceptedSocket,
                        worker, currentThread), 
null);
            } 
        }

这里调用NioWorker.register
void register(NioSocketChannel channel, ChannelFuture future) {

        
boolean server = !(channel instanceof NioClientSocketChannel);
        
//初始化新的task,将当前accept的socketchannel绑定到nioworker的selecortkey的attch
        Runnable registerTask = new RegisterTask(channel, future, server);
        Selector selector;

        
synchronized (startStopLock) {
            
if (!started) {
                
// Open a selector if this worker didn't start yet.
                try {
                    
this.selector = selector = Selector.open();
                } 
catch (Throwable t) {
                    
throw new ChannelException(
                            
"Failed to create a selector.", t);
                }

                
// Start the worker thread with the new Selector.
                String threadName =
                    (server 
? "New I/O server worker #"
                            : 
"New I/O client worker #"+ bossId + '-' + id;

                
boolean success = false;
                
try {
                
//启动一个线程来处理该连接
                    executor.execute(
                            
new IoWorkerRunnable(
                                    
new ThreadRenamingRunnable(this, threadName)));
                    success 
= true;
                } 
finally {
                    
if (!success) {
                        
// Release the Selector if the execution fails.
                        try {
                            selector.close();
                        } 
catch (Throwable t) {
                            logger.warn(
"Failed to close a selector.", t);
                        }
                        
this.selector = selector = null;
                        
// The method will return to the caller at this point.
                    }
                }
            } 
else {
                
// Use the existing selector if this worker has been started.
                selector = this.selector;
            }

            
assert selector != null && selector.isOpen();

            started 
= true;
            
//加入到任务队列
            boolean offered = registerTaskQueue.offer(registerTask);
            
assert offered;
        }

        
if (wakenUp.compareAndSet(falsetrue)) {
            selector.wakeup();
        }
    }
来看registertask的run方法
public void run() {
            SocketAddress localAddress = channel.getLocalAddress();
            SocketAddress remoteAddress = channel.getRemoteAddress();
            if (localAddress == null || remoteAddress == null) {
                if (future != null) {
                    future.setFailure(new ClosedChannelException());
                }
                close(channel, succeededFuture(channel));
                return;
            }

            try {
                if (server) {
                    channel.socket.configureBlocking(false);
                }

                synchronized (channel.interestOpsLock) {
                    //这里注册当前accepted的socketchannel的read事件
                    channel.socket.register(
                            selector, channel.getRawInterestOps(), channel);
                }
                if (future != null) {
                    channel.setConnected();
                    future.setSuccess();
                }
            } catch (IOException e) {
                if (future != null) {
                    future.setFailure(e);
                }
                close(channel, succeededFuture(channel));
                if (!(e instanceof ClosedChannelException)) {
                    throw new ChannelException(
                            "Failed to register a socket to the selector.", e);
                }
            }

            if (!server) {
                if (!((NioClientSocketChannel) channel).boundManually) {
                    fireChannelBound(channel, localAddress);
                }
                fireChannelConnected(channel, remoteAddress);
            }
        }

其中executor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(this, threadName)));
这里的this指向当前的nioworker,调用nioworker.run
public void run() {
        
//当前nioworker
        thread = Thread.currentThread();

        
boolean shutdown = false;
        Selector selector 
= this.selector;
        
for (;;) {
            wakenUp.set(
false);

            
if (CONSTRAINT_LEVEL != 0) {
                selectorGuard.writeLock().lock();
                    
// This empty synchronization block prevents the selector
                    
// from acquiring its lock.
                selectorGuard.writeLock().unlock();
            }

            
try {
                SelectorUtil.select(selector);

                
// 'wakenUp.compareAndSet(false, true)' is always evaluated
                
// before calling 'selector.wakeup()' to reduce the wake-up
                
// overhead. (Selector.wakeup() is an expensive operation.)
                
//
                
// However, there is a race condition in this approach.
                
// The race condition is triggered when 'wakenUp' is set to
                
// true too early.
                
//
                
// 'wakenUp' is set to true too early if:
                
// 1) Selector is waken up between 'wakenUp.set(false)' and
                
//    'selector.select()'. (BAD)
                
// 2) Selector is waken up between 'selector.select()' and
                
//    'if (wakenUp.get()) {  }'. (OK)
                
//
                
// In the first case, 'wakenUp' is set to true and the
                
// following 'selector.select()' will wake up immediately.
                
// Until 'wakenUp' is set to false again in the next round,
                
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                
// any attempt to wake up the Selector will fail, too, causing
                
// the following 'selector.select()' call to block
                
// unnecessarily.
                
//
                
// To fix this problem, we wake up the selector again if wakenUp
                
// is true immediately after selector.select().
                
// It is inefficient in that it wakes up the selector for both
                
// the first case (BAD - wake-up required) and the second case
                
// (OK - no wake-up required).

                
if (wakenUp.get()) {
                    selector.wakeup();
                }

                cancelledKeys 
= 0;
                processRegisterTaskQueue();
                processWriteTaskQueue();
                processSelectedKeys(selector.selectedKeys());

                
// Exit the loop when there's nothing to handle.
                
// The shutdown flag is used to delay the shutdown of this
                
// loop to avoid excessive Selector creation when
                
// connections are registered in a one-by-one manner instead of
                
// concurrent manner.
                if (selector.keys().isEmpty()) {
                    
if (shutdown ||
                        executor 
instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {

                        
synchronized (startStopLock) {
                            
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                                started 
= false;
                                
try {
                                    selector.close();
                                } 
catch (IOException e) {
                                    logger.warn(
                                            
"Failed to close a selector.", e);
                                } 
finally {
                                    
this.selector = null;
                                }
                                
break;
                            } 
else {
                                shutdown 
= false;
                            }
                        }
                    } 
else {
                        
// Give one more second.
                        shutdown = true;
                    }
                } 
else {
                    shutdown 
= false;
                }
            } 
catch (Throwable t) {
                logger.warn(
                        
"Unexpected exception in the selector loop.", t);

                
// Prevent possible consecutive immediate failures that lead to
                
// excessive CPU consumption.
                try {
                    Thread.sleep(
1000);
                } 
catch (InterruptedException e) {
                    
// Ignore.
                }
            }
        }
    }


来看processSelectedKeys(selector.selectedKeys());
 private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
        
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k 
= i.next();
            i.remove();
            
try {
                
int readyOps = k.readyOps();
                
//可读,处理downstream
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    
if (!read(k)) {
                        
// Connection already closed - no need to handle write.
                        continue;
                    }
                }
                
//可写,处理upstream
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);
                }
            } 
catch (CancelledKeyException e) {
                close(k);
            }

            
if (cleanUpCancelledKeys()) {
                
break// break the loop to avoid ConcurrentModificationException
            }
        }
    }
从这个过程来看,在netty中,boss线程用来侦听socket的连接,然后分派该连接给nioworker,在nioworker中有读和写的任务注册线程池,nioworker线程负责从这些线程中获取任务进行读写操作

只有注册用户登录后才能发表评论。


网站导航:
 

posts - 10, comments - 9, trackbacks - 0, articles - 15

Copyright © alex_zheng