SPRING CLOUD STREAM内置了一个RoutingFunction,能将MESSAGE路由到应用的其他FUNCTION中。
对接RoutingFunction可发送消息到其外部DESTINATION中或用“|”连接符连接。
application.yaml
# This setting can increase or decrease the rate of message production (1000 = 1s)
# spring.cloud.stream.poller.fixed-delay=1000
# DefaultPollerProperties
# This setting can control which function method in our code will be triggered if there are multiple
# spring.cloud.function.definition=supplyLoan
# Give the autogenerated binding a friendlier name
spring:
application:
name: loan-check-rabbit
banner:
location: classpath:/banner-rabbit.txt
cloud:
#BindingServiceProperties
stream:
#StreamFunctionProperties
function:
definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
routing:
enabled: true
#BindingProperties
bindings:
loanCheckerProcessor|functionRouter-in-0:
destination: queue.pretty.log.messages
binder: local_rabbit
loanApprovedConsumer-in-0:
destination: load.approved
binder: local_rabbit
loanDeclinedConsumer-in-0:
destination: load.declined
binder: local_rabbit
loanCheckerDecieder-in-0:
destination: queue.pretty.log.messages.222
binder: local_rabbit
loanCheckerDecieder-out-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
loanCheckerConsumer-in-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
#BinderProperties
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 10.80.27.69
port: 5672
username: guest
password: guest
virtual-host: my-virtual-host
logging:
level:
root: info
org.springframework:
cloud.function: debug
#retry: debug
LoanCheckConfiguration.java
package com.paul.testspringcloudstream.loancheck.config;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;
import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
import com.paul.testspringcloudstream.loancheck.service.LoanService;
@Configuration
public class LoanCheckConfiguration {
private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
private static final Long MAX_AMOUNT = 10000L;
private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
@Autowired
public void test(Consumer<Loan> loanCheckerConsumer) {
log.info("{}", loanCheckerConsumer.getClass());
}
@Bean
public Consumer<Loan> loanCheckerConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public Consumer<Loan> loanDeclinedConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public Consumer<Loan> loanApprovedConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public MessageRoutingCallback loanCheckerRouter() {
return new LoanCheckerRouter();
}
@Bean
public Function<Loan, Loan> loanCheckerProcessor(
LoanService loanService
){
return loan -> loanService.check(loan);
}
@Bean
public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
LoanService loanService
){
return loan -> {
Loan result = loanService.check(loan);
String sendTo = Status.DECLINED.name().equals(result.getStatus()) ?
LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
return MessageBuilder.withPayload(result)
.setHeader("spring.cloud.stream.sendto.destination", sendTo)
.build();
};
}
@Bean
public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
return loan -> {
log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Status.DECLINED.name());
streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
} else {
loan.setStatus(Status.APPROVED.name());
streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
}
log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
};
}
}
LoanCheckerRouter.java,将路由条件统一在此处
package com.paul.testspringcloudstream.loancheck.router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.messaging.Message;
import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;
public class LoanCheckerRouter implements MessageRoutingCallback{
private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);
@Override
public String functionDefinition(Message<?> message) {
// byte[] resultByte = (byte[])message.getPayload();
// String resultString = new String(resultByte);
//
// return "loanDeclinedConsumer";
Loan result = (Loan)message.getPayload();
log.info("Loan status: {}", result.getStatus());
return Status.DECLINED.name().equals(result.getStatus()) ?
"loanDeclinedConsumer" : "loanApprovedConsumer";
}
}