paulwong

SPRING BATCH remote chunking模式下可同时处理多文件

SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。

这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。

GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。

OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。

INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。

详情参见此文:https://blog.csdn.net/alexlau8/article/details/78056064

    <!-- Master jms -->
    <int:channel id="MasterRequestChannel">
        <int:dispatcher task-executor="RequestPublishExecutor"/>
    </int:channel>
    <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
<!--    <int-jms:outbound-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="RequestQueue" 
        channel="MasterRequestChannel"/> 
-->

    <int:channel id="MasterReplyChannel"/>
<!--    <int-jms:message-driven-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="ReplyQueue"
        channel="MasterReplyChannel"/> 
-->

    <int-jms:outbound-gateway
        
connection-factory="connectionFactory"
        correlation-key
="JMSCorrelationID"
        request-channel
="MasterRequestChannel"
        request-destination-name
="RequestQueue"
        receive-timeout
="30000"
        reply-channel
="MasterReplyChannel"
        reply-destination-name
="ReplyQueue"
        async
="true">
        <int-jms:reply-listener />
    </int-jms:outbound-gateway>

    <!-- Slave jms -->
    <int:channel id="SlaveRequestChannel"/>
<!--    <int-jms:message-driven-channel-adapter
        connection-factory="connectionFactory" 
        destination-name="RequestQueue"
        channel="SlaveRequestChannel"/> 
-->

    <int:channel id="SlaveReplyChannel"/>
<!--    <int-jms:outbound-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="ReplyQueue"
        channel="SlaveReplyChannel"/> 
-->

    <int-jms:inbound-gateway
        
connection-factory="connectionFactory"
        correlation-key
="JMSCorrelationID"
        request-channel
="SlaveRequestChannel"
        request-destination-name
="RequestQueue"
        reply-channel
="SlaveReplyChannel"
        default-reply-queue-name
="ReplyQueue"/>

MASTER配置
package com.paul.testspringbatch.config.master;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.config.CustomScopeConfigurer;
//import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.JmsOutboundGateway;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration
@EnableIntegration
@Profile("batch-master")
public class IntegrationMasterConfiguration {
    
//    @Value("${broker.url}")
//    private String brokerUrl;


//    @Bean
//    public ActiveMQConnectionFactory connectionFactory() {
//        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//        connectionFactory.setBrokerURL(this.brokerUrl);
//        connectionFactory.setTrustAllPackages(true);
//        return connectionFactory;
//    }

    /*
     * Configure outbound flow (requests going to workers)
     
*/
    @Bean
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public DirectChannel requests() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(requests())
//                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
//                .get();
//    }
    
     @Bean
     public CustomScopeConfigurer customScopeConfigurer() {
         CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
         customScopeConfigurer.addScope("thread", new SimpleThreadScope());
         return customScopeConfigurer;
     }
     
//     @Bean
//     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
//         return new BeanFactoryPostProcessor() {
//                
//             @Override
//             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
//                    beanFactory.registerScope("thread", new SimpleThreadScope());
//                }
//              };
//     }
    
    /*
     * Configure inbound flow (replies coming from workers)
     
*/
    @Bean
    @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public QueueChannel replies() {
        return new QueueChannel();
    }

//    @Bean
//    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
//                .channel(replies())
//                .get();
//    }

    @Bean
    public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
        JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
        jmsOutboundGateway.setConnectionFactory(connectionFactory);
        jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
        jmsOutboundGateway.setRequiresReply(true);
        jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
        jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
        jmsOutboundGateway.setUseReplyContainer(true);
        jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
        jmsOutboundGateway.setReceiveTimeout(30_000);
        return jmsOutboundGateway;
    }

    @Bean
    public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                        .from(requests())//1. receive message from this channel
                        .handle(jmsOutboundGateway(connectionFactory))
                        .channel(replies())//5. send back the response to this channel
                        .get();
    }

}


SLAVE配置:
package com.paul.testspringbatch.config.slave;

import javax.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration
@EnableIntegration
@Profile("batch-slave")
public class IntegrationSlaveConfiguration {
    

    /*
     * Configure inbound flow (requests coming from the master)
     
*/
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
//                .channel(requests())
//                .get();
//    }

    /*
     * Configure outbound flow (replies going to the master)
     
*/
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(replies())
//                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
//                .get();
//    }

    @Bean
    public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                    .from(Jms
                            .inboundGateway(connectionFactory)
                            .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                            .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                            .requestChannel(requests())//3. send the message to this channel
                            .replyChannel(replies())//4. waitting the result from this channel
                            .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                            )
                    .get();
    }

}

posted on 2019-07-16 14:38 paulwong 阅读(842) 评论(0)  编辑  收藏 所属分类: SRPING BATCH


只有注册用户登录后才能发表评论。


网站导航: