I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
-

1.源码包结构


2.测试线程

     1.启动mina-exampleechoserver#Main

     2.启动jconsole查看线程:

         1.NioSocketAcceptor


       RunnableAbstractPollingIoAcceptor$Acceptor

        
3.启动com.game.landon.entrance.EchoClient,连接已启动的echoserver.

         ->此时查看线程->多了一个
             2.NioProcessor        

        

     RunnableAbstractPollingIoProcessor$Processor

    4.两个线程池来源:
          1.AbstractIoService(IoSessionConfig sessionConfig,Executor executor)
               该构造函数中判断
               if(executor == null){
                    this.executor = Executors.newCachedThreadPool();
               }
   详见源码.
      
           2.SimpleIoProcessorPool
                
其构造函数判断同上

3.内部基本流程
     1.SocketAcceptor acceptor = new NioSocketAcceptor();

          1.初始化NioSocketAcceptor线程池 {@link AbstractIoService }
          2.初始化NioProcessor线程池  {@link SimpleIoProcessorPool }
          3. NioSocketAcceptor#init
            初始化Selector:selector = Selector.open();

     2.acceptor.bind(new InetSocketAddress(PORT));
          1.AbstractPollingIoAcceptor#bindInternal
               #startupAcceptor
                >NioSocketAcceptor线程池执行Acceptor这个任务.

    //1.startupAcceptor启动的时候也会判断是否已存在Acceptor这个任务,如果不存在才会创建.
    
// 2.Accetpor这个任务结束条件:所有bind的端口unbind->也会将Acceptor引用置为null.
    
// 3.每个NioSocketAcceptor只有一个selector/且只对应一个Acceptor任务,即只有一个Acceptor线程.所以我们可以说Acceptor就是单线程的.(即便是一个CachedThreadPool)


    3.Acceptor#run
          1.while(selectable)
               1.selector.select

               2.#registerHandles
                   ->NioSocketAcceptor#open
                   设置socket选项并向selector注册OP_ACCEPT事件

               3.#processHandles
                    1.NioSocketAcceptor#accept->返回NioSocketSession
                    2.SimpleIoProcessorPool#add
                         1.根据sessionId将session与pool中的某个NioProcessor绑定 {@link SimpleIoProcessorPool#getProcessor}
                         2.AbstractPollingIoProcessor#add
                         3. AbstractPollingIoProcessor#startupProcessor
                              ->NioProcessor线程池执行Processor这个任务

    // 从这段代码看出:
       
//1.Processor这个任务只会创建一次.即每一个NioProcessor对象最多拥有一个Processor
       
// 2.每个NioProcessor只会向线程池提交一次Processor任务.而Processor任务是一个无限循环的任务.
       
// 也可以说,每个Processor就占据着线程池的一个线程.->即每个NioProcessor对象对应线程池中的一个线程
       
// 3.而session是与某个固定的NioProcessor绑定的(取模)->也就是说每个session的处理都是单线程的.(均在NioProcessor的唯一Processor线程执行)
       
// 4.public NioSocketAcceptor(int processorCount)构造中可指定processor的数目,其实最终是指定CacheThreadPool中多少用于prossor的线程数目.
        
//(默认:private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1)
         // 5.每个NioProcessor中只有一个selector.
        private void startupProcessor() {
            Processor processor = processorRef.get();

           
if (processor == null) {
                processor = new Processor();

               
if (processorRef.compareAndSet(null, processor)) {
                executor.execute(new NamePreservingRunnable(processor, threadName));
                }

            }


           
// Just stop the select() and start it again, so that the processor
           
// can be activated immediately.
            wakeup();
            }

 

/**
     * This private class is used to accept incoming connection from
     * clients. It's an infinite loop, which can be stopped when all
     * the registered handles have been removed (unbound).
     
*/

    
private class Acceptor implements Runnable
    
{
        
public void run()
        
{
            
int nHandles = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            
// Release the lock
            lock.release();

            
while ( selectable )
            
{
                
try
                
{
                    
int selected = select( SELECT_TIMEOUT );

                    nHandles += registerHandles();

                    
if ( nHandles == 0 )
                    
{
                        
try
                        
{
                            lock.acquire();

                            
if ( registerQueue.isEmpty() && cancelQueue.isEmpty() )
                            
{
                                acceptor = null;
                                
break;
                            }

                        }

                        
finally
                        
{
                            lock.release();
                        }

                    }


                    
if ( selected > 0 )
                    
{
                        processReadySessions( selectedHandles() );
                    }


                    
long currentTime = System.currentTimeMillis();
                    flushSessions( currentTime );
                    nHandles -= unregisterHandles();

                    notifyIdleSessions( currentTime );
                }

                
catch ( ClosedSelectorException cse )
                
{
                    
// If the selector has been closed, we can exit the loop
                    break;
                }

                
catch ( Exception e )
                
{
                    ExceptionMonitor.getInstance().exceptionCaught( e );

                    
try
                    
{
                        Thread.sleep( 1000 );
                    }

                    
catch ( InterruptedException e1 )
                    
{
                    }

                }

            }


            
if ( selectable && isDisposing() )
            
{
                selectable = false;
                
try
                
{
                    destroy();
                }

                
catch ( Exception e )
                
{
                    ExceptionMonitor.getInstance().exceptionCaught( e );
                }

                
finally
                
{
                    disposalFuture.setValue( true );
                }

            }

        }

    }

  
    2.Processor#run
       1.for(;;)
            1.select(SELECT_TIMEOUT)
            2.#handleNewSessions
                 1. AbstractPollingIoProcessor#addNow

                 2. AbstractPollingIoProcessor#init
                      NioProcessor#init->session.getChannel向selector注册OP_READ事件

                3.#updateTrafficmask

                4.#process
                     //process reads/process writes

                5.#flush
                6.#nofifyIdleSessions

/**
     * The main loop. This is the place in charge to poll the Selector, and to
     * process the active sessions. It's done in
     * - handle the newly created sessions
     * -
    
*/

   
private class Processor implements Runnable {
       
public void run() {
           
assert (processorRef.get() == this);

           
int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

           
for (;;) {
               
try {
                   
// This select has a timeout so that we can manage
                   
// idle session when we get out of the select every
                   
// second. (note : this is a hack to avoid creating
                   
// a dedicated thread).
                    long t0 = System.currentTimeMillis();
                   
int selected = select(SELECT_TIMEOUT);
                   
long t1 = System.currentTimeMillis();
                   
long delta = (t1 - t0);

                   
if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                       
// Last chance : the select() may have been
                       
// interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");

                           
// we can reselect immediately
                           
// set back the flag to false
                            wakeupCalled.getAndSet(false);

                           
continue;
                        }
else {
                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                           
// Ok, we are hit by the nasty epoll
                           
// spinning.
                           
// Basically, there is a race condition
                           
// which causes a closing file descriptor not to be
                           
// considered as available as a selected channel, but
                           
// it stopped the select. The next time we will
                           
// call select(), it will exit immediately for the same
                           
// reason, and do so forever, consuming 100%
                           
// CPU.
                           
// We have to destroy the selector, and
                           
// register all the socket on a new one.
                            registerNewSelector();
                        }


                       
// Set back the flag to false
                        wakeupCalled.getAndSet(false);

                       
// and continue the loop
                        continue;
                    }


                   
// Manage newly created session first
                    nSessions += handleNewSessions();

                    updateTrafficMask();

                   
// Now, if we have had some incoming or outgoing events,
                   
// deal with them
                    if (selected > 0) {
                       
//LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                        process();
                    }


                   
// Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);

                   
// And manage removed sessions
                    nSessions -= removeSessions();

                   
// Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);

                   
// Get a chance to exit the infinite loop if there are no
                   
// more sessions on this Processor
                    if (nSessions == 0) {
                        processorRef.set(null);

                       
if (newSessions.isEmpty() && isSelectorEmpty()) {
                           
// newSessions.add() precedes startupProcessor
                            assert (processorRef.get() != this);
                           
break;
                        }


                       
assert (processorRef.get() != this);

                       
if (!processorRef.compareAndSet(null, this)) {
                           
// startupProcessor won race, so must exit processor
                            assert (processorRef.get() != this);
                           
break;
                        }


                       
assert (processorRef.get() == this);
                    }


                   
// Disconnect all sessions immediately if disposal has been
                   
// requested so that we exit this loop eventually.
                    if (isDisposing()) {
                       
for (Iterator<S> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }


                        wakeup();
                    }

                }
catch (ClosedSelectorException cse) {
                   
// If the selector has been closed, we can exit the loop
                    break;
                }
catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);

                   
try {
                        Thread.sleep(1000);
                    }
catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }

                }

            }


           
try {
               
synchronized (disposalLock) {
                   
if (disposing) {
                        doDispose();
                    }

                }

            }
catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);
            }
finally {
                disposalFuture.setValue(
true);
            }

        }

    }

 

 
4.相关类图关系

    1.IoService关系类图

    2.IoProcessor关系类图
    



    




    3.IoSession关系类图 
        


 5.总结:
    本篇只是引入篇,着重介绍了mina2内部的两个acceptor线程池和processor线程池.关于nio相关请看我之前的文章.

http://www.blogjava.net/landon/archive/2013/08/16/402947.html


 

posted on 2013-11-18 17:24 landon 阅读(2042) 评论(2)  编辑  收藏 所属分类: Sources

FeedBack:
# re: apache-mina-2.07源码笔记1-初步
2013-11-18 18:41 | foo
你这笔记可读性也太差了  回复  更多评论
  
# re: apache-mina-2.07源码笔记1-初步[未登录]
2013-11-19 14:26 | landon
确实有点.不过自己明白即可.哈哈.@foo
  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: