对Fork/Join的个人理解要点:
- fork/join将大的任务分割小的任务,直到小的任务可以使用最简单、直接或者同步的方式处理。
- 最小的任务将无法分解
- 每一个任务不是线程的实例
- 每一个工作线程将是一个隐式的线程实例
- 每一个工作线程都会维护自身的一个双向队列(支持FIFO/LIFO);在任务产生的子任务,会被push进当前工作线程所维护deque队列中,进入队列头部。
- 当一个工作线程的双向队列中暂无任务时,它会从随机的工作线程的双向队列的尾部获取一个入队最久的子任务(称之为窃取),take()方式获取,先进先出的规则(FIFO)。
- 当一个工作线程遇到一个join的操作,假如可能的话,它会处理其他的任务,直到目标任务被通知需要处理掉。
- 当一个工作者线程没有任务可以处理,并且不能从其他工作者线程中窃取的时,它会后退(通过yields,sleeps,或者优先级的调整),稍后重试,直到所有工作线程都会处于空闲状态,所有线程都会阻塞,等到另外的任务在顶层被调用。
Brian Goetz 认为"使用传统的线程池来实现 fork-join 也具有挑战性,因为 fork-join任务将线程生命周期的大部分时间花费在等待其他任务上。这种行为会造成线程饥饿死锁(thread starvation deadlock),除非小心选择参数以限制创建的任务数量,或者池本身非常大。传统的线程池是为相互独立的任务设计的,而且设计中也考虑了潜在的阻塞、粗粒度任务 — fork-join 解决方案不会产生这两种情况。对于传统线程池的细粒度任务,也存在所有工作线程共享的任务队列发生争用的情况。"
下面谈一谈工作窃取(work stealing)。在Fork/Join中,工作窃取采用了一个被当做栈(Stack)使用的双端队列WorkQueue。双端队列WorkQueue,支持在两端插入和移除元素,和单独的队列(Queue)相比,多了一端。在Fork/Join中工作线程中被当做栈(Stack)来使用,在头部push插入数据,pop获取数。而尾部,可以供需要窃取的工作线程(take()方法)使用。与单向队列相比,减少争用,可以提高性能。
WorkQueue的定义:
static final class WorkQueue {
......
}
它既没有继承自Deque又没有继承Queue接口,而是自己独立写了一个双端队列,数组实现。很显然,数组的读取性能要强于链表。
看一下ForkJoinPool的默认构造函数:
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, false);
}
public static interface ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
在ForkJoinPool代码初始化时,默认情况下:
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
默认情况下,根据当前CPU的数量建立一个ForkJoinWorkerThreadFactory工厂,CPU数量个ForkJoinWorkerThread工作线程。
仔细看一下ForkJoinWorkerThread代码,工作线程继承自Thread:
很显然,onStart 和 onTermination为钩子函数,可以被重写,但,需要构造一个新的ForkJoinPool.ForkJoinWorkerThreadFactory来配合使用。比如:
public static void main(String[] args) {
ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new CustomedForkJoinWorkerThread(pool);
}
};
ForkJoinPool joinPool = new ForkJoinPool(Runtime.getRuntime()
.availableProcessors(), factory, null, false);
// some code here ...
}
private static final class CustomedForkJoinWorkerThread extends
ForkJoinWorkerThread {
protected CustomedForkJoinWorkerThread(ForkJoinPool pool) {
super(pool);
}
@Override
protected void onStart() {
super.onStart();
System.out.println("准备初始化资源...");
}
@Override
protected void onTermination(Throwable exception) {
System.out.println("开始清理资源...");
super.onTermination(exception);
}
}
接着看一下pool.runWorker(this)方法:
final void runWorker(ForkJoinWorkerThread wt) {
WorkQueue w = wt.workQueue;
w.growArray(false);
w.seed = hashId(Thread.currentThread().getId());
do {} while (w.runTask(scan(w)));
}
初始化队列,设置其seed为当前线程ID的哈希值。然后循环执行,当没有任务可获取,自然就退出了。而scan()很复杂,大概功能,从当前队列中获取元素,当前队列为空时,从其他工作线程所持有的队列中窃取一个。都没有时,只能返回null,进而阻止线程活动。
嗯,有时间会再深入WorkQueue队列一些。