JAVA—咖啡馆

——欢迎访问rogerfan的博客,常来《JAVA——咖啡馆》坐坐,喝杯浓香的咖啡,彼此探讨一下JAVA技术,交流工作经验,分享JAVA带来的快乐!本网站部分转载文章,如果有版权问题请与我联系。

BlogJava 首页 新随笔 联系 聚合 管理
  447 Posts :: 145 Stories :: 368 Comments :: 0 Trackbacks
最近在做一个银行的生产数据脱敏系统,今天写代码时遇到了一个“瓶颈”,脱敏系统需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路 如图:



    首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

在查询的时候我用了如下语句
  1. String queryStr = "SELECT * FROM xx";  
  2. ResultSet coreRs = PreparedStatement.executeQuery(queryStr);  

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码
  1 package com.dlbank.domain;  
  2   
  3 import java.sql.Connection;  
  4 import java.sql.PreparedStatement;  
  5 import java.sql.ResultSet;  
  6 import java.sql.Statement;  
  7 import java.util.List;  
  8 import java.util.concurrent.atomic.AtomicLong;  
  9   
 10 import org.apache.log4j.Logger;  
 11   
 12 /** 
 13  *<p>title: 数据同步类 </p>   
 14  *<p>Description: 该类用于将生产核心库数据同步到开发库</p>   
 15  *@author Tank Zhang  
 16  */  
 17 public class CoreDataSyncImpl implements CoreDataSync {  
 18       
 19     private List<String> coreTBNames; //要同步的核心库表名  
 20     private ConnectionFactory connectionFactory;  
 21     private Logger log = Logger.getLogger(getClass());  
 22       
 23     private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数  
 24       
 25     private int syncThreadNum;  //同步的线程数  
 26   
 27     @Override  
 28     public void syncData(int businessType) throws Exception {  
 29           
 30         for (String tmpTBName : coreTBNames) {  
 31             log.info("开始同步核心库" + tmpTBName + "表数据");  
 32             // 获得核心库连接  
 33             Connection coreConnection = connectionFactory.getDMSConnection(4);  
 34             Statement coreStmt = coreConnection.createStatement();  
 35             //为每个线程分配结果集  
 36             ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
 37             coreRs.next();  
 38             //总共处理的数量  
 39             long totalNum = coreRs.getLong(1);  
 40             //每个线程处理的数量  
 41             long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
 42             log.info("共需要同步的数据量:"+totalNum);  
 43             log.info("同步线程数量:"+syncThreadNum);  
 44             log.info("每个线程可处理的数量:"+ownerRecordNum);  
 45             // 开启五个线程向目标库同步数据  
 46             for(int i=0; i < syncThreadNum; i ++){  
 47                 StringBuilder sqlBuilder = new StringBuilder();  
 48                 //拼装后SQL示例  
 49                 //Select * From dms_core_ds Where id between 1 And 657398  
 50                 //Select * From dms_core_ds Where id between 657399 And 1314796  
 51                 //Select * From dms_core_ds Where id between 1314797 And 1972194  
 52                 //Select * From dms_core_ds Where id between 1972195 And 2629592  
 53                 //Select * From dms_core_ds Where id between 2629593 And 3286990  
 54                 //..  
 55                 sqlBuilder.append("Select * From ").append(tmpTBName)  
 56                         .append(" Where id between " ).append(i * ownerRecordNum +1)  
 57                         .append( " And ")  
 58                         .append((i * ownerRecordNum + ownerRecordNum));  
 59                 Thread workThread = new Thread(  
 60                         new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
 61                 workThread.setName("SyncThread-"+i);  
 62                 workThread.start();  
 63             }  
 64             while (currentSynCount.get() < totalNum);  
 65             //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);  
 66             //Thread.sleep(1000 * 3);  
 67             log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");  
 68         }  
 69     }// end for loop  
 70       
 71     public void setCoreTBNames(List<String> coreTBNames) {  
 72         this.coreTBNames = coreTBNames;  
 73     }  
 74   
 75     public void setConnectionFactory(ConnectionFactory connectionFactory) {  
 76         this.connectionFactory = connectionFactory;  
 77     }  
 78       
 79     public void setSyncThreadNum(int syncThreadNum) {  
 80         this.syncThreadNum = syncThreadNum;  
 81     }  
 82       
 83     //数据同步线程  
 84     final class WorkerHandler implements Runnable {  
 85         ResultSet coreRs;  
 86         String queryStr;  
 87         int businessType;  
 88         String targetTBName;  
 89         public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
 90             this.queryStr = queryStr;  
 91             this.businessType = businessType;  
 92             this.targetTBName = targetTBName;  
 93         }  
 94         @Override  
 95         public void run() {  
 96             try {  
 97                 //开始同步  
 98                 launchSyncData();  
 99             } catch(Exception e){  
100                 log.error(e);  
101                 e.printStackTrace();  
102             }  
103         }  
104         //同步数据方法  
105         void launchSyncData() throws Exception{  
106             // 获得核心库连接  
107             Connection coreConnection = connectionFactory.getDMSConnection(4);  
108             Statement coreStmt = coreConnection.createStatement();  
109             // 获得目标库连接  
110             Connection targetConn = connectionFactory.getDMSConnection(businessType);  
111             targetConn.setAutoCommit(false);// 设置手动提交  
112             PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
113             ResultSet coreRs = coreStmt.executeQuery(queryStr);  
114             log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
115             int batchCounter = 0//累加的批处理数量  
116             while (coreRs.next()) {  
117                 targetPstmt.setString(1, coreRs.getString(2));  
118                 targetPstmt.setString(2, coreRs.getString(3));  
119                 targetPstmt.setString(3, coreRs.getString(4));  
120                 targetPstmt.setString(4, coreRs.getString(5));  
121                 targetPstmt.setString(5, coreRs.getString(6));  
122                 targetPstmt.addBatch();  
123                 batchCounter++;  
124                 currentSynCount.incrementAndGet();//递增  
125                 if (batchCounter % 10000 == 0) { //1万条数据一提交  
126                     targetPstmt.executeBatch();  
127                     targetPstmt.clearBatch();  
128                     targetConn.commit();  
129                 }  
130             }  
131             //提交剩余的批处理  
132             targetPstmt.executeBatch();  
133             targetPstmt.clearBatch();  
134             targetConn.commit();  
135             //释放连接   
136             connectionFactory.release(targetConn, targetPstmt,coreRs);  
137         }  
138     }  
139 }  

posted on 2010-11-26 14:11 rogerfan 阅读(969) 评论(0)  编辑  收藏 所属分类: 【Java知识】

只有注册用户登录后才能发表评论。


网站导航: