1.CountDownLatch demo
package com.landon.mavs.example.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** *//**
*
* CountdownLatch用法
*
* <pre>
* 1.同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
* 2.给定的计数 初始化 CountDownLatch.计数器到达零之前,所以调用await的线程会一直阻塞.之后,会释放所有等待的线程,执行await的后续调用.
* 3.计数无法被重置.如需重置计数,可考虑{@link java.util.concurrent.CyclicBarrier}
* 4.计数 1初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:
* 在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待;
* 用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待
* </pre>
*
* <pre>
* 1.CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch
* 2.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
* 3.await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
* 4.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程
* 5.getCount() 返回当前计数
* 6.toString() 返回标识此锁存器及其状态的字符串
* </pre>
*
* @author landon
*
*/
public class CountdownLatchExample {
public static void main(String[] args) throws Exception {
// 示例1:
// Master启动多个worker线程处理任务.所有的worker线程在执行任务前需等待Master初始化,Master线程初始化完毕,则startSignal.countdown,表示开始
// 工作线程被唤醒;然后Master阻塞,等待所有的worker线程执行完毕任务;每个worker线程执行完毕任务,则countdown一下,直至执行所有的任务完成;Master被唤醒,执行收尾工作.
// 示例2:
// 将一个问题分成 N 个部分,用执行每个部分并让锁存器倒计数的 Runnable 来描述每个部分,然后将所有 Runnable 加入到
// Executor 队列。
// 当所有的子部分完成后,协调线程就能够通过 await
Master master = new Master(3);
master.start();
// 任务分为5部分,交个线程池去执行任务.
CountDownLatch doneSignal = new CountDownLatch(5);
// 启动一个线程池去执行任务.这里是一个单线程(这里不关心有多少个线程去执行任务,这里只关心任务完成后计数递减,使得主线程可以继续执行)
ExecutorService executor = Executors.newSingleThreadExecutor();
// 向线程池提交5个任务
for (int i = 0; i < 5; i++) {
executor.execute(new WorkerTask(doneSignal, i));
}
// 主线程等待任务完成
doneSignal.await();
// 此时toString:[Count = 0]
System.out.println("主线程:问题全部解决.继续:" + doneSignal.toString());
}
private static class Master {
private CountDownLatch startSignal;
private CountDownLatch endSignal;
public Master(int workerNum) {
startSignal = new CountDownLatch(1);
endSignal = new CountDownLatch(workerNum);
// 启动所有worker线程
for (int i = 0; i < workerNum; i++) {
new Thread(new Worker(startSignal, endSignal)).start();
}
}
private void init() {
System.out.println("Master 初始化环境");
}
public void start() {
try {
init();
// 初始化完毕,则唤醒工作线程执行任务.
startSignal.countDown();
// 等待所有worker线程完成任务
endSignal.await();
dispose();
} catch (Exception e) {
}
}
private void dispose() {
System.out.println("Master 执行收尾操作");
}
}
private static class Worker implements Runnable {
private CountDownLatch startSignal;
private CountDownLatch endSignal;
public Worker(CountDownLatch startSignal, CountDownLatch endSignal) {
this.startSignal = startSignal;
this.endSignal = endSignal;
}
@Override
public void run() {
try {
// 等待Master线程初始化完毕
startSignal.await();
System.out.println("worker 执行任务");
// 表示任务完成,计数递减,计数为0时,表示所有的任务完成
endSignal.countDown();
System.out.println("endSignal.counter:" + endSignal.getCount());
} catch (Exception e) {
}
}
}
private static class WorkerTask implements Runnable {
// 所有任务完成信号
private CountDownLatch doneSignal;
// 表示任务序号
private int i;
public WorkerTask(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
@Override
public void run() {
try {
System.out.println("Worker[" + i + "]" + " 任务完成");
doneSignal.countDown();
} catch (Exception e) {
}
}
}
}
2.CylicBarrier demo
package com.landon.mavs.example.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/** *//**
*
* CyclicBarrier用法
*
* <pre>
* 1.同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
* 2.粗浅的理解即有一道屏障,目的是等待一组线程完成操作.某一线程完成操作后,则等待在屏障下(await).直至所有线程均到了屏障下,
* 则可执行指定的屏障操作.待执行完执行的屏障操作后,所有的线程则结束await,即越过屏障,继续执行后续操作.
* 3.该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
* </pre>
*
* @author landon
*
*/
public class CyclicBarrierExample {
public static void main(String[] args) throws Exception {
Master master = new Master();
master.start();
}
}
// 计算1²到10²和.分发到每个worker线程,最后合并
class Master {
// 用来保存计算结果
private static List<Integer> result = new ArrayList<>();
// public CyclicBarrier(int parties, Runnable barrierAction)
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作
// 该操作由最后一个进入 barrier 的线程执行
private CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() {
@Override
public void run() {
int sum = 0;
for (int tmp : result) {
sum += tmp;
}
// final result:285_Worker-9
// 从输出看Worker-9执行了屏障操作.而Worker-9在线程的索引为0.即await的返回值.
// 执行完该操作后,所有的线程越过屏障,执行后续操作.
System.out.println("final result:" + sum + "_"
+ Thread.currentThread().getName());
}
});
public void start() throws Exception {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Worker(i, barrier), "Worker-" + i);
thread.start();
}
}
public static synchronized void addASum(int sum) {
result.add(sum);
}
}
class Worker implements Runnable {
private int i;
private CyclicBarrier barrier;
public Worker(int i, CyclicBarrier barrier) {
this.i = i;
this.barrier = barrier;
}
@Override
public void run() {
int sum = i * i;
Master.addASum(sum);
try {
// 模拟一下耗时
Thread.sleep(i * 100);
// public int getNumberWaiting()
// 返回当前在屏障处等待的参与者数目
System.out.println(Thread.currentThread().getName()
+ "_curNumberWaitting:" + barrier.getNumberWaiting());
// public int await() throws InterruptedException,
// BrokenBarrierException
// 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
int curIndex = barrier.await();
System.out.println(Thread.currentThread().getName() + " end wait:"
+ curIndex);
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
posted on 2014-03-01 11:53
landon 阅读(1728)
评论(4) 编辑 收藏 所属分类:
Program