随笔-7  评论-23  文章-0  trackbacks-0
JAVA LOCK总体来说关键要素主要包括3:
1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
2.unsafe.park() unsafe.unpark()
3.单向链表结构或者说存储线程的数据结构

1主要为了保证锁的原子性,相当于一个锁是否正在被使用的标记,并且比较和设置这个标记的操作是原子的(硬件提供的swap和test_and_set指令,单CPU下同一指令的多个指令周期不可中断,SMP中通过锁总线支持上诉两个指令的原子性),这基本等于软件级别所能达到的最高级别隔离。

2主要将未得到锁的线程禁用(park)和唤醒(unpark),也是直接native实现(这几个native方法的实现代码在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是关键代码park的最终实现是和操作系统相关的,比如windows下实现是在os_windows.cpp中,有兴趣的同学可以下载jdk源码查看)。唤醒一个被park()线程主要手段包括以下几种
1.       其他线程调用以被park()线程为参数的unpark(Thread thread).
2.       其他线程中断被park()线程,waiters.peek().interrupt();waiters为存储线程对象的队列.
3.       不知原因的返回。

park()方法返回并不会报告到底是上诉哪种返回,所以返回好最好检查下线程状态,如

LockSupport.park();  //禁用当前线程
If(Thread.interrupted){
   
//doSomething
}

AbstractQueuedSynchronizer(AQS)对于这点实现得相当巧妙,如下所示
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    
final Node node = addWaiter(Node.SHARED);
    
try {
         
for (;;) {
             
final Node p = node.predecessor();
             
if (p == head) {
                 
int r = tryAcquireShared(arg);
                 
if (r >= 0{
                     setHeadAndPropagate(node, r);
                     p.next 
= null// help GC
                     return;
                 }

             }

             
//parkAndCheckInterrupt()会返回park住的线程在被unpark后的线程状态,如果线程中断,跳出循环。
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 
break;
         }

     }
 catch (RuntimeException ex) {
          cancelAcquire(node);
          
throw ex;
     }

    
     
// 只有线程被interrupt后才会走到这里
     cancelAcquire(node);
     
throw new InterruptedException();
}


//在park()住的线程被unpark()后,第一时间返回当前线程是否被打断
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(
this);
    
return Thread.interrupted();
}
3点对于一个Synchronizer的实现非常重要,存储等待线程,并且unlock时唤醒等待线程,这中间有很多工作需要做,唤醒策略,等待线程意外终结处理,公平非公平,可重入不可重入等。


以上简单说明了下
JAVA LOCKS关键要素,现在我们来看下java.util.concurrent.locks大致结构

上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为何图中没有用UML线表示呢,这是每个Lock实现类都持有自己内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为何要实现不同的Sync呢?这和每种Lock用途相关。另外还有AQSState机制。

基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。

ReentrantLock需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为ReetranLock的FairSync的tryAcquire实现代码解析。

//公平获取锁
protected final boolean tryAcquire(int acquires) {
    
final Thread current = Thread.currentThread();
    
int c = getState();
    
//如果当前重进入数为0,说明有机会取得锁
    if (c == 0{
        
//如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁
        if (isFirst(current) &&
            compareAndSetState(
0, acquires)) {
            setExclusiveOwnerThread(current);
            
return true;
        }

    }

    
//如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁
    else if (current == getExclusiveOwnerThread()) {
        
int nextc = c + acquires;
        
if (nextc < 0)
            
throw new Error("Maximum lock count exceeded");
        setState(nextc);
        
return true;
     }

     
//以上条件都不满足,那么线程进入等待队列。
     return false;
}

Semaphore则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现

protected int tryAcquireShared(int acquires) {
    Thread current 
= Thread.currentThread();
    
for (;;) {
         Thread first 
= getFirstQueuedThread();
         
//如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待
         if (first != null && first != current)
              
return -1;
         
//如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值
         int available = getState();
         
int remaining = available - acquires;
         
//如果remining<0,那么反馈给AQS当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,可以继续执行。
         if (remaining < 0 ||compareAndSetState(available, remaining))
             
return remaining;
    }

}

CountDownLatch闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDownsync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。

//await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程
public int tryAcquireShared(int acquires) {
     
return getState() == 0? 1 : -1;
}


//countDown时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。
public boolean tryReleaseShared(int releases) {
    
for (;;) {
         
int c = getState();
         
if (c == 0)
             
return false;
         
int nextc = c-1;
         
if (compareAndSetState(c, nextc))
             
return nextc == 0;
   }

}

FutureTask需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQSacquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,FutureTasktryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。

//get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住
protected int tryAcquireShared(int ignore) {
     
return innerIsDone()? 1 : -1;
}


//判定任务是否完成或者被Cancel
boolean innerIsDone() {
    
return ranOrCancelled(getState()) &&    runner == null;
}


//get时调用,对于CANCEL与其他异常进行抛错
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    
if (!tryAcquireSharedNanos(0,nanosTimeout))
        
throw new TimeoutException();
    
if (getState() == CANCELLED)
        
throw new CancellationException();
    
if (exception != null)
        
throw new ExecutionException(exception);
    
return result;
}


