1.SynchronousQueue示例代码,详解见代码注释package com.landon.mavs.example.concurrent;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** *//**
*
* {@link java.util.concurrent.SynchronousQueue}示例
*
* <pre>
* 一种无缓冲的等待队列,类似于无中介的直接交易
* </pre>
*
* <pre>
* 1.public class SynchronousQueue<E> extends AbstractQueue<E>
* implements BlockingQueue<E>, java.io.Serializable
* 2.其是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作,反之亦然.
* 3.同步队列没有任何内部容量,甚至连一个队列的容量都没有.(isEmpty永远返回true)
* 4.不能在同步队列上peek(获取但并不移除此队列的头),因为仅在试图移除元素时该元素才存在.
* 5.除非另一个线程试图移除某个元素,否则也不能使用任何方法插入元素.
* 6.无法迭代队列,因为没有元素可用于迭代.
* 7.该队列的头是尝试添加到队列的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且pool方法将返回null.
* 6.同步队列类似于传递性的设计,即一个线程中运行的对象要将某些信息/事件/任务传递给另一个线程中运行的对象,->同步
* 7.SynchronousQueue构造方法中可指定fairness的策略.如果fair,则等待线程以FIFO的顺序竞争访问;否则顺序是未指定的.
* </pre>
*
* <pre>
* 总结:1.进行相应插入和移除的操作时必须要有相应的移除和插入操作(__>同步).否则失败.
* 2.公平策略:内部实现为TransferQueue
* FIFO:先进先出_用队列(Queue)来阻塞多余的生产者和消费者
* 3.非公平策略:内部实现为TransferStack
* LIFO:后进先出_用栈(Stack)来阻塞多余的生产者和消费者
* </pre>
*
* @author landon
*
*/
public class SynchronousQueueExample {
private static final Logger LOGGER = LoggerFactory
.getLogger(SynchronousQueueExample.class);
public static void main(String[] args) {
SynchronousQueue<SQElement> sq = new SynchronousQueue<SQElement>();
// 启动take线程
SQTakeThread sqtt = new SQTakeThread(sq, "sqtt");
sqtt.start();
// 启动put线程,此时有sqtt线程正在执行take.所以put成功,take成功.
SQPutThread sqpt = new SQPutThread(sq, 1, "sqpt");
sqpt.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动put线程
SQPutThread sqpt2 = new SQPutThread(sq, 2, "sqpt2");
sqpt2.start();
// 启动take线程,此时有sqpt2线程正在进行put.所以take成功.put成功.
SQTakeThread sqtt2 = new SQTakeThread(sq, "sqpt2");
sqtt2.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动take线程
SQTakeThread sqtt3 = new SQTakeThread(sq, "sqtt3");
sqtt3.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 主线程启动offer.则发现sqtt3线程正在take.所以offer成功,take成功.
boolean offerResult = sq.offer(new SQElement(3));
LOGGER.debug("offerResult:{}", offerResult);
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动offer_timeout线程
SQOfferTimeoutThread sqott = new SQOfferTimeoutThread(sq, 4);
sqott.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动poll线程,启动发现sqott线程正在offer.所以poll成功,offer_timeout成功
SQPollThread sqPollThread = new SQPollThread(sq);
sqPollThread.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动poll_timeout线程
SQPollTimeoutThread sqptot = new SQPollTimeoutThread(sq);
sqptot.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 启动offer线程,此时发现有poll_timeout正在poll.所以offer成功,poll成功.
SQOfferThread sqot = new SQOfferThread(sq, 5);
sqot.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 这个必然返回false.因为此时执行插入的时候并没有一个移除的操作
boolean mainOfferResult = sq.offer(new SQElement(6));
LOGGER.debug("mainOfferResult:{}", mainOfferResult);
try {
// 主线程同样会超时等待2秒.结果也返回false.原因同上.
boolean mainOfferResult2 = sq.offer(new SQElement(7), 2,
TimeUnit.SECONDS);
LOGGER.debug("mainOfferResult2:{}", mainOfferResult2);
} catch (InterruptedException e) {
}
// 输出必然返回null.因为只有在试图移除元素时,该元素才存在.该方法始终返回null.
SQElement peekElement = sq.peek();
LOGGER.debug("peekElement:{}", peekElement);
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 测试公平访问策略.
SynchronousQueue<SQElement> sq2 = new SynchronousQueue<SQElement>(true);
// 从输出看:consumer是按照FIFO的顺序,即consumer1->consumer2->consumer3分别拿到数据->公平策略
// consumer1
SQTakeThread sq2TakeThread1 = new SQTakeThread(sq2, "sq2TakeThread1");
sq2TakeThread1.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// consumer2
SQTakeThread sq2TakeThread2 = new SQTakeThread(sq2, "sq2TakeThread2");
sq2TakeThread2.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer1
SQPutThread sq2PutThread1 = new SQPutThread(sq2, 11, "sq2PutThread1");
sq2PutThread1.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// consumer3
SQTakeThread sq2TakeThread3 = new SQTakeThread(sq2, "sq2TakeThread3");
sq2TakeThread3.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer2
SQPutThread sq2PutThread2 = new SQPutThread(sq2, 12, "sq2PutThread2");
sq2PutThread2.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer3
SQPutThread sq2PutThread3 = new SQPutThread(sq2, 13, "sq2PutThread3");
sq2PutThread3.start();
// 主线程暂停1秒
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 测试非公平访问策略.这也是同步队列的默认策略
SynchronousQueue<SQElement> sq3 = new SynchronousQueue<SQElement>(false);
// 从输出看:
// 1.当启动producer1的时候,因为consumer2是LI的->所以其率先take
// 2.当启动producer2的时候,因为consumer3是LI的->所以其率先take
// 3.最后take的是最早启动的consumer1
// 所以整体是不公平的,即LIFO
// consumer1
SQTakeThread sq3TakeThread1 = new SQTakeThread(sq3, "sq3TakeThread1");
sq3TakeThread1.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// consumer2
SQTakeThread sq3TakeThread2 = new SQTakeThread(sq3, "sq3TakeThread2");
sq3TakeThread2.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer1
SQPutThread sq3PutThread1 = new SQPutThread(sq3, 21, "sq3PutThread1");
sq3PutThread1.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// consumer3
SQTakeThread sq3TakeThread3 = new SQTakeThread(sq3, "sq3TakeThread3");
sq3TakeThread3.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer2
SQPutThread sq3PutThread2 = new SQPutThread(sq3, 22, "sq3PutThread2");
sq3PutThread2.start();
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// producer3
SQPutThread sq3PutThread3 = new SQPutThread(sq3, 23, "sq3PutThread3");
sq3PutThread3.start();
}
/** *//**
*
* SynchronousQueue element
*
* @author landon
*
*/
private static class SQElement {
private int id;
public SQElement(int id) {
this.id = id;
}
@Override
public String toString() {
return "SQElement [id=" + id + "]";
}
}
// producer
private static class SQPutThread extends Thread {
private SynchronousQueue<SQElement> sq;
private int curElementNum;
public SQPutThread(SynchronousQueue<SQElement> sq, int elementNum,
String name) {
super(name);
this.sq = sq;
curElementNum = elementNum;
}
@Override
public void run() {
SQElement element = new SQElement(curElementNum);
LOGGER.debug(getName() + ".sq.put.{} begin.", element);
try {
sq.put(element);
} catch (InterruptedException e) {
LOGGER.warn(getName() + ".sq.put.err.", e);
}
LOGGER.debug(getName() + ".sq.put end:{}.", curElementNum);
}
}
// consumer
private static class SQTakeThread extends Thread {
private SynchronousQueue<SQElement> sq;
public SQTakeThread(SynchronousQueue<SQElement> sq, String name) {
super(name);
this.sq = sq;
}
@Override
public void run() {
LOGGER.debug(getName() + ".sq.take begin");
SQElement element = null;
try {
element = sq.take();
} catch (InterruptedException e) {
LOGGER.warn(getName() + ".sq.take.err", e);
}
LOGGER.debug(getName() + ".sq.take:{}", element);
}
}
private static class SQPollThread extends Thread {
private SynchronousQueue<SQElement> sq;
public SQPollThread(SynchronousQueue<SQElement> sq) {
this.sq = sq;
}
@Override
public void run() {
LOGGER.debug("SQPollThread.sq.poll begin");
SQElement element = sq.poll();
LOGGER.debug("SQPollThread.sq.poll :{}", element);
}
}
private static class SQOfferTimeoutThread extends Thread {
private SynchronousQueue<SQElement> sq;
private int elementNum;
public SQOfferTimeoutThread(SynchronousQueue<SQElement> sq,
int elementNum) {
this.sq = sq;
this.elementNum = elementNum;
}
@Override
public void run() {
LOGGER.debug("SQPollThread.sq.offer.timeout(3) begin");
SQElement element = new SQElement(elementNum);
try {
boolean offerTimeoutResult = sq.offer(element, 3,
TimeUnit.SECONDS);
LOGGER.debug("SQPollThread.sq.offer.timeout(3).result:{}",
offerTimeoutResult);
} catch (InterruptedException e) {
}
}
}
private static class SQOfferThread extends Thread {
private SynchronousQueue<SQElement> sq;
private int elementNum;
public SQOfferThread(SynchronousQueue<SQElement> sq, int elementNum) {
this.sq = sq;
this.elementNum = elementNum;
}
@Override
public void run() {
LOGGER.debug("SQPollThread.sq.offer begin");
SQElement element = new SQElement(elementNum);
boolean offerResult = sq.offer(element);
LOGGER.debug("SQPollThread.sq.offer.result:{}", offerResult);
}
}
private static class SQPollTimeoutThread extends Thread {
private SynchronousQueue<SQElement> sq;
public SQPollTimeoutThread(SynchronousQueue<SQElement> sq) {
this.sq = sq;
}
@Override
public void run() {
LOGGER.debug("SQPollThread.sq.poll.timeout(3) begin");
SQElement element = null;
try {
element = sq.poll(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
LOGGER.debug("SQPollThread.sq.poll.timeout(3):{}", element);
}
}
}
2.LinkedBlockingQueue示例代码,详解见代码注释package com.landon.mavs.example.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** *//**
*
* {@link java.util.concurrent.LinkedBlockingQueue}示例
*
* <pre>
* 1.public class LinkedBlockingQueue<E> extends AbstractQueue<E>
* implements BlockingQueue<E>, java.io.Serializable
* 2.无界阻塞队列并可以指定capacity防止队列过度扩展
* 3.不能插入null,否则报空指针异常
* 4.其内部锁实现:takeLock/putLock,即采用了锁分离技术->因为二者分别操作队列的首和尾,理论上,二者并不冲突
* 5.内部阻塞实现:Condition notEmpty/Condition notFull,即take的时候如果队列为空则等待/put的时候如果队列已满则等待
* </pre>
*
* @author landon
*
*/
public class LinkedBlockingQueueExample {
private static final Logger LOGGER = LoggerFactory
.getLogger(LinkedBlockingQueueExample.class);
public static void main(String[] args) {
// public LinkedBlockingQueue(int capacity) 指定capacity
LinkedBlockingQueue<LBQElement> lbq = new LinkedBlockingQueue<LBQElement>(
1);
// AbstractQueue#public boolean add(E e)
// 源码实现:
// 1. if (offer(e)){return true}
// else{throw new IllegalStateException("Queue full")},
// 即该方法首先调用offer,如果offer失败即违反容量限制则抛出一个异常.
try {
boolean addResult = lbq.add(new LBQElement(1));
LOGGER.debug("sq.add(new LBQElement(1)).result:{}", addResult);
} catch (IllegalStateException e) {
LOGGER.warn("sq.add(new SQElement(1)).err.", e);
}
// public boolean offer(E e) 将指定元素插入到此队列的尾部,成功时返回 true,如果此队列已满,则返回 false
// 当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常
// 从输出结果上看,因为队列容量是1,所以offer失败.
boolean offerResult = lbq.offer(new LBQElement(2));
LOGGER.debug("sq.offer(new LBQElement(2)).result:{}", offerResult);
try {
// public boolean offer(E e, long timeout, TimeUnit unit) throws
// InterruptedException
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用;如果超时则返回false
boolean offerTimeoutResult = lbq.offer(new LBQElement(3), 3,
TimeUnit.SECONDS);
// 从输出看,超时3秒后返回false
LOGGER.warn(
" sq.offer(new LBQElement(3), 3,TimeUnit.SECONDS).result:{}",
offerTimeoutResult);
} catch (InterruptedException e) {
LOGGER.warn("sq.offer(new LBQElement(3), 3,TimeUnit.SECONDS).err.",
e);
}
// 启动LBQ_Put_Thread,调用阻塞队列的put方法,则LBQ_Put_Thread则阻塞等待可用空间
LBQPutThread lbqpt = new LBQPutThread(lbq);
lbqpt.start();
// AbstractQueue#public E element()
// 源码实现:
// 调用peek方法返回E.如果E不为null,则直接返回E.否则抛出NoSuchElementException
// 即获取但是不移除此队列的头->队列为空时抛出NoSuchElementException
LBQElement headElement = lbq.element();
LOGGER.debug("headElement:{}", headElement);
// public E peek()
// 获取但不移除此队列的头;如果此队列为空,则返回 null
LBQElement headPeekElement = lbq.peek();
LOGGER.debug("headPeekElement:{}", headPeekElement);
// public E poll()
// 获取并移除此队列的头,如果此队列为空,则返回 null
LBQElement headPollElement = lbq.poll();
LOGGER.debug("headPollElement:{}", headPollElement);
// 主线程sleep 1秒,尝试等待LBQPutThread操作完成
// 从输出看:因为从队列进行了移除.所以lbqpt线程的put的等待操作被唤醒,插入了元素4.
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 再次poll,获取并移除了元素4
LBQElement headPollElement2 = lbq.poll();
LOGGER.debug("headPollElement2:{}", headPollElement2);
try {
// 再次element,因为此时队列为空.所以element抛出异常
lbq.element();
} catch (NoSuchElementException e) {
LOGGER.warn("lbq.element().err.", e);
}
// 再次peek,队列为空则返回null
LBQElement headPeekElement2 = lbq.peek();
LOGGER.debug("headPeekElement2:{}", headPeekElement2);
try {
// public E poll(long timeout, TimeUnit unit) throws
// InterruptedException
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
// 超时则返回null
LBQElement headPollTimeoutElement = lbq.poll(3, TimeUnit.SECONDS);
LOGGER.debug("lbq.poll(3, TimeUnit.SECONDS) result:{}",
headPollTimeoutElement);
} catch (InterruptedException e) {
LOGGER.warn("lbq.poll(3, TimeUnit.SECONDS).err", e);
}
try {
// AbstractQueue#public E remove()
// 源码实现:
// 先调用poll方法返回E,如果E不为null则直接返回否则抛出异常:NoSuchElementException
lbq.remove();
} catch (NoSuchElementException e) {
LOGGER.warn("lbq.remove().err.", e);
}
LBQElement removeElement = new LBQElement(5);
lbq.add(removeElement);
// public boolean remove(Object o)
// 遍历队列进行移除指定的元素
boolean removeResult = lbq.remove(removeElement);
// 返回true
LOGGER.debug("removeResult:{}", removeResult);
// 再次执行remove,返回false
boolean removeResult2 = lbq.remove(removeElement);
LOGGER.debug("removeResult2:{}", removeResult2);
// 启动take线程,该线程会一直阻塞,直到有可用元素
LBQTakeThread lbqtt = new LBQTakeThread(lbq);
lbqtt.start();
// 唤醒take线程
lbq.offer(new LBQElement(6));
// 主线程暂停1秒,等待take线程执行完毕
try {
Thread.sleep(1 * 1000L);
} catch (InterruptedException e) {
}
// 未指定容量的lbq2-无界阻塞队列
LinkedBlockingQueue<LBQElement> lbq2 = new LinkedBlockingQueue<LBQElement>();
List<LBQElement> drainList = new ArrayList<LBQElement>();
// public int drainTo(Collection<? super E> c)
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中.
// 1.正在进行此操作时修改指定的 collection,则此操作行为是不确定的
// 2.在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection
// 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现
// 从LinkedBlockingQueue的源码实现中发现:
// 1.其是要首先拿到takeLock.
// 2.if (c == this)
// IllegalArgumentException,即不能指定drainTo的collection为当前队列自身
lbq2.offer(new LBQElement(7));
lbq2.offer(new LBQElement(8));
lbq2.drainTo(drainList);
LOGGER.debug("lbq2.drainList:{}", drainList);
}
/** *//**
*
* LinkedBlocking element
*
* @author landon
*
*/
private static class LBQElement {
private int id;
public LBQElement(int id) {
this.id = id;
}
@Override
public String toString() {
return "LBQElement [id=" + id + "]";
}
}
private static class LBQPutThread extends Thread {
private LinkedBlockingQueue<LBQElement> lbq;
public LBQPutThread(LinkedBlockingQueue<LBQElement> lbq) {
this.lbq = lbq;
}
@Override
public void run() {
try {
// BlockingQueue#void put(E e) throws InterruptedException
// 将指定元素插入队列中,如有必要则等待可用空间
LOGGER.debug("LBQPutThread.lbq.put(new LBQElement(4)) begin.");
lbq.put(new LBQElement(4));
LOGGER.debug("LBQPutThread.lbq.put(new LBQElement(4)) ok(end)");
} catch (InterruptedException e) {
LOGGER.warn("sq.put(new SQElement(2)).err.", e);
}
}
}
private static class LBQTakeThread extends Thread {
private LinkedBlockingQueue<LBQElement> lbq;
public LBQTakeThread(LinkedBlockingQueue<LBQElement> lbq) {
this.lbq = lbq;
}
@Override
public void run() {
try {
LOGGER.debug("LBQTakeThread.lbq.take begin.");
// public E take() throws InterruptedException
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
LBQElement takeElement = lbq.take();
LOGGER.debug("LBQTakeThread.lbq.take end.result:{}",
takeElement);
} catch (InterruptedException e) {
}
}
}
}
posted on 2013-12-31 11:44
landon 阅读(2419)
评论(0) 编辑 收藏 所属分类:
Program