java.util.concurrent的作者是Doug Lea : 世界上对Java影响力最大的个人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算器科学系的老大爷。",他可是并发编程的大师级人物哦!
Since jdk1.5,在java.util.concurrent包下的线程池模型是基于queue的,threadpool只有一个,而queue却有多个LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可参见java.util.concurrent.Executors.注意:我下面的问题是针对LinkedBlockingQueue的,参考的src为jdk1.6.
Threadpool通过以下的3个属性来标志池中的线程数:
corePoolSize(类似minimumPoolSize),poolSize(当前池中的线程数),maximumPoolSize(最大的线程数).
这3个属性表达的意思是每次新创建或结束一个线程poolSize++/--,在最忙的情况下threadpool创建的线程数不能超过maximumPoolSize,
当空闲的情况下poolSize应该降到corePoolSize,当然threadpool如果从创建时它就从来没有处理过一次请求的话,那么poolSize当然为0.
通过以上2段的说明下面我要引出我所要讲的问题:
我们来看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
它表达的主体意思是:如果当前的poolSize<corePoolSize,那么就增加线程直到poolSize==corePoolSize.
如果poolSize已经到达corePoolSize,那么就把command(task) put to workQueue,如果workQueue为LinkedBlockingQueue的话,
那么只有当workQueue offer commands达到workQueue.capacity后,threadpool才会继续增加线程直到maximumPoolSize.
1.*****如果LinkedBlockingQueue.capacity被设置为Integer.MAX_VALUE,那么池中的线程几乎不可能到达maximumPoolSize.*****
所以你如果使用了Executors.newFixedThreadPool的话,那么maximumPoolSize和corePoolSize是一样的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果这样new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的话,
上述的使用都将导致maximumPoolSize是无效的,也就是说线程池中的线程数不会超出corePoolSize.
这个也让那些tomcat6的开发人员可能也郁闷了,他们不得不改写LinkedBlockingQueue,以tomcat-6.0.20-src为例:
org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method:
public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
parent = tp;
this.endpoint = ep;
}
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
//this is an approximation, so it could use some tuning
if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
org.apache.tomcat.util.net.NioEndpoint.start()-->
TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor, this);
2.*****如果把LinkedBlockingQueue.capacity设置为一个适当的值远小于Integer.MAX_VALUE,那么只有put到queue的任务数到达LinkedBlockingQueue的capacity后,才会继续增加池中的线程,使得poolSize超出corePoolSize但不超过maximumPoolSize,这个时候来增加线程数是不是有点晚了呢??????*****.
这样一来reject(command)也可能随之而来了,LinkedBlockingQueue.capacity设置为何值又是个头疼的问题.
所以ThreadPoolExecutor+LinkedBlockingQueue表达的意思是首先会增加线程数到corePoolSize,但只有queue的任务容量到达最大capacity后,才会继续在corePoolSize的基数上增加线程来处理任务,直到maximumPoolSize.
但为什么我们不能这样呢:将LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,让task尽可能的得到处理,同时在忙的情况下,增加池中的线程充到maximumPoolSize来尽快的处理这些任务.即便是把LinkedBlockingQueue.capacity设置为一个适当的值<<<远小于Integer.MAX_VALUE,也不一定非得在任务数到达LinkedBlockingQueue的capacity之后才去增加线程使poolSize超出corePoolSize趋向maximumPoolSize.
所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue组合的缺点也就出来了:如果我们想让线程池尽可能多的处理大量的任务的话,我们会把LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,但是如果这样的话池中的线程数量就不能充到最大maximumPoolSize,也就不能充分发挥线程池的最大处理能力.如果我们把LinkedBlockingQueue.capacity设置为一个较小的值,那么线程池中的线程数量会充到最大maximumPoolSize,但是如果池中的线程都忙的话,线程池又会reject请求的任务,因为队列已满.
如果我们把LinkedBlockingQueue.capacity设置为一个较大的值但不是Integer.MAX_VALUE,那么等到线程池的线程数量准备开始超出corePoolSize时,也就是任务队列满了,这个时候才去增加线程的话,请求任务的执行会有一定的延时,也就是没有得到及时的处理.
其实也就是说ThreadPoolExecutor缺乏灵敏的线程调度机制,没有根据当前任务的执行情况,是忙,还是闲,以及队列中的待处理任务的数量级进行动态的调配线程数,使得它的处理效率受到影响.
那么什么是忙的情况的判断呢?
busy[1]:如果poolSize==corePoolSize,并且现在忙着执行任务的线程数(currentBusyWorkers)等于poolSize.[而不管现在put到queue的任务数是否到达queue.capacity]
busy[2].1:如果poolSize==corePoolSize,并且put到queue的任务数已到达queue.capacity.[queue.capacity是针对有任务队列极限限制的情况]
busy[2].2:线程池的基本目标是尽可能的快速处理大量的请求任务,那么就不一定非得在put到queue的任务数到达queue的capacity之后才判断为忙的情况,只要queue中现有的任务数(task_counter)与poolSize或者maximumPoolSize存在一定的比例时就可以判断为忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,这样queue.capacity这个限制可以取消了.
在上述busy[1],busy[2]这2种情况下都应增加线程数,直至maximumPoolSize,使请求的任务得到最快的处理.
前面讲的是忙的时候ThreadPoolExecutor+LinkedBlockingQueue在处理上的瑕疵,那么空闲的时候又要如何呢?
如果corePoolSize<poolSize<maximumPoolSize,那么线程等待keepAliveTime之后应该降为corePoolSize,嘿嘿,这个就真的成了bug了哦,一个很难发现的bug,poolSize是被降下来了,可是很可能降过了头<corePoolSize,甚至降为0也有可能.
ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
/*queue is empty,这里timeout之后,return null,之后call workerCanExit() return true.*/
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}//end getTask.
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}//end workerCanExit.
在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值没有变化,
ThreadPoolExecutor.Worker.run()将结束-->ThreadPoolExecutor.Worker.workerDone-->这个时候才将poolSize--,可惜晚了,在多线程的环境下,poolSize的值将变为小于corePoolSize,而不是等于corePoolSize!!!!!!
例如:如果poolSize(6)大于corePoolSize(5),那么同时timeout的就不一定是一条线程,而是多条,它们都有可能退出run,使得poolSize--减过了corePoolSize.
提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
它表达的意思是在空闲的时候让线程等待keepAliveTime,timeout后使得poolSize能够降为0.[其实我是希望它降为minimumPoolSize,特别是在服务器的环境下,我们需要线程池保持一定数量的线程来及时处理"零零碎碎的,断断续续的,一股一波的,不是很有压力的"请求],当然你可以把corePoolSize当作minimumPoolSize,而不调用该方法.
针对上述java util concurrent线程池的瑕疵,我对java util concurrent线程池模型进行了修正,特别是在"忙"(busy[1],busy[2])的情况下的任务处理进行了优化,使得线程池尽可能快的处理尽可能多的任务.
下面提供了高效的线程池的源码购买:
java版threadpool:
http://item.taobao.com/auction/item_detail-0db2-9078a9045826f273dcea80aa490f1a8b.jhtml
c [not c++]版threadpool in windows NT:
http://item.taobao.com/auction/item_detail-0db2-28e37cb6776a1bc526ef5a27aa411e71.jhtml