Java并发基础实践--分而治之
本系列的第三篇文章将以实现一个极简单的查找最大数的任务为例,分别给出了四个版本:1.顺序执行;2.基于传统的Thread.join();3.基于并发工具包的Future;4.基于JDK 7引入的Fork/Join框架。(2013.10.25最后更新) 分而治之(Divide-and-Conquer)是解决复杂问题的常用方法。在并发应用中,可将一个复杂算法分解为若干子问题,然后使用独立的线程去执行解决子问题的程序,之后再将各个子问题的结果逐级进行合并以得到最终结果。在特定环境中,这种方式可能较好地提高程序的执行效率。方案1:顺序执行 找出对给定整形数组中的最大值,使用的方法很简单,就是逐一遍历每个元素,将当前元素与当前最大值进行比较,若当前元素的值更大,则将该值作为新的当前最大值,再去与下一个元素进行比较,如此反复。在编写并发程序来实现这个算法之前,本文将先给出一个顺序执行的实现版本,但依然利用了分而治之的思想。即,先将给定数列分割成若干较小的子数列,找出各个子数列的最大值,将这些最大值组成一个新的数列,然后再使用同样的方法对这个新数列进行分割与最大值合并,...依次类推,直至找到最大值。 代码清单1中的MaxNumberFinder是本文的基础类:1.getMaxNumber()方法展示了如何查找一个数列中的最大值;2.使用工具方法getNumberArray()/createNumberArray()可以创建指定长度的随机整数数列;3.方法findMaxNumber()展示了将一个数列分割为子数列(子数列的最大长度不超过指定值THRESHOLD),并查找子数列的最大值,以及将子数列的最大值组成各级新的中间数列去查找其最大值。清单1
public class MaxNumberFinder {
public static final int THRESHOLD = 50;
private static final Random random = new Random();
public static int[] getNumberArray() {
return createNumberArray(3000);
}
private static int[] createNumberArray(int capacity) {
int[] numbers = new int[capacity];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = random.nextInt(capacity);
}
return numbers;
}
public static int getMaxNumber(int[] numbers) {
if (numbers.length == 0) {
return Integer.MIN_VALUE;
}
int max = numbers[0];
for (int i = 1; i < numbers.length; i++) {
if (numbers[i] > max) {
max = numbers[i];
}
}
return max;
}
private static int findMaxNumber(int[] numbers) {
// interim max number array
int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
maxNumbers[i / THRESHOLD] = getMaxNumber(subNumbers);
}
if (numbers.length % THRESHOLD != 0) {
int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
maxNumbers[maxNumbers.length - 1] = getMaxNumber(lastSubNumbers);
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return getMaxNumber(maxNumbers);
}
}
}
方案2:基于Thread.join() 基于方法MaxNumberFinder.findMaxNumber()的实现,在每次分割之后得到的数列,以及合并得到的中间最大值的数列,都可以使用独立的线程去分别查找它们的最大值,如代码清单2所示。清单2
public class MaxNumberFinderOnThread {
private static int findMaxNumber(int[] numbers) {
// interim max number array
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
// the threads for searching max value in sub number array
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
final int bufIndex = i / THRESHOLD;
Thread bufThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[bufIndex] = MaxNumberFinder.getMaxNumber(subNumbers);
}
});
bufThread.start();
threads.add(bufThread);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
final int lastIndex = (numbers.length - 1) / THRESHOLD;
Thread lastThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[lastIndex] = MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
threads.add(lastThread);
lastThread.start();
}
// waiting for all of jobs are finished
for (int i = 0, size = threads.size(); i < size; i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
为了能够将同一级线程所找的最大值合并成一个新的中间最大值数列,必须要等待这一组线程全部执行完毕。而通过分别调用每个线程实例中join()方法即可满足这一要求。 必须注意的是,若数列的很长,而每次最多处理的数列较短(即,THRESHOLD值较小),该方案将会产生较多的线程,消耗大量内存。另外,还会有较多的高层线程在等待低层线程的执行结果,这可能会大大影响整个任务的执行效率。方案3:基于Future 方案2使用的是旧有API,根据本系列上一篇中所提及的并发工具包中的Future,同样可以实现这一功能。只需要将使用Thread/Runnable的地方,相应地替换成使用Future/Callable即可,如代码清单3所示。清单3
public class MaxNumberFinderOnFuture {
private static ExecutorService executor = Executors.newCachedThreadPool();
private static int findMaxNumber(int[] numbers) {
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
Future<Integer> bufFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(subNumbers);
}
});
futures.add(bufFuture);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
Future<Integer> lastFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
futures.add(lastFuture);
}
// retrieve results from Futures one by one,
// get() method will be blocked if the searching isn't finished
for (int i = 0, size = futures.size(); i < size; i++) {
try {
maxNumbers[i] = futures.get(i).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
这一版的findMaxNumber()实现中,没有显示地调用Thread.join()方法去等待线程的执行结果。但在任务执行完成之前,调用Future.get()时会被阻塞,在此处的效果就与Thread.join()相同。方案4:基于Fork/Join 方案2与方案3不一定能够提高执行效率。如果该应用程序运行在单核处理器上,或者它没有利用上多核处理器中的多个内核,那么它的执行时间很有可能要长于方案1所使用的顺序执行方案,因为线程的创建、调度、上下文切换都会产生额外的开销。 为了更好地适应已经十分普遍的多核处理器场景,JDK 7引入了Fork/Join框架。如代码清单4所示,基于该框架提供的RecursiveTask,我们就可以直接地对任务进行分割与合并,程序本身也更为清晰简洁。清单4
public class NumberFinderOnForkJoin extends RecursiveTask<Integer> {
private static final long serialVersionUID = -5871813408961649666L;
private int[] numbers = null;
private int maxNumber = Integer.MIN_VALUE;
public NumberFinderOnForkJoin(int[] numbers) {
this.numbers = numbers;
}
@Override
public Integer compute() {
if (numbers.length <= THRESHOLD) {
maxNumber = NumberFinder.getMaxNumber(numbers);
} else {
int[] leftNumbers = new int[numbers.length / 2];
System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length);
int[] rightNumbers = new int[numbers.length - numbers.length / 2];
System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length);
// divide the task into two sub-tasks.
NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers);
NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers);
invokeAll(leftTask, rightTask);
maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber);
}
return maxNumber;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray());
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
System.out.println(task.get());
pool.shutdown();
}
}
稍稍说明一下,方案4对数列进行分割的方法与前三个方案都不同。方案4是使用的二分法,而前三个方案都是按从前往后的顺序依次取出不长于THRESHOLD的子数列。但这两种方法没有本质上的区别,最多只是分割的次数略有不同罢了。例如,一个长度为9的数列,将THRESHOLD的值设为3。若用二分法,第一轮会分割出4个子数列,其长度分别为3,2,3和1,后面还需要进行一轮分割;但若用顺序法,分割将得到的3个子数列,其长度均为3,之后不需要再进行分割了。小结 分解任务,各个击破,是应对复杂问题的惯用伎俩。在资源充足的情况下,应该尽可能地利用空闲的计算资源。Java并发工具包提供了适应多核环境的运行框架,使应用程序能更高效地利用多核处理器。 但对执行方案的选定,包括THRESHOLD的值,依然要基于性能测试。对于本文的例子,在我的测试环境中,方案1其实是最高的。在并发执行方案中,方案2会明显慢于方案3和方案4,而方案3与方案4之间则难分伯仲。