I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0

 1.IoHandler#sessionCreated/#sessionOpened区别
      1.sessionCreated:Invoked from an I/O processor thread when a new connection has been created.Because this method is supposed to be called from the same thread that handles I/O of multiple sessions, please implement this method to perform tasks that consumes minimal amount of time such as socket parameter and user-defined session attribute initialization.
     该方法是在I/O processor线程触发.->该方法尽量要执行耗时短的操作.
      2.sessionOpened:Invoked when a connection has been opened.  This method is invoked after {@link #sessionCreated(IoSession)}.  The biggest difference from {@link #sessionCreated(IoSession)} is that it's invoked from other thread than an I/O processor thread once thread model is configured properly.
     该方法是在sessionCreated后触发.和前者的区别在于一旦配置了合适的线程模型(ExecutorFilter)则该方法不是在I/O processor线程执行.(反之的意思是如果没有配置线程模型,则该方法也是在I/O processor执行).
 {@link ExecutorFilter#sessionOpened}

     landon:1.如果没有配置线程模型则所有IoHandler的处理都是在某个特定的processor线程执行的->也就是单线程执行的.->所以sessionCreated这里的逻辑耗时很长的话则严重影响NioProcesor其他链接成功的session.
             2.同理本人认为如果没有配置线程模型的话,则IoHandler的其他逻辑如果耗时长的话也会影响到其他session的处理.
             3.测试:在IoHandler的逻辑代码出直接sleep(30s)->看是否有影响其他session.

2.ExecutorFilter源码

    

// 继承IoFilterAdapter,而IoFilterAdapter实现了IoFilter接口
    
// 因此可添加至FilterChain
    
// 其核心在于如果某事件出现在了该filter的io事件监听列表之内则直接在内部的线程池内执行,而不是processor线程
    public class ExecutorFilter extends IoFilterAdapter {
    
// 处理的事件类型列表
    
// IoEventType包括SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE
    private EnumSet<IoEventType> eventTypes;

    
/** 关联的线程池 */
    
private Executor executor;

    
/** 线程池是否由filter管理声明周期(该值为true的时候会在destroy时关闭) */
    
private boolean manageableExecutor;

    
/** 默认的线程池max_pool_size */
    
private static final int DEFAULT_MAX_POOL_SIZE = 16;

    
/** 线程池启动时的线程数目 */
    
private static final int BASE_THREAD_NUMBER = 0;

    
/** 空闲线程的keepalive_time,秒 */
    
private static final long DEFAULT_KEEPALIVE_TIME = 30;

    
/** 
     * 线程池被管理的默认值true
     **/

    
private static final boolean MANAGEABLE_EXECUTOR = true;

    
// 线程池不被管理的默认值 false
    private static final boolean NOT_MANAGEABLE_EXECUTOR = false;

    
/** 线程池处理的默认事件列表,注意没有SESSION_CREATED */
    
private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
            IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
            IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED }
;

    
/**
     * 
     * 实例创建,默认的线程池为OrderedThreadPoolExecutor(以下默认的均为OrderedThreadPoolExecutor),初始线程为0
     * 最大线程数目为16,空闲线程keepalive_time为30s.
     * 线程工厂为默认的线程工厂
     *
     
*/

    
public ExecutorFilter() {
        
// 创建默认的线程池
        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// 初始化该filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 
     * 指定线程池最大线程数目
     * 
     
*/

    
public ExecutorFilter(int maximumPoolSize) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 指定线程池初始线程大小和最大线程数目
     * 
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 指定初始线程数目,最大线程数目以及空闲keepaliveTime
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 
     * 指定初始线程数目/最大线程数目/空闲keepAliveTime/事件队列处理器
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            IoEventQueueHandler queueHandler) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                Executors.defaultThreadFactory(), queueHandler);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 
     * 指定初始线程数目/最大线程数目/空闲keepAliveTime/线程工厂
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
                
null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 
     * 指定初始线程数目/最大线程数目/空闲keepAliveTime/线程工厂/事件队列处理器
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
        
// Create a new default Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                threadFactory, queueHandler);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR);
    }


    
/**
     * 指定监听的io事件列表
     
*/

    
public ExecutorFilter(IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程最大数目/监听的io事件列表
     
*/

    
public ExecutorFilter(int maximumPoolSize, IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程初始数目/最大数目/监听的io事件列表
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程初始数目/最大数目/空闲keepAliveTime/监听的io事件列表
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                Executors.defaultThreadFactory(), null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程初始数目/最大数目/空闲keepAliveTime/事件队列处理器/监听的io事件列表
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            IoEventQueueHandler queueHandler, IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                Executors.defaultThreadFactory(), queueHandler);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程初始数目/最大数目/空闲keepAliveTime/线程工厂/监听的io事件列表
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory, IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
                
null);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 指定线程池线程初始数目/最大数目/空闲keepAliveTime/线程工厂/事件队列处理器/监听的io事件列表
     
*/

    
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType eventTypes) {
        
// Create a new default Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                threadFactory, queueHandler);

        
// Initialize the filter
        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 外界指定线程池,NOT_MANAGEABLE_EXECUTOR表明由外部自动手动管理线程池声明周期
     
*/

    
public ExecutorFilter(Executor executor) {
        
// Initialize the filter
        init(executor, NOT_MANAGEABLE_EXECUTOR);
    }


    
/**
     * 外界指定线程池/监听io事件列表,NOT_MANAGEABLE_EXECUTOR表明由外部自动手动管理线程池声明周期
     
*/

    
public ExecutorFilter(Executor executor, IoEventType eventTypes) {
        
// Initialize the filter
        init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
    }


    
/**
     * 创建默认的OrderedThreadPoolExecutor
     
*/

    
private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
        
// Create a new Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                threadFactory, queueHandler);

        
return executor;
    }


    
/**
     * 
     * 初始化处理的事件类型
     * 1.拷贝至EnumSet
     * 2.检查是否包含SESSION_CREATED,如果是则抛出异常
     
*/

    
private void initEventTypes(IoEventType eventTypes) {
        
if ((eventTypes == null|| (eventTypes.length == 0)) {
            eventTypes = DEFAULT_EVENT_SET;
        }


        
// Copy the list of handled events in the event set
        this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);

        
// Check that we don't have the SESSION_CREATED event in the set
        if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
            
this.eventTypes = null;
            
throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
        }

    }


    
/**
     * 
     * 初始化
     * 1.初始化处理的事件类型
     * 2.关联线程池赋值
     * 3.设置manageableExecutor,如果该值为true,则表明executor的周期可被管理
     * (destroy销毁的时候会进行shutdown)
     * 
     
*/

    
private void init(Executor executor, boolean manageableExecutor, IoEventType eventTypes) {
        
if (executor == null{
            
throw new IllegalArgumentException("executor");
        }


        initEventTypes(eventTypes);
        
this.executor = executor;
        
this.manageableExecutor = manageableExecutor;
    }


    
/**
     * 如果是自管理线程池生命周期则destroy的时候执行关闭
     
*/

    @Override
    
public void destroy() {
        
if (manageableExecutor) {
            ((ExecutorService) executor).shutdown();
        }

    }


    
/**
     * 返回filter用的线程池
     
*/

    
public final Executor getExecutor() {
        
return executor;
    }


    
/**
     * 触发事件,直接由线程池执行{@link IoFilterEvent#fire}
     
*/

    
protected void fireEvent(IoFilterEvent event) {
        executor.execute(event);
    }


    
/**
     * {@inheritDoc}
     
*/

    @Override
    
public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
        
if (parent.contains(this)) {
            
throw new IllegalArgumentException(
                    
"You can't add the same filter instance more than once.  Create another instance and add it.");
        }

    }


    
/**
     * 执行sessionOpened.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void sessionOpened(NextFilter nextFilter, IoSession session) {
        
if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
            fireEvent(event);
        }
 else {
            nextFilter.sessionOpened(session);
        }

    }


    
/**
     * 执行sessionClosed.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void sessionClosed(NextFilter nextFilter, IoSession session) {
        
if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
            fireEvent(event);
        }
 else {
            nextFilter.sessionClosed(session);
        }

    }


    
/**
     * 执行sessionIdle.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
        
if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
            fireEvent(event);
        }
 else {
            nextFilter.sessionIdle(session, status);
        }

    }


    
/**
     * 执行exceptionCaught.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
        
if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
            fireEvent(event);
        }
 else {
            nextFilter.exceptionCaught(session, cause);
        }

    }


    
/**
     * 执行messageReceived.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
        
if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
            fireEvent(event);
        }
 else {
            nextFilter.messageReceived(session, message);
        }

    }


    
/**
     * 执行messageSent.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
        
if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
            fireEvent(event);
        }
 else {
            nextFilter.messageSent(session, writeRequest);
        }

    }


    
/**
     * 执行filterWrite.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
        
if (eventTypes.contains(IoEventType.WRITE)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
            fireEvent(event);
        }
 else {
            nextFilter.filterWrite(session, writeRequest);
        }

    }


    
/**
     * 执行filterClose.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
     
*/

    @Override
    
public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
        
if (eventTypes.contains(IoEventType.CLOSE)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
            fireEvent(event);
        }
 else {
            nextFilter.filterClose(session);
        }

    }

}

 


 3.OrderedThreadPoolExecutor源码
            1.构造源码

public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
            IoEventQueueHandler eventQueueHandler) {
        
// 从这里可以看到,其实现类似于{@link Executors.newCachedThreadPool},不过其可以设置corePoolSize和maximumPoolSize.
        
// 不过其execute方法是自实现的,否则如果用父类的则会出现问题,即在任务繁忙的时候会出现任务被拒绝
        
// 因为其把任务放到了session的任务队列中.即没有由线程池本身来保存
        
// 另外可以看到初始化的corePoolSize和maximumPoolSize分别传了0和1.这是为了更好的处理异常,因为super不能try/catch
        super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
                
new SynchronousQueue<Runnable>(), threadFactory,
                
new AbortPolicy());

        
// 校验corePoolSize/maximumPoolSize
        if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
            
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
        }


        
if ((maximumPoolSize == 0|| (maximumPoolSize < corePoolSize)) {
            
throw new IllegalArgumentException("maximumPoolSize: "
                    
+ maximumPoolSize);
        }


        
// 设置corePoolSize/maximumPoolSize
        super.setCorePoolSize(corePoolSize);
        
super.setMaximumPoolSize(maximumPoolSize);

        
// The queueHandler might be null.
        if (eventQueueHandler == null{
            
this.eventQueueHandler = IoEventQueueHandler.NOOP;
        }
 else {
            
this.eventQueueHandler = eventQueueHandler;
        }

    }


        2.execute源码,其覆写了该方法

        
    /**
     * 执行任务,调用线程执行.
     * 将task插入session的任务队列是保证order的条件;因为session的所有io事件任务都会被放到session的任务队列
     * 而该session身上挂的这些任务则会被线程池的某个线程依次处理完毕,所以保证了顺序和单线程
     * 因为线程池中的线程在fetchSession的时候,是从waitingSessions这个阻塞队列获取的,保证了并发的顺序.即
     * 同一时刻只能被某一单线程执行
     
*/

    @Override
    
public void execute(Runnable task) {
        
// 如果shutdown,则拒绝执行任务
        if (shutdown) {
            rejectTask(task);
        }


        
// 检查该任务是否是IoEvent类型,不是则抛出异常
        checkTaskType(task);

        
// 将任务转为IoEvent
        IoEvent event = (IoEvent) task;

        
// 获取关联的session
        IoSession session = event.getSession();

        
// 获取session的任务队列,如果没有则新建SessionTasksQueue至TASKS_QUEUE属性
        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
        
// 获取session的任务队列
        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;

        
boolean offerSession;

        
// 调用eventQueueHandler.accept方法,表示事件被队列accept
        
// IoEventQueueThrottle实现则返回true
        boolean offerEvent = eventQueueHandler.accept(this, event);

        
if (offerEvent) {
            
// 表示消息被accepted
            
// 同步
            synchronized (tasksQueue) {
                
// 插入session的任务队列
                tasksQueue.offer(event);

                
// 判断该session的任务队列是否已完成,如果完成则表明此时任务队列为空
                if (sessionTasksQueue.processingCompleted) {
                    sessionTasksQueue.processingCompleted = false;
                    offerSession = true;
      
          }
 else {
                    offerSession = false;
                }


                
if (LOGGER.isDebugEnabled()) {
                    print(tasksQueue, event);
                }

            }

        }
 else {
            offerSession = false;
        }


        
if (offerSession) {
            
// 因为此时的session任务队列为空则表示该任务立即被执行了.所以可以将该session放入waitingSessions.
            
// 即等待任务执行结束
            waitingSessions.offer(session);
        }


        
// 没有空闲的worker时添加worker线程
        addWorkerIfNecessary();

        
if (offerEvent) {
            
// io事件被插入队列时触发
            eventQueueHandler.offered(this, event);
        }

    }

    
4.UnorderedThreadPoolExecutor源码
      1.构造源码

public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
            IoEventQueueHandler queueHandler) {
        
// 这里指定了corePoolSize/maximumPoolSize和LinkedBlockingQueue
        
// 即超出corePooliSize后的任务会被放到无界阻塞队列
        super(01, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(),
                threadFactory, new AbortPolicy());
        
if (corePoolSize < 0{
            
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
        }


        
if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
            
throw new IllegalArgumentException("maximumPoolSize: "
                    
+ maximumPoolSize);
        }


        
if (queueHandler == null{
            queueHandler 
= IoEventQueueHandler.NOOP;
        }


        
this.corePoolSize = corePoolSize;
        
this.maximumPoolSize = maximumPoolSize;
        
this.queueHandler = queueHandler;
    }


    2.execute方法源码,其覆写了该方法


        
// 这个直接将任务放到了工作队列
    
// 而Worker线程获取任务的时候是直接从该队列获取.这样则无法保证某一session的所有io事件任务在同一线程执行
    
// 所以是unorder
    public void execute(Runnable task) {
        
if (shutdown) {
            rejectTask(task);
        }


        checkTaskType(task);

        IoEvent e 
= (IoEvent) task;
        
boolean offeredEvent = queueHandler.accept(this, e);
        
if (offeredEvent) {
            
// 直接放到workQueue,即LinkedBlockingQueue
            getQueue().offer(e);
        }


        addWorkerIfNecessary();

        
if (offeredEvent) {
            queueHandler.offered(
this, e);
        }

    }


5.总结:
    1.ExecutorFilter提供了业务逻辑的执行线程,可以将应用层业务逻辑通过配置该filter在配置的线程池内执行
     2.OrderThreadPoolExecutor保证了单线程执行session的业务以及io事件的执行顺序.
      3.UnorderedThreadPoolExecutor则无法保证单线程执行,所以要注意线程安全的问题.

        
posted on 2014-02-03 23:38 landon 阅读(2326) 评论(2)  编辑  收藏 所属分类: Sources

FeedBack:
# re: apache-mina-2.07源码笔记5-thread model
2014-02-04 06:44 | 鹏达锁业
转眼间,一年又过去了,博主新年快乐
  回复  更多评论
  
# re: apache-mina-2.07源码笔记5-thread model
2014-02-10 15:11 | 电驴资源
楼主厉害,学习了  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: