The Cafe Sample(小卖部订餐例子)
小卖部有一个订饮料服务,客户可以通过订单来订购所需要饮料。小卖部提供两种咖啡饮料
LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)。每种又都分冷饮和热饮
整个流程如下:
1.有一个下订单模块,用户可以按要求下一个或多个订单。
2.有一个订单处理模块,处理订单中那些是关于订购饮料的。
3.有一个饮料订购处理模块,处理拆分订购的具体是那些种类的饮料,把具体需要生产的饮料要求发给生产模块
4.有一个生产模块
这个例子利用Spring Integration实现了灵活的,可配置化的模式集成了上述这些服务模块。
先来看一下配置文件
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 启动Message bus 消息服务总线 支持四个属性
auto-startup[boolean是否自动启动 default=true]如果设置false,则需要手动调用applicationContext.start()方法
auto-create-channels[boolean是否自动注册MessageChannel default=false],如果使用的MessagChannle不存在
error-channel 设置错误时信息发送的MessageChannle,如果不设置,则使用DefaultErrorChannel
dispatcher-pool-size 使用的启动线程数,默认为10-->
<message-bus/>
<!-- 启动支持元数据标记 -->
<annotation-driven/>
<!-- 设置 @Component标识的元数据扫描包(package) -->
<context:component-scan base-package="org.springframework.integration.samples.cafe"/>
<!-- 下面启动了四个 MessageChannel服务 处理接收发送端发过来的消息和把消息流转到消息的消费端 -->
<!-- 属性说明: capacity 消息最大容量默认为100 publish-subscribe是否是发布订阅模式,默认为否
id bean的id名称 datatype ? -->
<channel id="orders"/> <!-- 订单Channel -->
<channel id="drinks"/> <!-- 饮料订单Channel,处理饮料的类别 -->
<channel id="coldDrinks"/> <!-- 冷饮生产Channel -->
<channel id="hotDrinks"/> <!-- 热饮生产Channel -->
<!-- 消息处理终端 接收 channel coldDrinks的消息后,执行barista.prepareColdDrink方法 生产冷饮 -->
<!-- 属性说明: input-channel 接收消息的Channel必须 default-output-channel设置默认回复消息Channel
handler-ref 引用bean的id名称 handler-method Handler处理方法名(参数类型必须与发送消息的payLoad使用的一致)
error-handler设置错误时信息发送的MessageChannle reply-handler 消息回复的Channel -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息处理终端 接收 channel hotDrinks的消息后,执行barista.prepareHotDrink方法 生产热饮 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
<!-- 定义一个启动下定单操作的bean,它通过 channel orders下定单 -->
<beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe">
<beans:property name="orderChannel" ref="orders"/>
</beans:bean>
</beans:beans>
下面我们来看一下源代码目录:
我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类,它触发下定单操作、
1 public class CafeDemo {
2
3 public static void main(String[] args) {
4 //加载Spring 配置文件
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //启动 Spring容器(启动所有实现 org.springframework.context.Lifecycle接口的实现类的start方法)
13 context.start();
14 //从Spring容器 取得cafe实例
15 Cafe cafe = (Cafe) context.getBean("cafe");
16 DrinkOrder order = new DrinkOrder();
17 //一杯热饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18 Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2, false);
19 Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3, true);
20 order.addDrink(hotDoubleLatte);
21 order.addDrink(icedTripleMocha);
22 //下100个订单
23 for (int i = 0; i < 100; i++) {
24 //调用cafe的placeOrder下订单
25 cafe.placeOrder(order);
26 }
27 }
28 }
下面是Cafe的源代码
1 public class Cafe {
2
3 private MessageChannel orderChannel;
4
5
6 public void setOrderChannel(MessageChannel orderChannel) {
7 this.orderChannel = orderChannel;
8 }
9
10 //其实下订单操作,调用的是orderChannel(orders channel)的send方法,把消息发出去
11 public void placeOrder(DrinkOrder order) {
12 this.orderChannel.send(new GenericMessage<DrinkOrder>(order));
13 //GenericMessage有三个构建方法,参考如下
14 //new GenericMessage<T>(Object id, T payload);
15 //new GenericMessage<T>(T payload);
16 //new GenericMessage<T>(T payload, MessageHeader headerToCopy)
17 }
18 }
下面我们来看一下哪个类标记有@MessageEndpoint(input="orders") 表示它会消费orders Channel的消息
我们发现OrderSplitter类标记这个元数据,下面是源代码,我们来分析
1 //标记 MessageEndpoint 元数据, input表示 设置后所有 orders Channel消息都会被OrderSplitter收到
2 @MessageEndpoint(input="orders")
3 public class OrderSplitter {
4
5 //@Splitter表示,接收消息后,调用这个类的该方法. 其的参数类型必须与message的 payload属性一致。
6 //即在new GenericMessage<T>的泛型中指定
7 //元数据设置的 channel属性表示,方法执行完成后,会把方法返回的结果保存到message的payload属性后,发送到指定的channel中去
8 //这里指定发送到 drinks channel
9 @Splitter(channel="drinks")
10 public List<Drink> split(DrinkOrder order) {
11 return order.getDrinks(); //方法中,是把订单中的饮料订单取出来
12 }
13 }
接下来,与找OrderSplitter方法相同,我们要找哪个类标记有@MessageEndpoint(input="drinks") 表示它会消费drinks Channel的消息
找到DrinkRouter这个类
1 @MessageEndpoint(input="drinks")
2 public class DrinkRouter {
3
4 //@Router表示,接收消息后,调用这个类的该方法. 其的参数类型必须与message的 payload属性一致。
5 //方法执行完毕后,其返回值为 在容器中定义的channel名称。channel名称必须存在
6 @Router
7 public String resolveDrinkChannel(Drink drink) {
8 return (drink.isIced()) ? "coldDrinks" : "hotDrinks"; //方法中,是根据处理饮料是否是冷饮,送不同的channel处理
9 }
10 }
备注:@Router可以把消息路由到多个channel,实现方式如下
@Router
public MessageChannel route(Message message) {}
@Router
public List<MessageChannel> route(Message message) {}
@Router
public String route(Foo payload) {}
@Router
public List<String> route(Foo payload) {}
接下来,我们就要找 MessageEndpoint 标记为处理 "coldDrinks" 和 "hotDrinks" 的类,我们发现
这个两个类并不是通过元数据@MessageEndpoint来实现的,而是通过容器配置
(下面会演示如何用元数据配置,但元数据配置有局限性。这两种配置方式看大家喜好,系统中都是可以使用)
下面是容器配置信息:
<!-- 消息处理终端 接收 channel coldDrinks的消息后,执行barista.prepareColdDrink方法 生产冷饮 -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息处理终端 接收 channel hotDrinks的消息后,执行barista.prepareHotDrink方法 生产热饮 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
我们来看一下源代码:
1 @Component //这个必须要有,表示是一个消息处理组件
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 public void prepareColdDrink(Drink drink) {
31 try {
32 Thread.sleep(this.coldDrinkDelay);
33 } catch (InterruptedException e) {
34 Thread.currentThread().interrupt();
35 }
36 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
37 }
38
39 }
如果要用元数据标识实现上述方法:
要用元数据配置,它不像容器配置,可以在一个类中,支持多个不同的Handler方法。以处理prepareColdDrink方法为例
1 @MessageEndpoint(input="coldDrinks") //加了该元数据,它会自动扫描,并作为@Componet标记处理
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 @Handler//回调处理的方法
31 public void prepareColdDrink(Drink drink) {
32 try {
33 Thread.sleep(this.coldDrinkDelay);
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 }
37 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
38 }
39 }
这样整个流程就执行完了,最终我们的饮料产品就按照订单生产出来了。累了吧,喝咖啡提神着呢!!!
初充:
下面是针对 Spring Integration adapter扩展的学习笔记
JMS Adapters
jms adapters 目前有两种实现
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter. 前者是使用Srping的JmsTemplate模板类通过轮循的方式接收消息
后者是使用则通过代理Spring的DefaultMessageListenerContainer实例,实现消息驱动的方式。
xml配置如下:
JmsPollingSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsPollingSourceAdapter">
<constructor-arg ref="jmsTemplate"/>
<property name="channel" ref="exampleChannel"/>
<property name="period" value="5000"/> <!-- 轮循时间间隔 -->
<property name="messageMapper" ref=""/> <!-- message转换 -->
</bean>
<!-- 备注:消息的转换方式如下:
收到JMS Message消息后,SourceAdapter会调用Spring的MessageConverter实现类,把javax.jms.Message对象
转换成普通Java对象,再调用Spring Integration的MessageMapper把该对象转成 org.springframework.integration.message.Message对象 -->
JmsMessageDrivenSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsMessageDrivenSourceAdapter">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
<property name="channel" ref="exampleChannel"/>
<property name="messageConverter" ref=""/> <!-- jms消息对象转换 -->
<property name="messageMapper" ref="" /> <!-- 普通java对象转换成 Spring Integration Message -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- sesssion回复模式 AUTO_ACKNOWLEDGE=1 CLIENT_ACKNOWLEDGE=2 DUPS_OK_ACKNOWLEDGE=3 SESSION_TRASACTED=0-->
</bean>
另外还有一个比较有用的类JmsTargetAdapter 它实现了MessageHandler接口。它提把Spring Integration Message对象转换成
JMS消息并发送到指定的消息队列。与JMS服务连接的实现可以通过设定 jmsTemplate属性引用或是connectionFactory和destination
或destinationName属性。
<bean class="org.springframework.integration.adapter.jms.JmsTargetAdapter">
<constructor-arg ref="connectionFactory"/>
<constructor-arg value="example.queue"/>
<!--或是以下配置
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
或是
<constructor-arg ref="jmsTemplate"/> -->
</bean>
Good Luck!
Yours Matthew!