场景,餐厅:
- 食客下单,有饮品、食物、甜点
- 侍应接单,传送给厨房
- 厨房分三个子流程处理,即饮品、食物、甜点子流程
- 等待三个子流程处理完,合并成一份交付
- 如果厨房发现某食物欠缺,会通知侍应,展开错误处理,即通知食客更改食物,再交给厨房
- 侍应将交付品传送给食客
有一个主流程、三个子流程和一个聚合流程,聚合流程会聚合三个子流程的产物,通知主流程,再往下走。
并且主流程会感知子流程的错误,并会交给相应错误处理流程处理,且将结果再交给聚合流程。
对应SPRING INTEGRATION 的SCATTERGATHER模式:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
https://github.com/adnanmamajiwala/spring-integration-sample/tree/master/dsl-scatter-gather/src/main/java/com/si/dsl/scattergatherhttps://docs.spring.io/spring-integration/docs/5.1.x/reference/html/#scatter-gather