yuyee

生产者消费者

package com.google.study.MQ;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue {

private List<Message> messageList = new LinkedList<Message>();
private final ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private Integer maxNum = 5;
private Integer minNum = 0;

public int size() {
return messageList.size();
}

public void produce(Message e) throws InterruptedException {
try {
lock.lock();
while (messageList.size() == maxNum) {
notFull.await();
}
messageList.add(e);
notEmpty.signal();
} finally {
lock.unlock();
}
}

public void consume() {

try {
lock.lock();
while (messageList.size() == minNum) {
notEmpty.await();
}
messageList.get(0);
messageList.remove(0);
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}
package com.google.study.MQ;

public class Consume implements Runnable {
private MessageQueue queue;

public Consume(MessageQueue queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
queue.consume();
System.out.println(queue.size());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

package com.google.study.MQ;

public class Produce implements Runnable {
private MessageQueue queue;

public Produce(MessageQueue queue) {
this.queue = queue;
}

@Override
public void run() {

while (true) {
try {
queue.produce(getMessage());
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

private Message getMessage() {

Message m = new Message();
m.setName("1");
m.setValue(1);
return m;
}

}
package com.google.study.MQ;

import java.io.Serializable;

public class Message implements Serializable {
private int value;
private String name;

public int getValue() {
return value;
}

public void setValue(int value) {
this.value = value;
}

public String getName() {
return name;
}
package com.google.study.MQ;

public class Test {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue();
Thread p1 = new Thread(new Produce(queue));
Thread p2 = new Thread(new Produce(queue));
Thread p3 = new Thread(new Produce(queue));
Thread p4 = new Thread(new Produce(queue));

Thread c1 = new Thread(new Consume(queue));
Thread c2 = new Thread(new Consume(queue));

p1.start();
p2.start();
p3.start();
p4.start();
c1.start();
c2.start();
}
}

public void setName(String name) {
this.name = name;
}

}

posted on 2010-11-01 00:47 羔羊 阅读(148) 评论(0)  编辑  收藏 所属分类: concurrent