SPRING INTEGRATION的强项是事件驱动,但捕获之后,要做的事是触发一个类的方法,对于要处理大数据量的文件,就没有办法了,如读取1w条记录,然后插入数据库。而这个正是SPRING BATCH的强项所在,因此有必要将此两个框架整合起来用。
场景:盯着一个文件夹,如果一有文件,此文件可能非常大的,则启动一个BATCH JOB来处理。
文件拉取器,监控文件夹一有新文件,则将此文件包装成MESSAGE,丢到下一个通道中:
<file:inbound-channel-adapter id="filePoller"
channel="filesAreComing"
directory="file:${input.directory}"
filename-pattern="test*" />
filesAreComing通道的ServiceActivator
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(job, jobParameters);
}
jobLauncher通道的ServiceActivator
<service-activator input-channel="jobLauncher">
<beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>
"file.input"依赖于执行期的取值
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters[input.file]}" />
line mapper and other props
</bean>
参考的SPRING XML
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<channel id="fileChannel"/>
<channel id="jobLaunchRequestChannel"/>
<channel id="jobExecutionChannel"/>
<logging-channel-adapter channel="jobExecutionChannel" />
<file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
channel="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
<poller max-messages-per-poll="1" cron="0/1 * * * * *" />
</file:inbound-channel-adapter>
<service-activator input-channel="fileChannel"
output-channel="jobLaunchRequestChannel"
ref="fileToJobLaunchRequestAdapter"
method="adapt"/>
<service-activator input-channel="jobLaunchRequestChannel" output-channel="jobExecutionChannel">
<beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>
<beans:bean id="fileToJobLaunchRequestAdapter" class="example.FileToJobLaunchRequestAdapter">
<beans:property name="job" ref="helloWorldJob"/>
</beans:bean>
<beans:bean id="fileCreationTimeComparator" class="com.paul.integration.file.filters.comparator.FileCreationTimeComparator">
</beans:bean>
</beans:beans>
SPRING BATCH JOB的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<batch:job id="helloWorldJob">
<batch:step id="step1" next="xmlFileReadAndWriterStep">
<batch:tasklet ref="helloWorldTasklet"></batch:tasklet>
</batch:step>
<batch:step id="xmlFileReadAndWriterStep">
<batch:tasklet>
<batch:chunk reader="xmlReader" writer="xmlWriter" processor="xmlProcessor"
commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="helloWorldTasklet" class="example.HelloWorldTasklet"></bean>
<!-- XML文件读取 -->
<bean id="xmlReader"
class="org.springframework.batch.item.xml.StaxEventItemReader" scope="step">
<property name="fragmentRootElementName" value="trade" />
<property name="unmarshaller" ref="tradeMarshaller" />
<property name="resource" value="#{jobParameters['input.file.path']}" />
</bean>
<bean id="xmlProcessor" class="com.paul.batch.XMLProcessor" />
<!-- XML文件写入 -->
<bean id="xmlWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"
scope="step">
<property name="rootTagName" value="wanggc" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="resource" value="#{jobParameters['output.file.path']}" />
</bean>
<bean id="tradeMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade" value="com.paul.domain.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="name" value="java.lang.String" />
</util:map>
</property>
</bean>
</beans>
文件处理器
package com.paul.batch;
import org.springframework.batch.item.ItemProcessor;
import com.paul.domain.Trade;
public class XMLProcessor implements ItemProcessor<Trade, Trade> {
/**
* XML文件内容处理。
*
*/
@Override
public Trade process(Trade trade) throws Exception {
return trade;
}
}
DOMAIN TRADE对象
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.paul.domain;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* @author Rob Harrop
* @author Dave Syer
*/
public class Trade implements Serializable {
private String isin = "";
private long quantity = 0;
private BigDecimal price = new BigDecimal(0);
private String customer = "";
private Long id;
private long version = 0;
public Trade() {
}
public Trade(String isin, long quantity, BigDecimal price, String customer){
this.isin = isin;
this.quantity = quantity;
this.price = price;
this.customer = customer;
}
/**
* @param id
*/
public Trade(long id) {
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public void setCustomer(String customer) {
this.customer = customer;
}
public void setIsin(String isin) {
this.isin = isin;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public void setQuantity(long quantity) {
this.quantity = quantity;
}
public String getIsin() {
return isin;
}
public BigDecimal getPrice() {
return price;
}
public long getQuantity() {
return quantity;
}
public String getCustomer() {
return customer;
}
public String toString() {
return "Trade: [isin=" + this.isin + ",quantity=" + this.quantity + ",price="
+ this.price + ",customer=" + this.customer + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((customer == null) ? 0 : customer.hashCode());
result = prime * result + ((isin == null) ? 0 : isin.hashCode());
result = prime * result + ((price == null) ? 0 : price.hashCode());
result = prime * result + (int) (quantity ^ (quantity >>> 32));
result = prime * result + (int) (version ^ (version >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Trade other = (Trade) obj;
if (customer == null) {
if (other.customer != null)
return false;
}
else if (!customer.equals(other.customer))
return false;
if (isin == null) {
if (other.isin != null)
return false;
}
else if (!isin.equals(other.isin))
return false;
if (price == null) {
if (other.price != null)
return false;
}
else if (!price.equals(other.price))
return false;
if (quantity != other.quantity)
return false;
if (version != other.version)
return false;
return true;
}
}
从文件夹取出文件列表后,进行按修改时间排序的排序器
package com.paul.integration.file.filters.comparator;
import java.io.File;
import java.util.Comparator;
public class FileCreationTimeComparator implements Comparator<File>{
@Override
public int compare(File file1, File file2) {
return Long.valueOf(file2.lastModified()).compareTo(
Long.valueOf(file1.lastModified()));
// return file1.getName().compareToIgnoreCase(file2.getName());
}
}
封装了JOB和JOBPARAMETERS的HOLDER类
package example;
import java.io.File;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.util.Assert;
/**
* Adapt a {@link File} to a {@link JobLaunchRequest} with a job parameter
* <code>input.file</code> equal to the path of the file.
*
* @author Dave Syer
*
*/
@MessageEndpoint
public class FileToJobLaunchRequestAdapter implements InitializingBean {
private Job job;
public void setJob(Job job) {
this.job = job;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(job, "A Job must be provided");
}
@ServiceActivator
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
String fileName = file.getAbsolutePath();
if (!fileName.startsWith("/")) {
fileName = "/" + fileName;
}
fileName = "file://" + fileName;
String outPutFilePath = "file:/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/" +
"spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file/output/out.xml";
JobParameters jobParameters = new JobParametersBuilder().
addString("input.file.path", fileName).
addString("output.file.path", outPutFilePath).
addLong("time.stamp", System.currentTimeMillis()).
toJobParameters();
if (job.getJobParametersIncrementer() != null) {
jobParameters = job.getJobParametersIncrementer().getNext(jobParameters);
}
return new JobLaunchRequest(job, jobParameters);
}
}
TRADE测试数据文件
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
MAVEN中的JAR DEPENDENCY:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
至此全部通道已打通。
参考: