首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。
在查询的时候我用了如下语句
- String queryStr = "SELECT * FROM xx";
- ResultSet coreRs = PreparedStatement.executeQuery(queryStr);
String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);
实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码
1 package com.dlbank.domain;
2
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.Statement;
7 import java.util.List;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.apache.log4j.Logger;
11
12 /**
13 *<p>title: 数据同步类 </p>
14 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>
15 *@author Tank Zhang
16 */
17 public class CoreDataSyncImpl implements CoreDataSync {
18
19 private List<String> coreTBNames; //要同步的核心库表名
20 private ConnectionFactory connectionFactory;
21 private Logger log = Logger.getLogger(getClass());
22
23 private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数
24
25 private int syncThreadNum; //同步的线程数
26
27 @Override
28 public void syncData(int businessType) throws Exception {
29
30 for (String tmpTBName : coreTBNames) {
31 log.info("开始同步核心库" + tmpTBName + "表数据");
32 // 获得核心库连接
33 Connection coreConnection = connectionFactory.getDMSConnection(4);
34 Statement coreStmt = coreConnection.createStatement();
35 //为每个线程分配结果集
36 ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
37 coreRs.next();
38 //总共处理的数量
39 long totalNum = coreRs.getLong(1);
40 //每个线程处理的数量
41 long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
42 log.info("共需要同步的数据量:"+totalNum);
43 log.info("同步线程数量:"+syncThreadNum);
44 log.info("每个线程可处理的数量:"+ownerRecordNum);
45 // 开启五个线程向目标库同步数据
46 for(int i=0; i < syncThreadNum; i ++){
47 StringBuilder sqlBuilder = new StringBuilder();
48 //拼装后SQL示例
49 //Select * From dms_core_ds Where id between 1 And 657398
50 //Select * From dms_core_ds Where id between 657399 And 1314796
51 //Select * From dms_core_ds Where id between 1314797 And 1972194
52 //Select * From dms_core_ds Where id between 1972195 And 2629592
53 //Select * From dms_core_ds Where id between 2629593 And 3286990
54 //..
55 sqlBuilder.append("Select * From ").append(tmpTBName)
56 .append(" Where id between " ).append(i * ownerRecordNum +1)
57 .append( " And ")
58 .append((i * ownerRecordNum + ownerRecordNum));
59 Thread workThread = new Thread(
60 new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
61 workThread.setName("SyncThread-"+i);
62 workThread.start();
63 }
64 while (currentSynCount.get() < totalNum);
65 //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);
66 //Thread.sleep(1000 * 3);
67 log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");
68 }
69 }// end for loop
70
71 public void setCoreTBNames(List<String> coreTBNames) {
72 this.coreTBNames = coreTBNames;
73 }
74
75 public void setConnectionFactory(ConnectionFactory connectionFactory) {
76 this.connectionFactory = connectionFactory;
77 }
78
79 public void setSyncThreadNum(int syncThreadNum) {
80 this.syncThreadNum = syncThreadNum;
81 }
82
83 //数据同步线程
84 final class WorkerHandler implements Runnable {
85 ResultSet coreRs;
86 String queryStr;
87 int businessType;
88 String targetTBName;
89 public WorkerHandler(String queryStr,int businessType,String targetTBName) {
90 this.queryStr = queryStr;
91 this.businessType = businessType;
92 this.targetTBName = targetTBName;
93 }
94 @Override
95 public void run() {
96 try {
97 //开始同步
98 launchSyncData();
99 } catch(Exception e){
100 log.error(e);
101 e.printStackTrace();
102 }
103 }
104 //同步数据方法
105 void launchSyncData() throws Exception{
106 // 获得核心库连接
107 Connection coreConnection = connectionFactory.getDMSConnection(4);
108 Statement coreStmt = coreConnection.createStatement();
109 // 获得目标库连接
110 Connection targetConn = connectionFactory.getDMSConnection(businessType);
111 targetConn.setAutoCommit(false);// 设置手动提交
112 PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
113 ResultSet coreRs = coreStmt.executeQuery(queryStr);
114 log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
115 int batchCounter = 0; //累加的批处理数量
116 while (coreRs.next()) {
117 targetPstmt.setString(1, coreRs.getString(2));
118 targetPstmt.setString(2, coreRs.getString(3));
119 targetPstmt.setString(3, coreRs.getString(4));
120 targetPstmt.setString(4, coreRs.getString(5));
121 targetPstmt.setString(5, coreRs.getString(6));
122 targetPstmt.addBatch();
123 batchCounter++;
124 currentSynCount.incrementAndGet();//递增
125 if (batchCounter % 10000 == 0) { //1万条数据一提交
126 targetPstmt.executeBatch();
127 targetPstmt.clearBatch();
128 targetConn.commit();
129 }
130 }
131 //提交剩余的批处理
132 targetPstmt.executeBatch();
133 targetPstmt.clearBatch();
134 targetConn.commit();
135 //释放连接
136 connectionFactory.release(targetConn, targetPstmt,coreRs);
137 }
138 }
139 }