3 AbstractQueuedSynchronizer
3.1 Inheritance
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer继承自Object并且实现了Serializable接口,它只有一个成员变量private transient Thread exclusiveOwnerThread以及对应的getter/setter方法。该成员变量用于保存当前拥有排他访问权的线程。需要注意的是,该成员变量没有用volatile关键字修饰。
3.2 State
AbstractQueuedSynchronizer用一个int(private volatile int state)来保存同步状态,以及对应的getter/setter/compareAndSetState方法。Java6新增了一个AbstractQueuedLongSynchronizer,它用一个long来保存同步状态,貌似目前没有被java.util.concurrent中的其它synchronizer所使用。对于ReentrantLock,state为0则意味着锁没有被任何线程持有;否则state保存了持有锁的线程的重入次数。
3.3 WaitQueue
WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。
标准的CLH lock queue通常被用来实现spin lock,它通过TheadLocal变量pred引用队列中的前一个节点(Node本身没有指向前后节点的引用),以下是标准的CLH lock queue的一个参考实现:
public class ClhSpinLock {
private final ThreadLocal<Node> pred;
private final ThreadLocal<Node> node;
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
public ClhSpinLock() {
this.node = new ThreadLocal<Node>() {
protected Node initialValue() {
return new Node();
}
};
this.pred = new ThreadLocal<Node>() {
protected Node initialValue() {
return null;
}
};
}
public void lock() {
final Node node = this.node.get();
node.locked = true;
Node pred = this.tail.getAndSet(node);
this.pred.set(pred);
while (pred.locked) {}
}
public void unlock() {
final Node node = this.node.get();
node.locked = false;
this.node.set(this.pred.get());
}
private static class Node {
private volatile boolean locked;
}
}
其逻辑并不复杂:对于lock操作,只需要通过一个CAS操作即可将当前线程对应的节点加入到队列中,并且同时获得了predecessor节点的引用,然后就是等待predecessor释放锁;对于unlock操作,只需要将当前线程对应节点的locked成员变量设置为false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node对象,这是对GC友好的一个优化。如果不考虑这个优化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解决了cache-coherence traffic的问题:每个线程在busy loop的时候,并没有竞争同一个状态,而是只判断其对应predecessor的锁定状态。如果你担心false sharing问题,那么可以考虑将锁定状态padding到cache line的长度。此外,CLH spin lock通过FIFO的队列保证了锁竞争的公平性。
AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞:在Inside AbstractQueuedSynchronizer (1)
中提到过,AbstractQueuedSynchronizer通过LockSupport实现了线程的block/unblock,因此需要通过successor引用找到后续的线程并将其唤醒(CLH spin lock因为是spin,所以不需要显式地唤醒)。此外,Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法。需要注意的是,虽然AbstractQueuedSynchronizer在绝大多数情况下是通过LockSupport进行线程的阻塞/唤醒,但是在特定情况下也会使用spin lock,static final long spinForTimeoutThreshold = 1000L这个静态变量设定了使用spin lock的一个阈值。
对WaitQueue进行enqueue操作相关的代码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter方法中的那个if分支,其实是一种优化,如果失败那么会调用enq方法进行enqueue。需要注意的是,以上代码中对next引用的设定是在enqueue成功之后进行的。这样做虽然没有并发问题,但是在判断一个node是否有sucessor时,不能仅仅通过next == null来判断,因为enqueue和设置next引用这两个步骤不是一个原子操作。
对WaitQueue进行dequeue操作相关的代码如下:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
setHead方法没有用到锁,也没有使用CAS,这样没有并发问题?没有,因为这个方法只会被持有锁的线程所调用,此时只需要将head指向持有锁的线程对应的node即可。