[本文是我对Java Concurrency In Practice C08的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
task和线程池执行机制之间隐式的耦合
前面曾提到过, 线程池的应用解耦了task的提交和执行. 事实上, 这有所夸大, 因为不是所有的task都适用于所有的执行机制, 某些task要求在特定的线程池中执行:
1. 非独立task, 指的是依赖于其他task的任务.
2. 要求在单线程中运行的task. 某些task不是线程安全的, 无法并发运行. Executors.newSingleThreadExecutor()方法返回的线程池只包含单个线程, 提交给该线程池的task将缓存在一个无界队列中, 线程池中所包含的单个线程将依次从队列中取出task运行.
3. 响应时间敏感的task. 某些task要求必须在极短的时间内开始执行, 比如GUI应用中处理用户点击操作的task. 假如提交给某一线程池的task既包含long-running task, 也包含响应时间敏感的task, 那么响应时间敏感的task可能无法在极短的时间内得到执行.
4. 使用了ThreadLocal类的task. 线程池的标准实现可能会在空闲时销毁多余的线程, 繁忙时创建更多的线程, 更有可能重用线程. 所以使用了ThreadLocal的task不应该提交给线程池运行, 除非ThreadLocal的使用只限定在单个task内, 不用于多个task之间通信.
线程饥饿死锁
如果提交给线程池运行的task之间不是相互独立的, 就有可能出现线程饥饿死锁. 比如提交给SingleThreadExecutor执行的2个task, task A在执行过程中需要等待task B的执行结果才能继续, 而此时没有多余的线程用于执行task B, 如此就发生了线程饥饿死锁.
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Runnable taskB = new Runnable() {
@Override
public void run() {
//...
}
};
Runnable taskA = new Runnable() {
@Override
public void run() {
Future<?> future = executor.submit(taskB);
try {
System.out.println("waiting for taskB complete");
// get方法将阻塞, 直到taskB执行完成
// 但是由于线程池中只有一个线程, 而该线程已经被taskA占用, 所以taskB将没有机会执行.
// 此时就发生了线程饥饿死锁
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
//...
}
};
executor.submit(taskA);
}
}
不仅SingleThreadExecutor执行相互依赖的task时会发生死锁, 其他线程池执行相互依赖的task时也可能发生死锁:
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newFixedThreadPool(3);
// 设定await在Barrier对象上的线程数达到4个时, 其await方法才释放
final CyclicBarrier barrier = new CyclicBarrier(4);
// 重复提交4个task, 每个task都await在barrier对象上
// barrier的await方法将一直阻塞, 直到4个线程都到达await点.
// 但是线程池中只有3个线程, 不可能出现4个线程都达到await点的情形, 所以依然会发生死锁
for (int i = 0; i < 4; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("waiting for other tasks arriving at common point");
barrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
}
避免相互依赖的task提交给同一线程池执行时发生死锁的唯一方法是: 线程池中的线程足够多.
确定线程池的size
如果线程池的size过大, 将造成内存等资源的浪费, 甚至使得资源耗尽. 如果线程池的size过小, 将造成CPU的利用率不高. 确定合适的size需要考虑:CPU数, 内存, 是计算密集型task还是I/O密集型task, 是否需要获取稀缺资源(比如数据库连接)等.
对于计算密集型task, 合适的size大约为CPU数量+1. 对于I/O占较大比例的task, 合适的size可以通过以下公式确定: size = CPU数量 * CPU利用率 * (1 + I/O时间比例). Runtime.getRuntime().availableProcessors()返回CPU的个数.
当然, 实际开发中size还受到内存, 文件句柄, socket, 数据库连接数等稀缺资源的约束. 将总的稀缺资源除以每一个task使用的资源数, 能得到线程数的上限.
循环并行化
如果循环体所进行的操作是相互独立的, 这样的循环可以并发的运行:
// 循环操作
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
// 将相互独立的循环操作转变为并发操作
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements) {
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
如果希望同时提交一系列task, 并且等待它们执行完毕, 可以调用ExecutorService.invokeAll方法.
如果希望task执行完毕之后就获取其执行结果, 可以使用CompletionService.