posts - 156,  comments - 601,  trackbacks - 0
前一部分,通过XML的使用方式讲解了The Cafe Sample(小卖部订餐例子)
本笔记接下讲解以Annotation的方式来使用Spring-integration的例子。还是之前的那个示例:

The Cafe Sample(小卖部订餐例子)

    小卖部有一个订饮料服务,客户可以通过订单来订购所需要饮料。小卖部提供两种咖啡饮料
        LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)。每种又都分冷饮和热饮
    整个流程如下:
        1.有一个下订单模块,用户可以按要求下一个或多个订单。
        2.有一个订单处理模块,处理订单中那些是关于订购饮料的。
        3.有一个饮料订购处理模块,处理拆分订购的具体是那些种类的饮料,把具体需要生产的饮料要求发给生产模块
        4.有一个生产模块,进行生产。
        5.等生成完成后,有一个订单确认模块(Waiter),把订单的生成的饮料输出。


Spring Integration以Annotation的方式,让xml配置简化了很多
先来看一下XML方式,进行示例的开发:
配置文件如下:
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans:beans xmlns="http://www.springframework.org/schema/integration"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:beans="http://www.springframework.org/schema/beans"
 5     xmlns:context="http://www.springframework.org/schema/context"
 6     xmlns:stream="http://www.springframework.org/schema/integration/stream"
 7     xsi:schemaLocation="http://www.springframework.org/schema/beans
 8             http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
 9             http://www.springframework.org/schema/context
10             http://www.springframework.org/schema/context/spring-context-2.5.xsd
11             http://www.springframework.org/schema/integration
12             http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
13             http://www.springframework.org/schema/integration/stream
14             http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
15 
16     <!-- 开启 Annotation支持  -->
17     <annotation-config />
18     <!-- 设置Spring 扫描 Annotation 包路径  -->
19     <context:component-scan base-package="org.springframework.integration.samples.cafe.annotation" />
20     <!-- 配置一个GateWay组件,提供消息的发送和接收。接口Cafe,提供一个void placeOrder(Order order);方法
21         该方法标记了@Gateway(requestChannel="orders"), 实现向orders队列实现数据的发送
22     -->
23     <gateway id="cafe" service-interface="org.springframework.integration.samples.cafe.Cafe" />
24     <!-- 订单Channel -->
25     <channel id="orders" />
26     <!-- 饮料订单Channel,处理饮料的类别 -->
27     <channel id="drinks" />
28      <!-- 冷饮生产Channel 最大待处理的数据量为 10-->
29     <channel id="coldDrinks">
30         <queue capacity="10" />
31     </channel>
32     <!-- 热饮生产Channel 最大待处理的数据量为 10-->
33     <channel id="hotDrinks">
34         <queue capacity="10" />
35     </channel>
36     <!-- 定义最终进行生产的消息队列 -->
37     <channel id="preparedDrinks" />
38     <!-- 定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕-->
39     <stream:stdout-channel-adapter id="deliveries" />
40 
41 </beans:beans>

我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类,它触发下定单操作
 1 public class CafeDemo {
 2 
 3     public static void main(String[] args) {
 4         //加载Spring 配置文件 "cafeDemo.xml"
 5         AbstractApplicationContext context = null;
 6         if(args.length > 0) {
 7             context = new FileSystemXmlApplicationContext(args);
 8         }
 9         else {
10             context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11         }
12         //取得 Cafe实列
13         Cafe cafe = (Cafe) context.getBean("cafe");
14         //准备 发送100条消息(订单)
15         for (int i = 1; i <= 100; i++) {
16             Order order = new Order(i);
17             // 一杯热饮  参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18             order.addItem(DrinkType.LATTE, 2false);
19             // 一杯冷饮  参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
20             order.addItem(DrinkType.MOCHA, 3true);
21             //下发订单,把消息发给 orders 队列
22             cafe.placeOrder(order);
23         }
24     }
25 
26 }

下面是Cafe接口的源代码
public interface Cafe {

    
//定义GateWay, 把消息发送到 orders 队列, Message的payLoad属性,保存 order参数值
    @Gateway(requestChannel="orders")
    
void placeOrder(Order order);

}

OrderSplitter 源代码
 1 //设置成Spring-integration组件
 2 @MessageEndpoint
 3 public class OrderSplitter {
 4 
 5     //实现Splitter模式, 接收 orders队列的消息,调用orderSplitter Bean的split方法,进行消息的分解
 6     //并把分解后的消息,发送到drinks队列
 7     @Splitter(inputChannel="orders", outputChannel="drinks")
 8     public List<OrderItem> split(Order order) {
 9         return order.getItems();
10     }
11 
12 }

OrderSplitter.split把消息拆分后,变成多个消息,发送到drinks队列.由drinkRouter进行消息的接收。
 1 //设置成Spring-integration组件
 2 @MessageEndpoint
 3 public class DrinkRouter {
 4 
 5     //实现Router模式,接收 drinks队列的消息, 并触发 drinkRouter Bean的 resolveOrderItemChannel方法
 6     //由在 resolveOrderItemChannel该方法的返回值(String--队列名称)表示把消息路由到那个队列上(coldDrinks或hotDrinks)
 7     @Router(inputChannel="drinks")
 8     public String resolveOrderItemChannel(OrderItem orderItem) {
 9         return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
10     }
11 
12 }

下面看一下,如果是一杯冷饮,则消息发送到 coldDrinks队列
如果是一杯热饮,则消息发送到 hotDrinks队列
接下来看coldDrinks, hotDrink 的队列由谁来监听:
查看源代码后,是由Barista.java来处理
 1 //设置成Spring-integration组件
 2 @Component
 3 public class Barista {
 4 
 5     private long hotDrinkDelay = 5000;
 6 
 7     private long coldDrinkDelay = 1000;
 8 
 9     private AtomicInteger hotDrinkCounter = new AtomicInteger();
10 
11     private AtomicInteger coldDrinkCounter = new AtomicInteger();
12 
13 
14     public void setHotDrinkDelay(long hotDrinkDelay) {
15         this.hotDrinkDelay = hotDrinkDelay;
16     }
17 
18     public void setColdDrinkDelay(long coldDrinkDelay) {
19         this.coldDrinkDelay = coldDrinkDelay;
20     }
21     
22     //配置接收"hotDrinks"队列,处理后,把结果发给队列prepareColdDrink
23     @ServiceActivator(inputChannel="hotDrinks", outputChannel="preparedDrinks")
24     public Drink prepareHotDrink(OrderItem orderItem) {
25         try {
26             Thread.sleep(this.hotDrinkDelay);
27             System.out.println(Thread.currentThread().getName()
28                     + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #"
29                     + orderItem.getOrder().getNumber() + "" + orderItem);
30             return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
31                     orderItem.getShots());
32         } catch (InterruptedException e) {
33             Thread.currentThread().interrupt();
34             return null;
35         }
36     }
37     
38     //配置接收"coldDrinks"队列,处理后,把结果发给队列prepareColdDrink
39     @ServiceActivator(inputChannel="coldDrinks", outputChannel="preparedDrinks")
40     public Drink prepareColdDrink(OrderItem orderItem) {
41         try {
42             Thread.sleep(this.coldDrinkDelay);
43             System.out.println(Thread.currentThread().getName()
44                     + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #"
45                     + orderItem.getOrder().getNumber() + "" + orderItem);
46             return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
47                     orderItem.getShots());
48         } catch (InterruptedException e) {
49             Thread.currentThread().interrupt();
50             return null;
51         }
52     }
53 
54 }

接下来,已经把订单需要生产的饮料已经完成,现在可以交给服务员(waier)交给客人了。
这里使用的aggregate模式,让服务器等待这个订单的所有饮料生产完后的,交给客户.
 1 //设置成Spring-integration组件
 2 @MessageEndpoint
 3 public class Waiter {
 4 
 5     //配置 aggregator模式。
 6     @Aggregator(inputChannel = "preparedDrinks", outputChannel = "deliveries", timeout = 5 * 60 * 1000)
 7     public Delivery prepareDelivery(List<Drink> drinks) {
 8         return new Delivery(drinks);
 9     }
10 
11 }
12 

最后我们使用一个 stream channel adaptor把订单生产完成的饮料输出。
        <!-- 定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕-->
    
<stream:stdout-channel-adapter id="deliveries"/>

spring-integration官网:http://www.springsource.org/spring-integration



Good Luck!
Yours Matthew!


posted on 2008-12-04 08:33 x.matthew 阅读(3237) 评论(0)  编辑  收藏 所属分类: Spring|Hibernate|Other framework

只有注册用户登录后才能发表评论。


网站导航: