package com.jwebs.learn.errorhandling;
import java.util.Random;
import javax.jms.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.jms.Jms;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessagingException;
/**
* Show how to handle error in spring integration flow.
* Please note, errorChannel in spring integration only applicable to
* error thrown in asynch component.
*
* @author zakyalvan
*/@SpringBootApplication
@IntegrationComponentScan
public class ErrorHandlingApplication {
public static void main(String[] args)
throws Exception {
ConfigurableApplicationContext applicationContext =
new SpringApplicationBuilder(ErrorHandlingApplication.
class)
.web(
false)
.run(args);
Runtime.getRuntime().addShutdownHook(
new Thread(() -> applicationContext.close()));
System.out.println("Pres enter key to exit
");
System.in.read();
System.exit(0);
}
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public MessageSource<Integer> randomIntegerMessageSource() {
return () -> MessageBuilder.withPayload(
new Random().nextInt()).build();
}
@Bean
public IntegrationFlow withErrorFlow() {
return IntegrationFlows.from(randomIntegerMessageSource(), spec -> spec.poller(Pollers.fixedDelay(1000)))
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("processor.input")
.replyContainer(spec -> spec.sessionTransacted(
true)))
.get();
}
@Autowired
@Qualifier("errorChannel")
private PublishSubscribeChannel errorChannel;
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(errorChannel)
.handle(message -> System.out.println("@@@@@@@@@@@@@@@@@@@@@" + ((MessagingException) message.getPayload()).getFailedMessage().getPayload()))
.get();
}
}