posts - 9, comments - 4, trackbacks - 0, articles - 21

生产者/消费者模型

Posted on 2007-10-27 16:23 一步一步努力向上爬 阅读(3537) 评论(0)  编辑  收藏 所属分类: 设计模式
生产者/消费者模型是最基本的并发协作模型,是所有并发协作的基础。可以这么说,其他的并发协作都是供求关系模型的变种。生产者,消费者之间的供求关系可以简单的 使用管道来构造。让我们看两者之间的行为模式: *生产/消费模型:消费者如果无消费对象,就会阻塞直到有消费对象到达;一个消费对象仅供一个消费者消费。 *BlockingQueue: 如果队列为空,则读取操作将会阻塞直至队列有新的内容到达;队列中对象一旦被读取,将从队列中移走。 由此可见,阻塞队列天然符合生产/消费模型的供求行为模式。在前面展示condition的用法的时候,曾经 用过生产者/消费者模型来举例。那个例子如果改用BlockingQueue来写的话就十分简单
    ...
BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
...
public void supply () {
q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
}
...
public void cunsume () {
String product =q.take();
System.out.println("consume product:"+product);
}
从BlockingQueue也可以看出,它和UNIX系统下面的Pipe十分相似。所不同的不过是两点,首先,pipe是进程间的,命名管道甚至可以在非亲缘进程间使用,而BlockingQueue 目前只是线程间的通信手段。当然,由于java本身强大的动态类装载功能,这个缺陷对java程序之间的沟通限制并不大。其次,pipe是基于字节流的,而BlockingQueue是 基于对象的,这使得BlockingQueue更加易用,不过却让BlockingQueue绑定了Java语言,使进一步成为轻量级本地进程通信工具的难度增大。

从前面对生产/消费模型的行为方式可以看出,生产/消费模型着重于规范消费者的行为模式,当消费速度超过生产速度的时候,消费者就会被阻塞。而对于生产者的行为则没有 规定。当生产速度超过消费速度,生产者的行为模式可以分为以下几种: #当积压的产品达到一定数量时,生产者被阻塞 #无论有多少积压产品,生产者都不会被阻塞 #不能有任何积压产品,生产者在当前产品未被消费之前,会被阻塞 对于产品来说,也有不同的行为模式 #产品只有在被生产出来一段时间之后才能被消费(先花点时间晾晾干?) #不同类别的产品被消费的优先级不同(有钻石的话,黄金就先放一边吧:))

根据生产者行为模式的不同Concurrent包提供了不同的BlockingQueue的实现 ||Queue种类||行为描述 |ArrayBlockingQueue|有限制的blocking queue,积压的产品不得超过制订数量 |DelayQueue|产品只有生产出一段时间之后,才能被消费,无限制的积压产品 |LinkedBlockingQueue|同时支持有限制的blocking queue,也能支持无限制的积压产品(数量不能超过Integer.MAX_VALUE) |PriorityBlockingQueue|不同产品的被消费优先级不同,无限制的积压产品 |SynchronousQueue|不允许积压产品

这些不同的行为模式中,较为常见的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。举例来说,如果我们利用BlockingQueue 来实现一个邮件系统(著名的qmail就是利用pipe技术构建的核心架构)。我们知道邮件有不同的级别,如果当前队列里有加急邮件需要处理的话,系统将优先处理加急邮件。 我们将以邮件传递为例子,说明PriorityBlockingQueue的使用方法。(注:这里的这个邮件模型只是一个非常简陋的模型,用来说明PriorityBlockingQueue的使用方法而已, 和实际应用有很大的差距)

首先,我们需要了解邮件传递过程的基本模型。在这个简单的邮件传送模型中涉及到下列概念 *MDA: Mail Deliver Agent, 负责接受指定用户的邮件。 *MTA: Mail Transfer Agent, 负责接受远程传送过来的邮件,并将其传送给收件人的MDA 它们和邮件用户之间的关系如下图

其中MTA使用Queue传送邮件给MDA。因此,不同的用户会使用不同的Mail Queue。下面是MailQueue的代码
public class MailQueue<E> extends PriorityBlockingQueue<E>{
public E take () throws InterruptedException {
E ren =super.take();
Utils._log("take:"+ren);
return ren;
}

public void put (E o) {
super.put(o);
Utils._log("put:"+o);
}
}
为了能够根据收件人的Mail Address找到相应的Mail Queue, 使用一个MailQueueFactory来产生MailQueue
public class MailQueueFactory {
//A ConcurrentHashMap is used here instead of Hashtable
static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
return getMailQueue(e.getMailAccount());
}

public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
return getMailQueue (m.getReceiver());
}

public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
mailQueues.putIfAbsent (e,new MailQueue<Mail>());
MailQueue<Mail> mailQ =mailQueues.get(e);

return mailQ;
}
}
需要注意的是,我们在MailQueueFactory里面使用了ConcurrentHashMap,而不是传统的Hashtable, 虽然Hashtable是thread-safe,但是缺乏putIfAbsent这样的 原子函数,如果不小心设计的话,会造成对同一个MailQueue重复初始化,从而导致死锁问题。 下面看Mail的定义
public class Mail implements Comparable{
public final static int emergencyMail =0;
public final static int normalMail =1;

static AtomicInteger serialCounter =new AtomicInteger(0);

private int mailLevel;
private int serialNumber =serialCounter.addAndGet(1);
private MailAccount receiver =null;
private MailAccount sender =null;
private Date sendTime =new Date();

public Mail (String from, String to, int level) {
...
}

//Get functions
...

public int compareTo(Object o) {
if (o instanceof Mail) {
return compareTo ((Mail)o);
}
return 0;
}

public int compareTo (Mail o) {
if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
if (o.serialNumber==this.serialNumber)
return 0;
if (o.serialNumber>this.serialNumber)
return -1;
return 1;
}
if (this.mailLevel==emergencyMail) return -1;
return 1;
}
//Other functions
...
}
这里值得注意的是AtomicInteger的使用,它被用来做内部serialNumber的产生。另外就是compareTo函数的使用,PriorityBlockingQueue使用Comparable接口来判定元素的优先级别。这里所定义的优先级如下: *如果邮件类别相同,则序列号小的邮件有较大的优先级 *如果邮件类别不同,则emergencyMail有较大的优先级 最后是Deliver Agent 和 Transfer Agent的代码
public class MailDeliverer {
MailAccount mailAccount =null;

public MailDeliverer (MailAccount account) {
this.mailAccount =account;
}

public MailAccount getMailAccount() {
return mailAccount;
}

public Mail retrieveMail () {
Mail mail =null;
while (mail==null) {
try {
mail =MailQueueFactory.getMailQueue(this).take();
}catch (Exception e) {
Utils._log("Encounter Exception",e);
}
}
return mail;
}
}

public class MailTransfer {
private static MailTransfer instance =new MailTransfer ();
private MailTransfer () { }

public static MailTransfer getInstance () {
return instance;
}

public void processMail (Mail m) {
BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
try {
mailQ.put(m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


只有注册用户登录后才能发表评论。


网站导航: