SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。
这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。
GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。
OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。
INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。
详情参见此文:
https://blog.csdn.net/alexlau8/article/details/78056064。
<!-- Master jms -->
<int:channel id="MasterRequestChannel">
<int:dispatcher task-executor="RequestPublishExecutor"/>
</int:channel>
<task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
<!-- <int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
channel="MasterRequestChannel"/> -->
<int:channel id="MasterReplyChannel"/>
<!-- <int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
channel="MasterReplyChannel"/> -->
<int-jms:outbound-gateway
connection-factory="connectionFactory"
correlation-key="JMSCorrelationID"
request-channel="MasterRequestChannel"
request-destination-name="RequestQueue"
receive-timeout="30000"
reply-channel="MasterReplyChannel"
reply-destination-name="ReplyQueue"
async="true">
<int-jms:reply-listener />
</int-jms:outbound-gateway>
<!-- Slave jms -->
<int:channel id="SlaveRequestChannel"/>
<!-- <int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
channel="SlaveRequestChannel"/> -->
<int:channel id="SlaveReplyChannel"/>
<!-- <int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
channel="SlaveReplyChannel"/> -->
<int-jms:inbound-gateway
connection-factory="connectionFactory"
correlation-key="JMSCorrelationID"
request-channel="SlaveRequestChannel"
request-destination-name="RequestQueue"
reply-channel="SlaveReplyChannel"
default-reply-queue-name="ReplyQueue"/>
MASTER配置
package com.paul.testspringbatch.config.master;
import javax.jms.ConnectionFactory;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
//import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.JmsOutboundGateway;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-master")
public class IntegrationMasterConfiguration {
// @Value("${broker.url}")
// private String brokerUrl;
// @Bean
// public ActiveMQConnectionFactory connectionFactory() {
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// connectionFactory.setBrokerURL(this.brokerUrl);
// connectionFactory.setTrustAllPackages(true);
// return connectionFactory;
// }
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public DirectChannel requests() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(requests())
// .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
// .get();
// }
@Bean
public CustomScopeConfigurer customScopeConfigurer() {
CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
customScopeConfigurer.addScope("thread", new SimpleThreadScope());
return customScopeConfigurer;
}
// @Bean
// public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
// return new BeanFactoryPostProcessor() {
//
// @Override
// public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// beanFactory.registerScope("thread", new SimpleThreadScope());
// }
// };
// }
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
@Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public QueueChannel replies() {
return new QueueChannel();
}
// @Bean
// public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
// .channel(replies())
// .get();
// }
@Bean
public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
jmsOutboundGateway.setConnectionFactory(connectionFactory);
jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
jmsOutboundGateway.setRequiresReply(true);
jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
jmsOutboundGateway.setUseReplyContainer(true);
jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
jmsOutboundGateway.setReceiveTimeout(30_000);
return jmsOutboundGateway;
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())//1. receive message from this channel
.handle(jmsOutboundGateway(connectionFactory))
.channel(replies())//5. send back the response to this channel
.get();
}
}
SLAVE配置:
package com.paul.testspringbatch.config.slave;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-slave")
public class IntegrationSlaveConfiguration {
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
// .channel(requests())
// .get();
// }
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(replies())
// .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
// .get();
// }
@Bean
public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms
.inboundGateway(connectionFactory)
.destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
.correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
.requestChannel(requests())//3. send the message to this channel
.replyChannel(replies())//4. waitting the result from this channel
.defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
)
.get();
}
}