the journey is the reward...

常用链接

统计

最新评论

Java线程池的瑕疵,For java util concurrent threadpool Since jdk1.5

    java.util.concurrent的作者是Doug Lea : 世界上对Java影响力最大的个人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算器科学系的老大爷。",他可是并发编程的大师级人物哦!
    Since jdk1.5,在java.util.concurrent包下的线程池模型是基于queue的,threadpool只有一个,而queue却有多个LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可参见java.util.concurrent.Executors.注意:我下面的问题是针对LinkedBlockingQueue的,参考的src为jdk1.6.
    Threadpool通过以下的3个属性来标志池中的线程数:
corePoolSize(类似minimumPoolSize),poolSize(当前池中的线程数),maximumPoolSize(最大的线程数).
这3个属性表达的意思是每次新创建或结束一个线程poolSize++/--,在最忙的情况下threadpool创建的线程数不能超过maximumPoolSize,
当空闲的情况下poolSize应该降到corePoolSize,当然threadpool如果从创建时它就从来没有处理过一次请求的话,那么poolSize当然为0.
    通过以上2段的说明下面我要引出我所要讲的问题:
我们来看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}
它表达的主体意思是:如果当前的poolSize<corePoolSize,那么就增加线程直到poolSize==corePoolSize.
如果poolSize已经到达corePoolSize,那么就把command(task) put to workQueue,如果workQueue为LinkedBlockingQueue的话,
那么只有当workQueue offer commands达到workQueue.capacity后,threadpool才会继续增加线程直到maximumPoolSize.
1.*****如果LinkedBlockingQueue.capacity被设置为Integer.MAX_VALUE,那么池中的线程几乎不可能到达maximumPoolSize.*****
所以你如果使用了Executors.newFixedThreadPool的话,那么maximumPoolSize和corePoolSize是一样的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果这样new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的话,
上述的使用都将导致maximumPoolSize是无效的,也就是说线程池中的线程数不会超出corePoolSize.
这个也让那些tomcat6的开发人员可能也郁闷了,他们不得不改写LinkedBlockingQueue,以tomcat-6.0.20-src为例:
org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method: 
 public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
            parent = tp;
            this.endpoint = ep;
        }
       
        public boolean offer(Runnable o) {
            //we can't do any checks
            if (parent==null) return super.offer(o);
            //we are maxed out on threads, simply queue the object
            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
            //we have idle threads, just add it to the queue
            //this is an approximation, so it could use some tuning
            if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
            //if we have less threads than maximum force creation of a new thread
            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
            //if we reached here, we need to add it to the queue
            return super.offer(o);
        } 

org.apache.tomcat.util.net.NioEndpoint.start()-->
   TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
                     TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                     executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
                     taskqueue.setParent( (ThreadPoolExecutor) executor, this);
2.*****如果把LinkedBlockingQueue.capacity设置为一个适当的值远小于Integer.MAX_VALUE,那么只有put到queue的任务数到达LinkedBlockingQueue的capacity后,才会继续增加池中的线程,使得poolSize超出corePoolSize但不超过maximumPoolSize,这个时候来增加线程数是不是有点晚了呢??????*****.
这样一来reject(command)也可能随之而来了,LinkedBlockingQueue.capacity设置为何值又是个头疼的问题.
所以ThreadPoolExecutor+LinkedBlockingQueue表达的意思是首先会增加线程数到corePoolSize,但只有queue的任务容量到达最大capacity后,才会继续在corePoolSize的基数上增加线程来处理任务,直到maximumPoolSize.
    但为什么我们不能这样呢:将LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,让task尽可能的得到处理,同时在忙的情况下,增加池中的线程充到maximumPoolSize来尽快的处理这些任务.即便是把LinkedBlockingQueue.capacity设置为一个适当的值<<<远小于Integer.MAX_VALUE,也不一定非得在任务数到达LinkedBlockingQueue的capacity之后才去增加线程使poolSize超出corePoolSize趋向maximumPoolSize.
    所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue组合的缺点也就出来了:如果我们想让线程池尽可能多的处理大量的任务的话,我们会把LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,但是如果这样的话池中的线程数量就不能充到最大maximumPoolSize,也就不能充分发挥线程池的最大处理能力.如果我们把LinkedBlockingQueue.capacity设置为一个较小的值,那么线程池中的线程数量会充到最大maximumPoolSize,但是如果池中的线程都忙的话,线程池又会reject请求的任务,因为队列已满.
    如果我们把LinkedBlockingQueue.capacity设置为一个较大的值但不是Integer.MAX_VALUE,那么等到线程池的线程数量准备开始超出corePoolSize时,也就是任务队列满了,这个时候才去增加线程的话,请求任务的执行会有一定的延时,也就是没有得到及时的处理.
    其实也就是说ThreadPoolExecutor缺乏灵敏的线程调度机制,没有根据当前任务的执行情况,是忙,还是闲,以及队列中的待处理任务的数量级进行动态的调配线程数,使得它的处理效率受到影响.
那么什么是忙的情况的判断呢? 
busy[1]:如果poolSize==corePoolSize,并且现在忙着执行任务的线程数(currentBusyWorkers)等于poolSize.[而不管现在put到queue的任务数是否到达queue.capacity]
busy[2].1:如果poolSize==corePoolSize,并且put到queue的任务数已到达queue.capacity.[queue.capacity是针对有任务队列极限限制的情况]
busy[2].2:线程池的基本目标是尽可能的快速处理大量的请求任务,那么就不一定非得在put到queue的任务数到达queue的capacity之后才判断为忙的情况,只要queue中现有的任务数(task_counter)与poolSize或者maximumPoolSize存在一定的比例时就可以判断为忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,这样queue.capacity这个限制可以取消了.
在上述busy[1],busy[2]这2种情况下都应增加线程数,直至maximumPoolSize,使请求的任务得到最快的处理.

前面讲的是忙的时候ThreadPoolExecutor+LinkedBlockingQueue在处理上的瑕疵,那么空闲的时候又要如何呢?
如果corePoolSize<poolSize<maximumPoolSize,那么线程等待keepAliveTime之后应该降为corePoolSize,嘿嘿,这个就真的成了bug了哦,一个很难发现的bug,poolSize是被降下来了,可是很可能降过了头<corePoolSize,甚至降为0也有可能.
ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
      /*queue is empty,这里timeout之后,return null,之后call workerCanExit() return true.*/
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
}//end getTask.
private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
}//end workerCanExit.

在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值没有变化,
ThreadPoolExecutor.Worker.run()将结束-->ThreadPoolExecutor.Worker.workerDone-->这个时候才将poolSize--,可惜晚了,在多线程的环境下,poolSize的值将变为小于corePoolSize,而不是等于corePoolSize!!!!!!
例如:如果poolSize(6)大于corePoolSize(5),那么同时timeout的就不一定是一条线程,而是多条,它们都有可能退出run,使得poolSize--减过了corePoolSize.
    提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
它表达的意思是在空闲的时候让线程等待keepAliveTime,timeout后使得poolSize能够降为0.[其实我是希望它降为minimumPoolSize,特别是在服务器的环境下,我们需要线程池保持一定数量的线程来及时处理"零零碎碎的,断断续续的,一股一波的,不是很有压力的"请求],当然你可以把corePoolSize当作minimumPoolSize,而不调用该方法.
    针对上述java util concurrent线程池的瑕疵,我对java util concurrent线程池模型进行了修正,特别是在"忙"(busy[1],busy[2])的情况下的任务处理进行了优化,使得线程池尽可能快的处理尽可能多的任务.
下面提供了高效的线程池的源码购买:
java版threadpool:
http://item.taobao.com/auction/item_detail-0db2-9078a9045826f273dcea80aa490f1a8b.jhtml
c [not c++]版threadpool in windows NT:
http://item.taobao.com/auction/item_detail-0db2-28e37cb6776a1bc526ef5a27aa411e71.jhtml

posted on 2010-02-20 20:15 adapterofcoms 阅读(1723) 评论(1)  编辑  收藏 所属分类: java techs

评论

# re: Java线程池的瑕疵,For java util concurrent threadpool Since jdk1.5 2010-10-19 19:30 myfavorite

您好,请教线程池启动后如何监控线程池状态?  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航: