1.CountDownLatch demo
package com.landon.mavs.example.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/** *//**
*
* CountdownLatch用法
*
* <pre>
* 1.同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
* 2.给定的计数 初始化 CountDownLatch.计数器到达零之前,所以调用await的线程会一直阻塞.之后,会释放所有等待的线程,执行await的后续调用.
* 3.计数无法被重置.如需重置计数,可考虑{@link java.util.concurrent.CyclicBarrier}
* 4.计数 1初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:
* 在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待;
* 用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待
* </pre>
*
* <pre>
* 1.CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch
* 2.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
* 3.await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
* 4.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程
* 5.getCount() 返回当前计数
* 6.toString() 返回标识此锁存器及其状态的字符串
* </pre>
*
* @author landon
*
*/

public class CountdownLatchExample
{

public static void main(String[] args) throws Exception
{
// 示例1:
// Master启动多个worker线程处理任务.所有的worker线程在执行任务前需等待Master初始化,Master线程初始化完毕,则startSignal.countdown,表示开始
// 工作线程被唤醒;然后Master阻塞,等待所有的worker线程执行完毕任务;每个worker线程执行完毕任务,则countdown一下,直至执行所有的任务完成;Master被唤醒,执行收尾工作.

// 示例2:
// 将一个问题分成 N 个部分,用执行每个部分并让锁存器倒计数的 Runnable 来描述每个部分,然后将所有 Runnable 加入到
// Executor 队列。
// 当所有的子部分完成后,协调线程就能够通过 await

Master master = new Master(3);
master.start();

// 任务分为5部分,交个线程池去执行任务.
CountDownLatch doneSignal = new CountDownLatch(5);
// 启动一个线程池去执行任务.这里是一个单线程(这里不关心有多少个线程去执行任务,这里只关心任务完成后计数递减,使得主线程可以继续执行)
ExecutorService executor = Executors.newSingleThreadExecutor();

// 向线程池提交5个任务

for (int i = 0; i < 5; i++)
{
executor.execute(new WorkerTask(doneSignal, i));
}

// 主线程等待任务完成
doneSignal.await();

// 此时toString:[Count = 0]
System.out.println("主线程:问题全部解决.继续:" + doneSignal.toString());
}


private static class Master
{
private CountDownLatch startSignal;
private CountDownLatch endSignal;


public Master(int workerNum)
{
startSignal = new CountDownLatch(1);
endSignal = new CountDownLatch(workerNum);

// 启动所有worker线程

for (int i = 0; i < workerNum; i++)
{
new Thread(new Worker(startSignal, endSignal)).start();
}
}


private void init()
{
System.out.println("Master 初始化环境");
}


public void start()
{

try
{
init();

// 初始化完毕,则唤醒工作线程执行任务.
startSignal.countDown();

// 等待所有worker线程完成任务
endSignal.await();

dispose();


} catch (Exception e)
{
}
}

private void dispose()
{
System.out.println("Master 执行收尾操作");
}
}


private static class Worker implements Runnable
{
private CountDownLatch startSignal;
private CountDownLatch endSignal;


public Worker(CountDownLatch startSignal, CountDownLatch endSignal)
{
this.startSignal = startSignal;
this.endSignal = endSignal;
}

@Override

public void run()
{

try
{
// 等待Master线程初始化完毕
startSignal.await();

System.out.println("worker 执行任务");

// 表示任务完成,计数递减,计数为0时,表示所有的任务完成
endSignal.countDown();

System.out.println("endSignal.counter:" + endSignal.getCount());

} catch (Exception e)
{
}
}
}


private static class WorkerTask implements Runnable
{
// 所有任务完成信号
private CountDownLatch doneSignal;
// 表示任务序号
private int i;


public WorkerTask(CountDownLatch doneSignal, int i)
{
this.doneSignal = doneSignal;
this.i = i;
}

@Override

public void run()
{

try
{
System.out.println("Worker[" + i + "]" + " 任务完成");
doneSignal.countDown();

} catch (Exception e)
{
}
}
}
}

2.CylicBarrier demo
package com.landon.mavs.example.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


/** *//**
*
* CyclicBarrier用法
*
* <pre>
* 1.同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
* 2.粗浅的理解即有一道屏障,目的是等待一组线程完成操作.某一线程完成操作后,则等待在屏障下(await).直至所有线程均到了屏障下,
* 则可执行指定的屏障操作.待执行完执行的屏障操作后,所有的线程则结束await,即越过屏障,继续执行后续操作.
* 3.该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
* </pre>
*
* @author landon
*
*/

public class CyclicBarrierExample
{

public static void main(String[] args) throws Exception
{
Master master = new Master();
master.start();
}
}

// 计算1²到10²和.分发到每个worker线程,最后合并

class Master
{
// 用来保存计算结果
private static List<Integer> result = new ArrayList<>();

// public CyclicBarrier(int parties, Runnable barrierAction)
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作
// 该操作由最后一个进入 barrier 的线程执行

private CyclicBarrier barrier = new CyclicBarrier(10, new Runnable()
{

@Override

public void run()
{
int sum = 0;

for (int tmp : result)
{
sum += tmp;
}

// final result:285_Worker-9
// 从输出看Worker-9执行了屏障操作.而Worker-9在线程的索引为0.即await的返回值.
// 执行完该操作后,所有的线程越过屏障,执行后续操作.
System.out.println("final result:" + sum + "_"
+ Thread.currentThread().getName());
}
});


public void start() throws Exception
{

for (int i = 0; i < 10; i++)
{
Thread thread = new Thread(new Worker(i, barrier), "Worker-" + i);
thread.start();
}
}

public static synchronized void addASum(int sum)
{
result.add(sum);
}
}


class Worker implements Runnable
{
private int i;
private CyclicBarrier barrier;


public Worker(int i, CyclicBarrier barrier)
{
this.i = i;
this.barrier = barrier;
}

@Override

public void run()
{
int sum = i * i;
Master.addASum(sum);


try
{

// 模拟一下耗时
Thread.sleep(i * 100);

// public int getNumberWaiting()
// 返回当前在屏障处等待的参与者数目
System.out.println(Thread.currentThread().getName()
+ "_curNumberWaitting:" + barrier.getNumberWaiting());

// public int await() throws InterruptedException,
// BrokenBarrierException
// 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
int curIndex = barrier.await();

System.out.println(Thread.currentThread().getName() + " end wait:"
+ curIndex);

} catch (InterruptedException e)
{

} catch (BrokenBarrierException e)
{
}
}
}
posted on 2014-03-01 11:53
landon 阅读(1734)
评论(4) 编辑 收藏 所属分类:
Program