yuyee

AbstractQueuedSynchronizer看看


API中的解释:为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()getState()getState()setState(int)compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。
此类采用模板模式设计,此类为一个抽象类,但是没抽象方法,每个sync子类需要实现5个受保护的方法

这个5个方法在AbstractQueuedSynchronizer 都抛出throw new UnsupportedOperationException();
AbstractQueuedSynchronizer 中有3个属性:主要声明一个状态和一个wait queue,通过

wait queue中Node 为一个双向链表,需要去理解Node中几个静态字段值的意义,下面为他的源码:
static final class Node {
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node until
         *               transferred. (Use of this value here
         *               has nothing to do with the other uses
         *               of the field, but simplifies mechanics.)
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified only using
         * CAS.
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned once during enqueuing, and
         * nulled out (for sake of GC) when no longer needed.  Upon
         * cancellation, we cannot adjust this field, but can notice
         * status and bypass the node if cancelled.  The enq operation
         * does not assign next field of a predecessor until after
         * attachment, so seeing a null next field does not
         * necessarily mean that node is at end of queue. However, if
         * a next field appears to be null, we can scan prev's from
         * the tail to double-check.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if
         * null.  Use when predecessor cannot be null.
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }


    }
获取锁定调用的方法,其实这个方法是阻塞的:
  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
如果获取不成功则调用如下方法:
   final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//当节点是头节点且独占时才返回
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
意思是如果你的前继节点不是head,而且当你的前继节点状态是Node.SIGNAL时,你这个线程将被park(),直到另外的线程release时,发现head.next是你这个node时,才unpark,你才能继续循环并获取锁
                    interrupted = true;
            }
shouldParkAfterFailedAcquire这个方法删除所有waitStatus>0也就是CANCELLED状态的Node,并设置前继节点为signal
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int s = pred.waitStatus;
        if (s < 0)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;
        if (s > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
   do {
node.prev = pred = pred.prev;
   } while (pred.waitStatus > 0);
   pred.next = node;
}
        else
            /*
             * Indicate that we need a signal, but don't park yet. Caller
             * will need to retry to make sure it cannot acquire before
             * parking.
             */
            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
        return false;
    }


使用LockSupport.park(this),禁用当前线程
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//block
        return Thread.interrupted();
    }
释放锁:
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//unblock
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        /*
         * Try to clear status in anticipation of signalling.  It is
         * OK if this fails or if status is changed by waiting thread.
         */
        compareAndSetWaitStatus(node, Node.SIGNAL, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }


        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
  

看下ReentrantLock锁中sync的实现:
 static abstract class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes this lock instance from a stream.
         * @param s the stream
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
非公平规则下nonfairTryAcquire,获取当前锁的state,通过CAS原子操作设置为1,并将当前线程设置为独占线程,如果当前线程已经拿了锁,则state增加1
公平锁中 有如下判断:
if (isFirst(current) &&//判断头元素
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
在获取锁步骤:
1.调用tryAcquire来获取,如果失败,则进入2
2.调用addWaiter,以独占模式将node放到tail位置
3.调用acquireQueued方法,此方法阻塞,直到node的pre为head,并成功获取锁定,也可能存在阻塞并打断情况
释放锁的步骤:
1.放弃排他锁持有权
2.unpark 节点的下一个blocked节点

公平锁与非公平锁:从代码上看,非公平锁是让当前线程优先独占,而公平锁则是让等待时间最长的线程优先,非公平的可能让其他线程没机会执行,而公平的则可以让等待时间最长的先执行,但是性能上会差点

posted on 2010-11-09 15:30 羔羊 阅读(814) 评论(0)  编辑  收藏 所属分类: concurrent