背景
应用项目组每个小时会定时的run一个存储过程进行结算,每次执行的时间也许会超过一个小时,而且需要绝对保证存储过程的串行执行。因为使用内存锁不能绝对保证两个存储过程的串行执行,因为应用服务器down掉重启后可能会出现并发执行的情况,因为先前的存储过程还在db中运行。我们是使用LTS,对quartz进行了封装来做任务调度的。我们决定锁的管理操作由framework来实现。原因是:
l 锁管理器可以做成通用的模块
l 申请锁,释放锁是比较危险的操作,担心业务开发人员由于遗忘导致死锁或者并发问题
l 可以很好的集成到我们现有的framework中,方便地开放给业务开发人员使用
注意:我们极其不推荐使用悲观离线锁,如果冲突出现的概率比较少,可以用其他方法比如乐观离线锁,DB Constraint再通过补偿操作能解决的问题,请不要使用悲观离线锁。
原理
PLSQL UL LOCK是oracle提供出来给开发人员使用的锁资源,功能和DML锁是类似的,当然我们可以通过DML锁来完成并发控制,select…for update或者自己维护一张锁表,考虑到实现代价,我们打算使用PLSQL UL LOCK。而且Oracle保证了session释放时,UL lock都会被释放。
但是用它时,需要注意到它的DBMS_LOCK.Unique函数它每次都会commit数据。如果是在分布式事务当中,会抛出事务已提交的异常。因为我们使用的是XA resource并且transaction level是global的,也就是JTA。为了使得锁的申请和释放不影响分布式业务事务,或者我们是使用非xa的resource和local的transaction来完成锁操作,或者也可以暂停已有事务,等锁操作完成后resume暂停的分布式事务。考虑到重用已有的xa resource我们打算使用后一种方法,其实这种方法我们也会经常使用,暂停分布式事务做DDL操作,再释放事务。
实现方法:
l 封装DBMS_LOCK包中的几个存储过程为我们所用
l Java端提供一个基于PLSQL UL锁的管理器
l Java端定义好申请锁,业务操作,释放锁的使用流程,作为一个模板
DB存储过程:
对DBMS_LOCK做了简单的封装,避免直接调用DBMS_LOCK。这样做的好处是:
l 和Oracle解耦,如果其他数据库可以提供类似的功能,我们也可以用同名的存储过程实现
l 方便以后对存储过程重构,升级
l 我们需要对DBMS_LOCK进行简单的封装,因为DBMS_LOCK.Unique获取lockhandle oracle中锁的唯一标识,输入是lockname,逻辑名,输出是锁的唯一标识,对java端应该是透明的,java端应该只关心锁的逻辑名。
create or replace package body frm_lts_processor_lock_pkg is
/* table to store lockhandles by name */
TYPE handle_tbltype IS TABLE OF varchar2(128)
INDEX BY varchar2(128);
v_lockhandle_tbl handle_tbltype;
procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default 864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
v_result number;
v_lockhandle varchar2(128);
begin
if v_lockhandle_tbl.count = 0 then
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
elsif v_lockhandle_tbl.exists(i_lock_name) then
dbms_output.put_line('atttacked');
v_lockhandle := v_lockhandle_tbl(i_lock_name);
else
dbms_output.put_line('new acquire');
--acquire a unique lock id
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
end if;
--acquire a lock
v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
--set return values
o_result := v_result;
end frm_lts_lock_acquire;
function frm_lts_lock_release(i_lock_name in varchar2) return number as
v_result number := 6;
v_lockhandle varchar2(128);
begin
--release lock according to lockhandle
if v_lockhandle_tbl.exists(i_lock_name) then
v_lockhandle := v_lockhandle_tbl(i_lock_name);
v_result := sys.dbms_lock.release(v_lockhandle);
v_lockhandle_tbl.delete(i_lock_name);
end if;
return v_result;
end frm_lts_lock_release;
end frm_lts_processor_lock_pkg;
/
|
锁管理器:
其实应用项目组有多个这样的存储过程,而这些存储过程之间的串行执行可以有多个business key来决定的,比如job order number,delivery order等。所以我们需要给他们提供多锁管理机制。我们会对这多个锁进行排序,以避免死锁,并强烈推荐应用项目设置超时时间。这些business key是由String对象构成的,为了防止大量的业务操作被锁在null或者空string这样没有意义的business key上面,我们对application提供的锁集合还需要进行过滤。
原理还是很简单的,就是在本地事务中调用db端的申请锁,释放锁的存储过程,然后对返回的结果进行一系列处理。
在使用多锁机制的时候要保证,如果只申请到了部分锁,在申请其中另外一个锁时发生了错误或者超时,要能够安全地将已申请的锁释放掉,所以多锁申请需要记录已申请到的锁,并且记录发生的错误,区分timeout和异常。Timeout返回false,如果出现异常记录下来,最后抛出。释放多锁时,不能被中断,记录释放每个锁后的结果,最后判定如果其中一些锁释放时发生了错误,抛出。
handleLock定义暂停jta事务,执行锁操作,释放jta事务流程
private Object handleLock(Connection connection,
LocalTransactionCallback localTransactionCallback)
throws LockException {
TransactionManager tm = null;
Transaction currentTx = null;
Object result = null;
try {
Context initialContext = new InitialContext();
UserTransaction userTrx = (javax.transaction.UserTransaction) initialContext
.lookup("java:comp/UserTransaction");
if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION)) {
tm = TransactionUtils.getTransactionManager(userTrx);
if (tm != null) {
currentTx = tm.suspend();
}
}
result = localTransactionCallback
.executeInLocalTransaction(connection);
if (null != currentTx) {
tm.resume(currentTx);
}
} catch (NamingException e) {
} catch (SystemException e) {
} catch (InvalidTransactionException e) {
} catch (IllegalStateException e) {
}
return result;
}
多锁申请操作是上面流程的一个回调
private class ObtainMutipleLocksLocalTransactionCallback implements
LocalTransactionCallback {
private Set<String> lockNames;
private int waitTime;
ObtainMutipleLocksLocalTransactionCallback(Set<String> lockNames,
int waitTime) {
this.lockNames = lockNames;
this.waitTime = waitTime;
}
public Object executeInLocalTransaction(Connection conn) {
CallableStatement lockAcquireStmt = null;
Set<String> obtainedLockNames = new HashSet<String>();
boolean timeOut = false;
String timeOutLockName = null;
Exception mifLockException = null;
try {
lockAcquireStmt = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);
for (String lockName : lockNames) {
lockAcquireStmt.setString(1, lockName);
lockAcquireStmt.setInt(2, LCOK_EXPIRE_TIME);
lockAcquireStmt.setInt(3, waitTime);
lockAcquireStmt.registerOutParameter(4,
java.sql.Types.INTEGER);
lockAcquireStmt.registerOutParameter(5,
java.sql.Types.VARCHAR);
lockAcquireStmt.executeUpdate();
int lockacquireResult = lockAcquireStmt.getInt(4);
if (lockacquireResult == ULLockResultType.SUCCESSFUL)
obtainedLockNames.add(lockName);
} else if (lockacquireResult == ULLockResultType.TIMEOUT) {
timeOut = true;
timeOutLockName = lockName;
break;
} else if (lockacquireResult != ULLockResultType.ALREADY_OWNED) {
String lockResultDesc = ULLockResultType
.getAcquireTypeDesc(lockacquireResult);
LockException lockException = new LockException(
"Obtain lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
throw lockException;
} else {
}
}
} catch (Exception ex) {
mifLockException = ex;
} finally {
if (null != lockAcquireStmt) {
try {
lockAcquireStmt.close();
} catch (SQLException e) {
// swallow
}
}
}
boolean success = true;
if (timeOut || mifLockException != null) {
success = false;
}
return new ObtainMultipleLocksResult(success, obtainedLockNames,
timeOut, timeOutLockName, mifLockException);
}
}
多锁释放操作也是事务暂停流程的一个回调
private class ReleaseMultipleLocksLocalTransactionCallback implements
LocalTransactionCallback {
private Set<String> lockNames;
ReleaseMultipleLocksLocalTransactionCallback(Set<String> lockNames) {
this.lockNames = lockNames;
}
public Object executeInLocalTransaction(Connection conn) {
CallableStatement lockReleaseStmt = null;
Map<String, Exception> mifLockErrors = new HashMap<String, Exception>();
Set<String> releasedLocks = new HashSet<String>();
try {
try {
lockReleaseStmt = conn.prepareCall(RELEASE_LOCK_PROC_CALL);
} catch (Exception ex) {
for (String lockName : lockNames) {
mifLockErrors.put(lockName, ex);
}
return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
}
for (String lockName : lockNames) {
try {
lockReleaseStmt.registerOutParameter(1,
java.sql.Types.INTEGER);
lockReleaseStmt.setString(2, lockName);
lockReleaseStmt.executeUpdate();
int lockReleaseResult = lockReleaseStmt.getInt(1);
if (lockReleaseResult == ULLockResultType.SUCCESSFUL) {
releasedLocks.add(lockName);
} else {
String lockResultDesc = ULLockResultType
.getReleaseTypeDesc(lockReleaseResult);
LockException lockException = new LockException(
"Release lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
mifLockErrors.put(lockName, lockException);
}
} catch (Exception ex) {
mifLockErrors.put(lockName, ex);
}
}
} finally {
if (null != lockReleaseStmt) {
try {
lockReleaseStmt.close();
} catch (SQLException e) {
}
}
}
boolean success = releasedLocks.size() == this.lockNames.size();
return new ReleaseMutipleLocksResult(success, releasedLocks,
mifLockErrors);
}
}
使用模板:注意锁的释放要写在finally语句块里面,保证锁的释放。
定义好模板,防止Application用户直接调用锁管理器或者滥用锁,忘记释放锁。我们决定定义一个模板,做到锁的申请和释放对application用户来说是透明的,把它做成了隐含锁。
public void execute(JobExecutionContext context)
throws JobExecutionException {
Map jobDataMap = context
.getJobDetail().getJobDataMap();
Collection<String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
Integer waitTimeInteger = (Integer) jobDataMap
.get(LOCK_WAIT_TIME_SECONDS_KEY);
int waitTime = MAX_WAITTIME;
if (waitTimeInteger != null) {
waitTime = waitTimeInteger.intValue();
}
Set<String> uniqueLockKeys = new HashSet<String>(lockKeys);
// filter empty keys
Iterator<String> keyIterator = uniqueLockKeys.iterator();
while (keyIterator.hasNext()) {
String key = keyIterator.next();
if (StringUtils.isEmptyNoOffset(key)) {
keyIterator.remove();
}
}
if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys)) {
Set<String> obtainedLockNames = null;
Connection connection = null;
try {
connection = DataSource.getConnection();
ObtainMultipleLocksResult result = LOCK_MANAGER.obtainLock(
connection, uniqueLockKeys, waitTime);
obtainedLockNames = result.getObtainedLockNames();
if (!result.isSuccess()) {
if (result.isTimeout()) {
//do log
return;
} else {
JobExecutionException jobException = new JobExecutionException(
"Obtain locks failed! "
+ result.getMifLockException()
.getMessage(), result
.getMifLockException());
throw jobException;
}
}
this. executeInLock (context);
} catch (Throwable e) {
throw new JobExecutionException(
"Get db connection failed!" + e.getMessage(), e);
} finally {
if (null != connection) {
this.releaseLocks(connection, obtainedLockNames);
try {
connection.close();
} catch (SQLException e) {
throw new JobExecutionException(
"close db connection failed!" + e.getMessage(), e);
}
}
}
} else {
this.executeInLock(context);
}
}
executeInLock由application的子类继承实现
缓存
l 缓存悲观离线锁
l 缓存lockhandle
因为使用的是悲观离线锁,每次申请锁都要跑一趟db,但如果当前线程已经是lock的所有者就不需要白跑一趟了。可以用ThreadLocal把当前线程已经拥有的锁缓存起来,释放锁时对应的需要清除缓存。
在申请锁时,需要获得UL Lock时的lockhandle,释放锁时也需要提供锁的lockhandle,我们需要将它缓存起来,主要是因为DBMS_LOCK.Unique每次都会commit,会影响性能,这样每次释放锁时就可以直接使用lockhandle了。有两种方法对lockhandle进行缓存,缓存在java端作为实例变量,缓存在plsql包的全局变量中。缓存在java端需要注意的是,lock manager不能作为单例或者享元来使用,否则lock handle的缓存在多jvm之间也存在着并发控制和同步的问题。
源代码:
Java:
ULLock-sources.rar
PLSQL:
lockplsql.rar
参考:
http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm