dingfirst

On the Road

  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  8 随笔 :: 2 文章 :: 3 评论 :: 0 Trackbacks
1,使用DataSourceTransactionManager(jdbc事务)
   
   使用DataSourceTransactionManager,只要设置一个dataSource就可以,如下面代码所示
    TransactionDefinition definition = new DefaultTransactionDefinition();
    TransactionStatus status
= transactionManager.getTransaction(definition);
    
try{
        JdbcTemlate.update(..);
        JdbcTemlate
    }
catch(..){
        transactionManager.roolback(status);
    }

    transactionManager.commit(status);

   实现中用到了上一篇文章“spring对jdbc的封装”中对TransactionSynchronizationManager、ConnectionHolde和DataSourceUtilsr的介绍,应用代码中只要在事务操作中connection是通过DataSourceUtils获取,就能保证事务的执行。
   主要类的结构为:
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements InitializingBean   
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager 
   AbstractPlatformTransactionManager提供各种PlatformTransactionManager通用的template method,下面代码中访问级别为protected的方法为DataSourceTransactionManager对jdbc 事务的具体实现,

1.1,getTransaction实现(AbstractPlatformTransactionManager),注意事务的传播类型和TransactionStatus构造函数中实参的定义。

/**
     * This implementation of getTransaction handles propagation behavior.
     * Delegates to doGetTransaction, isExistingTransaction, doBegin.
     * 
@see #doGetTransaction
     * 
@see #isExistingTransaction
     * 
@see #doBegin
     
*/

    
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        Object transaction 
= doGetTransaction();

        
// cache to avoid repeated checks
        boolean debugEnabled = logger.isDebugEnabled();

        
if (debugEnabled) {
            logger.debug(
"Using transaction object [" + transaction + "]");
        }


        
if (definition == null{
            
// use defaults
            definition = new DefaultTransactionDefinition();
        }

                
//如果transaction已经存在(已经调用过TransactionSynchronizationManager.bindResource(),
                
//控制事务传播类型
        if (isExistingTransaction(transaction)) {
                    
/**
                     * Execute non-transactionally, throw an exception if a transaction exists.
                     * Analogous to EJB transaction attribute of the same name.
                     
*/

            
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                
throw new IllegalTransactionStateException(
                        
"Transaction propagation 'never' but existing transaction found");
            }

                        
/**
                         * Execute non-transactionally, suspend the current transaction if one exists.
                         * Analogous to EJB transaction attribute of the same name.
                         
*/

            
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                
if (debugEnabled) {
                    logger.debug(
"Suspending current transaction");
                }

                Object suspendedResources 
= suspend(transaction);
                
boolean newSynchronization = (this.transactionSynchronization == SYNCHRONIZATION_ALWAYS);
                
return newTransactionStatus(
                        
nullfalse, newSynchronization, definition.isReadOnly(), debugEnabled, suspendedResources);
            }

                        
/**
                         * Create a new transaction, suspend the current transaction if one exists.
                         * Analogous to EJB transaction attribute of the same name.
                         
*/

            
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                
if (debugEnabled) {
                    logger.debug(
"Creating new transaction, suspending current one");
                }

                Object suspendedResources 
= suspend(transaction);
                doBegin(transaction, definition);
                
boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
                
return newTransactionStatus(
                        transaction, 
true, newSynchronization, definition.isReadOnly(), debugEnabled, suspendedResources);
            }

                        
/**
                         * Execute within a nested transaction if a current transaction exists,
                         * behave like PROPAGATION_REQUIRED else. There is no analogous feature in EJB.
                         
*/

            
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                
if (!isNestedTransactionAllowed()) {
                    
throw new NestedTransactionNotSupportedException(
                            
"Transaction manager does not allow nested transactions by default - " +
                            
"specify 'nestedTransactionAllowed' property with value 'true'");
                }

                
if (debugEnabled) {
                    logger.debug(
"Creating nested transaction");
                }

                
boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status 
= newTransactionStatus(
                        transaction, 
true, newSynchronization, definition.isReadOnly(), debugEnabled, null);
                
try {
                    
if (useSavepointForNestedTransaction()) {
                        status.createAndHoldSavepoint();
                    }

                    
else {
                        doBegin(transaction, definition);
                    }

                    
return status;
                }

                
catch (NestedTransactionNotSupportedException ex) {
                    
if (status.isNewSynchronization()) {
                        TransactionSynchronizationManager.clearSynchronization();
                    }

                    
throw ex;
                }

            }

                        
//只要支持事务,Participating in existing transaction
            else {
                
if (debugEnabled) {
                    logger.debug(
"Participating in existing transaction");
                }

                
boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
                
return newTransactionStatus(
                        transaction, 
false, newSynchronization, definition.isReadOnly(), debugEnabled, null);
            }

        }


        
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

                
/**
                 * Support a current transaction, throw an exception if none exists.
                 * Analogous to EJB transaction attribute of the same name.
                 
*/

        
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            
throw new IllegalTransactionStateException(
                    
"Transaction propagation 'mandatory' but no existing transaction found");
        }

               
//Creating new transaction,required,nested,requires new
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() 
== TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() 
== TransactionDefinition.PROPAGATION_NESTED) {
            
if (debugEnabled) {
                logger.debug(
"Creating new transaction");
            }

            doBegin(transaction, definition);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            
boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
            
return newTransactionStatus(
                    transaction, 
true, newSynchronization, definition.isReadOnly(), debugEnabled, null);
        }

        
else {
            
// "empty" (-> no) transaction
            boolean newSynchronization = (this.transactionSynchronization == SYNCHRONIZATION_ALWAYS);
            
return newTransactionStatus(
                    
nullfalse, newSynchronization, definition.isReadOnly(), debugEnabled, null);
        }

    }

   获取ConnectionHolder的方法与JdbcTemplate中是一致的,保证同一线程获取的connection是同一个,使事务顺利执行。
protected Object doGetTransaction() {
        DataSourceTransactionObject txObject 
= new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder 
=
            (ConnectionHolder) TransactionSynchronizationManager.getResource(
this.dataSource);
        txObject.setConnectionHolder(conHolder);
        
return txObject;
    }

protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject 
= (DataSourceTransactionObject) transaction;
        
// Consider a pre-bound connection as transaction.
        return (txObject.getConnectionHolder() != null);
    }
   
   以下两个方法用于构建特定的TransactionStatus
/**
     * Create a new TransactionStatus for the given arguments,
     * initializing transaction synchronization if appropriate.
     
*/

    
private DefaultTransactionStatus newTransactionStatus(
            Object transaction, 
boolean newTransaction, boolean newSynchronization,
            
boolean readOnly, boolean debug, Object suspendedResources) {

        
boolean actualNewSynchronization = newSynchronization &&
                
!TransactionSynchronizationManager.isSynchronizationActive();
        
if (actualNewSynchronization) {
            TransactionSynchronizationManager.initSynchronization();
        }

        
return new DefaultTransactionStatus(
                transaction, newTransaction, actualNewSynchronization, readOnly, debug, suspendedResources);
    }
  
   默认TransactionStatus的构造函数
/**
     * Create a new TransactionStatus instance.
     * 
@param transaction underlying transaction object that can hold
     * state for the internal transaction implementation
     * 
@param newTransaction if the transaction is new,
     * else participating in an existing transaction
     * 
@param newSynchronization if a new transaction synchronization
     * has been opened for the given transaction
     * 
@param debug should debug logging be enabled for the handling of this transaction?
     * Caching it in here can prevent repeated calls to ask the logging system whether
     * debug logging should be enabled.
     
*/

    
public DefaultTransactionStatus(
        Object transaction, 
boolean newTransaction, boolean newSynchronization,
        
boolean readOnly, boolean debug, Object suspendedResources) {

        
this.transaction = transaction;
        
this.newTransaction = newTransaction;
        
this.newSynchronization = newSynchronization;
        
this.readOnly = readOnly;
        
this.debug = debug;
        
this.suspendedResources = suspendedResources;
    }

1.2,commit实现(AbstractPlatformTransactionManager)

/**
     * This implementation of commit handles participating in existing
     * transactions and programmatic rollback requests.
     * Delegates to isRollbackOnly, doCommit and rollback.
     * 
@see org.springframework.transaction.TransactionStatus#isRollbackOnly
     * 
@see #doCommit
     * 
@see #rollback
     
*/

    
public final void commit(TransactionStatus status) throws TransactionException {
        DefaultTransactionStatus defStatus 
= (DefaultTransactionStatus) status;
        
if (defStatus.isCompleted()) {
            
throw new IllegalTransactionStateException(
                    
"Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        
if (status.isRollbackOnly()) {
            
if (defStatus.isDebug()) {
                logger.debug(
"Transactional code has requested rollback");
            }

            rollback(status);
        }

        
else {
            
try {
                
boolean beforeCompletionInvoked = false;
                
try {
                    triggerBeforeCommit(defStatus);
                    triggerBeforeCompletion(defStatus, 
null);
                    beforeCompletionInvoked 
= true;
                    
if (defStatus.hasSavepoint()) {
                        
if (defStatus.isDebug()) {
                            logger.debug(
"Releasing transaction savepoint");
                        }

                        defStatus.releaseHeldSavepoint();
                    }

                    
else if (status.isNewTransaction()) {
                        logger.debug(
"Initiating transaction commit");
                        doCommit(defStatus);
                    }

                }

                
catch (UnexpectedRollbackException ex) {
                    
// can only be caused by doCommit
                    triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_ROLLED_BACK, ex);
                    
throw ex;
                }

                
catch (TransactionException ex) {
                    
// can only be caused by doCommit
                    if (isRollbackOnCommitFailure()) {
                        doRollbackOnCommitException(defStatus, ex);
                    }

                    
else {
                        triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_UNKNOWN, ex);
                    }

                    
throw ex;
                }

                
catch (RuntimeException ex) {
                    
if (!beforeCompletionInvoked) {
                        triggerBeforeCompletion(defStatus, ex);
                    }

                    doRollbackOnCommitException(defStatus, ex);
                    
throw ex;
                }

                
catch (Error err) {
                    
if (!beforeCompletionInvoked) {
                        triggerBeforeCompletion(defStatus, err);
                    }

                    doRollbackOnCommitException(defStatus, err);
                    
throw err;
                }

                triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_COMMITTED, 
null);
            }

            
finally {
                cleanupAfterCompletion(defStatus);
            }

        }

    }

protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject 
= (DataSourceTransactionObject) status.getTransaction();
        Connection con 
= txObject.getConnectionHolder().getConnection();
        
if (status.isDebug()) {
            logger.debug(
"Committing JDBC transaction on connection [" + con + "]");
        }

        
try {
            con.commit();
        }

        
catch (SQLException ex) {
            
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }

    }

1.3,rollback实现(AbstractPlatformTransactionManager)

/**
     * This implementation of rollback handles participating in existing
     * transactions. Delegates to doRollback and doSetRollbackOnly.
     * 
@see #doRollback
     * 
@see #doSetRollbackOnly
     
*/

    
public final void rollback(TransactionStatus status) throws TransactionException {
        DefaultTransactionStatus defStatus 
= (DefaultTransactionStatus) status;
        
if (defStatus.isCompleted()) {
            
throw new IllegalTransactionStateException(
                    
"Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        
try {
            
try {
                triggerBeforeCompletion(defStatus, 
null);
                
if (defStatus.hasSavepoint()) {
                    
if (defStatus.isDebug()) {
                        logger.debug(
"Rolling back transaction to savepoint");
                    }

                    defStatus.rollbackToHeldSavepoint();
                }

                
else if (status.isNewTransaction()) {
                    logger.debug(
"Initiating transaction rollback");
                    doRollback(defStatus);
                }

                
else if (defStatus.getTransaction() != null{
                    
if (defStatus.isDebug()) {
                        logger.debug(
"Setting existing transaction rollback-only");
                    }

                    doSetRollbackOnly(defStatus);
                }

                
else {
                    logger.warn(
"Should roll back transaction but cannot - no transaction available");
                }

            }

            
catch (RuntimeException ex) {
                triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_UNKNOWN, ex);
                
throw ex;
            }

            
catch (Error err) {
                triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_UNKNOWN, err);
                
throw err;
            }

            triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_ROLLED_BACK, 
null);
        }

        
finally {
            cleanupAfterCompletion(defStatus);
        }

    }

protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject 
= (DataSourceTransactionObject) status.getTransaction();
        Connection con 
= txObject.getConnectionHolder().getConnection();
        
if (status.isDebug()) {
            logger.debug(
"Rolling back JDBC transaction on connection [" + con + "]");
        }

        
try {
            con.rollback();
        }

        
catch (SQLException ex) {
            
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }

    }

2,使用TransactionTemplate

   TransactionTemplate封装了一个回调接口实现类来执行具体的事务代码,确保了正确的事务初始化和事务关闭过程,避免了重复编写事务处理流程的问题。
   如果回调处理代码抛出一个运行期异常,要么对于回调方法调用transactionStatus.setRollbackOnly()设置事务类型为只允许会滚,那么TransactionTemplate执行事务会滚,否则事务自动提交。
   使用方法如下:
TransactionTemplate transactionTemplate=new TransactionTemplate(transactionStatus);
    transactionTemplate.execute(
        
new TransactionCallback(){
            
public Object doInTransaction (TransactionStatus status){
                
//operations
            }
                
        }

    }

封装代码如下所示:
/**
     * Execute the action specified by the given callback object within a transaction.
     * <p>Allows for returning a result object created within the transaction, i.e.
     * a domain object or a collection of domain objects. A RuntimeException thrown
     * by the callback is treated as application exception that enforces a rollback.
     * An exception gets propagated to the caller of the template.
     * 
@param action callback object that specifies the transactional action
     * 
@return a result object returned by the callback, or null
     * 
@throws TransactionException in case of initialization, rollback, or system errors
     
*/

    
public Object execute(TransactionCallback action) throws TransactionException {
        TransactionStatus status 
= this.transactionManager.getTransaction(this);
        Object result 
= null;
        
try {
            result 
= action.doInTransaction(status);
        }

        
catch (RuntimeException ex) {
            
// transactional code threw application exception -> rollback
            rollbackOnException(status, ex);
            
throw ex;
        }

        
catch (Error err) {
            
// transactional code threw error -> rollback
            rollbackOnException(status, err);
            
throw err;
        }

        
this.transactionManager.commit(status);
        
return result;
    }


    
/**
     * Perform a rollback, handling rollback exceptions properly.
     * 
@param status object representing the transaction
     * 
@param ex the thrown application exception or error
     * 
@throws TransactionException in case of a rollback error
     
*/

    
private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
        
if (logger.isDebugEnabled()) {
            logger.debug(
"Initiating transaction rollback on application exception", ex);
        }

        
try {
            
this.transactionManager.rollback(status);
        }

        
catch (RuntimeException ex2) {
            logger.error(
"Application exception overridden by rollback exception", ex);
            
throw ex2;
        }

        
catch (Error err) {
            logger.error(
"Application exception overridden by rollback error", ex);
            
throw err;
        }

    }

另外需要注意的一点是,只要事务爆出异常,则认为是无法恢复的,所以都为非受控异常。
posted on 2006-07-13 14:19 dingfirst 阅读(1817) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: