走自己的路

路漫漫其修远兮,吾将上下而求索

  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  50 随笔 :: 4 文章 :: 118 评论 :: 0 Trackbacks

背景

应用项目组每个小时会定时的run一个存储过程进行结算,每次执行的时间也许会超过一个小时,而且需要绝对保证存储过程的串行执行。因为使用内存锁不能绝对保证两个存储过程的串行执行,因为应用服务器down掉重启后可能会出现并发执行的情况,因为先前的存储过程还在db中运行。我们是使用LTS,对quartz进行了封装来做任务调度的。我们决定锁的管理操作由framework来实现。原因是:

l         锁管理器可以做成通用的模块

l         申请锁,释放锁是比较危险的操作,担心业务开发人员由于遗忘导致死锁或者并发问题

l         可以很好的集成到我们现有的framework中,方便地开放给业务开发人员使用

注意:我们极其不推荐使用悲观离线锁,如果冲突出现的概率比较少,可以用其他方法比如乐观离线锁,DB Constraint再通过补偿操作能解决的问题,请不要使用悲观离线锁。

 

原理

PLSQL UL LOCKoracle提供出来给开发人员使用的锁资源,功能和DML锁是类似的,当然我们可以通过DML锁来完成并发控制,select…for update或者自己维护一张锁表,考虑到实现代价,我们打算使用PLSQL UL LOCK。而且Oracle保证了session释放时,UL lock都会被释放。

但是用它时,需要注意到它的DBMS_LOCK.Unique函数它每次都会commit数据。如果是在分布式事务当中,会抛出事务已提交的异常。因为我们使用的是XA resource并且transaction levelglobal的,也就是JTA。为了使得锁的申请和释放不影响分布式业务事务,或者我们是使用非xaresourcelocaltransaction来完成锁操作,或者也可以暂停已有事务,等锁操作完成后resume暂停的分布式事务。考虑到重用已有的xa resource我们打算使用后一种方法,其实这种方法我们也会经常使用,暂停分布式事务做DDL操作,再释放事务。

 

实现方法:

l         封装DBMS_LOCK包中的几个存储过程为我们所用

l         Java端提供一个基于PLSQL UL锁的管理器

l         Java端定义好申请锁,业务操作,释放锁的使用流程,作为一个模板

 

DB存储过程:

DBMS_LOCK做了简单的封装,避免直接调用DBMS_LOCK。这样做的好处是:

l         Oracle解耦,如果其他数据库可以提供类似的功能,我们也可以用同名的存储过程实现

l         方便以后对存储过程重构,升级

l         我们需要对DBMS_LOCK进行简单的封装,因为DBMS_LOCK.Unique获取lockhandle oracle中锁的唯一标识,输入是lockname,逻辑名,输出是锁的唯一标识,对java端应该是透明的,java端应该只关心锁的逻辑名。

create or replace package body frm_lts_processor_lock_pkg is
  
/* table to store lockhandles by name */
   TYPE handle_tbltype IS TABLE OF varchar2(
128)
      INDEX BY varchar2(
128);

   v_lockhandle_tbl handle_tbltype;
  
 procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default
864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
      v_result     number;
      v_lockhandle varchar2(
128);
   begin
   if v_lockhandle_tbl.count =
0 then
      sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
      v_lockhandle_tbl(i_lock_name) := v_lockhandle;
   elsif v_lockhandle_tbl.exists(i_lock_name) then
      dbms_output.put_line(
'atttacked');
      v_lockhandle := v_lockhandle_tbl(i_lock_name);
   else
     dbms_output.put_line(
'new acquire');
     
--acquire a unique lock id
     sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
     v_lockhandle_tbl(i_lock_name) := v_lockhandle;
       
   end if;
   
--acquire a lock
    v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
 
   
--set return values
     o_result := v_result;
 end frm_lts_lock_acquire;
 
 function frm_lts_lock_release(i_lock_name in varchar2) return number as
       v_result number :=
6;
       v_lockhandle varchar2(
128);
 begin
     
--release lock according to lockhandle
      if v_lockhandle_tbl.exists(i_lock_name) then
          v_lockhandle :=  v_lockhandle_tbl(i_lock_name);
          v_result := sys.dbms_lock.release(v_lockhandle);
          v_lockhandle_tbl.delete(i_lock_name);       
      end if;
      return v_result;
 end frm_lts_lock_release;

end frm_lts_processor_lock_pkg;
/

 

 

锁管理器:

其实应用项目组有多个这样的存储过程,而这些存储过程之间的串行执行可以有多个business key来决定的,比如job order numberdelivery order等。所以我们需要给他们提供多锁管理机制。我们会对这多个锁进行排序,以避免死锁,并强烈推荐应用项目设置超时时间。这些business key是由String对象构成的,为了防止大量的业务操作被锁在null或者空string这样没有意义的business key上面,我们对application提供的锁集合还需要进行过滤。

原理还是很简单的,就是在本地事务中调用db端的申请锁,释放锁的存储过程,然后对返回的结果进行一系列处理。

在使用多锁机制的时候要保证,如果只申请到了部分锁,在申请其中另外一个锁时发生了错误或者超时,要能够安全地将已申请的锁释放掉,所以多锁申请需要记录已申请到的锁,并且记录发生的错误,区分timeout和异常。Timeout返回false,如果出现异常记录下来,最后抛出。释放多锁时,不能被中断,记录释放每个锁后的结果,最后判定如果其中一些锁释放时发生了错误,抛出。

 

handleLock定义暂停jta事务,执行锁操作,释放jta事务流程

private Object handleLock(Connection connection,
            LocalTransactionCallback localTransactionCallback)
            
throws LockException {
        TransactionManager tm 
= null;
        Transaction currentTx 
= null;
        Object result 
= null;
        
try {
            Context initialContext 
= new InitialContext();
            UserTransaction userTrx 
= (javax.transaction.UserTransaction) initialContext
                    .lookup(
"java:comp/UserTransaction");
            
if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION)) {
                tm 
= TransactionUtils.getTransactionManager(userTrx);
                
if (tm != null{
                    currentTx 
= tm.suspend();
                }

            }

            result 
= localTransactionCallback
                    .executeInLocalTransaction(connection);

            
if (null != currentTx) {
                tm.resume(currentTx);
            }

        }
 catch (NamingException e) {
        }
 catch (SystemException e) {
        }
 catch (InvalidTransactionException e) {
        }
 catch (IllegalStateException e) {
        }

        
return result;
    }


多锁申请操作是上面流程的一个回调

private class ObtainMutipleLocksLocalTransactionCallback implements
            LocalTransactionCallback 
{
        
private Set<String> lockNames;
        
private int waitTime;

        ObtainMutipleLocksLocalTransactionCallback(Set
<String> lockNames,
                
int waitTime) {
            
this.lockNames = lockNames;
            
this.waitTime = waitTime;
        }

        
public Object executeInLocalTransaction(Connection conn) {
            CallableStatement lockAcquireStmt 
= null;
            Set
<String> obtainedLockNames = new HashSet<String>();
            
boolean timeOut = false;
            String timeOutLockName 
= null;
            Exception mifLockException 
= null;
            
try {
                lockAcquireStmt 
= conn.prepareCall(OBTAIN_LOCK_PROC_CALL);
                
for (String lockName : lockNames) {
                    lockAcquireStmt.setString(
1, lockName);
                    lockAcquireStmt.setInt(
2, LCOK_EXPIRE_TIME);
                    lockAcquireStmt.setInt(
3, waitTime);
                    lockAcquireStmt.registerOutParameter(
4,
                            java.sql.Types.INTEGER);
                    lockAcquireStmt.registerOutParameter(
5,
                            java.sql.Types.VARCHAR);
                    lockAcquireStmt.executeUpdate();
                    
int lockacquireResult = lockAcquireStmt.getInt(4);
                    
if (lockacquireResult == ULLockResultType.SUCCESSFUL) 
                        obtainedLockNames.add(lockName);
                    }
 else if (lockacquireResult == ULLockResultType.TIMEOUT) {
                        timeOut 
= true;
                        timeOutLockName 
= lockName;
                        
break;
                    }
 else if (lockacquireResult != ULLockResultType.ALREADY_OWNED) {
                        String lockResultDesc 
= ULLockResultType
                                .getAcquireTypeDesc(lockacquireResult);
                        LockException lockException 
= new LockException(
                                
"Obtain lock " + lockName
                                        
+ " fails, the reason is "
                                        
+ lockResultDesc + " .");
                        lockException.setLockName(lockName);
                        lockException.setLockHandlingResult(lockResultDesc);
                        
throw lockException;
                    }
 else {
                    }

                }

            }
 catch (Exception ex) {
                mifLockException 
= ex;
            }
 finally {
                
if (null != lockAcquireStmt) {
                    
try {
                        lockAcquireStmt.close();
                    }
 catch (SQLException e) {
                        
// swallow
                    }

                }

            }

            
boolean success = true;
            
if (timeOut || mifLockException != null{
                success 
= false;
            }

            
return new ObtainMultipleLocksResult(success, obtainedLockNames,
                    timeOut, timeOutLockName, mifLockException);
        }

    }


多锁释放操作也是事务暂停流程的一个回调

private class ReleaseMultipleLocksLocalTransactionCallback implements
            LocalTransactionCallback 
{
        
private Set<String> lockNames;

        ReleaseMultipleLocksLocalTransactionCallback(Set
<String> lockNames) {
            
this.lockNames = lockNames;
        }


        
public Object executeInLocalTransaction(Connection conn) {
            CallableStatement lockReleaseStmt 
= null;
            Map
<String, Exception> mifLockErrors = new HashMap<String, Exception>();
            Set
<String> releasedLocks = new HashSet<String>();
            
try {
                
try {
                    lockReleaseStmt 
= conn.prepareCall(RELEASE_LOCK_PROC_CALL);
                }
 catch (Exception ex) {
                    
for (String lockName : lockNames) {
                        mifLockErrors.put(lockName, ex);
                    }

return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
                }


                
for (String lockName : lockNames) {
                    
try {
                        lockReleaseStmt.registerOutParameter(
1,
                                java.sql.Types.INTEGER);
                        lockReleaseStmt.setString(
2, lockName);
                        lockReleaseStmt.executeUpdate();
                        
int lockReleaseResult = lockReleaseStmt.getInt(1);
                        
if (lockReleaseResult == ULLockResultType.SUCCESSFUL) {
                            releasedLocks.add(lockName);
                        }
 else {
                            String lockResultDesc 
= ULLockResultType
                                    .getReleaseTypeDesc(lockReleaseResult);
                            LockException lockException 
= new LockException(
                                    
"Release lock " + lockName
                                            
+ " fails, the reason is "
                                            
+ lockResultDesc + " .");
                            lockException.setLockName(lockName);
                            lockException.setLockHandlingResult(lockResultDesc);
                            mifLockErrors.put(lockName, lockException);
                        }

                    }
 catch (Exception ex) {
                        mifLockErrors.put(lockName, ex);
                    }

                }

            }
 finally {
                
if (null != lockReleaseStmt) {
                    
try {
                        lockReleaseStmt.close();
                    }
 catch (SQLException e) {
                    }

                }

            }

            
boolean success = releasedLocks.size() == this.lockNames.size();
            
return new ReleaseMutipleLocksResult(success, releasedLocks,
                    mifLockErrors);
        }

    }


使用模板:注意锁的释放要写在finally语句块里面,保证锁的释放。

定义好模板,防止Application用户直接调用锁管理器或者滥用锁,忘记释放锁。我们决定定义一个模板,做到锁的申请和释放对application用户来说是透明的,把它做成了隐含锁。

public void execute(JobExecutionContext context)
            
throws JobExecutionException {
        Map jobDataMap 
= context
        .getJobDetail().getJobDataMap();
        Collection
<String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
        Integer waitTimeInteger 
= (Integer) jobDataMap
        .get(LOCK_WAIT_TIME_SECONDS_KEY);
        
int waitTime = MAX_WAITTIME;
        
if (waitTimeInteger != null{
            waitTime 
= waitTimeInteger.intValue();
        }

        Set
<String> uniqueLockKeys = new HashSet<String>(lockKeys);

        
// filter empty keys
        Iterator<String> keyIterator = uniqueLockKeys.iterator();
        
while (keyIterator.hasNext()) {
            String key 
= keyIterator.next();
            
if (StringUtils.isEmptyNoOffset(key)) {
                keyIterator.remove();
            }

        }

        
if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys)) {
            Set
<String> obtainedLockNames = null;
            Connection connection 
= null;
            
try {
                connection 
= DataSource.getConnection();
                ObtainMultipleLocksResult result 
= LOCK_MANAGER.obtainLock(
                        connection, uniqueLockKeys, waitTime);
                obtainedLockNames 
= result.getObtainedLockNames();
                
if (!result.isSuccess()) {
                    
if (result.isTimeout()) {
                      
//do log
                        return;
                    }
 else {
                        JobExecutionException jobException 
= new JobExecutionException(
                                
"Obtain locks failed! "
                                        
+ result.getMifLockException()
                                                .getMessage(), result
                                        .getMifLockException());
                        
throw jobException;
                    }

                }

                
this. executeInLock (context);
            }
 catch (Throwable e) {
                
throw new JobExecutionException(
                        
"Get db connection failed!" + e.getMessage(), e);
            }
 finally {
                
if (null != connection) {
                    
this.releaseLocks(connection, obtainedLockNames);
                    
try {
                        connection.close();
                    }
 catch (SQLException e) {
                        
throw new JobExecutionException(
                                
"close  db connection failed!" + e.getMessage(), e);
                    }

                }

            }

        }
 else {
            
this.executeInLock(context);
        }

    }


 

executeInLockapplication的子类继承实现

 

缓存

l         缓存悲观离线锁

l         缓存lockhandle

因为使用的是悲观离线锁,每次申请锁都要跑一趟db,但如果当前线程已经是lock的所有者就不需要白跑一趟了。可以用ThreadLocal把当前线程已经拥有的锁缓存起来,释放锁时对应的需要清除缓存。

在申请锁时,需要获得UL Lock时的lockhandle,释放锁时也需要提供锁的lockhandle,我们需要将它缓存起来,主要是因为DBMS_LOCK.Unique每次都会commit,会影响性能,这样每次释放锁时就可以直接使用lockhandle了。有两种方法对lockhandle进行缓存,缓存在java端作为实例变量,缓存在plsql包的全局变量中。缓存在java端需要注意的是,lock manager不能作为单例或者享元来使用,否则lock handle的缓存在多jvm之间也存在着并发控制和同步的问题。

源代码:

Java:
ULLock-sources.rar

PLSQL:
lockplsql.rar

参考:

http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm




posted on 2009-07-03 19:24 叱咤红人 阅读(1501) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: