9. 同步(Concurrent)
1. Executor接口
Executor接口提供了一个类似于线程池的管理工具。用于只需要往Executor中提交Runnable对象,剩下的启动线程等工作,都会有对应的实现类来完成。ScheduledExecutorService比ExecutorService增加了,时间上的控制,即用户可以在提交的时候额外的定义该任务的启动时机,以及随后的执行间隔和延迟等。
例子:
任务:
public class ETask implements Runnable{
private int id = 0;
public ETask(int id){
this.id = id;
}
public void run(){
try{
System.out.println(id+" Start");
Thread.sleep(1000);
System.out.println(id+" Do");
Thread.sleep(1000);
System.out.println(id+" Exit");
}catch(Exception e){
e.printStackTrace();
}
}
}
测试类:
public class ETest{
public static void main(String[] args){
ExecutorService executor = Executors.newFixedThreadPool(2);
for(int i=0;i<5;i++){
Runnable r = new ETask(i);
executor.execute(r);
}
executor.shutdown();
}
}
输出:
0 Start
1 Start
0 Do
1 Do
0 Exit
2 Start
1 Exit
3 Start
2 Do
3 Do
2 Exit
3 Exit
4 Start
4 Do
4 Exit
2. Future和Callable
Callable是一个类似于Runnable的接口,他与Runnable的区别是,她在执行完毕之后能够返回结果。Future用于获取线程的执行结果,或者取消已向Executor的任务。当我们通过Future提供的get()方法获取任务的执行结果时,如果任务没有完成,则调用get()方法的线程将会被阻塞,知道任务完成为止。一般我们都会使用Future的实现类FutureTask。
例子:
Callable对象:
public class ETask implements Callable{
private String id = null;
public ETask(String id){
this.id = id;
}
public String call(){
try{
System.out.println(id+" Start");
Thread.sleep(1000);
System.out.println(id+" Do");
Thread.sleep(1000);
System.out.println(id+" Exit");
}catch(Exception e){
e.printStackTrace();
}
return id;
}
}
测试类:
public class ETest{
public static void main(String[] args){
ExecutorService executor = Executors.newFixedThreadPool(2);
for(int i=0;i<5;i++){
try{
Callable c = new ETask(String.valueOf(i));
FutureTask ft = new FutureTask(c);
executor.execute(ft);
System.out.println("Finish:" + ft.get());
}catch(Exception e){
e.printStackTrace();
}
}
executor.shutdown();
}
}
输出:
0 Start
0 Do
0 Exit
Finish:0
1 Start
1 Do
1 Exit
Finish:1
2 Start
…
3. CompletionService和ExecutorCompletionService
CompletionService类似于一个Executor和Queue的混合。我们可以通过submit()向CompletionService提交任务,然后通过poll()来获取第一个完成的任务,也可以通过take()来阻塞等待下一个完成的任务。ExecutorCompletionService是CompletionService的实现类,他需要提供一个Executor作为构造函数的参数。
例子:
Executor executor = …;
CompletionService cs = new ExecutorCompletionService(executor);
Future fs = cs.submit(…);
Future ft = cs.take();
4. Semaphore
信号量是用于同步和互斥的低级原语。信号量提供的acquire()和release()操作,与操作系统上的p,v操作同。
例子:
缓冲区:
public class Buffer{
private Semaphore s = null;
private Semaphore p = null;
Vector<Integer> v = new Vector<Integer>();
public Buffer(int capacity){
s = new Semaphore(capacity);
p = new Semaphore(0);
}
public void put(int i){
try{
s.acquire();
v.add(new Integer(i));
p.release();
}catch(Exception e){
e.printStackTrace();
}
}
public int get(){
int i = 0;
try{
p.acquire();
i = ((Integer)v.remove(0)).intValue();
s.release();
}catch(Exception e){
e.printStackTrace();
}
return i;
}
}
生产者:
public class Producer extends Thread{
private Buffer b;
private int count;
private int step;
private int id;
public Producer(Buffer b,int step,int id){
this.b = b;
this.step = step;
this.id = id;
count = 0;
}
public void run(){
try{
while(true){
System.out.println("In put");
b.put(count);
System.out.println("Producer "+id+":"+count);
count++;
Thread.sleep(step);
System.out.println("Out put");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
消费者:
public class Consumer extends Thread{
private Buffer b;
private int step;
private int id;
public Consumer(Buffer b,int step,int id){
this.b = b;
this.step = step;
this.id = id;
}
public void run(){
try{
while(true){
System.out.println("In get");
System.out.println("\t\tConsume "+id+":"+b.get());
System.out.println("Out get");
Thread.sleep(step);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
测试程序:
public class CPTest{
public static void main(String[] args){
Buffer b = new Buffer(3);
Consumer c1 = new Consumer(b,1000,1);
Consumer c2 = new Consumer(b,1000,2);
Producer p1 = new Producer(b,100,1);
Producer p2 = new Producer(b,100,2);
c1.start();
c2.start();
p1.start();
p2.start();
}
}
5. CyclicBarrier
CyclicBarrier可以让一组线程在某一个时间点上进行等待,当所有进程都到达该等待点后,再继续往下执行。CyclicBarrier使用完以后,通过调用reset()方法,可以重用该CyclicBarrier。线程通过调用await()来减少计数。
CyclicBarrier
例子:
任务:
public class Task extends Thread{
private String id;
private CyclicBarrier c;
private int time;
public Task(CyclicBarrier c,String id,int time){
this.c = c;
this.id = id;
this.time = time;
}
public void run(){
try{
System.out.println(id+" Start");
Thread.sleep(time);
System.out.println(id+" Finish");
c.await();
System.out.println(id+" Exit");
}catch(Exception e){
e.printStackTrace();
}
}
}
测试类:
public class Test{
public static void main(String[] args){
CyclicBarrier c = new CyclicBarrier(3,new Runnable(){
public void run(){
System.out.println("All Work Done");
}
});
Task t1 = new Task(c,"1",1000);
Task t2 = new Task(c,"2",3000);
Task t3 = new Task(c,"3",5000);
t1.start();
t2.start();
t3.start();
}
}
输出结果:
1 Start
2 Start
3 Start
1 Finish
2 Finish
3 Finish
All Work Done
3 Exit
1 Exit
2 Exit
6. CountdownLatch
CountdownLatch具有与CyclicBarrier相似的功能,也能让一组线程在某个点上进行同步。但是与CyclicBarrier不同的是:1.CountdownLatch不能重用,2.线程在CountdownLatch上调用await()操作一定会被阻塞,直到计数值为0时才会被唤醒,而且计数值只能通过conutDown()方法进行减少。
特别的,当CountdownLatch的值为1时,该Latch被称为“启动大门”,所有任务线程都在该Latch上await(),直到某个非任务线程调用countDown()触发,所有任务线程开始同时工作。
7. Exchanger
Exchanger是一个类似于计数值为2的CyclicBarrier。她允许两个线程在某个点上进行数据交换。
例子:
public class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
public class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.full())
currentBuffer = exchanger.exchange(currentBuffer);
}
}catch(InterruptedException ex) { ... handle ... }
}
}
public class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.empty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
public void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
Exchange
8. Lock,Condition
锁是最基本的同步原语。通过在锁上面调用lock()和unlock()操作,可以达到与synchronized关键字相似的效果,但是有一点要注意的是,锁必须显式释放,如果由于抛出异常,而没有释放锁,将导致死锁出现。Condition提供的await(),signal(),signal()操作,与原来的wai(),notify(),notifyAll()操作具有相似的含义。Lock的两个主要子类是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允许多人读,而一人写。
例子:
使用Lock和Condition的生产者,消费者问题
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
9. 小结:新的concurrent包提供了一个从低到高的同步操作。
posted on 2007-01-22 17:33
Lib 阅读(1597)
评论(0) 编辑 收藏 所属分类:
Java