whitesock

BlogJava 首页 新随笔 联系 聚合 管理
  10 Posts :: 0 Stories :: 0 Comments :: 0 Trackbacks

Inside AbstractQueuedSynchronizer (1)

Inside AbstractQueuedSynchronizer (2)

Inside AbstractQueuedSynchronizer (3)

Inside AbstractQueuedSynchronizer (4)

3.4 Template Method

AbstractQueuedSynchronizer提供了以下几个protected方法用于子类改写

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()

这几个方法的默认实现是抛出UnsupportedOperationException,子类可以根据需要进行改写。

AbstractQueuedSynchronizer中最基本的acquire流程的相关代码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果tryAcquire失败,那么当前线程可能会被enqueue到WaitQueue,然后被阻塞。 shouldParkAfterFailedAcquire方法会确保每个线程在被阻塞之前,其对应WaitQueue中的节点的waitStatus被设置为Node.SIGNAL(-1),以便在release时避免不必要的unpark操作。此外shouldParkAfterFailedAcquire还会清理WaitQueue中已经超时或者取消的Node。需要注意的是,在某个线程最终被阻塞之前,tryAcquire可能会被多次调用。

AbstractQueuedSynchronizer中最基本的release流程的相关代码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

release方法中,总是总head节点开始向后查找sucessor。只有当该sucessor的waitStatus被设置的情况下才会调用unparkSuccessor。unparkSuccessor方法中首先清除之前设置的Node.waitStatus,然后向后查找并且唤醒第一个需要被唤醒的sucessor。需要注意的是,if (s == null || s.waitStatus > 0)这个分支中,查找是从tail节点开始,根据prev引用向前进行。在Inside AbstractQueuedSynchronizer (2) 中提到过,Node.next为null并不一定意味着没有sucessor,虽然WaitQueue是个双向链表,但是根据next引用向后查找sucessor不靠谱,而根据prev引用向前查找predecessor总是靠谱。

3.5 Fairness

到目前为止我们已经知道,WaitQueue是个FIFO的队列,唤醒也总是从head开始。但是AbstractQueuedSynchronizer却并不一定是公平的(实际上,大多数情况下都是在非公平模式下工作)。如果在看一遍acquire方法会发现,tryAcquire的调用顺序先于acquireQueued,也就是说后来的线程可能在等待中的线程之前acquire成功。这种场景被称为barging FIFO strategy,它能提供更高的吞吐量。

大多数AbstractQueuedSynchronizer的子类都同时提供了公平和非公平的实现,例如ReentrantLock提供了NonfairSync和FairSync。例如其FairSync的tryAcquire方法如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryAcquire方法返回true的条件之一是!hasQueuedPredecessors() 。hasQueuedPredecessors的代码如下:

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

综上, FairSync优先确保等待中线程先acquire成功。但是公平性也不是绝对的:在一个多线程并发的环境下,就算锁的获取是公平的,也不保证后续的其它处理过程的先后顺序。

既然默认情况下使用的都是NonfairSync,那么FairSync适合什么样的场景呢?如果被锁所保护的代码段的执行时间比较长,而应用又不能接受线程饥饿(NonfairSync可能会导致虽然某个线程长时间排队,但是仍然无法获得锁的情况)的场景下可以考虑使用FairSync。对于ReentrantLock,在其构造函数中传入true,即可构造一把公平锁。

posted on 2012-01-07 23:37 whitesock 阅读(190) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航:
博客园   IT新闻   Chat2DB   C++博客   博问