关于 Java Concurrency
自从Java诞生之时,Java 就支持并行的概念,比如线程和锁机制。这个教程帮助开发多线程Java程序员能够理解核心的Java并行理念以及如何使用他们。 内容涉及到Java语言中的线程, 重练级以及轻量级同步机制 以及JavaSE 5 中的锁,原子量 并行容器,线程调度 以及线程执行者。 开发人员使用这些知识能够开发好并发线程安全的Java 应用程序。
Java 并行的概念(Java Concurrency Concepts)
概念
|
描述
|
Java 内存模型
|
在JavaSE5 JSR133规范中详细定义了Java内存模型 Java Memory Model(JMM),该模型定义了相关的操作 比如读,写操作,以及在监视器上的同步。 这些操作按 Happens-before的顺序。
这个定义保证了一个线程可以看到另一个线程操作的结果,同时保证了同步的程序, 以及如何定义一个不变的属性 等等。
|
监视器
|
在Java中,任何一个对象都有一个监视器,来排斥共享访问临界区域的代码。这些临界区可以是一个方法 或者是一段代码块,这些临界区域作为同步块。线程只有获取该监视器才能执行同步块的代码。当一个线程到达这块代码是,首先等待来确定是否其他线程已经释放这个监控器。监控器除了排斥共享访问,还能通过Wait 和Notify来协调线程之间的交互。
|
原子属性
|
除了Double 和long类型,其他的简单类型都是原子类型。Double和long 类型的修改在JVM分为两个不封。为了保证更新共享的Double和Long类型,你应该将Double和long 的属性作为Volatile 或者将修改代码放入同步块中。
|
竞争情况
|
当许多线程在一系列的访问共享资源操作中,并且结果跟操作顺便有关系的时候,就发生了竞争情况。
|
数据竞争
|
数据竞争涉及到当许多线程访问不是non-final或者non-volatile 并没有合适的同步机制的属性时,JMM不能保证不同步的访问共享的熟悉。数据竞争导致比个预知的行为。
|
自公布
|
还没有通过构造方法实例化对象之前,把这个对象的引用公布时不安全的。
一种是通过注册一个监听器,当初始化的时候回调来发布引用。
另一种是在构造方法里面启动线程。这两种都会导致其他线程引用部分初始化的对象。
|
Final属性
|
Final属性必须显示的赋值,否则就会有编译错误。一旦赋值,不能被修改。将一个对象引用标记为Final只能保证该引用不会被修改,但该对象可以被修改。比如一个Final ArrayIist不能改变为另一个ArrayList 但你可以添加或者修改这个List的对象。在构造方法之后,一个对象的Final 属性是冻结的,保证了对象被安全的发布。其他线程可以在构造方法时看到该变量,甚至在缺乏同步的机制下。
|
不变对象
|
Final 属性从语义上能够保证创建不变对象。而不变对象可以再没有同步机制下多线程共享和读取。为保证该对象是不变的,必须保证如下:
这个对象被安全的发布,this引用不能在构造方法的时候被发布
所有的属性都是Final的
应用的对象必须保证在构造方法之后不能被修改。
这个对象需要声明为Final 保证子类违法这些原则。
|
Protecting shared data
保护共享的数据
线程安全的程序需要开发人员在需要修改共享的数据时使用合适的锁机制。锁机制建立的
适合JMM的顺序,保证对于其他程序的可视性。
当在同步机制外修改共享的data时,JMM不能保证其一致性。 JVM提供了一些方法来保证其可视性。
Synchronized
每一个对象实体都有一个监视器(来之于Object对象),这个监视器能被再某一线程中锁定。Synchronized关键字来指定在方法或者代码块上持有该对象监视器上的锁定。当某一线程同步修改一属性,后续线程将能看到被该线程修改的数据。
Lock
java.util.concurrent.locks 包提供了Lock的接口,ReentrantLock实现了类似Synchronized关键字的功能。同时还提供了额外的功能,比如不是阻塞的tryLock()方法和释放锁。
public class Counter {
private final Lock lock = new ReentrantLock();
private int value = 0;
public int increment() {
lock.lock();
try {
return ++value;
} finally {
lock.unlock();
}
}
}
同时,在多线程高冲突的情况下,ReentrantLock要比Synchronized效率好。
ReadWriteLock
java.util.concurrent.locks 包提供了一个读写锁的接口,这个接口定义了读和写的一对锁,
一般允许并行的读和排他的写。下面的代码展示了上述功能。
public class Statistic {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private int value;
public void increment() {
lock.writeLock().lock();
try {
value++;
} finally {
lock.writeLock().unlock();
}
}
public int current() {
lock.readLock().lock();
try {
return value;
} finally {
lock.readLock().unlock();
}
}
}
Volatile
Volatile 关键字使其属性对于后续的线程的可见性。
public class Processor implements Runnable {
private volatile boolean stop;
public void stopProcessing() {
stop = true;
}
public void run() {
while(! stop) {
// .. do processing
}
}
}
注意:将array标记为Volatile不能保证数组里面元素的Volatile,只能保证数组的引用时
可见的。使用AtomicIntegerArray 来保证整个数组都是可见的。
原子类
Volatile 的缺点是只能保证可见性。不能保证修改结果的可见性。而java.util.concurrent.atomic
包包含了一组支持原子操作的类来弥补Volatile的不足。
public class Counter {
private AtomicInteger value = new AtomicInteger();
public int next() {
return value.incrementAndGet();
}
}
ThreadLocal
ThreadLocal存贮了该线程所需要的数据,不需要锁的机制。一般而言,ThreadLocal 存放当前的事务和其他资源等。如下代码,TransactionManager中,ThreadLocal 类型的currentTransaction 存贮了当前事务。
public class TransactionManager {
private static final ThreadLocal<Transaction> currentTransaction =
new ThreadLocal<Transaction>() {
@Override
protected Transaction initialValue() {
return new NullTransaction();
}
};
public Transaction currentTransaction() {
Transaction current = currentTransaction.get();
if(current.isNull()) {
current = new TransactionImpl();
currentTransaction.put(current);
}
return current;
}
}
并行容器
合理维护共享数据一致性的核心技术是在访问数据时采取同步机制。这种技术使得所有访问共享数据的方式保证了同步的原则。java.util.concurrent提供了可以并行使用的数据结构。通常而言,使用这些数据结构优于通过Synchronized包装的非同步集合。
同步的 lists and sets
类
|
描述
|
CopyOnWriteArraySet
|
CopyOnWriteArraySet
提供Copy-On-Write的语义 即:每当修改某一数据时在整个容器内容拷贝上修改,然后将该备份同步入容器。
|
CopyOnWriteArrayList
|
类似CopyOnWriteArraySet
|
ConcurrentSkipListSet
|
JSE6提供的并行访问可以排序的Set。
|
并行 maps
java.util.concurrent扩展map接口,提供了名叫ConcurrentMap的并行Map。
如下面所有的操作都是原子性的。
方法
|
描述
|
putIfAbsent(K key, V value) : V
|
如果Key没有在该Map中,将Key Value存入。
否则不做任何处理。
如果没有该Key 返回Null
如果有 返回以前的值。
|
remove(Object key, Object value)
: boolean
|
如果Map中包含该key则移出该Value 否则不做任何操作。
|
replace(K key, V value) : V
|
如果Map中有该Key 则用该Value值替换久值。
|
replace(K key, V oldValue, V
newValue) : boolean
|
如果Map中有该Key且值为oldValue时,用newValue替换该久值。
|
下面是具体实现类:
类
|
描述
|
ConcurrentHashMap
|
内部的segment实现了并行的读取。
|
ConcurrentSkipListMap
|
JSE6提供的并行访问可排序的Map。
|
Queues
作为生产者于消费者管道的Queues,生产的条目从一头放入,然后从另一头取出,典型的先进先出的顺序。Queues接口在JSE5加入java.util包里,应用于单线程的环境。最主要用于多生产者消费者的情况下。所有的读写操作都在同一Queue上。在Java.util.concurrent包的blockingQueues接口扩张了Queue并处理了Queue可能已经被生产者添加慢的情况,或者消费者已经读取或者取出完,Queue为空的情况。在这些情况下,BlockingQueue提供了阻塞的机制。可以设定阻塞的时间或者阻塞的条件。
下表反应了Queue于BlockingQueue对处理特殊条件下的不同策略。
类
|
策略
|
插入
|
移除
|
检查
|
Queue
|
扔出异常
|
Add
|
remove
|
element
|
返回特定的值
|
Offer
|
poll
|
peek
|
Blocking Queue
|
永远的阻塞
|
Put
|
take
|
n/a
|
在设定的时间阻塞
|
Offer
|
poll
|
n/a
|
下面是具体的实现类。
PriorityQueue
|
唯一非并行的Queue。用于单线程 处理排序的集合。
|
ConcurrintlinkedQueue
|
没有容量限制的的并行实现,不支持阻塞。
|
ArrayBlockingQueue
|
基于数组 有容量限制的 阻塞Queue。
|
LinkedBlockingQueue
|
最通用的实现阻塞容量限制的Queue。
|
PriorityBlockingQueue
|
相对于先进先出,该Queue的顺序基于Comparator的优先级别,没有容量限制。
|
DelayQueue
|
没有容量限制的Queue,有一个延迟值。
只有延迟时间超过时才能被移除。
|
SynchronousQueue
|
容量为0的队列,只到下一个到来之前,生产者和消费者被阻塞。适合在线程中交换数据。
|
Deques
Deques在JSE6加入,为双头Queue。它不仅支持在对头添加,在队尾移除的功能,还提供在双头添加和移除。类似于BlockingQueue,也有一个BlockingDeques提供阻塞和超时的Deque。
下表为Deque和BlockingDeque对于具体方法的策略。
接口
|
First 或者Last
|
策略
|
插入
|
移除
|
检测
|
Deque
|
Head
|
扔出异常
|
addFirst
|
removeFirst
|
getFirst
|
返回特定值
|
offerFisrt
|
pollFirst
|
peekFirst
|
Tail
|
扔出异常
|
addLast
|
RemoveLast
|
getLast
|
返回特定值
|
offerLast
|
PollLast
|
PeekLast
|
BlockingDeque
|
Head
|
永远阻塞
|
putFirst
|
takeFirst
|
N/A
|
在一定时间段内阻塞
|
offerFirst
|
pollFirst
|
N/A
|
Tail
|
永远阻塞
|
PutLast
|
takeLast
|
N/A
|
在一定时间段内阻塞
|
offerLast
|
pollLast
|
N/A
|
对于Deque特殊的用法就是添加移除和检测发生在队列的末端。这种用法类似于栈(先进后出顺序)。事实上,Deque也提供了类似的方法,push() pop()和peek().这些方法被映射到addFirst()
removeFirst() 和PeekFirst().
下表为JDK提供的实现类。
类
|
描述
|
LinkedList
|
在JSE6中重新设计实现了Deque接口。
实现了非同步了堆栈。
|
ArrayDeque
|
不支持并行 容量不限的Deque。
|
LinkedBlockingDeque
|
唯一支持并行的Deque,
|
线程
在Java中,java.lang.Thread被用来表述一个应用或者JVM 线程。在代码中,常用Thread.currentThread()来获得当前线程。
下表为线程的相关方法
线程方法
|
表述
|
Start
|
启动一个线程 执行Run方法。
|
Join
|
阻塞当前线程直到其他线程退出。
|
interrupt
|
中断其他线程。如果一个线程正在被阻塞,如果试图去Interrupt这个线程,这个线程会泡出InterruptedException,否则,置为interrupt状态。
|
Stop suspend resume destroy
|
这些方法已经不赞成使用。这些危险的操作依赖于线程的状态。可以使用interrut和volatile 标记来实现。
|
Uncaught exception handlers
如果一个线程添加一个UncaughtExceptionHandlers,如果该线程被非安全终止时会收到通知。如下代码:
Thread t = new Thread(runnable);
t.setUncaughtExceptionHandler(new Thread.
UncaughtExceptionHandler() {
void uncaughtException(Thread t, Throwable e) {
// get Logger and log uncaught exception
}
});
t.start();
死锁
当线程相互等待资源,而这些资源又被相互持有是,死锁就发生了。最明显的资源就是对象的监控器。但是可以引起阻塞(比如Wait和notify)的任何资源也可以引起死锁。
最新的JVM能够检测死锁,并在线程的Dump中打印死锁信息。
另外的死锁情况是,饥饿线程和活锁。 饥饿线程是指一些线程长时间持有锁而是某些线程处于饥饿状态而不处理真的业务。活锁是指线程花费大量时间检测资源 避免死锁而不是真正处理业务逻辑。
线程交互
Wait/notify
Wait/nofify 是最合理的方式来处理一个线程子在一定条件下通过一个信号来通知另一线程,特别是代替在循环里通过Sleep来检测条件的方式。比如,一个线程可能等待一个需要处理的队列,当需要处理的内容添加到队里中时,而另外一个线程会通知等待的线程。
规范的用法如下:
public class Latch {
private final Object lock = new Object();
private volatile boolean flag = false;
public void waitTillChange() {
synchronized(lock) {
while(! flag) {
try {
lock.wait();
} catch(InterruptedException e) {
}
}
}
}
public void change() {
synchronized(lock) {
flag = true;
lock.notifyAll();
}
关于上面的代码,需要着重说明的如下:
一定要在同步锁中调用call notify notifyall.
Wait 一定要在循环中检测等待条件。
一定要在调用notify 或者notifyAll之前改变条件,否则即使通知了其他线程也无法退出循环。
Condition
在JSE5中,新添加了java.util.concurrent.locks.Condition类。该类在语义上实现了wait和notify的功能,同时添加了额外的功能,比如每个锁可以有多个条件,中断的等待,访问统计等。Conditon通过锁的实例获得。
public class LatchCondition {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean flag = false;
public void waitTillChange() {
lock.lock();
try {
while(! flag) {
condition.await();
}
} finally {
lock.unlock();
}
}
public void change() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
Coordination 类
在java.util.concurrent包包含几个常用的多线程交互类,这些类基本覆盖了常用的情况。通常使用这些类比使用wait/notice更安全。
CyclicBarrier
CyclicBarrier以特定的值对计数器初始化。参与的线程调用await()方法时,如果没有没有达到计数器的初始化的值时,该线程被阻塞。直到计数器达到特定的值时,所有阻塞的线程被释放。CyclicBarrier可以重复设置,来调整一组线程的启动与停止。
CountDownLatch
CountDownLatch以特定的值初始化,线程调用await()时,如果计数器没有减少到0时,该线程被阻塞。其他线程可以调用countDown()方法减少计数器。当计数器减少为0时,该CountDownLatch不能重新设置计数器而重用。
Semaphore
Semaphore 管理一组许可证,线程通过调用acquire()方法来检测是否有许可,如果没有被阻塞,线程可以调用release()方法来释放许可证。Semaphore等价于互斥排他锁。
Exchanger
Exchanger 等待线程调用exchange()方法来交互数据,类似使用SynchronousQueue,通过它交互数据是双向的。
Task Execution
许多java多线程程序需要一组工作线程从队列中取出任务来执行。 Java.util.concurrent包对这种工作线程提供了可靠的基础。
ExecutorService
Exccutor 和更易扩展的ExecutorService接口定义了相关的方法来执行任务。通过使用这些接口可以得到众多各式各样的实现。
最基本的Executor接口只接受Runnable的任务。
void execute(Runnable command)
ExecutorService继承了Executor并添加了方法支持Runnable和Callable的任务。
Future<?> submit(Runnable task)
• Future<T> submit(Callable<T> task)
• Future<T> submit(Runnable task, T result)
• List<Future<T> invokeAll(Collection<? extends
Callable<T>> tasks)
• List<Future<T> invokeAll(Collection<? extends
Callable<T>> tasks, long timeout, TimeUnit unit)
T invokeAny(Collection<? extends Callable<T>> tasks)
• T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
Callable and Future
一个Callalbe类似于Runnable,但它可以有返回值或者扔出异常。
• V call() throws Exception;
通用的执行框架提交一个Callalbe 并受到一个Future, 一个Future被标记为返回结果值。Future有方法来轮询或者阻塞直到结果已经返回。你可以在执行之前或者执行时取消一个任务。
如果你想让Runalbe来支持Future,你可以使用FutureTask作为桥梁。FutureTask实现了Future和Runnable,所以你可以提交一个Runnalbe的任务,并作为Future来取得结果。
ExecutorService 的实现
对于ExecutorService接口最主要的实现是ThreadPoolExecutor。这个实现类实现了各种可配置的如下功能。
-
线程池 配置核心线程数量,预先启动,最大线程数。
-
线程工程 生成特殊定制的线程,比如线程名等。
-
工作队列 制定队列的实现,这个队列是阻塞的,但可以是有界性或者无界限的。
-
拒绝的任务 可以指定策略来拒绝任务,比如Queue已经没有空余,或者没有有效的工作线程。
-
生命周期的钩子 类似拦截器可以在Task的生命周期添加功能,比如在工作开始于完成之间插入现有的功能。
-
关闭 停止接受提交的任务,直到所有的任务被完成。
ScheduledThreadPoolExecutor扩展了ThreadPoolExecutor,提供了对任务调度的功能而不是先进先出。在这点上Java.util.Timer不能足够的,而ScheduledThreadPoolExecutor经常提供了足够的弹性。
Executors类提供了许多静态方法来创建包装好的ExecutoService和ScheduledExccutorService实例。
方法
|
描述
|
newSingleThreadExecutor
|
返回一个线程的ExecutorService
|
newFixedTreadPoll
|
固定数量的线程池
|
newCachedThreadPoll
|
大小变化的线程池
|
newSingleThreadScheduledExecutor
|
单线程的ScheduledExecutorService
|
newScheduledThreadPool
|
一组线程的ScheduledExecutorService
|
下面是对固定大小线程池的使用,提交长时间运行的任务。
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.
newFixedThreadPool(processors);
Future<Integer> futureResult = executor.submit(
new Callable<Integer>() {
public Integer call() {
// long running computation that returns an integer
}
});
Integer result = futureResult.get(); // block for result
在上述列子中,调用者提交了向执行者提交了长时间运行的任务,并立即返回。在结果还没有返回之间,调用get()方法会被阻塞。
ExecutorService 基本覆盖了所有的情况。
CompletionService
除了通常我们将任务提交到线程池的Queue之外,我们还需要每一个任务生产结果,并为日后处理。
CompletionService接口允许使用者提交Callalbe和Runnable的任务,同时在结果队列中取出或者轮询结果
Future<V> take() – take if available
• Future<V> poll() – block until available
• Future<V> poll(long timeout, TimeUnit unit) – block
until timeout ends
ExecutorCompletionService是CompletionService接口的标准实现。构造方式类似于Executor
提供输入队列和工作线程池。
重要提示:对于线程池大小的设定,一般采用逻辑上处理器数量。在Java中,通过Runtime.getRuntime().avaiableProcessors()来获取有效的处理器数量,这个数量可能在JVM运行时被修改。