package com.google.study.MQ;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MessageQueue {
private List<Message> messageList = new LinkedList<Message>();
private final ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private Integer maxNum = 5;
private Integer minNum = 0;
public int size() {
return messageList.size();
}
public void produce(Message e) throws InterruptedException {
try {
lock.lock();
while (messageList.size() == maxNum) {
notFull.await();
}
messageList.add(e);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public void consume() {
try {
lock.lock();
while (messageList.size() == minNum) {
notEmpty.await();
}
messageList.get(0);
messageList.remove(0);
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
package com.google.study.MQ;
public class Consume implements Runnable {
private MessageQueue queue;
public Consume(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
queue.consume();
System.out.println(queue.size());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
package com.google.study.MQ;
public class Produce implements Runnable {
private MessageQueue queue;
public Produce(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
queue.produce(getMessage());
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private Message getMessage() {
Message m = new Message();
m.setName("1");
m.setValue(1);
return m;
}
}
package com.google.study.MQ;
import java.io.Serializable;
public class Message implements Serializable {
private int value;
private String name;
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public String getName() {
return name;
}
package com.google.study.MQ;
public class Test {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue();
Thread p1 = new Thread(new Produce(queue));
Thread p2 = new Thread(new Produce(queue));
Thread p3 = new Thread(new Produce(queue));
Thread p4 = new Thread(new Produce(queue));
Thread c1 = new Thread(new Consume(queue));
Thread c2 = new Thread(new Consume(queue));
p1.start();
p2.start();
p3.start();
p4.start();
c1.start();
c2.start();
}
}
public void setName(String name) {
this.name = name;
}
}