//任务的执行线程执行完毕调用(set(V v))
void innerSet(V v) {
     
for (;;) {
        
int s = getState();
        
//如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)
        if (s == RAN)
            
return;
        
//如果被CANCEL了,那么释放等待线程,并且会抛错
        if (s == CANCELLED) {
            releaseShared(
0);
            
return;
        }

        
//如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现)
        if (compareAndSetState(s, RAN)) {
            result 
= v;
            releaseShared(
0);
            done();
            
return;
        }

     }

}

以上4AQS的使用是比较典型,然而有个问题就是这些状态存在哪里呢?并且是可以计数的。从以上4个example,我们可以很快得到答案,AQS提供给了子类一个int state属性。并且暴露给子类getState()setState()两个方法(protected)。这样就为上述状态解决了存储问题,RetrantLock可以将这个state用于存储当前线程的重进入次数,Semaphore可以用这个state存储许可数,CountDownLatch则可以存储需要被countDown的次数,而Future则可以存储当前任务的执行状态(RUNING,RAN,CANCELL)。其他的Synchronizer存储他们的一些状态。

AQS留给实现者的方法主要有5个方法,其中tryAcquire,tryReleaseisHeldExclusively三个方法为需要独占形式获取的synchronizer实现的,比如线程独占ReetranLockSync,而tryAcquireSharedtryReleasedShared为需要共享形式获取的synchronizer实现。

ReentrantLock内部Sync类实现的是tryAcquire,tryRelease, isHeldExclusively三个方法(因为获取锁的公平性问题,tryAcquire由继承该Sync类的内部类FairSyncNonfairSync实现)Semaphore内部类Sync则实现了tryAcquireSharedtryReleasedShared(CountDownLatch相似,因为公平性问题,tryAcquireShared由其内部类FairSyncNonfairSync实现)CountDownLatch内部类Sync实现了tryAcquireSharedtryReleasedSharedFutureTask内部类Sync也实现了tryAcquireSharedtryReleasedShared

其实使用过一些JAVA synchronizer的之后,然后结合代码,能够很快理解其到底是如何做到各自的特性的,在把握了基本特性,即获取原子状态和释放原子状态,其实我们自己也可以构造synchronizer。如下是一个LOCK API的一个例子,实现了一个先入先出的互斥锁。

public class FIFOMutex {
    
private AtomicBoolean locked=new AtomicBoolean(false);
    
private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
    
    
public void lock(){
        
boolean wasInterrupted=false;
        Thread current
=Thread.currentThread();
        waiters.add(current);
        
        
//如果waiters的第一个等待者不为当前线程,或者当前locked的状态为被占用(true)
        
//那么park住当前线程
        while(waiters.peek()!=current||!locked.compareAndSet(falsetrue)){
            LockSupport.park();
            
            
//当线程被unpark时,第一时间检查当前线程是否被interrupted
            if(Thread.interrupted()){
                wasInterrupted
=true;
            }

        }

        
        
//得到锁后,从等待队列移除当前线程,如果,并且如果当前线程已经被interrupted,
        
//那么再interrupt一下以便供外部响应。
        waiters.remove();
        
if(wasInterrupted){
            current.interrupt();
        }

    }

    
    
//unlock逻辑相对简单,设定当前锁为空闲状态,并且将等待队列中
    
//的第一个等待线程唤醒
    public void unlock(){
        locked.set(
false);
        LockSupport.unpark(waiters.peek());
    }

}

总结,JAVA lock机制对于整个java concurrent包的成员意义重大,了解这个机制对于使用java并发类有着很多的帮助,文章中可能存在着各种错误,请各位多多谅解并且能够提出来,谢谢。

文章参考:JDK 1.6 source
                    java 并发编程实践
                    JDK 1.6 API 文档

 

posted on 2010-09-30 12:05 BucketLI 阅读(13141) 评论(2)  编辑  收藏

评论:
# re: JAVA LOCK代码浅析 2012-10-30 10:32 | mengmeng.zhangmm
非常清晰的讲解了Concurrent的QAS机制和部分并发工具类实现。
谢谢楼主的博文。  回复  更多评论
  
# re: JAVA LOCK代码浅析 2014-10-28 10:43 | zuidaima
java 线程demo教程源代码下载:http://zuidaima.com/share/k%E7%BA%BF%E7%A8%8B-p1-s1.htm  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: