从J2SE 5.0开始有了java.util.concurrent套件,其中的类可以使实现多线程相关功能更为方便。本节将简介concurrent套件中的几个简单常用的类。
15.3.1 BlockingQueue
队列(Queue)是一个先进先出(First In First Out, FIFO)的数据结构。在J2SE 5.0中增加了java.util.concurrent.BlockingQueue。在多线程情况下,如果BlockingQueue的内容为空,而有个线程试图从Queue中取出元素,则该线程会被Block,直到Queue有元素时才解除Block;反过来,如果 BlockingQueue满了,而有个线程试图再把数据填入Queue中,则该线程会被Block,直到Queue中有元素被取走后解除Block。
BlockingQueue的几个主要操作如表15-1所示。
表15-1 BlockingQueue的几个操作
java.util.concurrent中提供几种不同的BlockingQueue。ArrayBlockingQueue要指定容量大小来构建。LinkedBlockingQueue默认没有容量上限,但也可以指定容量上限。PriorityBlockingQueue严格来说不是Queue,因为它是根据优先权(Priority)来移除元素。
我们以在wait()、notify()介绍时的生产者、消费者程序为例,使用BlockQueue来加以改写,优点是不用亲自处理wait()、notify()的细节。首先生产者改写如范例15.21所示:
范例15.21 ProducerQueue.java
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class ProducerQueue implements Runnable {
private BlockingQueue<Integer> queue;
public ProducerQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
for(int product = 1; product <= 10; product++) {
try {
// wait for a random time
Thread.sleep((int) Math.random() * 3000);
queue.put(product);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
可以看到,直接使用BlockingQueue,会自动处理同步化、wait()和notify()的执行。消费者类改写如范例15.22所示:
范例15.22 ConsumerQueue.java
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class ConsumerQueue implements Runnable {
private BlockingQueue<Integer> queue;
public ConsumerQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
for(int i = 1; i <= 10; i++) {
try {
// wait for a random time
Thread.sleep((int) (Math.random() * 3000));
queue.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
可以使用范例15.23进行简单的测试:
范例15.23 BlockingQueueDemo.java
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class ConsumerQueue implements Runnable {
private BlockingQueue<Integer> queue;
public ConsumerQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
for(int i = 1; i <= 10; i++) {
try {
// 等待一个随机时间
Thread.sleep((int) (Math.random() * 3000));
queue.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
由于BlockingQueue不需要您来控制,所以没有特意显示信息以表示生产者、消费者放入产品至Queue的信息,不过仍可以在ProducerQueue与ConsumerQueue中放入相关信息显示,以确认程序确实在运转。