.概述
1.1 JMS与ActiveMQ特性
JMS始终在JavaEE五花八门的协议里,WebService满天飞的时候占一位置,是因为:
- 它可以把不影响用户执行结果又比较耗时的任务(比如发邮件通知管理员)异步的扔给JMS 服务端去做,而尽快的把屏幕返还给用户。
- 服务端能够多线程排队响应高并发的请求,并保证请求不丢失。
- 可以在Java世界里达到最高的解耦。客户端与服务端无需直连,甚至无需知晓对方是谁、在哪里、有多少人,只要对流过的信息作响应就行了,在企业应用环境复杂时作用明显。
ActiveMQ的特性:
- 完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,也是Apache Geronimo默认的JMS provider。
- POJO withdout EJB Container,不需要实现EJB繁琐复杂的Message Bean接口和配置。
- Spring Base,可以使用Spring的各种特性如IOC、AOP 。
- Effective,基于Jencks的JCA Container实现 pool connection,control transactions and manage security。
1.2 SpringSide 的完全POJO的JMS方案
SpringSide 2.0在BookStore示例中,演示了用户下订单时,将发通知信到用户邮箱的动作,通过JMS交给JMS服务端异步完成,避免了邮件服务器的堵塞而影响用户的下订。
全部代码于examples\bookstore\src\java\org\springside\bookstore\components\activemq 目录中。
一个JMS场景通常需要三者参与:
- 一个POJO的的Message Producer,负责使用Spring的JMS Template发送消息。
- 一个Message Converter,负责把Java对象如订单(Order)转化为消息,使得Producer能够直接发送POJO。
- 一个MDP Message Consumer,负责接收并处理消息。
SpringSide 2.0采用了ActiveMQ 4.1-incubator 与Spring 2.0 集成,对比SS1.0M3,有三个值得留意的地方,使得代码中几乎不见一丝JMS的侵入代码:
- 采用Spring2.0的Schema式简化配置。
- 实现Message Converter转化消息与对象,使得Producer能够直接发送POJO而不是JMS Message。
- 使用了Spring2.0的DefaultMessageListenerContainer与MessageListenerAdapter,消息接收者不用实现MessageListener 接口。
- 同时,Spring 2.0 的DefaultMessageListenerContainer 代替了SS1.0M3中的Jenck(JCA Container),充当MDP Container的角色。
2.引入ActiveMQ的XSD
ActiveMQ4.1 响应Spring 2.0号召,支持了引入XML Schema namespace的简单配置语法,简化了配置的语句。
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:
<beans
xmlns="http:
xmlns:amq="http:
xmlns:xsi="http:
xsi:schemaLocation="http: http:
由于ActiveMQ4.1 SnapShot的那个XSD有部分错误,因此使用的是自行修改过的XSD。
先在ClassPath根目录放一个修改过的activemq-core-4.1-incubator-SNAPSHOT.xsd。
在ClassPath 下面建立META-INF\spring.schemas 内容如下。这个spring.schemas是spring自定义scheam的配置文件,请注意"http:\://"部分写法
3. 配置方案
3.1 基础零件
1. 配置ActiveMQ Broker
暂时采用在JVM中嵌入这种最简单的模式, 当spring初始化时候,ActiveMQ embedded Broker 就会启动了。
<!-- lets create an embedded ActiveMQ Broker -->
<amq:broker useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp:/>
</amq:transportConnectors>
</amq:broker>
2. 配置(A)ConnectionFactory
由于前面配置的Broker是JVM embedded 所以URL为:vm://localhost
<!-- ActiveMQ connectionFactory to use -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm:/>
3 配置(B)Queue
<!-- ActiveMQ destinations to use -->
<amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/>
4. 配置(C)Converter
配置Conveter,使得Producer能够直接发送Order对象,而不是JMS的Message对象。
<!-- OrderMessage converter -->
<bean id="orderMessageConverter" class="org.springside.bookstore.components.activemq.OrderMessageConverter"/>
3.2 发送端
1 配置JmsTemplate
Spring提供的Template,绑定了(A)ConnectionFactory与(C)Converter。
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
2.Producer
消息发送者,使用JmsTemplate发送消息,绑定了JmsTemplate (含A、C)与(B)Queue。
<!-- POJO which send Message uses Spring JmsTemplate,绑定JMSTemplate 与Queue -->
<bean id="orderMessageProducer" class="org.springside.bookstore.components.activemq.OrderMessageProducer">
<property name="template" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean>
3.3 接收端
1.接收处理者(MDP)
使用Spring的MessageListenerAdapter,指定负责处理消息的POJO及其方法名,绑定(C)Converter。
<!-- Message Driven POJO (MDP),绑定Converter -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="org.springside.bookstore.components.activemq.OrderMessageConsumer">
<property name="mailService" ref="mailService"/>
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="sendEmail"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
2. listenerContainer
负责调度MDP, 绑定(A) connectionFactory, (B)Queue和MDP。
<!-- this is the attendant message listener container -->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
互相绑定的关系有点晕,发送端和接收端都以不同形式绑定了(A) connectionFactory, (B)Queue和 (C)Converter。
4. 下篇
1. 说明
请先阅读ActiveMQ4.1 +Spring2.0的POJO JMS方案(上)
本篇将补充说明了:
1) 使用数据库持久化消息,保证服务器重启时消息不会丢失
2) 使用Jencks作正宗的JCA Container。
2.持久化消息
2.1 给Broker加入Persistence 配置
在配置文件applicationContext-activemq-embedded-persitence.xml中的<amq:broker>节点加入
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#hsql-ds" createTablesOnStartup="true" useDatabaseLock="false"/>
</amq:persistenceAdapter>
请注意MSSQL(2000/2005)和HSQL由于不支持[SELECT * ACTIVEMQ_LOCK FOR UPDATE ]语法,因此不能使用默认的userDatabaseLock="true",只能设置成useDatabaseLock="false"
2.2 配置多种数据源
配置多种数据源,给jdbcPersistenceAdapter使用,SpringSide 中使用的内嵌HSQL
<!-- The HSQL Datasource that will be used by the Broker -->
<bean id="hsql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:res:hsql/activemq"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
<property name="poolPreparedStatements" value="true"/>
</bean>
2. 3 说明
笔者仅仅使用了jdbcPersistenceAdapter,其实在ActiveMQ的XSD已经描述了多种PersistenceAdapter,可以参考对应的XSD文件.
另外对于数据库的差异主要表现在设置了userDatabaseLock="true"之后,ActiveMQ使用的[SELECT * ACTIVEMQ_LOCK FOR UPDATE] 上面,会导致一些数据库出错(测试中MSSQL2000/2005,HSQL都会导致出错)。另外HSQL的脚本请参见activemq.script。
3. Jenck(JCA Container)
Spring 2.0本身使用DefaultMessageListenerContainer 可以充当MDP中的Container角色,但是鉴于Jencks是JCA标准的,它不仅仅能够提供jms的jca整合,包括其他资源比如jdbc都可以做到jca管理
所以,同时完成了这个ActiveMQ+Spring+Jencks 配置演示,更多的针对生产系统的JCA特性展示,会在稍后的开发计划讨论中确定。
此文档适用于说明使用 Jecncks 和 使用Spring 2.0(DefaultMessageListenerContainer) 充当MDP Container时的区别,同时演示Jecnks 的Spring 2.0 新配置实例。
3.1 引入ActiveMQ ResourceAdapter 和Jencks 的XSD
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ ResourceAdapter 和Jencks 的XML Scheam 配置文件),如下:
ActiveMQ4.1 响应Spring 2.0号召,支持了引入XML Schema namespace的简单配置语法,简化了配置的语句。
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:
<beans
xmlns="http: xmlns:amq="http://activemq.org/config/1.0" xmlns:ampra="http://activemq.org/ra/1.0" xmlns:jencks="http://jencks.org/1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http: http: http: http:
由于ActiveMQ RA和Jencks 那个XSD 仍然有部分错误,因此使用的是自行修改过的XSD。(是xs:any元素引起的错误)
先在ClassPath根目录放一个修改过的activemq-ra-4.1-incubator-SNAPSHOT.xsd和jencks-1.3.xsd。
同样修改 ClassPath 下面META-INF\spring.schemas 增加内容如下。这个spring.schemas是spring自定义scheam的配置文件,请注意"http:\://"部分写法
3.2 配置方案
3.2.1 基础零件
1. 配置ActiveMQ Broker 参见 ActiveMQ+Spring
2. 配置ActiveMQ Resource Adapter
<amqra:managedConnectionFactory id="jmsManagedConnectionFactory" resourceAdapter="#resourceAdapter"/><amqra:resourceAdapter id="resourceAdapter" serverUrl="vm: />
3. 配置Jencks 基础配置
具体的配置可以参见Jencks的XSD
<!-- jencks PoolFactory config-->
<jencks:singlePoolFactory id="poolingSupport" maxSize="16" minSize="5" blockingTimeoutMilliseconds="60" idleTimeoutMinutes="60" matchOne="true" matchAll="true" selectOneAssumeMatch="true" /> <!-- jencks XATransactionFactory -->
<jencks:xATransactionFactory id="transactionSupport" useTransactionCaching="true" useThreadCaching="true" />
<!-- jencks ConnectionManagerFactory -->
<jencks:connectionManagerFactory id="connectionManager" containerManagedSecurity="false" poolingSupport="#poolingSupport" transactionSupport="#transactionSupport" /> <!-- jencks TransactionContextManagerFactory -->
<jencks:transactionContextManagerFactory id="transactionContextManagerFactory"/>
4. 配置给JmsTemplate使用的connectionFactory (主要是生成者/发送者 使用)
这里注意下,在配置jmsTemplate的使用的targetConnectionFactory就是使用jencks配置的connectionManager
<!-- spring config jms with jca-->
<bean id="jmsManagerConnectionFactory" class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="managedConnectionFactory">
<ref local="jmsManagedConnectionFactory" />
</property>
<property name="connectionManager">
<ref local="connectionManager" />
</property>
</bean>
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsManagerConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter" />
</bean>
5. 配置Spring 2.0的MessageListenerAdapter,保证不需要用户实现MessageListener
见ActiveMQ+Spring
6.配置Jecnks 充当MDP的Container
就是把上面的MessageListenerAdapter配置到Jencks里面,完成整个MDP的配置
<!-- Jencks Container-->
<jencks:jcaContainer> <jencks:bootstrapContext>
<jencks:bootstrapContextFactory threadPoolSize="25" />
</jencks:bootstrapContext>
<jencks:connectors>
<!-- use jencks container (use spring MessageListenerAdapter)-->
<jencks:connector ref="messageListener">
<jencks:activationSpec>
<amqra:activationSpec destination="org.apache.activemq.spring.Test.spring.embedded" destinationType="javax.jms.Queue" />
</jencks:activationSpec>
</jencks:connector> </jencks:connectors> <jencks:resourceAdapter>
<amqra:resourceAdapter serverUrl="vm: />
</jencks:resourceAdapter>
</jencks:jcaContainer>