【来源】
http://www.ibm.com/developerworks/cn/data/library/techarticles/dm-0908luyx/
本文介绍了如何使用多线程来构建轻量级 Batch 框架,将大量的数据迁移到 IBM DB2 Content Manager 8.3 中。通过本文的学习,读者可以了解如何通过使用多线程调用 IBM DB2 Content Manager API 构建的框架来启动,暂停,恢复,停止,放缓等操作。
在用 API 导入大量数据的过程中,如果没有框架很难有效的对整个过程控制,仅仅通过日志来分析解决问题总是很浪费时间,并且效率不太理想。
本文的内容放在了如何使用多线程和配置文件来构建 Batch 框架来处理大数量导入的问题。
随着 IBM DB2 Content Manager(简称 IBM CM)产品的不断成熟,越来越多的内容管理系统需要迁移到 IBM CM 中来,这些需要迁移的数据通常首先把结构化的内容导到文本文件中,与之相对应的图像和 pdf 文件通常放在对应的文件夹中,图像和 pdf 对应的文件夹路径也通常存放在文本文件中,然后迁移程序遍历文本文件,把对应的 Item 迁移到 IBM CM 中。这些需要迁移的数据通常都有几百 G,如何有效的控制迁移过程是一个很大的挑战,因此我们必须构建一个轻量级的 batch 处理框架来控制整个数据的迁移周期,记录处理过程中的错误,保证数据的一致性。
同时,在用 API 导入数据的过程中,被导入数据总是千边万化,无效的映射导入数据和 DB2 Content Manager 的项,导致工作变得复杂,同时使的设计和代码冗余,并且使重用,维护和扩展履步为艰难。
为了克服所提到的挑战,这个 batch 框架必须要有以下功能:
- 用户出于不影响生产环境性能的考虑,可以暂时停止数据的迁移,或者减缓迁移处理的频率,即框架必须具有 suspend 和 slowdown 功能。
- 用户可以让暂停处理的系统继续处理,即框架必须具有 resume 功能。
- 用户可以让系统停止处理,修改某些配置,然后继续处理,即框架必须有 re-start 功能。
- 处理过程中发生的错误,警告系统必须记录下来,用户可以根据这些记录来修正数据。
- 通过配置文件建立规则来解决数据千边万化的问题。
构建框架
构建交互性
要使框架有交互性,我们必须有三个个线程:客户端线程,服务端线程,工作线程。客户端线程负责发出工作指令,服务端线程接受这些指令并调用工作线程来做实际的工作。对于客户端和服务器交互,在没有 web 服务器支持的情况下,我们可以采用一种古老但是很有效的做法:socket 编程。 Java socket 对象的 accept 方法会一直阻塞直到客户端有程序输入,当客户端有新的命令输入的时候,服务器端从 socket 中读出命令,然后执行命令。下面是示例程序,Client.java 代表客户端程序,Server.java 代表服务器端程序,Worker.java 代表工作程序 ,Config.java 代表系统中一些参数配置。
清单 1. 客户端程序
package com.ibm.batch.sample;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.Socket;
import org.apache.log4j.Logger;
public class Client {
private Config config = null;
public void setConfig(Config config) {
this.config = config;
}
private Logger logger = Logger.getLogger(Client.class);
public void sendCommand(String command) {
Socket socket = null;
OutputStream out = null;
BufferedWriter writer = null;
try {
// establish the connection with server.
socket = new Socket(config.getHost(), config.getSocketPort());
out = socket.getOutputStream();
writer = new BufferedWriter(new OutputStreamWriter(out));
// send the command to server
writer.write(command);
writer.flush();
} catch (IOException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
}
|
清单 2. 服务器端程序
package com.ibm.batch.sample;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import com.ibm.batch.sample.util.ResourceUtils;
public class Server {
private Config config = null;
private boolean processing = true;
private Worker worker = null;
public void setConfig(Config config) {
this.config = config;
}
public static void main(String[] args) {
Server server = new Server();
// create the work thread
Worker worker = server.createWorker(args);
worker.start();
server.receiveAndExecuteCommand();
}
private Worker createWorker(String[] args) {
Worker worker = new Worker();
this.worker = worker;
return worker;
}
/**
* receive the command from client and execute the command. the method is
* keeping running until client send the 'stop' command.
*
* @throws Exception
*/
public void receiveAndExecuteCommand() {
ServerSocket serverSocket = buildSocketConnection();
// loop until client send 'stop' command
while (processing) {
Socket socket = null;
try {
socket = serverSocket.accept();
String commandLine = readCommandFromSocket(socket);
executeCommand(commandLine);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
ResourceUtils.closeSocket(socket);
}
}
}
private void executeCommand(String commandLine) {
// TODO Auto-generated method stub
}
/**
* read the command from the socket
*
* @param socket
* @return
*/
private String readCommandFromSocket(Socket socket) {
InputStream in = null;
BufferedReader bufferedReader = null;
String commandLine = null;
try {
in = socket.getInputStream();
bufferedReader = new BufferedReader(new InputStreamReader(in));
commandLine = bufferedReader.readLine();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
ResourceUtils.closeInputStream(in);
ResourceUtils.closeReader(bufferedReader);
}
return commandLine;
}
/**
* build the socket.
*
* @return
*/
private ServerSocket buildSocketConnection() {
// prepare the socket for client to connect.
ServerSocket serverSocket;
try {
serverSocket = new ServerSocket(config.getSocketPort());
} catch (java.net.BindException e1) {
throw new RuntimeException("Socket port already in use.", e1);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
return serverSocket;
}
}
|
清单 3. 工作程序
package com.ibm.batch.sample;
import org.apache.log4j.Logger;
public class Worker extends Thread {
Logger logger = Logger.getLogger(Worker.class);
/**
* the main method for create item function.
*/
public void run() {
createItem();
}
/**
* do the real job
*/
private void createItem() {
}
}
|
添加 suspend 和 slowdown 处理命令
大数量的数据迁移一般是在周末或者晚上进行,但是如果客户的历史数据太大,在周末或者晚上数据可能处理不完,为了不影响生产环境的性能,我们必须能够在客户的工作时间暂缓处理或者降低处理的频率,把 cpu 等资源让给客户程序,也就是说处理线程 worker 的工作可以 suspend 或者 slowdow 。为了让 worker 线程知道需要 suspend 当前处理,我们可以在 worker 内部设置一个布尔变量 isSuspend,当程序在循环创建 CM item 的时候,我们每次都判断一下这个布尔变量 isSuspend,当其为 ture 的时候,程序就调用线程的 wait 方法中断当前线程的处理,wait 方法还可以接受一个以微秒为单位的时间参数,当时间到达 wait 指定的时间的时候,程序继续创建 CM Item 。为了多线程之间的变量可见性,我们必须把 worker 的 isSuspend 变量和 suspendTime 设置为 volatile 。同理我们设置一个布尔变量 isSlowdown 以及 slowdowTime 。示例程序如下:
清单 4. 工作程序
package com.ibm.batch.sample;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import org.apache.log4j.Logger;
import com.ibm.batch.sample.util.ResourceUtils;
public class Worker extends Thread {
Logger logger = Logger.getLogger(Worker.class);
private volatile boolean isSlowdown = false;
private volatile Double slowdownTime;
private volatile boolean isSuspend;
private volatile Double suspendTime;
public void setSlowdown(boolean isSlowdown) {
this.isSlowdown = isSlowdown;
}
public void setSlowdownTime(Double slowdownTime) {
this.slowdownTime = slowdownTime;
}
public void setSuspend(boolean isSuspend) {
this.isSuspend = isSuspend;
}
public void setSuspendTime(Double suspendTime) {
this.suspendTime = suspendTime;
}
public boolean isSlowdown() {
return isSlowdown;
}
public Double getSlowdownTime() {
return slowdownTime;
}
public boolean isSuspend() {
return isSuspend;
}
public Double getSuspendTime() {
return suspendTime;
}
protected Object semaphore = new Object();
private Config config;
public void setConfig(Config config) {
this.config = config;
}
|
清单 5. 主方法
/**
* the main method for create item function.
*/
public void run() {
BufferedReader reader = null;
try {
reader = getFileReader();
String oneLine = null;
while ((oneLine = reader.readLine()) != null) {
if (isSlowdown()) {
sleep4GivenTime();
}
if (isSuspend()) {
suspend4SomeTime();
}
createItem(oneLine);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
ResourceUtils.closeReader(reader);
}
}
/**
* current thread sleep for some time,the unit is minute.
*/
protected void sleep4GivenTime() {
try {
Thread.sleep((long) (slowdownTime.doubleValue() * 1000));
} catch (InterruptedException e) {
// do nothing
}
}
|
清单 6.Suspend 方法
/**
* suspend working for given time.
*/
protected void suspend4SomeTime() {
synchronized (semaphore) {
try {
Double suspendTime = getSuspendTime();
if (suspendTime != null) {
double suspendTimeDouble = suspendTime.doubleValue() * 60 * 1000;
semaphore.wait((long) suspendTimeDouble);
} else {
semaphore.wait();
}
} catch (InterruptedException e) {
// tell user that the processing started
logger.info("suspend is over,system is continue processing .");
}
}
}
/**
* do the real job
*
* @throws Exception
*/
private void createItem(String oneLine) throws Exception {
}
private BufferedReader getFileReader() throws FileNotFoundException {
String fileName = config.getFileName();
File processingFile = new File(fileName);
BufferedReader reader = new BufferedReader(new FileReader(
processingFile));
return reader;
}
}
|
添加 resume 功能
在程序暂停处理以后,我们可以提前终止 suspend,让框架继续处理,也就是框架必须有 resume 功能。我们调用 Worker.java 对象上的 notify 方法来实现这个功能,示例如下:
清单 7.Resume
public class Worker extends Thread {
/**
* resume the working.
*/
public void continueWorking() {
cleanSuspend();
synchronized (semaphore) {
semaphore.notify();
}
}
}
|
添加 stop 和 re-start 功能
有时候用户因为一些原因(例如修改配置文件)想停止程序的执行,所以框架必须有 stop 的功能,但是 stop 的时候我们必须注意记录程序处理到的行数,这样客户再开始执行的时候能够从上次执行的断点继续执行,也就是框架具备了 re-start 功能,这是 batch 程序必须具备的一种很重要的功能,re-start 功能有多种实现方法,我们这里采取一种简单的方法,在 stop 的时候,把当前处理的记录到一个文本文件中去,下次启动的时候从上次最后处理的对象开始进行处理。所以我们在 Worker.java 中增加一个 keepProcessing 布尔变量,在循环创建 CM Item 的时候 , 我们每次都判断一下这个值是否为 true,如果为 false 的话,我们就停止循环处理,在 Worker.java 中还要增加一个 moveReaderToLastProcess 方法,把 reader 重新定向到上次处理点。
清单 8. 停止和重启
public class Worker extends Thread {
private volatile boolean keepProcessing;
public boolean isKeepProcessing() {
return keepProcessing;
}
public void setKeepProcessing(boolean keepProcessing) {
this.keepProcessing = keepProcessing;
}
/**
* the main method for create item function.
*/
public void run() {
BufferedReader reader = null;
try {
long lastProcessedRow = config.getLastProcessedRow();
reader = moveReaderToLastProcess(lastProcessedRow);
String oneLine = null;
connectToCM();
while (((oneLine = reader.readLine()) != null)
&& isKeepProcessing()) {
if (isSlowdown()) {
sleep4GivenTime();
}
if (isSuspend()) {
suspend4SomeTime();
}
createItem(oneLine);
lastProcessedRow++;
}
logCurrentProcessingLine(lastProcessedRow);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
ResourceUtils.closeReader(reader);
}
}
private void logCurrentProcessingLine(long lastProcessedRow) {
config.setLastProcessedRow(lastProcessedRow);
}
/**
* move current reader position to last process postion
* @return
* @throws IOException
*/
private BufferedReader moveReaderToLastProcess(long lastProcessedRow)
throws IOException {
// get the file reader
BufferedReader reader = getFileReader();
// move the reader to the start row -1.
int count = 0;
while (count < lastProcessedRow-1) {
reader.readLine();
count++;
}
return reader;
}
}
|
添加错误处理功能
刚才我们调用的 createItem 方法是直接抛出异常的,但是这样的处理实际上是错误的,因为在 batch 处理过程中,我们不希望在处理某一个 item 出错导致剩余的 item 不再处理,所以我们在 catch 里面对异常进行分类处理,我们 catch 住非检查异常(runtime exception),通常非检查异常是不可以恢复的,所以我们直接抛出,让程序结束处理。对于其余的异常,我们只是在日志中记录下来,并不抛出。在全部处理结束以后,用户可以检查日志来进行相应的处理。示例代码如下:
清单 9. 错误处理
public class Worker extends Thread {
/**
* do the real job
*
* @throws Exception
*/
private void createItem(String oneLine) throws Exception {
try {
//create the item from one line
}catch (RuntimeException e) {
throw e;
}catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
|
添加创建 CM item 功能
下面的内容放在了如何使用配置文件来处理导入的问题。
通过调用和运行 API 来处理数据的导入,我们首先定义一个基本信息的配置文件,用来制定连接的信息,其他配置文件的目录,工作的目录等有关导入需要的参数。然后定义导入数据和 DB2 Content Manager 的项的映射配置文件。配置文件定义结束后,我们就可以调用API来启动相应的导入流程,在程序运行过程中,可以动态的更改配置,从而有效的处理导入的任务。
在开发过程中,您可以灵活地定义各种配置文件以便实现多种导入规则,同时在程序运行中进行数据校验,以防止冗余和非法数据被错误导入。
下面的一些配置和代码示例,以此介绍了如何定义配置文件,然后管理 API 来完成导入的任务。
定义基本信息配置文件:在该文件中,须先设定 IBM DB2 Content Manager 的一些连接参数, 如:
contentManagerDatabase=iCMnlsdb // 定义调用的数据库名字
contentManagerUsername=iCMadmin // 定义用户名
contentManagerPassword= password // 定义连接密码
contentManagerSchema=ICMADMIN // 定义具体的 schema |
您可以在代码中用以上参数来实现对 IBM DB2 Content Manager 的连接,代码示例:
DKDatastoreICM dsICM = new DKDatastoreICM();
// 创建连接 dsICM.connect("iCMnlsdb", "iCMadmin", "password", "SCHEMA=ICMADMIN"); |
还需指定哪个文件夹存放映射文件,以及需导入的数据文件,如:
mappingFilePath=config/rapid/mapping // 映射文件路径
dataFileFolder=config/rapid/data // 数据文件路径 |
也可定义一些参数来增强该导入的流程控制,如:
runPhase=2
// 指定是第二阶段导入,在导入时需更新已有的数据 |
定义映射文件:该配置文件主要用于将用户想要导入的数据映射到 IBM DB2 Content Manager 的 Item Type 中,您可自由定制该文件,使用户遵循您定义的规范顺利完成数据迁移。如:
C001.del=c01
C002.del=c01 |
该定义中 C001.del 和 C002.del 是需要导入的数据文件,c01 是对应的 Item Type 名字。这种定义方法可实现将多个数据文件导入同一个 Item Type 中。
具体的对应关系如下:
position=1|name=COMPANYNAME
position=2|name=COMPANYID
position=3|name=INPUTVALUE
position=-1|name=SPECIALVALUE|value=C1 |
这个映射关系反映了数据文件中列数和 Item Type 中 attribute 的关系,如第一列在 Item Type 中代表了名字为 COMPANYNAME 的 attribute 。您也可定义一些特殊规则,如将 position 设为负数,以便反映该列是一个特殊的 attribute, 它的值是固定的。 比如将 position 设为 -1 时,名为 SPECIALVALUE 的 attribute 的值总是为 C1 。
若您想实现将一个数据文件导入多个 Item Type 中,可在数据文件中加入一个特殊列,在映射文件中指定该列的列数,以及当该列的值和多种 Item Type 的映射关系。如:
这样,C003.del 就不是单一的对应一个 Item Type,而是先去取第三列 INPUTVALUE 的值,再去对应表中查找到关联的 Item Type 。该对应表可设成:
若第三列 INPUTDOCID 的值为 Value1 时,其对应的 Item Type 为 c01,同样的当值为 Value2 时,会将该行数据导入到 c02 的 Item Type 中。
调用 API 完成操作的代码示例:在编写代码过程中,需要调用 DB2 Content Manager 的 API 来完成 Item Type 以及它包含的 attribute 的创建。上文已给出了通过参数来连接 Content Manager 的方法,下面的示例代码用得到的 DKDatastoreICM 来实现具体的操作:
清单 10. API 调用
// Create an item / DDO / Root Component
DKDDO ddo = dsICM.createDDO("S_withChild", itemPropertyOrSemanticType);
//createDDO(<Overall Item Type>, <Item Property / Semantic Type>);
// Adding Multivalue Attributes to DDOs, multiple type can be used,
//here just give some example
ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_varchar"),
"this is a string value");
//string
ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_date"),
java.sql.Date.valueOf("2001-08-12"));
//date
ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_double"),
new Double("123"));
//double
|
结束语
通过本文的介绍,相信您对多线程构建的 Batch 框架实现大量数据迁移的过程,和通过配置文件的管理的 API 实现数据导入的过程也有了一定的了解和学习。您可灵活地实现一对一,一对多,多对多等各种映射关系,您也可以利用多线程来实现其他的功能的开发,编写出更加富有创造性的软件。
参考资料
学习
获得产品和技术
- 使用可直接从 developerWorks 下载的 IBM 试用软件 构建您的下一个 Linux 开发项目。