1.IoHandler#sessionCreated/#sessionOpened区别
1.sessionCreated:Invoked from an I/O processor thread when a new connection has been created.Because this method is supposed to be called from the same thread that handles I/O of multiple sessions, please implement this method to perform tasks that consumes minimal amount of time such as socket parameter and user-defined session attribute initialization.
该方法是在I/O processor线程触发.->该方法尽量要执行耗时短的操作.
2.sessionOpened:Invoked when a connection has been opened. This method is invoked after {@link #sessionCreated(IoSession)}. The biggest difference from {@link #sessionCreated(IoSession)} is that it's invoked from other thread than an I/O processor thread once thread model is configured properly.
该方法是在sessionCreated后触发.和前者的区别在于一旦配置了合适的线程模型(ExecutorFilter)则该方法不是在I/O processor线程执行.(反之的意思是如果没有配置线程模型,则该方法也是在I/O processor执行).
{@link ExecutorFilter#sessionOpened}
landon:1.如果没有配置线程模型则所有IoHandler的处理都是在某个特定的processor线程执行的->也就是单线程执行的.->所以sessionCreated这里的逻辑耗时很长的话则严重影响NioProcesor其他链接成功的session.
2.同理本人认为如果没有配置线程模型的话,则IoHandler的其他逻辑如果耗时长的话也会影响到其他session的处理.
3.测试:在IoHandler的逻辑代码出直接sleep(30s)->看是否有影响其他session.
2.ExecutorFilter源码
// 继承IoFilterAdapter,而IoFilterAdapter实现了IoFilter接口
// 因此可添加至FilterChain
// 其核心在于如果某事件出现在了该filter的io事件监听列表之内则直接在内部的线程池内执行,而不是processor线程
public class ExecutorFilter extends IoFilterAdapter {
// 处理的事件类型列表
// IoEventType包括SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE
private EnumSet<IoEventType> eventTypes;
/** *//** 关联的线程池 */
private Executor executor;
/** *//** 线程池是否由filter管理声明周期(该值为true的时候会在destroy时关闭) */
private boolean manageableExecutor;
/** *//** 默认的线程池max_pool_size */
private static final int DEFAULT_MAX_POOL_SIZE = 16;
/** *//** 线程池启动时的线程数目 */
private static final int BASE_THREAD_NUMBER = 0;
/** *//** 空闲线程的keepalive_time,秒 */
private static final long DEFAULT_KEEPALIVE_TIME = 30;
/** *//**
* 线程池被管理的默认值true
**/
private static final boolean MANAGEABLE_EXECUTOR = true;
// 线程池不被管理的默认值 false
private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
/** *//** 线程池处理的默认事件列表,注意没有SESSION_CREATED */
private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
/** *//**
*
* 实例创建,默认的线程池为OrderedThreadPoolExecutor(以下默认的均为OrderedThreadPoolExecutor),初始线程为0
* 最大线程数目为16,空闲线程keepalive_time为30s.
* 线程工厂为默认的线程工厂
*
*/
public ExecutorFilter() {
// 创建默认的线程池
Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// 初始化该filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
*
* 指定线程池最大线程数目
*
*/
public ExecutorFilter(int maximumPoolSize) {
// Create a new default Executor
Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
* 指定线程池初始线程大小和最大线程数目
*
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
* 指定初始线程数目,最大线程数目以及空闲keepaliveTime
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
*
* 指定初始线程数目/最大线程数目/空闲keepAliveTime/事件队列处理器
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
IoEventQueueHandler queueHandler) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), queueHandler);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
*
* 指定初始线程数目/最大线程数目/空闲keepAliveTime/线程工厂
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
*
* 指定初始线程数目/最大线程数目/空闲keepAliveTime/线程工厂/事件队列处理器
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
// Create a new default Executor
Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
threadFactory, queueHandler);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR);
}
/** *//**
* 指定监听的io事件列表
*/
public ExecutorFilter(IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程最大数目/监听的io事件列表
*/
public ExecutorFilter(int maximumPoolSize, IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程初始数目/最大数目/监听的io事件列表
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程初始数目/最大数目/空闲keepAliveTime/监听的io事件列表
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程初始数目/最大数目/空闲keepAliveTime/事件队列处理器/监听的io事件列表
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
IoEventQueueHandler queueHandler, IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), queueHandler);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程初始数目/最大数目/空闲keepAliveTime/线程工厂/监听的io事件列表
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventType eventTypes) {
// Create a new default Executor
Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
null);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 指定线程池线程初始数目/最大数目/空闲keepAliveTime/线程工厂/事件队列处理器/监听的io事件列表
*/
public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType eventTypes) {
// Create a new default Executor
Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
threadFactory, queueHandler);
// Initialize the filter
init(executor, MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 外界指定线程池,NOT_MANAGEABLE_EXECUTOR表明由外部自动手动管理线程池声明周期
*/
public ExecutorFilter(Executor executor) {
// Initialize the filter
init(executor, NOT_MANAGEABLE_EXECUTOR);
}
/** *//**
* 外界指定线程池/监听io事件列表,NOT_MANAGEABLE_EXECUTOR表明由外部自动手动管理线程池声明周期
*/
public ExecutorFilter(Executor executor, IoEventType eventTypes) {
// Initialize the filter
init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
}
/** *//**
* 创建默认的OrderedThreadPoolExecutor
*/
private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
// Create a new Executor
Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
threadFactory, queueHandler);
return executor;
}
/** *//**
*
* 初始化处理的事件类型
* 1.拷贝至EnumSet
* 2.检查是否包含SESSION_CREATED,如果是则抛出异常
*/
private void initEventTypes(IoEventType eventTypes) {
if ((eventTypes == null) || (eventTypes.length == 0)) {
eventTypes = DEFAULT_EVENT_SET;
}
// Copy the list of handled events in the event set
this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
// Check that we don't have the SESSION_CREATED event in the set
if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
this.eventTypes = null;
throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
}
}
/** *//**
*
* 初始化
* 1.初始化处理的事件类型
* 2.关联线程池赋值
* 3.设置manageableExecutor,如果该值为true,则表明executor的周期可被管理
* (destroy销毁的时候会进行shutdown)
*
*/
private void init(Executor executor, boolean manageableExecutor, IoEventType eventTypes) {
if (executor == null) {
throw new IllegalArgumentException("executor");
}
initEventTypes(eventTypes);
this.executor = executor;
this.manageableExecutor = manageableExecutor;
}
/** *//**
* 如果是自管理线程池生命周期则destroy的时候执行关闭
*/
@Override
public void destroy() {
if (manageableExecutor) {
((ExecutorService) executor).shutdown();
}
}
/** *//**
* 返回filter用的线程池
*/
public final Executor getExecutor() {
return executor;
}
/** *//**
* 触发事件,直接由线程池执行{@link IoFilterEvent#fire}
*/
protected void fireEvent(IoFilterEvent event) {
executor.execute(event);
}
/** *//**
* {@inheritDoc}
*/
@Override
public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
if (parent.contains(this)) {
throw new IllegalArgumentException(
"You can't add the same filter instance more than once. Create another instance and add it.");
}
}
/** *//**
* 执行sessionOpened.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void sessionOpened(NextFilter nextFilter, IoSession session) {
if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
fireEvent(event);
} else {
nextFilter.sessionOpened(session);
}
}
/** *//**
* 执行sessionClosed.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void sessionClosed(NextFilter nextFilter, IoSession session) {
if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
fireEvent(event);
} else {
nextFilter.sessionClosed(session);
}
}
/** *//**
* 执行sessionIdle.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
fireEvent(event);
} else {
nextFilter.sessionIdle(session, status);
}
}
/** *//**
* 执行exceptionCaught.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
fireEvent(event);
} else {
nextFilter.exceptionCaught(session, cause);
}
}
/** *//**
* 执行messageReceived.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
fireEvent(event);
} else {
nextFilter.messageReceived(session, message);
}
}
/** *//**
* 执行messageSent.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
fireEvent(event);
} else {
nextFilter.messageSent(session, writeRequest);
}
}
/** *//**
* 执行filterWrite.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
if (eventTypes.contains(IoEventType.WRITE)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
fireEvent(event);
} else {
nextFilter.filterWrite(session, writeRequest);
}
}
/** *//**
* 执行filterClose.如果监听了该事件则由在线程池中运行,否则在processor线程执行继续传递
*/
@Override
public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
if (eventTypes.contains(IoEventType.CLOSE)) {
IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
fireEvent(event);
} else {
nextFilter.filterClose(session);
}
}
}
3.OrderedThreadPoolExecutor源码 1.构造源码
public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
IoEventQueueHandler eventQueueHandler) {
// 从这里可以看到,其实现类似于{@link Executors.newCachedThreadPool},不过其可以设置corePoolSize和maximumPoolSize.
// 不过其execute方法是自实现的,否则如果用父类的则会出现问题,即在任务繁忙的时候会出现任务被拒绝
// 因为其把任务放到了session的任务队列中.即没有由线程池本身来保存
// 另外可以看到初始化的corePoolSize和maximumPoolSize分别传了0和1.这是为了更好的处理异常,因为super不能try/catch
super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
new SynchronousQueue<Runnable>(), threadFactory,
new AbortPolicy());
// 校验corePoolSize/maximumPoolSize
if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
throw new IllegalArgumentException("maximumPoolSize: "
+ maximumPoolSize);
}
// 设置corePoolSize/maximumPoolSize
super.setCorePoolSize(corePoolSize);
super.setMaximumPoolSize(maximumPoolSize);
// The queueHandler might be null.
if (eventQueueHandler == null) {
this.eventQueueHandler = IoEventQueueHandler.NOOP;
} else {
this.eventQueueHandler = eventQueueHandler;
}
} 2.execute源码,其覆写了该方法
/** *//**
* 执行任务,调用线程执行.
* 将task插入session的任务队列是保证order的条件;因为session的所有io事件任务都会被放到session的任务队列
* 而该session身上挂的这些任务则会被线程池的某个线程依次处理完毕,所以保证了顺序和单线程
* 因为线程池中的线程在fetchSession的时候,是从waitingSessions这个阻塞队列获取的,保证了并发的顺序.即
* 同一时刻只能被某一单线程执行
*/
@Override
public void execute(Runnable task) {
// 如果shutdown,则拒绝执行任务
if (shutdown) {
rejectTask(task);
}
// 检查该任务是否是IoEvent类型,不是则抛出异常
checkTaskType(task);
// 将任务转为IoEvent
IoEvent event = (IoEvent) task;
// 获取关联的session
IoSession session = event.getSession();
// 获取session的任务队列,如果没有则新建SessionTasksQueue至TASKS_QUEUE属性
SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
// 获取session的任务队列
Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
boolean offerSession;
// 调用eventQueueHandler.accept方法,表示事件被队列accept
// IoEventQueueThrottle实现则返回true
boolean offerEvent = eventQueueHandler.accept(this, event);
if (offerEvent) {
// 表示消息被accepted
// 同步
synchronized (tasksQueue) {
// 插入session的任务队列
tasksQueue.offer(event);
// 判断该session的任务队列是否已完成,如果完成则表明此时任务队列为空
if (sessionTasksQueue.processingCompleted) {
sessionTasksQueue.processingCompleted = false;
offerSession = true;
} else {
offerSession = false;
}
if (LOGGER.isDebugEnabled()) {
print(tasksQueue, event);
}
}
} else {
offerSession = false;
}
if (offerSession) {
// 因为此时的session任务队列为空则表示该任务立即被执行了.所以可以将该session放入waitingSessions.
// 即等待任务执行结束
waitingSessions.offer(session);
}
// 没有空闲的worker时添加worker线程
addWorkerIfNecessary();
if (offerEvent) {
// io事件被插入队列时触发
eventQueueHandler.offered(this, event);
}
} 4.UnorderedThreadPoolExecutor源码 1.构造源码
public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
IoEventQueueHandler queueHandler) {
// 这里指定了corePoolSize/maximumPoolSize和LinkedBlockingQueue
// 即超出corePooliSize后的任务会被放到无界阻塞队列
super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(),
threadFactory, new AbortPolicy());
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException("maximumPoolSize: "
+ maximumPoolSize);
}
if (queueHandler == null) {
queueHandler = IoEventQueueHandler.NOOP;
}
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queueHandler = queueHandler;
} 2.execute方法源码,其覆写了该方法
// 这个直接将任务放到了工作队列
// 而Worker线程获取任务的时候是直接从该队列获取.这样则无法保证某一session的所有io事件任务在同一线程执行
// 所以是unorder
public void execute(Runnable task) {
if (shutdown) {
rejectTask(task);
}
checkTaskType(task);
IoEvent e = (IoEvent) task;
boolean offeredEvent = queueHandler.accept(this, e);
if (offeredEvent) {
// 直接放到workQueue,即LinkedBlockingQueue
getQueue().offer(e);
}
addWorkerIfNecessary();
if (offeredEvent) {
queueHandler.offered(this, e);
}
}5.总结:
1.ExecutorFilter提供了业务逻辑的执行线程,可以将应用层业务逻辑通过配置该filter在配置的线程池内执行
2.OrderThreadPoolExecutor保证了单线程执行session的业务以及io事件的执行顺序.
3.UnorderedThreadPoolExecutor则无法保证单线程执行,所以要注意线程安全的问题.
posted on 2014-02-03 23:38
landon 阅读(2323)
评论(2) 编辑 收藏 所属分类:
Sources