paulwong

SPRING INTEGRATION + SPRING BATCH

SPRING INTEGRATION的强项是事件驱动,但捕获之后,要做的事是触发一个类的方法,对于要处理大数据量的文件,就没有办法了,如读取1w条记录,然后插入数据库。而这个正是SPRING BATCH的强项所在,因此有必要将此两个框架整合起来用。

场景:盯着一个文件夹,如果一有文件,此文件可能非常大的,则启动一个BATCH JOB来处理。


文件拉取器,监控文件夹一有新文件,则将此文件包装成MESSAGE,丢到下一个通道中:
<file:inbound-channel-adapter id="filePoller"
                              channel
="filesAreComing" 
                              directory
="file:${input.directory}"
                              filename-pattern
="test*" />


filesAreComing通道的ServiceActivator
public JobLaunchRequest adapt(File file) throws NoSuchJobException {

    JobParameters jobParameters = new JobParametersBuilder().addString(
            "input.file", file.getAbsolutePath()).toJobParameters();

    return new JobLaunchRequest(job, jobParameters);
}


jobLauncher通道的ServiceActivator
<service-activator input-channel="jobLauncher">
    <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
        <beans:constructor-arg ref="jobLauncher" />
    </beans:bean>
</service-activator>


"file.input"依赖于执行期的取值
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="resource" value="#{jobParameters[input.file]}" />
     line mapper and other props
</bean>


参考的SPRING XML
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:beans
="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:file
="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation
="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
>

    <channel id="fileChannel"/>
    <channel id="jobLaunchRequestChannel"/>
    <channel id="jobExecutionChannel"/>
    <logging-channel-adapter channel="jobExecutionChannel" />

    <file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
        channel
="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
        <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
    </file:inbound-channel-adapter>
    
    <service-activator input-channel="fileChannel"
                       output-channel
="jobLaunchRequestChannel"
                       ref
="fileToJobLaunchRequestAdapter"
                       method
="adapt"/>
                       
    <service-activator input-channel="jobLaunchRequestChannel" output-channel="jobExecutionChannel">
        <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
            <beans:constructor-arg ref="jobLauncher" />
        </beans:bean>
    </service-activator>

    <beans:bean id="fileToJobLaunchRequestAdapter" class="example.FileToJobLaunchRequestAdapter">
        <beans:property name="job" ref="helloWorldJob"/>
    </beans:bean>
    
    
    <beans:bean id="fileCreationTimeComparator" class="com.paul.integration.file.filters.comparator.FileCreationTimeComparator">
    </beans:bean>

</beans:beans>

SPRING BATCH JOB的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch
="http://www.springframework.org/schema/batch"
    xmlns:util
="http://www.springframework.org/schema/util"
    xsi:schemaLocation
="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"
>

    

    <batch:job id="helloWorldJob">
        <batch:step id="step1" next="xmlFileReadAndWriterStep">
            <batch:tasklet ref="helloWorldTasklet"></batch:tasklet>
        </batch:step>
        <batch:step id="xmlFileReadAndWriterStep">
             <batch:tasklet>
                 <batch:chunk reader="xmlReader" writer="xmlWriter" processor="xmlProcessor"
                     commit-interval
="10">
                 </batch:chunk>
             </batch:tasklet>
         </batch:step>
    </batch:job>
    
    <bean id="helloWorldTasklet" class="example.HelloWorldTasklet"></bean>
    
    <!-- XML文件读取 -->
     <bean id="xmlReader"
         class
="org.springframework.batch.item.xml.StaxEventItemReader" scope="step">
         <property name="fragmentRootElementName" value="trade" />
         <property name="unmarshaller" ref="tradeMarshaller" />
         <property name="resource" value="#{jobParameters['input.file.path']}" />
     </bean>
 
     <bean id="xmlProcessor" class="com.paul.batch.XMLProcessor" />
 
     <!-- XML文件写入 -->
    <bean id="xmlWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"
        scope
="step">
        <property name="rootTagName" value="wanggc" />
        <property name="marshaller" ref="tradeMarshaller" />
        <property name="resource" value="#{jobParameters['output.file.path']}" />
    </bean>

    <bean id="tradeMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">
        <property name="aliases">
            <util:map id="aliases">
                <entry key="trade" value="com.paul.domain.Trade" />
                <entry key="price" value="java.math.BigDecimal" />
                <entry key="name" value="java.lang.String" />
            </util:map>
        </property>
    </bean>
    
</beans>


文件处理器
package com.paul.batch;

import org.springframework.batch.item.ItemProcessor;

import com.paul.domain.Trade;

public class XMLProcessor implements ItemProcessor<Trade, Trade> {

    /**
     * XML文件内容处理。
     * 
     
*/
    @Override
    public Trade process(Trade trade) throws Exception {
        return trade;
    }
}


DOMAIN TRADE对象
/*
 * Copyright 2006-2007 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      
http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 
*/

package com.paul.domain;

import java.io.Serializable;
import java.math.BigDecimal;


/**
 * 
@author Rob Harrop
 * 
@author Dave Syer
 
*/
public class Trade implements Serializable {
    private String isin = "";
    private long quantity = 0;
    private BigDecimal price = new BigDecimal(0);
    private String customer = "";
    private Long id;
    private long version = 0;

    public Trade() {
    }
    
    public Trade(String isin, long quantity, BigDecimal price, String customer){
        this.isin = isin;
        this.quantity = quantity;
        this.price = price;
        this.customer = customer;
    }

    /**
     * 
@param id
     
*/
    public Trade(long id) {
        this.id = id;
    }
    
    public long getId() {
        return id;
    }
    
    public void setId(long id) {
        this.id = id;
    }

    public long getVersion() {
        return version;
    }

    public void setVersion(long version) {
        this.version = version;
    }

    public void setCustomer(String customer) {
        this.customer = customer;
    }

    public void setIsin(String isin) {
        this.isin = isin;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public void setQuantity(long quantity) {
        this.quantity = quantity;
    }

    public String getIsin() {
        return isin;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public long getQuantity() {
        return quantity;
    }

    public String getCustomer() {
        return customer;
    }

    public String toString() {
        return "Trade: [isin=" + this.isin + ",quantity=" + this.quantity + ",price="
            + this.price + ",customer=" + this.customer + "]";
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((customer == null) ? 0 : customer.hashCode());
        result = prime * result + ((isin == null) ? 0 : isin.hashCode());
        result = prime * result + ((price == null) ? 0 : price.hashCode());
        result = prime * result + (int) (quantity ^ (quantity >>> 32));
        result = prime * result + (int) (version ^ (version >>> 32));
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Trade other = (Trade) obj;
        if (customer == null) {
            if (other.customer != null)
                return false;
        }
        else if (!customer.equals(other.customer))
            return false;
        if (isin == null) {
            if (other.isin != null)
                return false;
        }
        else if (!isin.equals(other.isin))
            return false;
        if (price == null) {
            if (other.price != null)
                return false;
        }
        else if (!price.equals(other.price))
            return false;
        if (quantity != other.quantity)
            return false;
        if (version != other.version)
            return false;
        return true;
    }
    
 }


从文件夹取出文件列表后,进行按修改时间排序的排序器
package com.paul.integration.file.filters.comparator;

import java.io.File;
import java.util.Comparator;

public class FileCreationTimeComparator implements Comparator<File>{

    @Override
    public int compare(File file1, File file2) {
        return Long.valueOf(file2.lastModified()).compareTo(
                Long.valueOf(file1.lastModified()));
//        return file1.getName().compareToIgnoreCase(file2.getName());
    }
    
}


封装了JOB和JOBPARAMETERS的HOLDER类

package example;

import java.io.File;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.util.Assert;

/**
 * Adapt a {
@link File} to a {@link JobLaunchRequest} with a job parameter
 * <code>input.file</code> equal to the path of the file.
 * 
 * 
@author Dave Syer
 * 
 
*/
@MessageEndpoint
public class FileToJobLaunchRequestAdapter implements InitializingBean {

    private Job job;

    public void setJob(Job job) {
        this.job = job;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(job, "A Job must be provided");
    }

    @ServiceActivator
    public JobLaunchRequest adapt(File file) throws NoSuchJobException {

        String fileName = file.getAbsolutePath();

        if (!fileName.startsWith("/")) {
            fileName = "/" + fileName;
        }

        fileName = "file://" + fileName;
        
        String outPutFilePath = "file:/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/" +
                "spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file/output/out.xml";

        JobParameters jobParameters = new JobParametersBuilder().
                addString("input.file.path", fileName).
                addString("output.file.path", outPutFilePath).
                addLong("time.stamp", System.currentTimeMillis()).
                toJobParameters();

        if (job.getJobParametersIncrementer() != null) {
            jobParameters = job.getJobParametersIncrementer().getNext(jobParameters);
        }

        return new JobLaunchRequest(job, jobParameters);

    }

}


TRADE测试数据文件
<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

MAVEN中的JAR DEPENDENCY:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
    <version>1.2.1.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

至此全部通道已打通。


参考:
http://stackoverflow.com/questions/7099543/launching-spring-batch-job

http://stackoverflow.com/questions/11758672/spring-batch-flatfileitemreader-read-multiple-files

https://github.com/SpringSource/spring-batch-admin/blob/master/spring-batch-admin-manager/src/main/java/org/springframework/batch/admin/integration/FileToJobLaunchRequestAdapter.java

http://blog.springsource.org/2010/02/15/practical-use-of-spring-batch-and-spring-integration/

http://www.enterpriseintegrationpatterns.com/ramblings/18_starbucks.html

http://static.springsource.org/spring-integration/docs/2.0.3.RELEASE/reference/html/jdbc.html

posted on 2012-10-16 00:11 paulwong 阅读(5460) 评论(7)  编辑  收藏 所属分类: SPRING INTERGRATIONSRPING BATCH

Feedback

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-16 16:17 RoJeff

你好:现在有个问题请求帮助。
需求:数据源来自多个目录下的文件,而且文件的格式不一样。该怎么解决?跪求结果,谢谢了

现在读取多个目录下的文件的问题已经解决了,我是通过配置多个输入channel,一个输出 channel。就是对多个目录进行监控。我是没有办法,才这样。因为我需要监控多个目录。

还个难题就是多个文件进来了,格式不一样,怎么样去调用不同的分割器,分割文件。或者说调用不到的读取器

以下是我的applicationContext-integration.xml配置

<channel id="videoPlayerFileChannel" />
<channel id="videoPlayerJobLaunchRequestChannel" />
<channel id="videoPlayerJobExecutionChannel" />
<logging-channel-adapter channel="videoPlayerJobExecutionChannel" />

<file:inbound-channel-adapter id="videoPlayerInboundChannelAdapter"
directory="${config.1000000022.SourcePath}" auto-create-directory="true"
auto-startup="true" channel="videoPlayerFileChannel" filename-pattern="${config.1000000022.SourceType}"
comparator="fileComparator">
<poller max-messages-per-poll="1" cron="0/1 * * * * *" />
</file:inbound-channel-adapter>

<service-activator input-channel="videoPlayerFileChannel"
output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
method="adapt" />

<service-activator input-channel="videoPlayerJobLaunchRequestChannel"
output-channel="videoPlayerJobExecutionChannel">
<beans:bean
class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>

<beans:bean id="launcher" class="com.zjcy.cbs.pretreatment.batch.Launcher">
<beans:property name="job" ref="pretreatJob" />
</beans:bean>


<beans:bean id="fileComparator"
class="com.zjcy.cbs.pretreatment.batch.FileComparator">
</beans:bean>


<channel id="fileChannelMessage" />
<channel id="messageJobLaunchRequestChannel" />
<channel id="messageJobExecutionChannel" />
<logging-channel-adapter channel="messageJobExecutionChannel" />


<file:inbound-channel-adapter id="fileChannelMessage1"
directory="${config.2000000001.SourcePath}" auto-create-directory="true" auto-startup="true"
channel="fileChannelMessage" filename-pattern="*${config.2000000001.SourceType}" comparator="fileComparator">
<poller max-messages-per-poll="1" cron="0/1 * * * * *" />
</file:inbound-channel-adapter>

<service-activator input-channel="fileChannelMessage"
output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
method="adapt" />  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-17 22:39 paulwong

@RoJeff
可以在videoPlayerFileChannel中取得文件后缀名,加一个选择判断器之类的东西,按后缀名分派到不同的CHANNEL中。  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH[未登录] 2012-11-18 09:44 RoJeff

@paulwong
非常谢谢paulwong的帮助。
  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-18 09:44 RoJeff

@paulwong
非常谢谢paulwong的帮助。  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 17:25 RoJeff

@paulwong
请教问题:
需求:file:inbound-channel-adapter 能配置多个目录吗,或者说可以包括子目录吗?如果可以,该怎么配置?

<file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
channel="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
<poller max-messages-per-poll="1" cron="0/1 * * * * *" />
</file:inbound-channel-adapter>  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:17 paulwong

@RoJeff
<bean name="scanner" class="org.springframework.integration.file.RecursiveLeafOnlyDirectoryScanner"/>

<file:inbound-channel-adapter
id="inputChannel" scanner="scanner" directory="/some/folder">  回复  更多评论   

# re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:47 RoJeff

@paulwong
大牛就是大牛啊,这个问题得到解决,非常感觉。  回复  更多评论   



只有注册用户登录后才能发表评论。


网站导航: