如果要对JMS BROKER生产和消费MESSAGE,一种方式是用JmsTemplate发送和消费消息,另一种方式是SPRING INTEGRATION。
SPRING INTEGRATION是实现了EIP模式的一种框架,即使用CHANNEL和JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER,完全脱离了JmsTemplate的API。
如果需要实现这种场景:从BROKER取一条消息,处理消息,且处理途中不要再从BROKER再取消息,处理完后再取消息,再处理。
这样要求手动开始和停止JMS LISTENER,即手动开始和停止JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER。
@Bean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
return Jms
.inboundAdapter(oracleConnectionFactory())
.configureJmsTemplate(
t -> t.deliveryPersistent(true)
.jmsMessageConverter(jacksonJmsMessageConverter())
).destination(jmsInbound).get();
}
当使用@InboundChannelAdapter时,会自动注册一个SourcePollingChannelAdapter ,但这个名字比较长:configrationName.loaderResponseSource.inboundChannelAdapter。
呼叫这个实例的start()和stop()方法即可。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
Message operation = MessageBuilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)
https://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapterhttps://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bushttps://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.javahttps://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail