whitesock

BlogJava 首页 新随笔 联系 聚合 管理
  10 Posts :: 0 Stories :: 0 Comments :: 0 Trackbacks

Inside AbstractQueuedSynchronizer (1)

Inside AbstractQueuedSynchronizer (2)

Inside AbstractQueuedSynchronizer (3)

Inside AbstractQueuedSynchronizer (4)

3 AbstractQueuedSynchronizer

3.1 Inheritance

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer继承自Object并且实现了Serializable接口,它只有一个成员变量private transient Thread exclusiveOwnerThread以及对应的getter/setter方法。该成员变量用于保存当前拥有排他访问权的线程。需要注意的是,该成员变量没有用volatile关键字修饰。


3.2 State

AbstractQueuedSynchronizer用一个int(private volatile int state)来保存同步状态,以及对应的getter/setter/compareAndSetState方法。Java6新增了一个AbstractQueuedLongSynchronizer,它用一个long来保存同步状态,貌似目前没有被java.util.concurrent中的其它synchronizer所使用。对于ReentrantLock,state为0则意味着锁没有被任何线程持有;否则state保存了持有锁的线程的重入次数。

3.3 WaitQueue

WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。

标准的CLH lock queue通常被用来实现spin lock,它通过TheadLocal变量pred引用队列中的前一个节点(Node本身没有指向前后节点的引用),以下是标准的CLH lock queue的一个参考实现:

public class ClhSpinLock {
    private final ThreadLocal<Node> pred;
    private final ThreadLocal<Node> node;
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

    public ClhSpinLock() {
        this.node = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return new Node();
            }
        };

        this.pred = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return null;
            }
        };
    }

    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        Node pred = this.tail.getAndSet(node);
        this.pred.set(pred);
        while (pred.locked) {}
    }

    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.pred.get());
    }

    private static class Node {
        private volatile boolean locked;
    }
}

其逻辑并不复杂:对于lock操作,只需要通过一个CAS操作即可将当前线程对应的节点加入到队列中,并且同时获得了predecessor节点的引用,然后就是等待predecessor释放锁;对于unlock操作,只需要将当前线程对应节点的locked成员变量设置为false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node对象,这是对GC友好的一个优化。如果不考虑这个优化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解决了cache-coherence traffic的问题:每个线程在busy loop的时候,并没有竞争同一个状态,而是只判断其对应predecessor的锁定状态。如果你担心false sharing问题,那么可以考虑将锁定状态padding到cache line的长度。此外,CLH spin lock通过FIFO的队列保证了锁竞争的公平性。

AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞:在Inside AbstractQueuedSynchronizer (1) 中提到过,AbstractQueuedSynchronizer通过LockSupport实现了线程的block/unblock,因此需要通过successor引用找到后续的线程并将其唤醒(CLH spin lock因为是spin,所以不需要显式地唤醒)。此外,Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法。需要注意的是,虽然AbstractQueuedSynchronizer在绝大多数情况下是通过LockSupport进行线程的阻塞/唤醒,但是在特定情况下也会使用spin lock,static final long spinForTimeoutThreshold = 1000L这个静态变量设定了使用spin lock的一个阈值。

对WaitQueue进行enqueue操作相关的代码如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter方法中的那个if分支,其实是一种优化,如果失败那么会调用enq方法进行enqueue。需要注意的是,以上代码中对next引用的设定是在enqueue成功之后进行的。这样做虽然没有并发问题,但是在判断一个node是否有sucessor时,不能仅仅通过next == null来判断,因为enqueue和设置next引用这两个步骤不是一个原子操作。

对WaitQueue进行dequeue操作相关的代码如下:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

setHead方法没有用到锁,也没有使用CAS,这样没有并发问题?没有,因为这个方法只会被持有锁的线程所调用,此时只需要将head指向持有锁的线程对应的node即可。


posted on 2012-01-07 17:54 whitesock 阅读(328) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: