posts - 156,  comments - 601,  trackbacks - 0

CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。假如在涉及一组固定大小的线程的程序应用中,要求这些线程必须不时地互相等待(保证所有线程都执行完毕才返回),那么选择 CyclicBarrier 就会让这个实现变得非常容易。CyclicBarrier 在释放等待线程后可以重用,所以又称它为循环 barrier

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令在每个屏障点完成前只运行一次。

 

下面是一个JDK官方自带示例(只演示使用方式,目前不能直接运行),可以让大家更好的了解一下CyclicBarrier的使用。

示例说明:有一个两维数组,保存的N行的数据内容。现在需要有N个线程,每个线程处理一行结果,当所有N的结果都处理完成后,返回。示例代码如下:

 

 

public class Solver {

    final int N;

    final float[][] data;

    final CyclicBarrier barrier;

 

    class Worker implements Runnable {

        int myRow;

 

        Worker(int row) {

            myRow = row;

        }

 

        public void run() {

            while (!done()) {

                processRow(myRow);

 

                try {

                    barrier.await();

                } catch (InterruptedException ex) {

                    return;

                } catch (BrokenBarrierException ex) {

                    return;

                }

            }

        }

    }

 

    public Solver(float[][] matrix) {

      data = matrix;

              N = matrix.length;

      //创建N个大小的Barrier,调用 barrier.await方法来等待线程结束

      barrier = new CyclicBarrier(N,

                                  new Runnable() {

                                    public void run() {

                                      mergeRows(...);

                                    }

                                  });

      for (int i = 0; i < N; ++i)

        new Thread(new Worker(i)).start();

 

      waitUntilDone();

    }

}

在这个例子中,每个 worker 线程处理一行数据,在处理完所有的行之前,该线程将一直在屏障处等待。

处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。

如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。 

如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。

为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作,例如: 
  if (barrier.await() == 0) {
     // log the completion of this iteration
   }
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。 

内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。

 

主要API介绍

await

public int await() throws InterruptedException, BrokenBarrierException

将一直等待。 

如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态: 

最后一个线程到达;

或者 其他某个线程中断当前线程;

或者 其他某个线程中断另一个等待线程;

或者 其他某个线程在等待 barrier 时超时;

或者 其他某个线程在此 barrier 上调用 reset() 

 

如果当前线程: 
在进入此方法时已经设置了该线程的中断状态;或者 
在等待时被中断 
则抛出 InterruptedException,并且清除当前线程的已中断状态。 
如果在线程处于等待状态时 barrier reset(),或者在调用 await barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。 

如果任何线程在等待时被 中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。 

如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。
如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。 

返回:
到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程 
抛出: 
InterruptedException -
如果当前线程在等待时被中断 
BrokenBarrierException -
如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。

 

getNumberWaiting

  public int getNumberWaiting()

返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。 
返回 当前阻塞在 await() 中的参与者数目。

 

完整示例

示例说明:假如一个飞机乘客的登机过程,要求乘客的身份证验证通过,乘客的登机牌已经更换以及乘客的行礼已经确认后才可以登录。

完整代码如下:

    public static void main(String[] args) {

       

        final int TOTAL_STEPS = 3;//步骤个数

       

        final String[] steps = new String[] {"换登机牌确认", "身份证确认", "行礼确认"};

       

        final CyclicBarrier cb = new CyclicBarrier(TOTAL_STEPS);

        ExecutorService es = Executors.newFixedThreadPool(TOTAL_STEPS); // 在线程池中放入三个线程

        for (int i = 0; i < TOTAL_STEPS; i++) { // 开启三个任务

            final String name = steps[i];

            es.execute(new Runnable() {

 

                public void run() {

                    for (int i = 0; i < 5; i++) {

                        try {

                            String passenger = "乘客"+i;

                            int wait = new Random().nextInt(5000);

                            Thread.sleep(wait);

                            System.out.print(passenger + " " + name + "【确认通过 + 耗时:" + wait);

                            // 如果有2个线程已经在等待,那么最后一个线程到达后就可以一起开始后面操作

                            if (cb.getNumberWaiting() + 1 == 3) {

                                System.out.println(" 全部通过,确认下一个乘客");

                            } else {

                                System.out.println(" 还有"

                                        + (TOTAL_STEPS - cb.getNumberWaiting() - 1) + "个任务等待");

                            }

                            cb.await();

                        } catch (Exception e) {

                            e.printStackTrace();

                        }

                    }

                }

            });

        }

        es.shutdown();

    }

}

执行结果如下:

乘客0 行礼确认【确认通过 + 耗时:1616 还有2个任务等待

乘客0 身份证确认【确认通过 + 耗时:1792 还有1个任务等待

乘客0 换登机牌确认【确认通过 + 耗时:3891 全部通过,确认下一个乘客

乘客1 身份证确认【确认通过 + 耗时:282 还有2个任务等待

乘客1 换登机牌确认【确认通过 + 耗时:4354 还有1个任务等待

乘客1 行礼确认【确认通过 + 耗时:4996 全部通过,确认下一个乘客

乘客2 身份证确认【确认通过 + 耗时:2977 还有2个任务等待

乘客2 行礼确认【确认通过 + 耗时:3848 还有1个任务等待

乘客2 换登机牌确认【确认通过 + 耗时:4069 全部通过,确认下一个乘客

乘客3 换登机牌确认【确认通过 + 耗时:905 还有2个任务等待

乘客3 身份证确认【确认通过 + 耗时:1916 还有1个任务等待

乘客3 行礼确认【确认通过 + 耗时:4710 全部通过,确认下一个乘客

乘客4 身份证确认【确认通过 + 耗时:1371 还有2个任务等待

乘客4 行礼确认【确认通过 + 耗时:1768 还有1个任务等待

乘客4 换登机牌确认【确认通过 + 耗时:2498 全部通过,确认下一个乘客

 

Good Luck!

Yours Matthew!


 

posted on 2012-06-28 13:34 x.matthew 阅读(2732) 评论(1)  编辑  收藏 所属分类: Best Practise(JDK API)

只有注册用户登录后才能发表评论。


网站导航: