CountDownLatch:
CountDownLatch
是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。比如说一个
Server
启动时需要初始化
4
个部件,
Server
可以同时启动
4
个线程去初始化这
4
个部件,然后调用
CountDownLatch(4).await()
阻断进入等待,每个线程完成任务后会调用一次
CountDownLatch.countDown()
来倒计数
,
当
4
个线程都结束时
CountDownLatch
的计数就会降低为
0
,此时
Server
就会被唤醒继续下一步操作。
CountDownLatch
的方法主要有:
-
await()
:使调用此方法的线程阻断进入等待
-
countDown():
倒计数,将计数值减
1
-
getCount():
得到当前的计数值
CountDownLatch
的例子:一个
server
调了三个
ComponentThread
分别去启动三个组件,然后
server
等到组件都启动了再继续。
public class Server {
public static void main(String[] args) throws InterruptedException{
System.out.println("Server is starting.");
//
初始化一个初始值为
3
的
CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
//
起
3
个线程分别去启动
3
个组件
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new ComponentThread(latch, 1));
service.submit(new ComponentThread(latch, 2));
service.submit(new ComponentThread(latch, 3));
service.shutdown();
//
进入等待状态
latch.await();
//
当所需的三个组件都完成时,
Server
就可继续了
System.out.println("Server is up!");
}
}
public class ComponentThread implements Runnable{
CountDownLatch latch;
int ID;
/** Creates a new instance of ComponentThread */
public ComponentThread(CountDownLatch latch, int ID) {
this.latch = latch;
this.ID = ID;
}
public void run() {
System.out.println("Component "+ID + " initialized!");
//
将计数减一
latch.countDown();
}
}
|
运行结果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
|
CyclicBarrier:
CyclicBarrier
类似于
CountDownLatch
也是个计数器,不同的是
CyclicBarrier
数的是调用了
CyclicBarrier.await()
进入等待的线程数,当线程数达到了
CyclicBarrier
初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。
CyclicBarrier
就象它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。
CyclicBarrier
初始时还可带一个
Runnable
的参数,此
Runnable
任务在
CyclicBarrier
的数目达到后,所有其它线程被唤醒前被执行。
CyclicBarrier
提供以下几个方法:
-
await()
:进入等待
-
getParties()
:返回此
barrier
需要的线程数
-
reset()
:将此
barrier
重置
以下是使用
CyclicBarrier
的一个例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和(这个例子比较无聊,我没有想到更合适的例子)
public class MainThread {
public static void main(String[] args)
throws InterruptedException, BrokenBarrierException, TimeoutException{
final int[] array = new int[2];
CyclicBarrier barrier = new CyclicBarrier(2,
new Runnable() {//
在所有线程都到达
Barrier
时执行
public void run() {
System.out.println("Total is:"+(array[0]+array[1]));
}
});
//
启动线程
new Thread(new ComponentThread(barrier, array, 0)).start();
new Thread(new ComponentThread(barrier, array, 1)).start();
}
}
public class ComponentThread implements Runnable{
CyclicBarrier barrier;
int ID;
int[] array;
public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
this.barrier = barrier;
this.ID = ID;
this.array = array;
}
public void run() {
try {
array[ID] = new Random().nextInt();
System.out.println(ID+ " generates:"+array[ID]);
//
该线程完成了任务等在
Barrier
处
barrier.await();
} catch (BrokenBarrierException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
|
Exchanger:
顾名思义
Exchanger
让两个线程可以互换信息。用一个例子来解释比较容易。例子中服务生线程往空的杯子里倒水,顾客线程从装满水的杯子里喝水,然后通过
Exchanger
双方互换杯子,服务生接着往空杯子里倒水,顾客接着喝水,然后交换,如此周而复始。
class FillAndEmpty {
//
初始化一个
Exchanger
,并规定可交换的信息类型是
DataCup
Exchanger exchanger = new Exchanger();
Cup initialEmptyCup = ...; //
初始化一个空的杯子
Cup initialFullCup = ...; //
初始化一个装满水的杯子
//
服务生线程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
//
往空的杯子里加水
currentCup.addWater();
//
杯子满后和顾客的空杯子交换
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ... }
}
}
//
顾客线程
class Customer implements Runnable {
public void run() {
DataCup currentCup = initialFullCup;
try {
//
把杯子里的水喝掉
currentCup.drinkFromCup();
//
将空杯子和服务生的满杯子交换
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}
}
|
6: BlockingQueue接口
BlockingQueue
是一种特殊的
Queue
,若
BlockingQueue
是空的,从
BlockingQueue
取东西的操作将会被阻断进入等待状态直到
BlocingkQueue
进了新货才会被唤醒。同样,如果
BlockingQueue
是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到
BlockingQueue
里有新的空间才会被唤醒继续操作。
BlockingQueue
提供的方法主要有:
-
add(anObject):
把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容纳返回
true
,否则抛出
IllegalStateException
异常。
-
offer(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容纳返回
true
,否则返回
false
。
-
put(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
没有空间,调用此方法的线程被阻断直到
BlockingQueue
里有新的空间再继续。
-
poll(time)
:取出
BlockingQueue
里排在首位的对象,若不能立即取出可等
time
参数规定的时间。取不到时返回
null
。
-
take()
:取出
BlockingQueue
里排在首位的对象,若
BlockingQueue
为空,阻断进入等待状态直到
BlockingQueue
有新的对象被加入为止。
根据不同的需要
BlockingQueue
有
4
种具体实现:
-
ArrayBlockingQueue
:规定大小的
BlockingQueue
,其构造函数必须带一个
int
参数来指明其大小。其所含的对象是以
FIFO
(先入先出)顺序排序的。
-
LinkedBlockingQueue
:大小不定的
BlockingQueue
,若其构造函数带一个规定大小的参数,生成的
BlockingQueue
有大小限制,若不带大小参数,所生成的
BlockingQueue
的大小由
Integer.MAX_VALUE
来决定。其所含的对象是以
FIFO
(先入先出)顺序排序的。
LinkedBlockingQueue
和
ArrayBlockingQueue
比较起来,它们背后所用的数据结构不一样,导致
LinkedBlockingQueue
的数据吞吐量要大于
ArrayBlockingQueue
,但在线程数量很大时其性能的可预见性低于
ArrayBlockingQueue
。
-
PriorityBlockingQueue
:类似于
LinkedBlockingQueue
,但其所含对象的排序不是
FIFO
,而是依据对象的自然排序顺序或者是构造函数所带的
Comparator
决定的顺序。
-
SynchronousQueue
:特殊的
BlockingQueue
,对其的操作必须是放和取交替完成的。
下面是用
BlockingQueue
来实现
Producer
和
Consumer
的例子:
public class BlockingQueueTest {
static BlockingQueue basket;
public BlockingQueueTest() {
//
定义了一个大小为
2
的
BlockingQueue
,也可根据需要用其他的具体类
basket = new ArrayBlockingQueue(2);
}
class Producor implements Runnable {
public void run() {
while(true){
try {
//
放入一个对象,若
basket
满了,等到
basket
有位置
basket.put("An apple");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
public void run() {
while(true){
try {
//
取出一个对象,若
basket
为空,等到
basket
有东西为止
String result = basket.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
public void execute(){
for(int i=0; i<10; i++){
new Thread(new Producor()).start();
new Thread(new Consumer()).start();
}
}
public static void main(String[] args){
BlockingQueueTest test = new BlockingQueueTest();
test.execute();
}
}
|
7:Atomics 原子级变量
原子量级的变量,主要的类有
AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……
。这些原子量级的变量主要提供两个方法:
-
compareAndSet(expectedValue, newValue):
比较当前的值是否等于
expectedValue
,
若等于把当前值改成
newValue
,并返回
true
。若不等,返回
false
。
-
getAndSet(newValue):
把当前值改为
newValue
,并返回改变前的值。
这些原子级变量利用了现代处理器(
CPU
)的硬件支持可把两步操作合为一步的功能,避免了不必要的锁定,提高了程序的运行效率。
8:Concurrent Collections 共点聚集
在
Java
的聚集框架里可以调用
Collections.synchronizeCollection(aCollection)
将普通聚集改变成同步聚集,使之可用于多线程的环境下。
但同步聚集在一个时刻只允许一个线程访问它,其它想同时访问它的线程会被阻断,导致程序运行效率不高。
Java 5.0
里提供了几个共点聚集类,它们把以前需要几步才能完成的操作合成一个原子量级的操作,这样就可让多个线程同时对聚集进行操作,避免了锁定,从而提高了程序的运行效率。
Java 5.0
目前提供的共点聚集类有:
ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList
和
CopyOnWriteArraySet.