posts - 42,comments - 83,trackbacks - 0
        今天有人提出了一个诡异的要求,要求在全局事务中执行多线程操作。他们全局事务中涉及两个数据库中的多个表,如果单线程那么走完,相应时间上不满足要求,说白了就是比较慢,于是提出了这样的要求。从JTA的规范来看,transaction(TX)和thread是密切相关的,TX一般是不能在应用线程间传递的, 即我主线程起一个全局事务,然后我把这个事务传递给其他我新起的线程,单纯的变量传递没问题,但这个事务是不能被transaction manager(TM)识别的,TM对TX的管理有他自己的方式。从weblogic的实现来看,TX被放在当前线程的threadlocal中,普通应用线程不存在这样的结构,所以简单的变量传递,对于TM而言是没有意义的。那么到底有没有方法实现上面的需求的,我做了些测试,使用weblogic内部的一些API可以实现这个需求。下面我们就来看看实现中的几个要点: :)

        1:上面说了,简单的变量传递对于weblogic的TM是没有意义的。TM判断事务上下文(transaction context)的时候,会从当前线程的threadlocal检查,如果没有,则说明当前线程没有和任何TX关联。那么我们如何将我们手里的TX放入当前线程的threadlocal呢? weblogic的ExecuteThread是我们需要的那种线程,但它是final的,我们不能继承它,只能继承它的父类了,也就是weblogic.kernel.AuditableThread。

        2:我们有继承了AuditableThread,那么我们怎么把TX放入它的threadlocal中呢?这个可以通过weblogic的TM实现中的一些API来实现,具体到这个类就是weblogic.transaction.internal.TransactionManagerImpl。比如interResume(tx),internalSuspend()。由于这个API不是package protect的,我们自己的类必须也位于weblogic.transaction.internal这个包中。interResume(tx),用于将当前线程和指定的TX做关联,而internalSuspend()恰恰相反,它用于解除这种关联。

        3:因为涉及到多线程,主线程需要决定何时提交或回滚事务,这个我们要自己要实现一个线程结果检查的方法(checkCompletion())。

         下面就是我自己实现的测试代码,在Weblogic81测试没有问题。
  1 package weblogic.transaction.internal;
  2 
  3 import weblogic.transaction.TxHelper;
  4 import weblogic.transaction.internal.TransactionManagerImpl;
  5 import javax.transaction.Transaction;
  6 import java.util.ArrayList;
  7 
  8 public class DriverTest {
  9 
 10     private static String INITIAL_CONTEXT_FACTORY = "weblogic.jndi.WLInitialContextFactory"
 11     private static String PROVIDER_URL = "t3://localhost:8001";  
 12     private static String SQL_INSERT = "insert into test values(?)";
 13     private static String ANO_SQL_INSERT = "insert into test1 values(?)";
 14     
 15     public static void main(String args[])
 16     {
 17         DriverTest test = new DriverTest();
 18         test.multiThreadXATest();
 19     }
 20     
 21     private Connection getConnection(String url, String dsName) throws NamingException, SQLException
 22     {
 23         InitialContext ctx = initializeEnv(url);
 24         DataSource ds = (DataSource)ctx.lookup(dsName);
 25         ctx.close();
 26         return ds.getConnection();
 27     }
 28     
 29     private UserTransaction getUserTransaction() throws NamingException, SQLException
 30     {
 31         InitialContext ctx = initializeEnv(null);
 32         return (UserTransaction)ctx.lookup("javax/transaction/UserTransaction");
 33     }
 34     
 35     private InitialContext initializeEnv(String url) throws NamingException
 36     {
 37         Properties prop = new Properties();
 38         if(url == null)
 39             prop.put(Context.PROVIDER_URL, PROVIDER_URL);
 40         else
 41             prop.put(Context.PROVIDER_URL, url);
 42         prop.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
 43         return new InitialContext(prop);
 44     }
 45     
 46     private void executeInsertInPSMT(Connection conn, String sql)
 47     {
 48         PreparedStatement pstmt = null;
 49         try{
 50             pstmt = conn.prepareStatement(sql);
 51             pstmt.setString(1"data_to_insert");
 52             pstmt.executeUpdate();
 53             pstmt.close();
 54         }catch(SQLException e){
 55                 e.printStackTrace();
 56         }    
 57     }
 58     
 59     public void multiThreadXATest()
 60     {
 61         ArrayList result = new ArrayList();
 62         try{
 63             UserTransaction userTx = getUserTransaction();
 64             userTx.setTransactionTimeout(1000);
 65             userTx.begin();
 66             Transaction tx = TxHelper.getTransaction();
 67             Connection conn = getConnection("t3://localhost:8011""TestXADS");
 68             if(conn != null) conn.close();
                  SQLThread thread1 
= new SQLThread(tx,result,"t3://localhost:8011","TestXADS", SQL_INSERT);
 69             SQLThread thread2 = new SQLThread(tx,result,"t3://localhost:8021","TestXADS_1", ANO_SQL_INSERT);
 70             thread1.start();
 71             thread2.start();
 72             while(result.size() != 2){
 73                 Thread.currentThread().sleep(1);
 74             }
 75             if(checkCompletion(result)){
 76                 userTx.commit();
 77             }
 78             else{
 79                 userTx.rollback();
 80             }
 81         }catch(Exception e){
 82             e.printStackTrace();
 83         }
 84     }
 85     
 86     private boolean checkCompletion(ArrayList result){
 87         boolean toReturn = true;
 88         for(int loop=0; loop<result.size(); loop++){
 89             if((!((String)result.get(loop)).equals("OK"))){
 90                 toReturn = false;
 91                 break;
 92             }
 93         }
 94         return toReturn;
 95     }
 96     
 97     class SQLThread extends weblogic.kernel.AuditableThread {
 98         
 99         private Transaction tx = null;
100         private ArrayList result = null;
101         private String dsName = null;
102         private String url = null;
103         private String sql = null;
104         
105         public SQLThread(Transaction tx,ArrayList result,String ds, String url, String sql){
106             this.tx = tx;
107             this.result = result;
108             this.dsName = ds;
109             this.url = url;
110             this.sql = sql;
111         }
112         
113         public void run(){
114             Connection conn = null;
115             try{
116                 TransactionManagerImpl tm = (TransactionManagerImpl)TransactionManagerImpl.getTransactionManager();
117                 tm.internalResume((TransactionImpl)tx);
118                 DriverTest test = new DriverTest();
119                 conn = test.getConnection(url, dsName);
120                 test.executeInsertInPSMT(conn, sql);
121                 conn.close();
122                 tm.internalSuspend();
123                 result.add("OK");
124             }catch(Exception e){
125                 result.add("NA");
126                 e.printStackTrace();
127             }finally{
128                 try{
129                     if(conn != null)
130                         conn.close();
131                 }catch(Exception e){
132                     e.printStackTrace();
133                 }
134             }
135         }
136     }
137 }
138 
139 

        下面是关于上面这段测试代码的一些解释和代码中的限制:
         1:为什么会在66行出现Connection conn = getConnection("t3://localhost:8011""TestXADS");这个看似无用的语句?Weblogic的TM实现中只有有XAResource参与到这个global transaction的server实例才有资格充当这个global transaction的coordinator,其他的server实例只能充当sub-coordinator。而且总是第一个参与全局事务的XAResource的实例充当coordinator,因为coordinator的委任决定于TX开始后,第一次RMI request发送给哪个server。Connection conn = getConnection("t3://localhost:8001""TestXADS")用于指定这个global transaction的coordinator为8011这个server。如果没有这个语句,thread1,thread2启动后,它们开始XA操作时,每个XAResouce都会把自己当作这个TX的coordinator(Thread1委任8011,Thread2委任8021),这样就会出现如下的异常,
        
javax.transaction.TransactionRolledbackException: Current server is the coordinator and transaction is not found.  It was probably rolled back and forgotten already.
 at weblogic.rjvm.BasicOutboundRequest.sendReceive(BasicOutboundRequest.java:108)
 at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:290)
 at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:247)
 at weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection(Unknown Source)
 at weblogic.transaction.internal.DriverTest1.getConnection(DriverTest1.java:39)
 at weblogic.transaction.internal.DriverTest1.access$0(DriverTest1.java:34)
 at weblogic.transaction.internal.DriverTest1$SQLThread.run(DriverTest1.java:135)

        2:某个全局事务中启动的线程,不能同时操作同一个XAResource,比如Thread1操作datasource1和datasource2,thread2操作datasource2和datasource3。Weblogic中,我们做XA操作的时候,需要同后端的XA Resource Manager交互,交互中我们会多次调用xaStart(xid, flag),xaEnd(xid, flag)这里的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我们在同一个全局事务的多个线程中同时操作某个RESOURCE,那么就可能我们不同线程先后给这个RESOUCE的RM发送相同的FLAG,比如xaStart(xid, TMSUSPEND),即两个线程同时发送TMSUSPEND,这样会引发XA_ERR,如下:

java.sql.SQLException: Unexpected exception while enlisting XAConnection java.sql.SQLException: XA error: XAER_RMERR : A resource manager error has occured in the transaction branch start() failed on resource 'TestXAPool_1': XAER_RMERR : A resource manager error has occured in the transaction branch
oracle.jdbc.xa.OracleXAException
 at oracle.jdbc.xa.OracleXAResource.checkError(OracleXAResource.java:1017)
 at oracle.jdbc.xa.client.OracleXAResource.start(OracleXAResource.java:227)
 at weblogic.jdbc.wrapper.VendorXAResource.start(VendorXAResource.java:50)
 at weblogic.jdbc.jta.DataSource.start(DataSource.java:629)
 at weblogic.transaction.internal.XAServerResourceInfo.start(XAServerResourceInfo.java:1142)
 at weblogic.transaction.internal.XAServerResourceInfo.xaStart(XAServerResourceInfo.java:1073)
 at weblogic.transaction.internal.XAServerResourceInfo.enlist(XAServerResourceInfo.java:241)
 at weblogic.transaction.internal.ServerTransactionImpl.enlistResource(ServerTransactionImpl.java:463)
 at weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
 at weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist(DataSource.java:1334)
 at weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
 at weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
 at weblogic.jdbc.common.internal.RmiDataSource.getConnection(RmiDataSource.java:305)
 at weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown Source)
 ......

        虽然测试中没有什么问题,但我不建议谁这么去做,毕竟我们需要遵循规范。写这么个例子,只是让大家对weblogic的transaction加深些理解,而不是真的要在生产系统中这样去做。
posted on 2009-07-31 15:18 走走停停又三年 阅读(2477) 评论(0)  编辑  收藏 所属分类: Weblogic

只有注册用户登录后才能发表评论。


网站导航: