新的起点 新的开始

快乐生活 !

(转)Java 5.0多线程编程(4)

CountDownLatch:

   CountDownLatch 是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。比如说一个 Server 启动时需要初始化 4 个部件, Server 可以同时启动 4 个线程去初始化这 4 个部件,然后调用 CountDownLatch(4).await() 阻断进入等待,每个线程完成任务后会调用一次 CountDownLatch.countDown() 来倒计数 , 4 个线程都结束时 CountDownLatch 的计数就会降低为 0 ,此时 Server 就会被唤醒继续下一步操作。 CountDownLatch 的方法主要有:

  • await() :使调用此方法的线程阻断进入等待
  • countDown(): 倒计数,将计数值减 1
  • getCount(): 得到当前的计数值

   CountDownLatch 的例子:一个 server 调了三个 ComponentThread 分别去启动三个组件,然后 server 等到组件都启动了再继续。

public class Server {

      public static void main(String[] args) throws InterruptedException{

            System.out.println("Server is starting.");

            // 初始化一个初始值为 3 CountDownLatch

            CountDownLatch latch = new CountDownLatch(3);

            // 3 个线程分别去启动 3 个组件

            ExecutorService service = Executors.newCachedThreadPool();

            service.submit(new ComponentThread(latch, 1));

            service.submit(new ComponentThread(latch, 2));

            service.submit(new ComponentThread(latch, 3));

            service.shutdown();

            // 进入等待状态

            latch.await();

            // 当所需的三个组件都完成时, Server 就可继续了

            System.out.println("Server is up!");

      }

}

 

public class ComponentThread implements Runnable{

      CountDownLatch latch;

      int ID;

      /** Creates a new instance of ComponentThread */

      public ComponentThread(CountDownLatch latch, int ID) {

            this.latch = latch;

            this.ID = ID;

      }

      public void run() {

            System.out.println("Component "+ID + " initialized!");

            // 将计数减一

            latch.countDown();

      }    

}

运行结果:

Server is starting.

Component 1 initialized!

Component 3 initialized!

Component 2 initialized!

Server is up!

CyclicBarrier:

   CyclicBarrier 类似于 CountDownLatch 也是个计数器,不同的是 CyclicBarrier 数的是调用了 CyclicBarrier.await() 进入等待的线程数,当线程数达到了 CyclicBarrier 初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 CyclicBarrier 就象它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。 CyclicBarrier 初始时还可带一个 Runnable 的参数,此 Runnable 任务在 CyclicBarrier 的数目达到后,所有其它线程被唤醒前被执行。

CyclicBarrier 提供以下几个方法:

  • await() :进入等待
  • getParties() :返回此 barrier 需要的线程数
  • reset() :将此 barrier 重置

   以下是使用 CyclicBarrier 的一个例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和(这个例子比较无聊,我没有想到更合适的例子)

public class MainThread {

public static void main(String[] args)

      throws InterruptedException, BrokenBarrierException, TimeoutException{

            final int[] array = new int[2];

            CyclicBarrier barrier = new CyclicBarrier(2,

                  new Runnable() {// 在所有线程都到达 Barrier 时执行

                  public void run() {

                        System.out.println("Total is:"+(array[0]+array[1]));

                  }

            });           

            // 启动线程

            new Thread(new ComponentThread(barrier, array, 0)).start();

            new Thread(new ComponentThread(barrier, array, 1)).start();   

      }     

}

 

public class ComponentThread implements Runnable{

      CyclicBarrier barrier;

      int ID;

      int[] array;

      public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

            this.barrier = barrier;

            this.ID = ID;

            this.array = array;

      }

      public void run() {

            try {

                  array[ID] = new Random().nextInt();

                  System.out.println(ID+ " generates:"+array[ID]);

                  // 该线程完成了任务等在 Barrier

                  barrier.await();

            } catch (BrokenBarrierException ex) {

                  ex.printStackTrace();

            } catch (InterruptedException ex) {

                  ex.printStackTrace();

            }

      }

}

Exchanger:

   顾名思义 Exchanger 让两个线程可以互换信息。用一个例子来解释比较容易。例子中服务生线程往空的杯子里倒水,顾客线程从装满水的杯子里喝水,然后通过 Exchanger 双方互换杯子,服务生接着往空杯子里倒水,顾客接着喝水,然后交换,如此周而复始。

class FillAndEmpty {

      // 初始化一个 Exchanger ,并规定可交换的信息类型是 DataCup

      Exchanger exchanger = new Exchanger();

      Cup initialEmptyCup = ...; // 初始化一个空的杯子

      Cup initialFullCup = ...; // 初始化一个装满水的杯子

      // 服务生线程

      class Waiter implements Runnable {

            public void run() {

                  Cup currentCup = initialEmptyCup;

                  try {

                        // 往空的杯子里加水

                        currentCup.addWater();

                        // 杯子满后和顾客的空杯子交换

                        currentCup = exchanger.exchange(currentCup);

                  } catch (InterruptedException ex) { ... handle ... }

             }

      }

      // 顾客线程

      class Customer implements Runnable {

            public void run() {

                  DataCup currentCup = initialFullCup;

                  try {

                        // 把杯子里的水喝掉

                        currentCup.drinkFromCup();

                        // 将空杯子和服务生的满杯子交换

                        currentCup = exchanger.exchange(currentCup);

                  } catch (InterruptedException ex) { ... handle ...}

            }

      }

     

      void start() {

            new Thread(new Waiter()).start();

            new Thread(new Customer()).start();

      }

}

6: BlockingQueue接口

   BlockingQueue 是一种特殊的 Queue ,若 BlockingQueue 是空的,从 BlockingQueue 取东西的操作将会被阻断进入等待状态直到 BlocingkQueue 进了新货才会被唤醒。同样,如果 BlockingQueue 是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到 BlockingQueue 里有新的空间才会被唤醒继续操作。 BlockingQueue 提供的方法主要有:

  • add(anObject): anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容纳返回 true ,否则抛出 IllegalStateException 异常。
  • offer(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容纳返回 true ,否则返回 false
  • put(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 没有空间,调用此方法的线程被阻断直到 BlockingQueue 里有新的空间再继续。
  • poll(time) :取出 BlockingQueue 里排在首位的对象,若不能立即取出可等 time 参数规定的时间。取不到时返回 null
  • take() :取出 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的对象被加入为止。

根据不同的需要 BlockingQueue 4 种具体实现:

  • ArrayBlockingQueue :规定大小的 BlockingQueue ,其构造函数必须带一个 int 参数来指明其大小。其所含的对象是以 FIFO (先入先出)顺序排序的。
  • LinkedBlockingQueue :大小不定的 BlockingQueue ,若其构造函数带一个规定大小的参数,生成的 BlockingQueue 有大小限制,若不带大小参数,所生成的 BlockingQueue 的大小由 Integer.MAX_VALUE 来决定。其所含的对象是以 FIFO (先入先出)顺序排序的。 LinkedBlockingQueue ArrayBlockingQueue 比较起来,它们背后所用的数据结构不一样,导致 LinkedBlockingQueue 的数据吞吐量要大于 ArrayBlockingQueue ,但在线程数量很大时其性能的可预见性低于 ArrayBlockingQueue
  • PriorityBlockingQueue :类似于 LinkedBlockingQueue ,但其所含对象的排序不是 FIFO ,而是依据对象的自然排序顺序或者是构造函数所带的 Comparator 决定的顺序。
  • SynchronousQueue :特殊的 BlockingQueue ,对其的操作必须是放和取交替完成的。

下面是用 BlockingQueue 来实现 Producer Consumer 的例子:

public class BlockingQueueTest {

      static BlockingQueue basket;

      public BlockingQueueTest() {

            // 定义了一个大小为 2 BlockingQueue ,也可根据需要用其他的具体类

            basket = new ArrayBlockingQueue(2);

      }

      class Producor implements Runnable {

            public void run() {

                  while(true){

                        try {

                              // 放入一个对象,若 basket 满了,等到 basket 有位置

                              basket.put("An apple");

                        } catch (InterruptedException ex) {

                              ex.printStackTrace();

                        }

                  }

            }

      }

      class Consumer implements Runnable {

            public void run() {

                  while(true){

                        try {

                              // 取出一个对象,若 basket 为空,等到 basket 有东西为止

                              String result = basket.take();

                        } catch (InterruptedException ex) {

                              ex.printStackTrace();

                        }

                  }

            }           

      }

      public void execute(){

            for(int i=0; i<10; i++){

                  new Thread(new Producor()).start();

                  new Thread(new Consumer()).start();

            }           

      }

      public static void main(String[] args){

            BlockingQueueTest test = new BlockingQueueTest();

            test.execute();

      }     

}

7Atomics 原子级变量

   原子量级的变量,主要的类有 AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference …… 。这些原子量级的变量主要提供两个方法:

  • compareAndSet(expectedValue, newValue): 比较当前的值是否等于 expectedValue , 若等于把当前值改成 newValue ,并返回 true 。若不等,返回 false
  • getAndSet(newValue): 把当前值改为 newValue ,并返回改变前的值。

   这些原子级变量利用了现代处理器( CPU )的硬件支持可把两步操作合为一步的功能,避免了不必要的锁定,提高了程序的运行效率。

8Concurrent Collections 共点聚集

   在 Java 的聚集框架里可以调用 Collections.synchronizeCollection(aCollection) 将普通聚集改变成同步聚集,使之可用于多线程的环境下。 但同步聚集在一个时刻只允许一个线程访问它,其它想同时访问它的线程会被阻断,导致程序运行效率不高。 Java 5.0 里提供了几个共点聚集类,它们把以前需要几步才能完成的操作合成一个原子量级的操作,这样就可让多个线程同时对聚集进行操作,避免了锁定,从而提高了程序的运行效率。 Java 5.0 目前提供的共点聚集类有: ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList CopyOnWriteArraySet.


posted on 2007-03-26 14:32 advincenting 阅读(534) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航:
 

公告

Locations of visitors to this page

导航

<2007年3月>
25262728123
45678910
11121314151617
18192021222324
25262728293031
1234567

统计

常用链接

留言簿(13)

随笔分类(71)

随笔档案(179)

文章档案(13)

新闻分类

IT人的英语学习网站

JAVA站点

优秀个人博客链接

官网学习站点

生活工作站点

最新随笔

搜索

积分与排名

最新评论

阅读排行榜

评论排行榜