JDK1.5引入了Doug Lea大神的concurrent框架,其中AbstractQueuedSynchronizer是concurrent框架的基本,从大神的paper中可以看到
1.传统的synchronized不能进行中段,这个不合适
2.如果将concurrent重心放在少数竞争下优化锁,而在其他情况下放任缓慢执行的策略是不正确的
3.需要可预测的维护效率,即使在同步竞争激烈的情况下,理想中无论多少线程试图通过一个同步点的开销应该是恒定的
4.设计的目标是总时间的减少,因为有可能在此之间一个线程可以通过同步点,然后他没有立即执行
5.在高吞吐量的基本上,更重要的是线程的公平调度
AQS设计思路:
原子的管理同步状态;阻塞和解除阻塞线程;保持线程的阻塞队列。
实现1.在CLH queue的基础上进行改造
2.单个int state 计数synchronization状态
队列的每个节点是内部类Node:
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
对于waitStatus>0的node在等待的遍历当中是会被抛弃掉的,而nextWaiter在共享锁和Lock的Condition中会用到
void acquire(int arg)方法
该方法是其他基于AQS实现的具体类获得锁或者信号等的基础实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(int arg) 留给具体子类实现,如果返回false,说明没有得到锁,则调用acquireQueued(final Node node, int arg)方法,在调用此方法前需要将当前线程的node加入队列:
private Node addWaiter(Node mode) {
//关键点1.
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先尝试快速加入到队尾,如果加入队尾失败,说明在此期间,其余的线程入队了,那么真正的执行入队操作:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这是个循环,只到node加入到队列中才返回,当是对队列的第一次插入node时,必须初始化队列,创建一个傀儡header,完成入队操作后执行acquireQueued()方法:
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//关键点2.
if (p == head && tryAcquire(arg)) {
//关键点4.
setHead(node);
p.next = null; // help GC
return interrupted;
}
//关键点3
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
前面的方法可以看出当前thread的对应的node就是参数传入的node,获得node的前驱,如果当前前驱是傀儡header而且子类的
tryAcquire()方法返回为真,那么当前的node变成header
,而且返回node对应的thread的中断状态,如果前面流程没有执行,执行shouldParkAfterFailedAcquire():
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (s > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else
/*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
得到node的前驱prev的waitStatus,当waitStatus>0,那么prev处于退出状态,从队列中去除prev,如果waitStatus<0,说明线程需要park住,如果没有为0,就是新建的,需要设置成
-1,在下一次循环中,如果还不能获得锁那么就需要park住线程,
parkAndCheckInterrupt()就是执行park()线程,然后返回线程中断状态.
acquireQueued()方法是个死循环,只到header之后的一个线程获得锁之后,才会重新设置header,被设置为新header的是header的next node,就这样返回到acquire()中,如果返回的中断状态为true,当前线程中断.
总结下:
关键点1.将线程入队列,不管是否是第一个线程。在和其他线程竞争前,尝试入队一次
关键点2.header只是一个傀儡node,他代表上次获得的线程,在这里只有header的后面一个thread尝试成功了,才会跳出循环,然后其他的线程只能在循环中
关键点3.检查加入的node对应thread是否需要park(),当node的waitStatus>0就是取消的node,不需要park(),其他的需要park().park住的线程就一直阻塞到unpark
关键点4.这里setHead()方法把当前取得锁的node设置为header,使得队列往前走了一步。
boolean release(int arg) 方法
这个方法是unlock()等方法的基础方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()留给子类实现,在队列header处的node不为空而且状态不为初始状态的话(初始为0),需要为这个header所持有的thread unpark
private void unparkSuccessor(Node node) {
/*
* Try to clear status in anticipation of signalling. It is
* OK if this fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor. 这里貌似是cancelAcquire()方法引起从后向前遍历
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在acquire()方法中知道header是个傀儡header,所以这个方法中得到header的next,如果next不为空或者next为取消的node,则从tail开始向前遍历,找到最前面
waitStatus<=0的node,然后unpark node的thread。
void cancelAcquire(Node node)
该方法是一切取消获得的基础实现,代码有些地方需要注意:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness
//前面的操作使得前置改变了 但是prex的后置还是没有变
Node predNext = pred.next;
// Can use unconditional write instead of CAS here
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves
if (node == tail && compareAndSetTail(node, pred)) {
//tail就set null 了
compareAndSetNext(pred, predNext, null);
} else {
// If "active" predecessor found
//
if (pred != head
&& (pred.waitStatus == Node.SIGNAL
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
&& pred.thread != null) {
// If successor is active, set predecessor's next link
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果前面没有阻塞的node,到本node下面的话,需要unpark了
unparkSuccessor(node);
}
node.next = node; // help GC 这个会影响next节点的秩序 还好每次pred.waitStatus>0的检测使得受影响的时间窗口比较小
}
} 共享锁从acquireShared(int arg)开始:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
//队列向后移动一位
setHead(node);
//propagate>0说明,共享数值是大于前面要求的数值的
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
//如果只剩下一个node节点 那么直接unpark是可以的,因为就这个node,propagate数也大于0
//如果是共享的也可以unpark,unpark后还在doAcquireShared循环的,如果发现acquires数值过大,这个线程还是会park住的
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
共享的和独占的在实现上面是类似得,共享实现上,对于获得能成功,只要是子类实现上面能获得成功,如信号量的实现(state的可用量是大于1的),就不用进入队列阻塞。
Condition
condition是服务于单个Lock的,condition.await()等等待方法在Lock上面形成了一个condition的等待队列,condition.singal()方法在Lock上面处理condition的等待队列,
一个一个化解,然后将队列的node加入到AQS的阻塞队列中等待对应的线程被unpark
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//关键点1
int savedState = fullyRelease(node);//关键点2
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//关键点3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//关键点4
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其中的关键点:
关键点1.加入到condition的对应的lock私有的队列中,和AQS的阻塞队列形式相似
关键点2.释放这个condition对应的lock的锁,因为在使用的过程当中,考虑到如果这个wait()方法阻塞住,而lock如果没有释放锁,那么对于其他的线程的node来说肯定是阻塞住的,
因为condition对应的lock获得了锁,肯定在AQS的header处,其他线程肯定是得不到锁阻塞在那里,这样两边都阻塞的话就死锁了,所以这里需要释放对应lock的锁的
关键点3.判断condition是否已经转化成为AQS阻塞队列的一个节点,如果没有park线程阻塞在这里
关键点4.到这一步的话就需要signal()或者signalAll()的方法的执行,说明这个线程已经被unpark,然后运行直到acquireQueued尝试再次获得锁,因为condition对应的lock的锁在关键
点2是被释放了的
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//进入到AQS的阻塞队列中
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
在整个AQS存在两种链表。 一个链表就是整个Sync Node链表,横向链表。另一种链表就是Condition的wait Node链表,相对于Sync node,它属于node节点的一个纵向链表。当纵向列表被single通知后,会进入对应的Sync Node进行排队处理。
最后面上个图理解下condition:
图片转载自:
http://www.goldendoc.org/2011/06/juc_condition/