[本文是我对Java Concurrency In Practice C14.1的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
java类库中包含许多状态依赖的类: 其中的某些方法只有满足特定的前置条件才能继续, 比如BlockingQueue的take方法, 只有队列不为空时take方法才能返回.
状态依赖的操作一般如下:
void blockingAction() {
申请锁
while(前置条件不满足) {
释放锁
重新获取锁
}
执行操作
释放锁
}
BaseBoundedBuffer是一个普通抽象类, 它对put和take方法的实现是有缺陷的: 没有在put方法执行前判断缓冲区是否已满, 也没有在take方法执行之前判断缓冲区是否为空. 其代码如下:
public abstract class BaseBoundedBuffer<V> {
private final V[] buf;
private int tail;
private int head;
private int count;
@SuppressWarnings("unchecked")
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length) {
tail = 0;
}
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length) {
head = 0;
}
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
本文将使用多种方式为doPut和doTake方法增加前置判断.
通知调用方前置条件判断失败
GrumpyBoundedBuffer是BaseBoundedBuffer的子类, 并向外提供put和take方法. 调用put方法时, 如果缓冲区已满, 将抛出BufferFullException异常. 调用take方法时, 如果缓冲区为空, 将抛出BufferEmptyException异常:
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
GrumpyBoundedBuffer实现起来很简单, 但是这样的类很难使用: 调用方需要捕获并处理异常. 例如调用take方法时需要这样:
public void invocaton() {
while (true) {
try {
V item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
// 当抛出BufferEmptyException异常时, 说明buffer为空. 调用方睡眠一段时间后再进行尝试
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
这样的调用方式是令人难以忍受的, 而且sleep的时间SLEEP_GRANULARITY不好确定: 如果设定的太短, 将白白消耗CPU资源. 如果设定的太长, 则程序的响应性不好, 也有可能错过前置条件满足的时刻.
内部处理重试逻辑
既然由调用方处理异常并重试是不可取的, 那么SleepyBoundedBuffer类改为在内部处理重试逻辑:
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
private static final long SLEEP_GRANULARITY = 10;
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
// 释放锁后sleep一段时间再进行重试
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
// 释放锁后sleep一段时间再进行重试
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
SleepyBoundedBuffer相比于GrumpyBoundedBuffer具有很大的进步: 不需要在调用方进行重试. SleepyBoundedBuffer易于使用, 但是sleep的时间仍然不好确定, 需要在响应性和CPU消耗间权衡.
前置条件满足时唤醒线程
BoundedBuffer试着解决SleepyBoundedBuffer中的问题: 当前置条件不满足时将线程挂起, 并等待前置条件满足时由其他线程唤醒, 这样就不需要权衡sleep的时间了:
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
public BoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws InterruptedException {
// 当缓冲区已满时将线程挂起, 等待其他线程唤醒
// 不给唤醒之后再次判断缓冲区是否已满
while (isFull()) {
wait();
}
doPut(v);
// 操作完成后唤醒其他线程
notifyAll();
}
public synchronized V take() throws InterruptedException {
// 当缓冲区为空时将线程挂起, 等待其他线程唤醒
// 被唤醒之后再次判断缓冲区是否为空
while (isEmpty()) {
wait();
}
V v = doTake();
// 操作完成后唤醒其他线程
notifyAll();
return v;
}
}
BoundedBuffer已经比较完美了, 相比于SleepyBoundedBuffer, 其具有更好的响应性, 更高的CPU效率以及更少的上下文切换.