1.ThreadPoolExecutor#execute(Runnable command) public void execute(Runnable command) {
// 如果任务为空,则直接抛出空指针异常
if (command == null)
throw new NullPointerException();
// 1.如果线程池线程数目UnderCorePoolSize且RUNNING则直接添加worker线程并启动
// 2.如果超过了corePoolSize或者addIfUnderCorePoolSize失败则
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如果线程池是RUNNING状态且可将任务command加入workQueue(即不违反容量限制)
if (runState == RUNNING && workQueue.offer(command)) {
// 因为是并发执行.如果此时发现线程池状态不再是RUNNING(可能执行了类似shutdown的操作)或者线程池中已无Worker线程
if (runState != RUNNING || poolSize == 0)
//1.如果线程池状态不再是RUNNING且此时command依然在队列中,即还未执行则直接拒绝.
// 2.否则如果线程池状态 < STOP,即可能是SHUTDOWN状态且任务队列中依然有任务且工作线程的数目不足corePoolSize,则额外添加一个Worker线程并启动
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
// 如果在UnderMaximumPoolSize下增加worker线程失败则执行拒绝策略,直接调用RejectedExecutionHandler#rejectedExecution
reject(command); // is shutdown or saturated
}
} 2. addIfUnderCorePoolSize(Runnable firstTask) // poolSize < corePoolSize && RUNNING的情况下添加worker线程并启动worker线程
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
// 锁
mainLock.lock();
try {
// 初始poolSize为0,runState为0,即RUNNING
// RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
// TERMINATED = 3
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
} 3.addThread(Runnable firstTask)
private Thread addThread(Runnable firstTask) {
// 初始化Worker,传入firstTask
Worker w = new Worker(firstTask);
// 利用线程工厂新建线程,注意这里传入的参数是w
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
// 添加至workers
workers.add(w);
// ++poolSize
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
} 4.Workerprivate final class Worker implements Runnable {
/** *//**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/** *//**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;
/** *//**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;
/** *//**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
/** *//**
* 中断线程如果没有正在运行任务(可能在等待任务)
* {@link ThreadPoolExecutor#interruptIdleWorkers}
* {@link ThreadPoolExecutor#getTask}
* {@link ThreadPoolExecutor#shutdown}
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
// 注意只有该方法是被其他线程调用才会执行interrupt.
// 1.个人认为如果是当前自身线程执行到这里的时候,说明getTask返回了null.线程就会结束了.
// 2.Worker线程在自身任务的执行中调用此方法时没有作用的.即恰恰说明了运行时不被中断.(因为不太可能存在这样的类似业务,内部线程自己在运行任务的时候中断自己.没有任何作用.你懂的.这压根就是错误的做法)
// 3.还有一个很重要的原因是:这里加了运行锁.即如果此时有任务正在运行则独占runLock,则其他线程必须等待任务完毕释放锁才可以进行interrupt.
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/** *//**
* Interrupts thread even if running a task.
*/
void interruptNow() {
// 直接进行中断,无论是内部线程还是其他线程
// 无论是否正在运行任务
// 没有获得锁
// 此时如果线程正在等待任务或者任务执行过程中阻塞都可以被中断
// 个人认为该方法也肯定是由外部线程进行调用的,而非内部的线程,你懂的.用了也没有作用.
thread.interrupt();
}
/** *//**
* 运行任务在beforeExecute/afterExecute之间
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/**//*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
// 这段代码乍看起来可能有些奇怪.个人认为是因为多线程的原因,如线程池调用了shutdownNow方法.
// 1.如果线程池是RUNNING/SHUTDOWN且之前被中断过,则清除中断状态(interrupted) 2.再次检查如果执行了shutdownNow的话,则会直接interrupt thread.而此时的中断状态可能被清除了.->需要需要再次调用interrupt重置中断状态.(还需要仔细考证)
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
// 任务执行前的一些业务,空实现,子类可覆盖
// 任务完成或者任务执行出现异常则可通过afterExecute(空实现)追踪
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
// 任务计数
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/** *//**
* Work线程主任务循环
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
// 1.如果第一个任务不为null则一定会执行第一个任务
// 2.如果getTask为null则线程结束.
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
// 跳出while,线程即结束
// 1.completedTaskCount 计数
// 2.workers.remove(w) 从workers移除
// 3.--poolSize,如果poolSize为0则tryTerminate
workerDone(this);
}
}
} 5.Runnable getTask() Runnable getTask() {
for (;;) {
try {
int state = runState;
// 线程池运行状态为STOP或者TERMINATED,直接返回null,则Worker线程跳出while,终止
if (state > SHUTDOWN)
return null;
Runnable r;
// 如果线程池运行状态恰好是SHUTDOWN,则继续从队列获取任务(队列为空则返回null),也在该状态下如果线程池不为空则一直获取任务
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
// RUNNING状态下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允许核心线程超时) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
// 在keepAliveTime时间内等待可用的元素,等待时可被中断.如果超时则返回null.
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
// Running状态下,poolSize未超出corPoolSize且不允许核心线程超时,则在元素变得可用之前一直等待,可被中断
r = workQueue.take();
if (r != null)
return r;
// 如果此时返回的任务为null且worker线程可退出(该方法其实是重复校验,因为是并发执行.所以可能任务队列已经有了任务等条件出现)
if (workerCanExit()) {
// 如果此时线程池状态不是RUNNING
if (runState >= SHUTDOWN) // Wake up others
// 唤醒可能阻塞的任务,{@link Worker#interruptIfIdle}
interruptIdleWorkers();
// 返回null,结束任务
return null;
}
// Else retry
// 继续for-循环
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
} 6.workerCanExit()// 判断worker线程是否可退出
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
// 运行状态为STOP或者TERMINATED
// 或者任务队列为空
// 或者池中至少有一个线程且允许核心线程超时
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
} 7.tryTerminate()// 尝试终止
private void tryTerminate() {
// 如果当前池中没有线程
if (poolSize == 0) {
int state = runState;
// 如果当前运行状态时是Running/SHUTDOWN且任务队列不为空
if (state < STOP && !workQueue.isEmpty()) {
// 重新设置为运行状态
state = RUNNING; // disable termination check below
// 添加一个firstTask为null的worker并启动.因为队列不为空则可以getTask
Thread t = addThread(null);
if (t != null)
t.start();
}
// 如果运行状态为STOP或者SHUTDOWN则置状态为TERMINATED并唤醒等待终止的线程 {@link #awaitTermination(long timeout, TimeUnit unit)}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
terminated();// 此方法暂时未实现
}
}
} 8.awaitTermination(long timeout, TimeUnit unit) // 等待线程池终止 {@link #tryTerminate()}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 注这是一个无限循环,直到线程池终止或者超时
for (;;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
// {@link Condition#long awaitNanos(long nanosTimeout)}
// 此方法返回的是一个估算(nanosTimeout - awaitTime),如果小于等于0则表示没有剩余时间,即超时.不过如果返回值是一个正值的话且线程池未终止的话->所以由将返回值继续传入了参数->确保肯定会发生超时而导致nanos<=0而跳出循环
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
} 9.shutdown()public void shutdown() {
// 检查是否有shutdown的权限
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) { // Check if caller can modify our threads
// 检查所有的worker线程是否有修改线程的权限
for (Worker w : workers)
security.checkAccess(w.thread);
}
int state = runState;
// 设置线程池当前状态是RUNNING,则设置为SHUTDOWN状态
if (state < SHUTDOWN)
runState = SHUTDOWN;
try {
// 尝试打断空闲的worker线程
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
// 如果出现异常,则还原状态
runState = state;
// tryTerminate() here would be a no-op 这个注释的意思是出现了这个异常,tryTerminate是不起作用的.因为tryTerminate的条件是poolSize == 0.但是异常说明interruptIfIdle失败则不可能poolSize == 0.
// 继续向上抛出异常,这个异常是SecurityException
throw se;
}
// 尝试终止(队列为空的时候直接终止)
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
} 10.shutdownNow()public List<Runnable> shutdownNow() {
// 检查shutdown权限以及修改工作线程的权限
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
int state = runState;
// 置状态为STOP(可能未RUNNING或者SHUTDOWN)
if (state < STOP)
runState = STOP;
try {
for (Worker w : workers) {
// 直接中断
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
// 将队列中的所有可用元素添加list中并返回
List<Runnable> tasks = drainQueue();
// 尝试终止
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
} 11.总结:
1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 为线程池6大参数.
2.corePoolSize:当线程池poolSize少于corePoolSize时,则会新增worker线程.
3.线程池数目超过corePoolSize则向workQueue offer 任务.如果offer失败则在maximumPoolSize下新增worker线程;如果超过了maximumPoolSize,则执行拒绝策略.
4.keepAliveTime:poolSize超过了corePoolSize时(或者允许core thread timeout),此参数指明workQueue pool的超时时间,超时则返回null,即表示当前线程空闲.(workerCanExit中有判断workQueue为空的条件)然后worker线程结束(被回收).
5.Worker有两个方法interruptIfIdle,这个方法会先获得运行锁,即如果当前有任务运行(占有锁),则其他线程无法中断.只有执行完workQueue的任务才会结束并释放锁.(shutdown);而另一个方法interruptNow则是不管任何条件,直接interrupt.
posted on 2013-12-26 11:43
landon 阅读(1662)
评论(2) 编辑 收藏 所属分类:
Sources