在SPRING INTEGRATION中,如果exception发生在各种thread里时,如何将exception返回到指定的channel,之后再绕回到aggrator-channel中。
@Bean
public IntegrationFlow provisionUserFlow() {
return
IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s.applySequence(true)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerA, "provision")
.channel("aggregatorChannel")
)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerB, "provision")
.channel("aggregatorChannel"))
)
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
.channel( aggregatorChannel)
.aggregate( a -> a.processor( collect, "aggregatingMethod"))
.get();
}
@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
Exception exception = (Exception) errorMessage.getPayload();
return MessageBuilder.withPayload( exception.getMessage())
.copyHeadersIfAbsent( failedMessage.getHeaders() )
.build();
}
https://stackoverflow.com/q/46495127/11790720