Fork-join framework是Java 7并行库的新内容,基于divide-and-conquer算法来处理大数据量计算。DnQ是处理微粒度并行计算(数据量大,单位运行时间短)的理想模式。数据量在达到一个预定义的门槛之前,被分割成多个任务被Worker threads执行。因为现代Java虚拟机都把Java thread映射到系统线程或LWP(Light-weight process) ,同时Worker数量一般设定等同于CPU个数,这样在多核的硬件系统中能充分利用多个CPU的计算能力。
写了一个MergeSort的测试例子,最终的排序用的是Java Collection Framework 自带的Arrays.sort()。在自己双核机器试了试,发现提升不是特别明显。Arrays.sort 本身很高效,Framework有thread之间协作和管理worker pool的开销,所以必须选择一个适合的数据量阚值。下面是运行结果:
java -Xms64m -Xmx128m -cp C;/forkjoin/jsr166y.zip;C:/workspace/java.tij forkjoin.SortTask
Number of processor 2
=================Sequential ===================
Sorting takes 2617701971 to complete
=================ForkJoin ====================
Sorting takes 2284940405 to complete
找不到更多核的机器,有条件的同学可以测试一把。另外,Brain Goetz (Java Concurrency in Practice作者) 的文章可参考,他的测试例子显示了不错的性能提升(最高17倍
在32cpu系统),一般4核或8核的能达到3倍或5倍的
SPEEDUP
Java thread and practice: Stick a fork in it Part 1 -
http://www.ibm.com/developerworks/java/library/j-jtp11137.html
package forkjoin;
import jsr166y.forkjoin.RecursiveAction;
import jsr166y.forkjoin.ForkJoinPool;
import java.util.Random;
import java.util.Arrays;
public class SortTask extends RecursiveAction {
final static int ARRAY_LENGTH = 10000000;
final static int THRESHOLD = 3000000;
final int[] array;
final int lo;
final int hi;
public SortTask(int[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private void sequentiallySort(int[] array, int lo, int hi) {
int[] units = new int[hi - lo + 1];
for (int i = lo; i <= hi; i++)
units[i - lo] = array[i];
Arrays.sort(units);
for (int i = lo; i <= hi; i++)
array[i] = units[i - lo];
}
private void merge(int[] array, int lo, int mid, int hi) {
int[] units = new int[hi - lo + 1];
int i = lo;
int j = mid + 1;
for (int k = 0; k < units.length; k++) {
if (array[i] <= array[j])
units[k] = array[i++];
else if (array[i] > array[j])
units[k] = array[j++];
if (i > mid)
for (int m = j; m <= hi; m++)
units[++k] = array[m];
else if (j > hi)
for (int m = i; m <= mid; m++)
units[++k] = array[m];
}
for (int k = lo; k <= hi; k++)
array[k] = units[k - lo];
}
protected void compute() {
try {
if (hi - lo < THRESHOLD)
sequentiallySort(array, lo, hi);
else {
int mid = (lo + hi) >>> 1;
//System.out.println(mid);
forkJoin(new SortTask(array, lo, mid), new SortTask(array, mid + 1, hi));
merge(array, lo, mid, hi);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
/** *//**
* @param args
*/
public static void main(String[] args) {
int[] sample = new int[ARRAY_LENGTH];
System.out.println("Number of processor"
+ Runtime.getRuntime().availableProcessors());
Random seed = new Random(47);
for (int i = 0; i < sample.length; i++) {
sample[i] = seed.nextInt();
}
long start = System.nanoTime();
Arrays.sort(sample);
long duration = System.nanoTime() - start;
System.out.println("===============Sequential==================");
System.out.println("Sorting takes " + duration + " to compelte");
int[] sample2 = new int[ARRAY_LENGTH];
for (int i = 0; i < sample2.length; i++) {
sample2[i] = seed.nextInt();
}
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime()
.availableProcessors());
SortTask st = new SortTask(sample2, 0, sample2.length - 1);
start = System.nanoTime();
pool.execute(st);
while (!st.isDone()) {
}
duration = System.nanoTime() - start;
System.out.println("===============ForkJoin==================");
System.out.println("Sorting takes " + duration + " to compelte");
}
}