[本文是我对Java Concurrency In Practice 7.1的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
启动线程之后, 大多数时候我们等待线程运行完成后自动结束. 但是有时我们希望可以提前终止线程的运行:
1. 用户申请取消时. 比如用户点击了取消按钮.
2. 时间限制的任务. 有些任务具有时间限制, 如果在一定的时间内仍然没有得到想要的结果, 我们可能希望终止该任务的运行.
3. 发生特定的事件时. 比如多个线程同时在不同的位置搜索某一文件, 当其中一个线程搜索到了想要的文件, 应该终止其他仍在运行的线程.
4. 发生错误时. 比如发生了磁盘已满的错误, 需要向磁盘写入数据的线程应该提前终止.
5. 应用或者服务被关闭时.
java没有直接规定如何安全的提前终止线程的运行, 相反, 提供了不具约束力的协商式机制: 线程A可以请求线程B中断, 但是是否响应, 何时响应, 如何响应中断请求, 由线程B自己决定. 每个线程对象都有一个boolean型的中断标记, 其他线程请求目标线程中断时, 会将目标线程的中断标记设置为true, 然后由目标线程自己决定如何处理. 所以中断线程时, 我们需要知道目标线程的中断机制. 如果我们不知道目标线程会怎样处理中断请求, 不要贸然请求其中断. Thread类中与中断标记相关的方法有:
public class Thread {
// 请求线程中断, 该方法会将线程的中断标记设置为true. 如何处理中断由目标线程决定
public void interrupt() { ... }
// 返回中断标记的值
public boolean isInterrupted() { ... }
// 这个方法的命名很让人蛋疼. 该静态方法用于重置中断标记(将其设置为false), 并返回重置之前的值
public static boolean interrupted() { ... }
...
}
设置自定义flag结束线程
在深入了解java的中断机制之前, 我们先看一个通过设置自定义的flag结束线程的例子:
public class PrimeGenerator implements Runnable {
private final List<BigInteger> primes = new ArrayList<BigInteger>();
/**
* 自定义的flag, 为保证线程可见性, 将其声明为volatile
*/
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
// 每次循环之前检查cancelled标记的值, 如果cancelled为true, 循环终止, 线程也就运行结束了
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
public static void main(String[] args) {
PrimeGenerator generator = new PrimeGenerator();
Thread t = new Thread(generator);
t.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 通过调用cancel方法, 将自定义的cancelled标记设置为true, 从而使得线程t运行终止
generator.cancel();
System.out.println(generator.get().size());
}
}
自定义flag结束线程存在的问题
PrimeGenerator自定义了cancelled标记, 在继续下一次循环之前, 轮询该标记的值. 当cancelled标记为true时, 循环不再继续.
这种方式在PrimeGenerator中可以起到期望的作用, 但使用这种方式结束线程存在潜在的问题: 假如循环中执行了阻塞操作, 那么即使cancelled标记被设置为true, run方法却没有机会去检查cancelled标记的值, 所以线程将迟迟无法结束:
class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
// 当队列已满时, put方法将会阻塞. 一旦put方法阻塞, 且没有其他线程从队列中取数据时, 阻塞将一直持续下去
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException consumed) {
//...
}
}
public void cancel() {
cancelled = true;
}
public static void main(String[] args) {
// 设置队列的最大容量为10
BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>(10);
BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
producer.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.cancel();
}
}
在主线程中启动BrokenPrimeProducer线程, 1s后调用其cancel方法. 队列的最大容量被设定为10, 1s后队列肯定已满, 也就是说BrokenPrimeProducer线程将在put方法上阻塞, 没有机会去检查cancelled标记, 从而导致BrokenPrimeProducer线程无法结束.
可中断的阻塞方法
java API中的大多数阻塞方法都是可中断的, 如Thread.sleep, Object.wait, BlockingQueue.put等. 可中断的阻塞方法有一个共同的特点: 声明抛出InterruptedException异常. 可中断的阻塞方法在阻塞期间会周期性检查当前线程的中断标记, 如果发现当前线程的中断标记为true, 就重置中断标记后提前从阻塞状态返回, 并抛出InterruptedException异常.
据此我们可以改进BrokenPrimeProducer类:
class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
// 每次循环前检查当前线程的中断标记, 如果中断标记为设定为true, 则循环结束
// 就算当前线程阻塞在put方法上, 在阻塞期间也会周期性检查中断标记, 一旦发现中断标记为true, 就会从阻塞状态中返回, 并抛出InterruptedException异常
while (!Thread.currentThread().isInterrupted()) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException consumed) {
System.out.println("InterruptedException happened");
}
}
public void cancel() {
// interrupt方法会将当前线程的中断标记设置为true
interrupt();
}
public static void main(String[] args) {
// 设置队列的最大容量为10
BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>(10);
PrimeProducer producer = new PrimeProducer(primes);
producer.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.cancel();
}
}
通过Future终止线程运行
有时我们将task提交给线程池运行, 由于我们不知道task会由线程池中的哪一个线程运行, 也不知道线程池中的线程会怎样处理中断, 所以无法直接调用Thread对象的interrupt方法提前终止线程的运行. 但是ExecutorService类的submit, invokeAll等方法会返回表示task未决结果的Future对象, 调用Future对象的cancel方法, 可以取消task的运行. Future类中与取消有关的方法有:
1. boolean cancel(boolean mayInterruptIfRunning). 该方法尝试取消task的执行. 如果task已经完成, 或已取消, 或由于某些原因无法取消, 则尝试失败, 返回false.
如果task尚未启动, 则成功调用其Future对象的cancel方法将导致其永不启动.
mayInterruptIfRunning如果为true, 且此时task正在某个线程中运行, 那么该线程的中断标记将被设置为true.
当mayInterruptIfRunning为false时, 如果task没有启动则不再启动, 如果task已经启动, 则尝试失败.
如果task没有处理中断, mayInterruptIfRunning应该为false.
此方法返回后, isDone方法将始终返回true. 如果此方法返回true, 对isCancelled方法的后续调用将始终返回true.
2. boolean isDone(). 如果task已经完成, 该方法返回true. 完成的情况包括正常完成, task被取消, 异常终止等.
3. boolean isCancelled(). 如果task正常完成前被取消, 该方法返回true.
前面提到, 如果不知道线程会怎样处理中断, 就不应该调用该线程的interrupt方法, 那么调用Future的cancel方法, 并将mayInterruptIfRunning参数设置为true是否合适? 线程池中用于执行task的线程会将中断的处理委托给task, 所以这样做是合适的(前提是task正确处理了中断).
使用Future取消task的例子:
/**
* 执行一项任务, 如果指定时间内没有正常完成, 就取消该任务
*/
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
// 如果线程池中的线程执行任务过程中该线程发生了中断, 那么调用task的get方法将会抛出InterruptedException异常.
// 对于InterruptedException, 按照之前总结的方法处理即可. 此例将其抛给上层.
task.get(timeout, unit);
} catch (TimeoutException e) {
// 如果发生TimeoutException异常, 表明执行时间超时, 此时取消该任务即可
} catch (ExecutionException e) {
// 发生其他异常时, 不仅要取消任务的执行, 也应该重抛该异常
throw launderThrowable(e.getCause());
} finally {
task.cancel(true);
}
}
线程的中断方式总结:
1. 可以通过设置自定义标记结束线程. 但是这样方式在包含阻塞方法的任务中不适用.
2. interrupt线程. 前提是知道目标线程会怎样处理interrupt请求.
3. 如果是提交给线程池运行的任务, 可以调用Future.cancel.