#
@Configuration
@DependsOn(value="cachingConnectionFactory")
public class JmsTemplateConfiguration {
@Value("${wechat.sendmessage.queue}")
private String queueName;
@Value("${wechat.sendmessage.topic}")
private String topicName;
@Value("${spring.jms.pub-sub-domain}")
private boolean isPubSubDomain;
/**
* 定义点对点队列
* @return
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
/**
* 定义一个主题
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;
private final CachingConnectionFactory cachingConnectionFactory;
@Autowired
public JmsTemplateConfiguration(ObjectProvider<DestinationResolver> destinationResolver,
ObjectProvider<MessageConverter> messageConverter,
CachingConnectionFactory cachingConnectionFactory) {
this.destinationResolver = destinationResolver;
this.messageConverter = messageConverter;
this.cachingConnectionFactory = cachingConnectionFactory;
}
/**
* 配置队列生产者的JmsTemplate
* @return JmsTemplate
*/
@Bean(name="jmsQueueTemplate")
public JmsTemplate jmsQueueTemplate() {
//设置创建连接的工厂
//JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//优化连接工厂,这里应用缓存池 连接工厂就即可
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置默认消费topic
//jmsTemplate.setDefaultDestination(topic());
//设置P2P队列消息类型
jmsTemplate.setPubSubDomain(isPubSubDomain);
DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
if (destinationResolver != null) {
jmsTemplate.setDestinationResolver(destinationResolver);
}
MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
if (messageConverter != null) {
jmsTemplate.setMessageConverter(messageConverter);
}
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
//定义持久化后节点挂掉以后,重启可以继续消费.
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
//如果不启用事务,则会导致XA事务失效;
//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
//jmsTemplate.setSessionTransacted(true);
//消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
//Session.AUTO_ACKNOWLEDGE 消息自动签收
//Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
//Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
/**
* 配置发布订阅生产者的JmsTemplate
* @return JmsTemplate
*/
@Bean(name="jmsTopicTemplate")
public JmsTemplate jmsTopicTemplate() {
//设置创建连接的工厂
//JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//优化连接工厂,这里应用缓存池 连接工厂就即可
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置默认消费topic
//jmsTemplate.setDefaultDestination(topic());
//设置发布订阅消息类型
jmsTemplate.setPubSubDomain(isPubSubDomain);
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("是否开启事务"+jmsTemplate.isSessionTransacted());
//如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
//jmsTemplate.setSessionTransacted(true);
//不带事务的session的签收方式,取决于session的配置。
//默认消息确认方式为1,即AUTO_ACKNOWLEDGE
System.out.println("是否消息确认方式"+jmsTemplate.getSessionAcknowledgeMode());
//消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
//Session.AUTO_ACKNOWLEDGE 消息自动签收
//Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
//Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
}
Why Enterprise Integration Patterns?
Enterprise integration is too complex to be solved with a simple 'cookbook' approach. Instead, patterns can provide guidance by documenting the kind of experience that usually lives only in architects' heads: they are accepted solutions to recurring problems within a given context. Patterns are abstract enough to apply to most integration technologies, but specific enough to provide hands-on guidance to designers and architects. Patterns also provide a vocabulary for developers to efficiently describe their solution.
Patterns are not 'invented'; they are harvested from repeated use in practice. If you have built integration solutions, it is likely that you have used some of these patterns, maybe in slight variations and maybe calling them by a different name. The purpose of this site is not to "invent" new approaches, but to present a coherent collection of relevant and proven patterns, which in total form an integration pattern language.
Despite the 700+ pages, our book covers only a fraction of patterns (and the problems to be solved) in the integration space. The current patterns focus on Messaging, which forms the basis of most other integration patterns. We have started to harvest more patterns but are realizing (once again) how much work documenting these patterns really is. So please stay tuned.
Messaging Patterns
We have documented 65 messaging patterns, organized as follows:
https://www.enterpriseintegrationpatterns.com/patterns/messaging/index.html
A detailed step-by-step tutorial on how to connect to a JMS broker using a Spring Integration Gateway and Spring Boot.
A detailed step-by-step tutorial on how to connect to Apache ActiveMQ Artemis using Spring JMS and Spring Boot.
A detailed step-by-step tutorial on how to publish/subscribe to a JMS topic using Spring JMS and Spring Boot.
A detailed step-by-step tutorial on how to connect to an ActiveMQ JMS broker using Spring Integration and Spring Boot.
A detailed step-by-step tutorial on how a Spring JMS listener works in combination with Spring Boot.
A detailed step-by-step tutorial on how to use JmsTemplate in combination with Spring JMS and Spring Boot.
A detailed step-by-step tutorial on how to implement a message selector using Spring JMS and Spring Boot.
A detailed step-by-step tutorial on how to implement a message converter using Spring JMS and Spring Boot.
A detailed step-by-step tutorial on how to use a Spring Boot admin UI to manage Spring Batch jobs.
A detailed step-by-step tutorial on how to implement a Spring Batch Tasklet using Spring Boot.
A detailed step-by-step tutorial on how to implement a Hello World Spring Batch job using Spring Boot.
This time I decided to play a little bit with Spring Integration Java DSL. Which has been merged directly into Spring Integration Core 5.0, which is smart and obvious move because:
- Everyone starting the new Spring projects based on Java Config uses that
- SI Java DSL enables you to use new powerfull Java 8 features like Lambdas
- You can build your flow using the Builder pattern based on IntegrationFlowBuilder
Let's take a look on the samples howto use that based on ActiveMQ JMS.
https://bitbucket.org/tomask79/spring-integration-java-dsl/src/master/
SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。
这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。
GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。
OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。
INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。
详情参见此文:
https://blog.csdn.net/alexlau8/article/details/78056064。
<!-- Master jms -->
<int:channel id="MasterRequestChannel">
<int:dispatcher task-executor="RequestPublishExecutor"/>
</int:channel>
<task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
<!-- <int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
channel="MasterRequestChannel"/> -->
<int:channel id="MasterReplyChannel"/>
<!-- <int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
channel="MasterReplyChannel"/> -->
<int-jms:outbound-gateway
connection-factory="connectionFactory"
correlation-key="JMSCorrelationID"
request-channel="MasterRequestChannel"
request-destination-name="RequestQueue"
receive-timeout="30000"
reply-channel="MasterReplyChannel"
reply-destination-name="ReplyQueue"
async="true">
<int-jms:reply-listener />
</int-jms:outbound-gateway>
<!-- Slave jms -->
<int:channel id="SlaveRequestChannel"/>
<!-- <int-jms:message-driven-channel-adapter
connection-factory="connectionFactory"
destination-name="RequestQueue"
channel="SlaveRequestChannel"/> -->
<int:channel id="SlaveReplyChannel"/>
<!-- <int-jms:outbound-channel-adapter
connection-factory="connectionFactory"
destination-name="ReplyQueue"
channel="SlaveReplyChannel"/> -->
<int-jms:inbound-gateway
connection-factory="connectionFactory"
correlation-key="JMSCorrelationID"
request-channel="SlaveRequestChannel"
request-destination-name="RequestQueue"
reply-channel="SlaveReplyChannel"
default-reply-queue-name="ReplyQueue"/>
MASTER配置
package com.paul.testspringbatch.config.master;
import javax.jms.ConnectionFactory;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
//import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.JmsOutboundGateway;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-master")
public class IntegrationMasterConfiguration {
// @Value("${broker.url}")
// private String brokerUrl;
// @Bean
// public ActiveMQConnectionFactory connectionFactory() {
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// connectionFactory.setBrokerURL(this.brokerUrl);
// connectionFactory.setTrustAllPackages(true);
// return connectionFactory;
// }
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public DirectChannel requests() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(requests())
// .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
// .get();
// }
@Bean
public CustomScopeConfigurer customScopeConfigurer() {
CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
customScopeConfigurer.addScope("thread", new SimpleThreadScope());
return customScopeConfigurer;
}
// @Bean
// public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
// return new BeanFactoryPostProcessor() {
//
// @Override
// public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// beanFactory.registerScope("thread", new SimpleThreadScope());
// }
// };
// }
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
@Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public QueueChannel replies() {
return new QueueChannel();
}
// @Bean
// public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
// .channel(replies())
// .get();
// }
@Bean
public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
jmsOutboundGateway.setConnectionFactory(connectionFactory);
jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
jmsOutboundGateway.setRequiresReply(true);
jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
jmsOutboundGateway.setUseReplyContainer(true);
jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
jmsOutboundGateway.setReceiveTimeout(30_000);
return jmsOutboundGateway;
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())//1. receive message from this channel
.handle(jmsOutboundGateway(connectionFactory))
.channel(replies())//5. send back the response to this channel
.get();
}
}
SLAVE配置:
package com.paul.testspringbatch.config.slave;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import com.paul.testspringbatch.common.constant.IntegrationConstant;
@Configuration
@EnableIntegration
@Profile("batch-slave")
public class IntegrationSlaveConfiguration {
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
// .channel(requests())
// .get();
// }
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
// @Bean
// public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
// return IntegrationFlows
// .from(replies())
// .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
// .get();
// }
@Bean
public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms
.inboundGateway(connectionFactory)
.destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
.correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
.requestChannel(requests())//3. send the message to this channel
.replyChannel(replies())//4. waitting the result from this channel
.defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
)
.get();
}
}
在SPRING BATCH中,通常ROUTER是针对STEP的,但是如果在一个STEP中有多个WRITER,每个WRITER是写不同文件的,因此需要一个STEP内的ROUTER,以便能ROUTE到不同的WRITER中。
https://gist.github.com/benas/bfe2be7386b99ce496425fac9ff35fb8
在SPRING BATCH REMOTE CHUNKING的模式下:
SPRING BATCH 读文件时,是按一行一行来读取数据,再按CHUNKSIZE提交到REMOTE操作,有时要整合当前行和下几行,再决定CHUNKSIZE,以便相关的数据能在远程同一个PROCESSOR中按顺序进行处理,因为相关的数据被拆成几个CHUNK来处理的话,就有可能不按顺序来处理。这样就需要动态调整CHUNKSIZE。
参照如下:
https://stackoverflow.com/questions/37390602/spring-batch-custom-completion-policy-for-dynamic-chunk-size
并结合SingleItemPeekableItemReader(装饰者,允许查看下一条数据,真正的操作委托给代理)。
Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).
In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.
For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.
For more information, check here.
Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).
- Using JMSTemplate
- Using Spring Integration
For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.
Using JMSTemplate
First, we include the required dependencies. Replace the activemq
dependency with your messaging system’s jars if not using ActiveMQ
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.
spring:
activemq:
broker-url: tcp://localhost:61616
non-blocking-redelivery: true
packages:
trust-all: true
- Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
@EnableJms
@Configuration
public class ActiveMQConfig {
public static final String ORDER_QUEUE = "order-queue";
public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";
}
First, we will configure the Producer
@Slf4j
@Service
public class Producer {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
JmsTemplate jmsTemplate;
private Session session;
@PostConstruct
public void init(){
jmsTemplate.setReceiveTimeout(1000L);
jmsMessagingTemplate.setJmsTemplate(jmsTemplate);
session = jmsMessagingTemplate.getConnectionFactory().createConnection()
.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public Shipment sendWithReply(Order order) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(order);
objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
objectMessage.setJMSReplyTo(new ActiveMQQueue(ORDER_REPLY_2_QUEUE));
objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
objectMessage.setJMSExpiration(1000L);
objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
objectMessage, Shipment.class); //this operation seems to be blocking + sync
}
}
- Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
Next, we can see the Receiver
@Component
public class Receiver implements SessionAwareMessageListener<Message> {
@Override
@JmsListener(destination = ORDER_QUEUE)
public void onMessage(Message message, Session session) throws JMSException {
Order order = (Order) ((ActiveMQObjectMessage) message).getObject();
Shipment shipment = new Shipment(order.getId(), UUID.randomUUID().toString());
// done handling the request, now create a response message
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(shipment);
// Message sent back to the replyTo address of the income message.
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}
}
- Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.
Using Spring Integration
First, we include the required dependencies in addition to the above dependencies
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>
Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.
spring:
activemq:
broker-url: tcp://localhost:61616
non-blocking-redelivery: true
packages:
trust-all: true
- Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
Next we create the required Beans for the Spring Integration.
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class ActiveMQConfig {
public static final String ORDER_QUEUE = "order-queue";
public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public MessageChannel requests() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "requests")
public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(activeMQConnectionFactory);
gateway.setRequestDestinationName(ORDER_QUEUE);
gateway.setReplyDestinationName(ORDER_REPLY_2_QUEUE);
gateway.setCorrelationKey("JMSCorrelationID");
gateway.setSendTimeout(100L);
gateway.setReceiveTimeout(100L);
return gateway;
}
@Autowired
Receiver receiver;
@Bean
public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(activeMQConnectionFactory);
container.setDestinationName(ORDER_QUEUE);
MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public Shipment handleMessage(Order order) {
return receiver.receiveMessage(order);
}
});
container.setMessageListener(adapter);
return container;
}
}
Next, we will configure the MessagingGateway
@MessagingGateway(defaultRequestChannel = "requests")
public interface ClientGateway {
Shipment sendAndReceive(Order order);
}
We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.
@Slf4j
@Component
public class Receiver {
public Shipment receiveMessage(@Payload Order order) {
Shipment shipment = new Shipment(order.getId(), UUID.randomUUID().toString());
return shipment;
}
}
- Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.
For those, who just want to clone the code, head out to aniruthmp/jms