Java Tutorials -- Concurrency
近一段时间在使用Thinking in Java(4th, English)和Java Concurrency in Practice学习Java并发编程。不得不说官方的Java Tutorias是很好的Java并发编程入门级教程,故将它其中的Concurrency一章翻译在了此处。与我翻译Java Tutorias中Generics一章时的目的相同,只是对自己近一段时间学习的回顾罢了,也希望对其它朋友能有所助益。(2007.11.29最后更新)
课程: 并发
计算机用户们将他们的系统能够在同一时刻做一件以上的事情视为一种当然。他们猜想着,他们在使用字处理器的同时其它的应用程序正在下载文件,管理打印队列和传输音频流。例如,音频流应用程序必须在同一时刻从网络上读取数字音频数据,解压它们,管理重放功能,并更新它们的显示方式。
Java平台彻底地被设计为支持并发的编程,它有着在Java程序设计语言和Java字节类库中的基本并发支持。从5.0开始,Java平台也将包含着高级的并发API。该课程介绍了该平台的基本并发编程支持,并概述了java.util.concurrent包中的一些高级API。
进程与线程
在并发编程中,有两种基本的执行单元:进程和线程。在Java程序设计语言中,并发编程几乎只关心线程。当然,进程也是重要的。
计算机系统一般都有很多活跃的进程与线程。甚至在只有一个执行内核的系统中也是如此。在一个给定的时刻,确实只能有一个线程是在执行。通过一种称之为"分片"的操作系统特性,进程与线程共享着单核的处理时间。
计算机系统拥有多个处理器或有多个执行内核的单处理器正在变得越来越普遍。这将很大地提升系统在处理进程与线程的并发执行时的能力。
进程
一个进程拥有一个自我包括的执行环境。一个进程一般拥有一个完整的,内置的基本运行时资源;特别地,每个进程都拥有它自己的内存空间。
进程经常被视为程序或应用的同义词。然而,用户所看到的单个应用可能实际上是一组相互协作的进程。为了便于进程之间的通信,大多数操作系统支持内部进程通信(Inter Process Communication, IPC),例如管道与套接字。IPC不仅被用于同一个系统中进程之间的通信,还可以处理不同系统中进程之间的通信。
Java虚拟机的大多数实现都是作为单个进程的。一个Java应用程序可以使用ProcessBuilder对象来创建额外的进程。多进程应用程序已经超出了本课程的范围。
线程
线程有时候被称之为轻量级进程。进程和线程都能提供一个执行环境,但创建一个线程所需要的资源少于创建一个进程。
线程存在于一个进程中--每个进程至少有一个线程。线程分享进程的资源,包括内存和被打开的文件。这样做是为了高效,但在通信方面有着潜在的问题。
多线程执行是Java平台的一个本质特性。每个应用程序至少拥有一个线程--或者说是多个,如果你把像内存管理和信号处理这样的系统进程算进来的话。但从应用程序员的角度来看,你仅以一个线程开始,该线程被称之为主线程。该线程拥有创建其它线程的能力,我们将在下一节中使用例子证明这一点。
线程对象
每个线程都关联着一个Thread类的实例。为了创建并发应用,有两种使用Thread对象的基本策略。
* 为了直接的控制线程的创建与管理,当应用每次需要启动一个同步任务时就实例化一个Thread类。
* 从你的应用中将线程管理抽象出来,将应用的任务传递给一个执行器(Executor)。
本节将描述Thread对象的使用,执行器将与其它的高级并发对象们一起讨论。
定义与启动一个线程
一个应用要创建一个Thread的实例就必须提供那些要在该线程中运行的代码。有两种方法去做这些:
* 提供一个Runnable对象。Runnable接口只定义了一个方法--run,它要包含那些将在该线程中执行的代码。Runnable对象将被传入Thread的构造器,如例子HelloRunnable如示:
public class HelloRunnable implements Runnable {
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}
}
* 继承Thread类。Thread类本身也实现了Runnable接口,可是它的run方法什么都没有做。一个应用可以继承Thread类,并提供它自己的run方法实现,就如HelloThread类所做的:
public class HelloThread extends Thread {
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String args[]) {
(new HelloThread()).start();
}
}
注意,为了启动新的线程,这两个例子都调用了Thread.start方法。
你应该使用哪一种方式呢?第一种方式使用了Runnable对象,这更加通用,因为Runnable对象还可以继承Thread之外的其它类。第二种方式可以很方便的使用在简单应用中,但这样的话,你的任务类必须被限制为Thread的一个子类。本课程关注于第一种方式,该方法将Runnable任务从执行该任务的Thread对象中分离出来。这种方法不仅更富弹性,而且它还适用于后面将要提到的高级线程管理API中。
Thread类为线程的管理定义了一组很有有的方法。它们包括一些能够提供关于该线程信息的静态方法,以及当该线程调用这些方法时能够影响到该线程的状态。另一些方法则是被管理该线程和Thread对象的其它的线程调用。
使用sleep方法暂停执行
Thread.sleep方法使当前运行的线程暂停执行一段指定的时间周期。这是使相同应用中的其它线程,或是相同计算机系统中的其它应用能够获得处理器时间的有效方法。sleep方法也被用于"缓步",如接下来的例子所示,并且等待其它负有任务的线程,这些线程被认为有时间需求。
sleep方法有两个相互重载的版本:一个指定了睡眠时间的毫秒数;另一个指定了睡眠的纳秒数。然而,这些睡眠时间都无法得到精确的保证,因为受底层操作系统所提供的机制的限制。而且,睡眠周期会由于中断而被停止,我们将在下面的章节中看到。在任何情况下,你都不能猜想调用sleep方法后都能精确地在指定时期周期内暂停线程。
示例SeelpMessages使用sleep方法在每4秒的间隔内打印信息:
public class SleepMessages {
public static void main(String args[]) throws InterruptedException {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
for (int i = 0; i < importantInfo.length; i++) {
//Pause for 4 seconds
Thread.sleep(4000);
//Print a message
System.out.println(importantInfo[i]);
}
}
}
注意main方法声明了它会抛出InterruptedException异常。当其它的线程中断了正处于睡眠的当前线程时,sleep方法就会抛出该异常。由于该应用并没有定义其它的线程去造成中断,所以它没必要去捕获InterruptedException。
中断
中断指明了一个线程应该停止它正在做的事情并做些其它的事情。它使程序员要仔细地考虑让一个线程如何回应中断,但十分普通的做法是让这个线程停终止。这是本课程特别强调的用法。
一个线程以调用Thread对象中的interrupt方法的方式将中断信号传递给另一个线程而使它中断。为了使中断机制能正确地工作,被中断的线程必须支持它自己的中断。
支持中断
一个线程如何支持它自己的中断呢?这取决于它当前正在干什么?如果该线程正在调用那些会抛出InterruptedException的方法,那么当它捕获了该异常之后就只会从run方法内返回。例如,假设SleepMessages示例中打印信息的循环就在该线程的Runnable对象的run方法中。再做如下修改,使它能够中断:
for (int i = 0; i < importantInfo.length; i++) {
//Pause for 4 seconds
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
//We've been interrupted: no more messages.
return;
}
//Print a message
System.out.println(importantInfo[i]);
}
许多会抛出InterruptedException异常的方法,如sleep,都被设计成当它们收到一个中断时就取消它们当前的操作并立即返回。
如果该线程长时间运行且没有调用会抛出InterruptedException异常的方法,那又会怎样呢?它必须周期性地调用Thread.interrupted方法,如果收到了一个中断,该方法将返回true。例如:
for (int i = 0; i < inputs.length; i++) {
heavyCrunch(inputs[i]);
if (Thread.interrupted()) {
//We've been interrupted: no more crunching.
return;
}
}
在这个简单的例子中,这些代码仅是简单地测试该线程是否收到了中断,如果收到了就退出。在更复杂的例子中,它可能为了更有意义些而抛出一个InterruptedException异常:
if (Thread.interrupted()) {
throw new InterruptedException();
}
这就能使处理中断的代码被集中在catch语句块中。
中断状态标志
被认为是中断状态的内部标志用于中断机制的实现。调用Thread.interrupt方法会设置这个标记。当一个线程调用静态方法Thread.interrupted去检查中断时,中断状态就被清理了。用于一个线程查询另一个线程中断状态的非静态方法Thread.isInterrupted不会改变中断状态标志。
按照惯例,任何通过抛出InterruptedException异常而退出的方法都会在它退出时清理中断状态。然而,总存在着这样的可能性,中断状态会由于其它线程调用interrupt方法而立即被再次设置。
Joins
join允许一个线程等待另一个线程完成。如果Thread对象t的线程当前正在执行,
t.join();
上述语句将导致当前线程暂停执行只到t的线程终止为止。join的一个重载版本允许程序员指定等待的周期。然而,与sleep方法一样,join的时长依赖于操作系统,所以你不应该设想join将准确地等待你所指定的时长。
像sleep方法一样,在由于InterruptedException异常而退出时,join方法也要应对中断。
SimpleThreads示例
下面的例子汇集了本节一些概念。SimpleThreads类由两个线程组成。第一个线程就是每个Java应用都有的主线程。主线程从一个Runnable对象,MessageLoop,创建一个新的线程,并等待它结束。如果MessageLoop线程花的时间太长了,主线程就会中断它。
MessageLoop线程打印出一系列的信息。如果在打印出所以信息之前就被中断了,MessageLoop线程将会打印一条信息并退出。
public class SimpleThreads {
//Display a message, preceded by the name of the current thread
static void threadMessage(String message) {
String threadName = Thread.currentThread().getName();
System.out.format("%s: %s%n", threadName, message);
}
private static class MessageLoop implements Runnable {
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
try {
for (int i = 0; i < importantInfo.length; i++) {
//Pause for 4 seconds
Thread.sleep(4000);
//Print a message
threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
threadMessage("I wasn't done!");
}
}
}
public static void main(String args[]) throws InterruptedException {
//Delay, in milliseconds before we interrupt MessageLoop
//thread (default one hour).
long patience = 1000 * 60 * 60;
//If command line argument present, gives patience in seconds.
if (args.length > 0) {
try {
patience = Long.parseLong(args[0]) * 1000;
} catch (NumberFormatException e) {
System.err.println("Argument must be an integer.");
System.exit(1);
}
}
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
Thread t = new Thread(new MessageLoop());
t.start();
threadMessage("Waiting for MessageLoop thread to finish");
//loop until MessageLoop thread exits
while (t.isAlive()) {
threadMessage("Still waiting...");
//Wait maximum of 1 second for MessageLoop thread to
//finish.
t.join(1000);
if (((System.currentTimeMillis() - startTime) > patience) &&
t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
//Shouldn't be long now -- wait indefinitely
t.join();
}
}
threadMessage("Finally!");
}
}
同步
线程通信主要是通过访问共享的字段以及这些字段所涉及的对象引用。这种通信的形式十分的高效,但它可能造成两种错误:线程干涉和内存一致性错误。用于阻止这些错误的工具就是同步。
* 线程干预介绍了当多个线程访问共享数据时产生的错误。
* 内存一致性错误介绍了指由对共享内存不一致的查看而导致的错误。
* 同步方法介绍了一种能够有效地防止线程干预和内存一致性错误的常用方法。
* 隐含锁和同步介绍了一种更通用的同步方法,并介绍了同步是如何基于隐含锁的。
* 原子访问介绍这种通用的不会受其它线程干预的操作概念。
线程干预
考虑这个叫Counter的简单类
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c--;
}
public int value() {
return c;
}
}
Counter被设计成让每次调用increment方法后c就加1,而每次调用decrement方法后c就减1。然而,如果一个Counter对象被多个线程所引用,那么线程之前的干预可能不会使所期望的事情发生。
当在不同线程中的两个操作交叉地作用于同一数据时,干预就发生了。这就是说两个操作由多步组成,并且步调之间相互重叠。
看起来作用于Counter实例的操作不可能是交叉的,因为这两个关于c变量的操作都是一元的简单语句。可是,如此简单的语句也能够被虚拟机解释成多个步骤。我们不用检查虚拟机所做的特定步骤--我们足以知道一元表达式c++可被分解成如下三步:
1. 获取c的当前值。
2. 将这个被取出的值加1。
3. 将被加的值再放回c变量中。
表达式c--也能被进行相同地分解,除了将第二步的加替换为减。
猜想在线程B调用decrement方法时,线程A调用了increment方法。如果c的初始值为0,它们交叉的动作可能是如下顺序:
1. 线程A:取出c。
2. 线程B:取出c。
3. 线程A:将取出的值加1,结果为1。
4. 线程B:将取出的值减一,结果为-1。
5. 线程A:将结果存于c中,c现在为1。
6. 线程B:将结果存于c中,c现在为-1。
线程A的结果丢失了,被线程B覆盖了。这个特殊的交叉只是一种可能性。在不同的环境下,可能是线程B的结果丢失,也可能根本就没有发生任何错误。因为它们是不可能预知的,所以很难发现并修正线程干预缺陷。
内存一致性错误
当不同的线程观察到本应相同但实际上不同的数据时,内存一致性错误就发生了。导致内存一致性错误的原因十分复杂并且超出了本教程的范围。幸运地是,应用程序员并不需要了解这些原因的细节。所需要的就是一个避免它们的策略。
避免内存一致性错误的关键就是要理解"happens-before"的关系。这个关系保证了由一个特定的语句所写的内存对其它特定的语句都是可见。为了解它,可以考虑下面的例子。假设一个简单的int型字段的定义与初始化:
int counter = 0;
counter字段被两个线程,A和B,共享。假设线程A增加counter的值:
counter++;
很短的时间之后,线程B打印出counter的值:
System.out.println(counter);
如果这两条语句是在同一个线程中执行,那就是可以很肯定地猜测被打印出的会是"1"。但如果在不同的线程中执行这两条语句,被打印出的值可能正好是"0",因为没有什么能保证线程A对counter的改变能被线程B看到--除非应用程序员在这两条语句之间建立了"happens-before"关系。
有多种方式能够创建"happens-before"关系。其中之一就是同步,我们将在接下来的一节中看到它。
我们已经看到了两种建立"happens-before"关系的方法。
* 当一条语句调用Thread.start方法,一个新的线程执行的每条语句都有"happens-before"关系的语句与那些也有着"happens-before"关系。这些代码的作用就是使新线程的创建对于其它的新线程是可见的。
* 当一个线程终止并在另一个线程中调用Thread.join导致返回,然后所有的由已终止的线程执行的语句伴着随后成功join的所有语句都有"happens-before"关系。那么在该线程中的代码所产生的影响对于join进来的线程就是可见的。
要看创建"happens-before"关系的一列方法,可以参考java.util.concurrent包的摘要页面。
同步方法
Java设计程序需要提供两种基本的同步常用法:同步方法和同步语句。其中更为复杂的一种,同步语句,将在下一节讲述。本节是关于同步方法的。
使一个方法是可同步的,只要简单地将关键字synchronized加到它的声明中:
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
如果count是SynchronizedCounter的一个实例,那么使这些方法同步将有两个作用:
* 第一,对同一个对象中的同步方法进行交叉的调用就不可能了。当一个线程正在调用一个对象中的一个同步方法时,所有其它的调用该对象的同步方法的线程将被阻塞,直到第一个线程结束对该对象的工作。
* 第二,当同步方法存在,它就会与在同一对象中后序调用的方法自动地建立"happens-before"关系。
注意,构造器不能是可同步--对一个构造器使用关键字synchronized是一个语法错误。同步构造器没有意义,因为只有一个线程要创建对象,当它正在被构造时才会访问构造器。
警告:当构造一个将会在线程之间共享的对象时,要非常小心对象的引用过早地"溢出"。例如,假设你要维护一个叫instances的List去包含class的每个实例。你可能会尝试着加入下面一行
instances.add(this);
到你的构造嚣。但之后其它的线程可以在这个对象构造完成之前就可以instances去访问该对象。
同步方法使一个简单的防止线程干预和内存一致错误的策略成为可能:如果一个对象对于一个以上的线程是可见的,所有针对该对象的变量的读与写都要通过同步方法。(有一个很重要的例外:在被构造之后就不能被修改的final字段,一旦它被创建,就能够被非同步方法安全地读取)。这种策略十分高效,但会出现活跃度问题,我们将在后面的教程中见到。
内部锁与同步
同步是围绕着一个被认为是内部锁或监视锁的内部实体而建立(API规范经常就称这个实体为"监视器")。内部锁在同步的两个方面扮演着角色:强制排他地访问一个对象的状态,为那些必须是可见的状态建立"happen-before"关系。
每个对象都有一个与之关联的内部锁。一般地,一个线程要排他并一致地访问一个对象的字段就必须在访问它之前就获得这个对象的内部锁,在这个线程使用完之后就释放这个内部锁。在获得锁与释放锁之间的这段时间内,这个线程被认为拥有这个内部锁。一但线程拥有了内部锁,其它的线程就不能再获得相同的锁了。当另一个线程试图获得这个锁时,它将会被阻塞。
当线程释放了一个内部锁,在这个动作与后续想获得同一个锁的动作之间的"happens-before"关系就建立起来了。
在同步方法中的锁
当线程调用了同步方法,它就自动地获得这个方法所在对象的内部锁,当这个方法返回时它就会释放这个锁。即使这个返回是由一个未捕获的异常造成的,锁也会被释放。
你可能会对调用一个静态的同步方法时所发生的事情感到惊讶,因为静态方法是与一个类,而不是一个对象,相关联的。在这种情况下,线程要求获得与这个类相关的Class对象的内部锁。因此访问类的静态字段是被一个与作用于类的实例的锁不同的锁控制的。
同步语句
创建同步代码的另一种方式是使用同步语句。与同步方法不同,同步语句必须要指定提供内部锁的对象:
public void addName(String name) {
synchronized(this) {
lastName = name;
nameCount++;
}
nameList.add(name);
}
在这个例子中,addName方法要对lastName和nameCount的修改进行同步,但也要避免同步地调用另一个对象中的方法(从同步代码中调用另一个对象的方法会产生的问题将在Liveness章节中讲述)。不用同步语句,就只能是一个隔离的非同步方法,其目的只是为了调用nameList.add方法。
同步语句对使用细致的同步去提高并发应用也是有用的。例如,假设类MsLunch有两个实例字段,c1和c2,从来都没有一起被使用过。这些字段的更新都必须是同步的,但没有道理在交叉地对c2进行更新时防止对c1的更新--这样做会创建不必要的阻塞而减少并发。我们创建两个对象单独地提供锁,而不是使用同步方法或反而使用与this关联的锁。
public class MsLunch {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();
public void inc1() {
synchronized(lock1) {
c1++;
}
}
public void inc2() {
synchronized(lock2) {
c2++;
}
}
}
使用这种方法必须极其的小心。你必须非常地肯定交叉地访问这些受影响的字段是安全的。
可重进入的同步
回忆一下,线程不能获得被其它线程占有的锁。但线程可以获得被它自己占有的锁。允许线程多次获得相同的锁就能够形成可重进入的同步。这就能解释这样一种情况,当同步代码直接或间接地调用了一个已经包含同步代码的方法,但两组代码都使用相同的锁。没有可重进入的同步,同步代码将不得不采取更多额外的预防措施去避免线程被自己阻塞。
原子访问
在编程中,一个原子操作就是所有有效的动作一次性发生。原子操作不能在中间停止:它要么完全发生,要么完全不发生。原子操作不会有任何可见的副作用,直到该行为完成。
我们已经看到了像c++这样的加法表达式不是一个原子操作。非常简单的表达甚至都可以被定义成能被分解为其它操作的复杂操作。但是,有些操作你可以认为它们是原子的:
* 读和写引用变量和大多数基本数据类型变量(除long和double之外的其它基本数据类型)
* 读和写被声明为volatile的变量(包括long和double型的变量)都是原子的。
原子操作不能被交叉地执行,因此使用它们可以不必担心线程干预。然而,这并不能完全清除对原子操作进行同步的需要,因为内存一致性错误的可能性仍然存在。使用volatile变量可以降低内存一致性错误的风险,因为任何针对volatile变量的写操作都与后续的针对该变量的读操作之间建立了"happen-before"关系。这就意味着对一个voloatile变量的改变对于其它线程都是可见的。进一步说,这也意味着当一个线程读一个volatile变量时,它不仅能看到该volatile变量的最新变化,也能看到导致该变化的代码的副作用。
使简洁的原子变量访问比通过同步代码访问这些变量更加高效,但也要求应用程序员更加小心以避免内存一致性错误。额外的努力是否值得,取决于应用的规模与复杂度。
java.util.concurrent包中的一些类提供了一些不依赖于同步的原子方法。我们将在High Level Concurrency Objects一节中讨论它们。
死锁
死锁描述了一种两个或以上的线程永久地相互等待而被阻塞的情形。这儿就有一个例子。
Alphonse和Gaston是朋友,并且都很崇尚礼节。礼节的一条严格规则就是,当你向朋友鞠躬时,你必须保持鞠躬的姿势直到你的朋友能有机会向你还以鞠躬。不幸地是,这条规则没有说明这样一种可能性,即两个朋友可能在同时间相互鞠躬。
public class Deadlock {
static class Friend {
private final String name;
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public synchronized void bow(Friend bower) {
System.out.format("%s: %s has bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
}
public synchronized void bowBack(Friend bower) {
System.out.format("%s: %s has bowed back to me!%n",
this.name, bower.getName());
}
}
public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new Runnable() {
public void run() { alphonse.bow(gaston); }
}).start();
new Thread(new Runnable() {
public void run() { gaston.bow(alphonse); }
}).start();
}
}
当Deadlock运行后,极有可能当两个线程试图调用bowBack时它们被阻塞了。没有一种阻塞会结束,因为每个线程都在另一方退出bow方法。
饥饿与活性锁
饥饿与活性锁是没有死锁那么普遍的问题,也仍然是每个并发软件的设计者都可能遇到的问题。
饥饿
饥饿所描述的情形是指当一个线程不能正常地访问到共享资源,也就不能得到进步。当共享资源被"贪婪"的线程长时间占有时,这种情况就会发生。例如,假设一个对象提供了一个经常会耗费很长时间才会返回的同步方法。如果一个线程频繁地调用这个方法,其它也需要频繁地同步访问相同对象的线程就会经常被阻塞。
活性锁
一个线程的行为经常是对另一个线程的行为的响应。如果另一个线程的行为也是对另一个线程的行为的响应,这时活性锁可能就产生了。与死锁比较,活性锁线程是不能得到更进一步的进步。但是这些线程并没有被阻塞--它们只是过分疲于应付彼此而不能恢复工作。就能比喻成在走廊中的两个人都试图通过对方:Alphonse向他的左边移动以让Gaston通过,此时Gaston则向他的右边移动以让Alphonse通过。看到他们仍然彼此阻塞着,Alphone就向他的右边移动,此时Gaston向他的左边移动。这样,他们仍然各自阻塞着对方...
受保护的块
线程经常不得不调整它们的行为。最常用的调整方式就是受保护的块。在执行之前,这种块开始时会轮询查检某个条件必须为成立。为了正确地做到这一点有许多步骤需要遵守。
例如,假设guardedJoy方法将不会执行,直到共享变量joy被别的线程设置过。理论上,这样的一个方法可以不停的循环直到条件满足为止。但这个循环不经济,因为在等待的时候它仍然在持续不停的运行。
public void guardedJoy() {
//Simple loop guard. Wastes processor time. Don't do this!
while(!joy) {}
System.out.println("Joy has been achieved!");
}
一种更高效的保护方式就是调用Object.wait方法去暂停当前的线程。调用wait方法不会返回直到另一个线程发出通知,说某个特定事件已经发生过了--尽管这个线程所等待的事件并不是必要的:
public synchronized guardedJoy() {
//This guard only loops once for each special event, which may not
//be the event we're waiting for.
while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println("Joy and efficiency have been achieved!");
}
注意:总是在循环内部调用wait方法去测试所等待的条件是否成立。不要猜想你所等待着的特殊条件中断了,或者这个条件仍然成立。
就像许多延缓执行的方法一样,wait也会抛出InterruptedException异常。在这个例子中,我们可以忽略这个异常--我们仅关注joy的值。
为什么guardedJoy的这个版本是可同步的?假设我们用调用wait方法的对象是d,当一个线程调用了wait方法,它必须拥有对象d的内部锁--否则一个错误就会发生。在一个同步方法内部调用wait方法是一种获得内部锁的简便途径。
当wait方法被调用了,该线程就释放锁并挂起执行。在以后的某个时间,另一个线程将会获得相同的锁并调用Object.notifyALL方法,通知所有正在等待这个锁的线程某个重要的事情已经发生过了:
public synchronized notifyJoy() {
joy = true;
notifyAll();
}
在第二个线程已经释放锁之后的某个时间,第一个线程重新获得锁并从wait方法的调用中返回以恢复执行。
注意:还有另一个通知方法,notify,该方法只唤醒一个线程。因为notify方法不允许你指定被唤醒的线程,所以它只用于大并发应用程序中--即,这个程序拥有大量的线程,这些线程又都做类似的事情。在这样的应用中,你并不关心是哪个线程被唤醒了。
让我们使用受保护的块去创建生产者-消费者应用。该种应用是在两个线程中共享数据:生产者创建数据,而消费者使用数据。这两个线程使用一个共享对象进行通信。协调是必须的:消费者线程在生产者线程交付这个数据之前不能试图去获取它,在消费者还没有获取老的数据之前生产者不能试图交付新的数据。
在这个例子中,数据是一系列的文本信息,它们通过类型为Drop的对象进行共享:
public class Drop {
//Message sent from producer to consumer.
private String message;
//True if consumer should wait for producer to send message, false
//if producer should wait for consumer to retrieve message.
private boolean empty = true;
public synchronized String take() {
//Wait until message is available.
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = true;
//Notify producer that status has changed.
notifyAll();
return message;
}
public synchronized void put(String message) {
//Wait until message has been retrieved.
while (!empty) {
try {
wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = false;
//Store message.
this.message = message;
//Notify consumer that status has changed.
notifyAll();
}
}
生产者进程,由Producer类定义,传递一系列类似的信息。字符串"DONE"表示所有的信息都已经发出了。为了模拟真实应用的不可能预知性,生产者线程在两次发送信息之间会暂停一个随机的时间间隔。
import java.util.Random;
public class Producer implements Runnable {
private Drop drop;
public Producer(Drop drop) {
this.drop = drop;
}
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
Random random = new Random();
for (int i = 0; i < importantInfo.length; i++) {
drop.put(importantInfo[i]);
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {}
}
drop.put("DONE");
}
}
消费者线程,由Consumer类定义,就获得信息并把它们打印出来,直到获得"DONE"对象为止。该线程也会在随机的时间间隔内暂停执行。
import java.util.Random;
public class Consumer implements Runnable {
private Drop drop;
public Consumer(Drop drop) {
this.drop = drop;
}
public void run() {
Random random = new Random();
for (String message = drop.take(); ! message.equals("DONE");
message = drop.take()) {
System.out.format("MESSAGE RECEIVED: %s%n", message);
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {}
}
}
}
最后就是main线程了,定义在了ProducerConsumerExample类中,该类将启动生产者和消费者线程。
public class ProducerConsumerExample {
public static void main(String[] args) {
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
注意:Drop类是为了证明受保护的块而写的。为了避免重新发明轮子,在尝试测试你自己的数据共享对象之前可以先使用Java集合框架中的数据结构。
不可变对象
如果一个对象的状态在它被创建之后就不能修改了,这样的对象就被认为是不可变的。最大程度地依赖不可变对象是一个被广泛接受的用来创建简洁而可靠代码的良好策略。
不可变对象在并发应用中特别有用。由于它们的状态不能改变,它们就不会有线程干预的困扰,也不会被观察到不一致的状态。
应用程序员经常不使用不可变对象,因为他们担心创建一个新的对象而不是更新已有对象的状态所付出的代价。创建对象的代价经常被高估了,而且与不可变对象相关的高效率也可抵消一些新建对象的代价。
后面的子章节将使用一个使用可变实例的类,然后再从这个类派生出一个使用不可变实例的类。通过所做的这些,它们给出了一个进行这种转变的通用规则,并证明了不可变对象的一些好处。
一个同步类的例子
类SynchronizedRGB定义的对象用于表示色彩。每个对象用代表主色值的三个整数去表示色彩,并用一个字符串表示这种色彩的名称。
public class SynchronizedRGB {
//Values must be between 0 and 255.
private int red;
private int green;
private int blue;
private String name;
private void check(int red, int green, int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public SynchronizedRGB(int red, int green, int blue, String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public void set(int red, int green, int blue, String name) {
check(red, green, blue);
synchronized (this) {
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
}
public synchronized int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public synchronized String getName() {
return name;
}
public synchronized void invert() {
red = 255 - red;
green = 255 - green;
blue = 255 - blue;
name = "Inverse of " + name;
}
}
SynchronizedRGB必须小心地避免被观察到不一致的状态。例如,假设一个线程执行下面的代码:
SynchronizedRGB color = new SynchronizedRGB(0, 0, 0, "Pitch Black");
...
int myColorInt = color.getRGB(); //Statement 1
String myColorName = color.getName(); //Statement 2
如果另一个线程在Statement 1之后而在Statement 2之前调用了color.set方法,那么myColorInt的值就不匹配myColorName表示的色彩。为了避免这种结果,这两条语句必须绑在一起:
synchronized (color) {
int myColorInt = color.getRGB();
String myColorName = color.getName();
}
这种不一致性只可能发生在不可变对象上--对于不可变版本的SynchronizedRGB就不会有这种问题。
定义不可变对象的策略
下面的规则定义了一种简单的创建不可变对象的策略。不是所有被标为"immutable"的类都符合下面的这些规则。这也不是说这些类的创建者缺乏考虑--他们可能有好的理由去相信他们的类的实例在构造完毕之后就不会再改变了。
1. 不要提供"setter"方法-修改字段的方法或由这些字段引用的对象。
2. 将所有的字段设置为final和private。
3. 不允许子类去重载方法。实现这一点的最简单的路径就是将该类声明为final。更复杂一点儿的方法是声明该类的构造器为private,并通过工厂方法创建实例。
4. 如果实例字段包含对可变对象的引用,就不允许这些对象被改变:
* Don't provide methods that modify the mutable objects.
* 不要提供修改这些对象的方法。
* Don't share references to the mutable objects. Never store references to external, mutable objects passed to the constructor; if necessary, create copies, and store references to the copies. Similarly, create copies of your internal mutable objects when necessary to avoid returning the originals in your methods.
* 不要共享对可变对象的引用。不要通过构造器存储对外部可变对象的引用;如果必须那么做,就创建一个拷贝,将引用存放到拷贝中。类似的,如果有必要避免在你的方法内部返回原始对象,可以为你的内部可变对象创建拷贝。
将该策略应用到SynchronizedRGB中就会产生如下步骤:
1. 该类中有两个setter方法。首先是set方法,无论使用何种方式改变该对象,在该类的不可变版本中都不可能再有它的位置了。其次就是invert方法,可以使用它创建一个新的对象而不是修改已有的对象。
2. 所有的字段都已经是私有的了;再进一步使它们是final的。
3. 将该类本身声明为final。
4. 仅有一个引用其它对象的字段,而那个对象本身也是不可变的。因此,针对包含可变对象的状态的防护手段都是不必要的了。
做完这些之后,我们就有了ImmutableRGB:
final public class ImmutableRGB {
//Values must be between 0 and 255.
final private int red;
final private int green;
final private int blue;
final private String name;
private void check(int red, int green, int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public ImmutableRGB(int red, int green, int blue, String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public String getName() {
return name;
}
public ImmutableRGB invert() {
return new ImmutableRGB(255 - red, 255 - green, 255 - blue,
"Inverse of " + name);
}
}
高层次并发对象
到目前为止,本教程已经关注了在一开始就是Java平台一部分的低层次API。这些API足以应付非常基本的工作,但对于更高级的工作,高层次组件是必需的。对于需要充分发掘当今多多处理器和多核系统的大规模并发应用就更是如此。
在本节,我们将看到一些在Java平台5.0版中引入的高级并发特性。这些特性中的大部分是在java.util.concurrent包中实现的。在Java集合框架中也有一些新的并发数据结构。
* 支持锁机制的锁对象简化了很多并发应用。
* 执行器为启动和管理线程定义了一个高级API。由java.util.concurrent包提供的执行器的实现提供了适应于大规模应用的线程池管理。
* 并发集合使得管理大型数据集合更为简易,并能大幅减少去同步的需求。
* 原子变量拥有最小化对同步的需求并帮助避免内存一致性错误的特性。
锁对象
同步代码依赖于一种简单的可重入锁。这种锁方便使用,但有很多限制。java.util.concurrent.locks包提供了更为复杂的锁机制。我们不会细致地测试这个包,而是关注它最基本的接口,Lock。
锁对象工作起来非常像由同步代码使用的隐含锁。使用隐含锁时,在一个时间点只有一条线程能够拥有锁对象。通过与之相关联的Condition对象,锁对象也支持等待/唤醒机制。
相对于隐含锁,锁对象最大的优势就是它们可以退回(backs out)试图去获得某个锁。如果一个锁不能立刻或在一个时间限制(如果指定了)之前获得的话,tryLock方法就会退回这一企图。如果在获得锁之前,另一个线程发出了中断信号,lockInterruptibly方法也会退回这一请求。
让我们使用锁对象去解决在Liveness这一节中看到的死锁问题。Alphonse和Gaston已经训练了他们自己,能够注意到朋友对鞠躬的反应。我们使用这样一种方法去做改进,即要求Friend对象在应对鞠躬行为之前,必须获得两个参与者的锁。下面的源代码就是改进后的模型,Safelock。为了证明该方式的通用性,我们假设Alphonse和Gaston是如此地痴迷于他们新发现的安全地鞠躬的能力,以至于相互之间都不能停止向对方鞠躬。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;
public class Safelock {
static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public boolean impendingBow(Friend bower) {
Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) {
if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
}
}
return myLock && yourLock;
}
public void bow(Friend bower) {
if (impendingBow(bower)) {
try {
System.out.format("%s: %s has bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
} finally {
lock.unlock();
bower.lock.unlock();
}
} else {
System.out.format("%s: %s started to bow to me, but" +
" saw that I was already bowing to him.%n",
this.name, bower.getName());
}
}
public void bowBack(Friend bower) {
System.out.format("%s: %s has bowed back to me!%n",
this.name, bower.getName());
}
}
static class BowLoop implements Runnable {
private Friend bower;
private Friend bowee;
public BowLoop(Friend bower, Friend bowee) {
this.bower = bower;
this.bowee = bowee;
}
public void run() {
Random random = new Random();
for (;;) {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {}
bowee.bow(bower);
}
}
}
public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new BowLoop(alphonse, gaston)).start();
new Thread(new BowLoop(gaston, alphonse)).start();
}
}
执行器
在所有之前的例子中,在由新线程执行的工作--由它的Runnable对象定义,和该线程本身--由Thread对象定义,之间有着紧密的联系。对于小规模应用,它能工作的很好,但在大规模应用中,就有必要将线程的管理与创建从应用的其它部分分离出来。封闭这部分功能的对象被称作执行器。下面的子章节将细致地描述执行器。
* Executor接口定义了三种执行器类型。
* Thread Pool是执行器最常用的实现。
Executor接口
java.util.concurrent包定义了三种执行器接口:
* Executor,是一种支持启动新任务的简单接口。
* ExecutorService,是Executor接口的子接口,它加入了帮助管理包括单个任务和执行器本身的生命周期的功能。
* ScheduledExecutorService,是ExecutorService的子接口,支持在未来时间和/或周期性执行任务。
一般地,与执行器对象相关的变量都被声明为上述三个接口中的一个,而不是一个执行器类。
Executor接口
Executor接口提供了一个方法,execute,它被设计为是通用的线程创建方式的替代品。如果r是一个Runnable对象,那么e就是你用于替换r的Executor对象
(new Thread(r)).start();
和
e.execute(r);
然而,execute方法的定义缺乏规范。前面的低层次机制创建了一个新的线程并立即启动它。但根据Executor的不同实现,execute可能也会做相同的事情,但更可能是使用一个已有的工人(worker)线程去执行r,或将r置于一个等待工人线程能被使用的队列中。(我们将在Thread Pool一节中描述工人线程。)
java.util.concurrent包中的执行器实现被设计为使更高级的ExecutorService和ScheduledExecutorService能够充分使用它,尽管它们也能够与Executor接口一起工作。
ExecutorService接口
ExecutorService接口补充提供了一个与execute相似但功能更丰富的submit方法。与execute相同,submit方法也接受Runnable对象,但也接受Callable对象,该对象允许任务返回一个值。submit方法返回Future对象,该对象用于获取Callable返回的值并管理Callable和Runnable任务的状态。
ExecutorService也提供了用于提交大量Callable对象集合的方法。最后,ExecutorService还提供了用于管理执行器关闭的一组方法。为了支持立即关闭,任务应该要正确地处理中断。
ScheduledExecutorService接口
ScheduledExecutorService接口为它的父接口补充提供了与时间计划有关的方法,使得能在指定延迟后执行Runnable或Callable任务。
线程池
java.util.concurrent包中的大部分执行器实现都使用了由工人(worker)线程组成的线程池。这种线程独立于Runnable和Callable任务而存在,常被用于执行多个任务。
使用工人线程能够最大限度的减小由于线程创建而产生的开销。线程对象会占用大量的内存,而在大规模应用中,分配和收回大量线程对象会造成大量的内存管理开销。
一种常用的线程池类型是固定数量线程池。这种池总是有特定数量的线程在执行;如果一个线程不知何故在它仍然被使用时终止了,它会立即被一个新的线程替代。任务通过内部队列提交到线程池中。当活跃任务的数量超过线程的数量时,这种内部队列会保存多余的任务。
固定数量线程池的一个重要的优点就是应用会慢慢退化地使用它。为了理解这一点,考虑这样的一个Web服务器的应用,每个HTTP请求被独立的线程处理。如果应用为每个新的HTTP请求创建一个新的线程,并且系统接到的请求数超出它能立即处理的数量,即当所有线程的开销超过系统的承受能力时,该应用对此的反应就会是突然停止 。
一种简单地创建执行器的方法就是使用固定数量线程池,通过调用java.util.concurrent.Executors类的newFixedThreadPool工厂方法可以得到该线程池。Executors类也提供下面的工厂方法:
* newCachedThreadPool方法创建一个有可扩展线程池的执行器。该执行器适用于会启动许多短寿命任务的应用。
* newSingleThreadExecutor方法创建在一个时间点只执行一个任务的执行器。
* 有几个工厂方法是上述执行器的ScheduledExecutorService版本。
如果上述工厂方法提供的执行器没有一个适合于你的需求,创建java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例将给你另外的选择。
并发集合
java.util.concurrent包含一组Java集合框架额外的扩展。根据提供的集合接口十分容易把它们归类为:
* BlockingQueue定义了一个先入先出的数据结构,当你试图向一个已满的队列添加或向从一个已空的队列中取出元素时,阻塞你或使你超时。
* ConcurrentMap是java.util.Map的子接口,它定义了一些有用的原子操作。只有某个键存在时,这些操作才删除或替换一个这个键-值对,或者只有当某个键不存在时,才能添加这个键-值对。使这些操作都是原子的,以帮助避免同步。标准而通用的ConcurrentMap实现是ConcurrentHashMap,它是HashMap的同步相似体。
* ConcurrentNavigableMap is a subinterface of ConcurrentMap that supports approximate matches. The standard general-purpose implementation of ConcurrentNavigableMap is ConcurrentSkipListMap, which is a concurrent analog of TreeMap.
* ConcurrentNavigableMap是ConcurrentMap的子接口,它支持近似符合。标准而通用的ConcurrentNavigableMap是ConcurrentSkipListMap,它是TreeMap的同步相似体。
所有这些集合都为了帮助避免内存一致性错误而在向集合中添加对象的操作与其后的访问或删除对象的操作之间定义了"Happens-Before"关系。
原子变量
java.util.concurrent.atomic包定义了在单个变量中支持原子操作的类。所有的这些类都有get和set方法,这些方法就如同读写volatile变量那样工作。即,一个set方法与任何随其后的针对相同变量的get方法之间有"Happen-Before"对象。通过应用于整型原子变量的原子算术方法,原子的compareAndSet方法也战士具有这样的内存一致性特性。
为了看看如何使用这个包,让我们回想之前为了证明干预而使用过的类Counter:
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c--;
}
public int value() {
return c;
}
}
为了防止线程干预的一种方法就是使它的方法可同步,如SynchronizedCounter里的方法那样:
class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
对于这个简单的类,同步是一个能够被接受的解决方案。但对于更复杂的类,我们可能想避免不必要的同步的活跃度影响。使用AtomicInteger对象替代int字段允许我们在不求助同步的情况下就能防止线程干预。
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.incrementAndGet();
}
public void decrement() {
c.decrementAndGet();
}
public int value() {
return c.get();
}
}
进一步地阅读
* Concurrent Programming in Java: Design Principles and Pattern (2nd Edition),Doug Lea著。这本综合性著作的作者是一位卓越的专家,同时也是Java平台并发框架的架构师。
* Java Concurrency in Practice,Brian Goetz,Tim Peierls,Joshua Bloch,Joseph Bowbeer,David Holmes和Doug Lea著。一本为贴近初学者而设计的实践性指南。
* Effective Java Programming Language Guide,Joshua Bloch著。虽然这是一本通用的程序设计指南,但其中关于线程的章节包含着并发编程必需的"最佳实践"。
* Concurrency: State Models & Java Programs (2nd Edition),Jeff Magee和eff Kramer著。通过模型化和实用的例子介绍了并发编程。