说明: 本文并非基础说明文章,如无基础,请先参阅:
1. http://wiki.springside.org.cn/display/springside/ActiveMQ
2. http://wiki.springside.org.cn/display/springside/ActiveMQ-part2
仅以springside-2.0-RC1版本扩展
由于为每个POJO类实现自己的MessageConverter ,所以配置xml和使用比较繁琐。
一.MessageConverter 扩展
在springside-2.0-RC1版本中
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
<!-- OrderMessage converter -->
<bean id="orderMessageConverter" class="org.springside.bookstore.components.activemq.OrderMessageConverter"/>
orderMessageConverter为 MessageConverter实现类。
但是如果在实际项目中应用的话,如果为2个或2个以上的pojo 实现MessageConverter的话,那么就会发现jmsTemplate让人比较尴尬的位置,启动的注入messageConverter也不是,动态messageConverter也不是, 显的不是那么平易近人了。
自己的扩展
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<property name="messageConverter" ref="messageConverter"/>
</bean>
<!-- Holder Message converter -->
<bean id="messageConverter" class="com.service.CoverterHolder" />
通过通用的CoverterHolder分发messageConverter实现,可以为多个pojo提供MessageConverter,并且有默认的messageConverter实现,并且可以通过 set Map<String, MessageConverter> converters 注入,提供特殊的pojo的MessageConverter的实现,为真正项目中应用提供必要的支持。
com.service.CoverterHolder 代码为:
public class CoverterHolder implements MessageConverter {
private MessageConverter defaultMessageConverter;
private Map<String, MessageConverter> converters = new HashMap<String, MessageConverter>();
public Message toMessage(Object obj, Session session) throws JMSException {
String clz = obj.getClass().getName();
if(converters.containsKey(clz)){
Message message = converters.get(clz).toMessage(obj, session);
message.setObjectProperty("meta-class", clz);
return message;
}else{
Message message=defaultMessageConverter.toMessage(obj,session);
message.setObjectProperty("default-meta-class", clz);
return message;
}
}
public Object fromMessage(Message msg) throws JMSException {
if(msg.getObjectProperty("meta-class") != null){
String clz= msg.getObjectProperty("meta-class").toString();
if(converters.containsKey(clz)){
return converters.get(clz).fromMessage(msg);
}
}else if(msg.getObjectProperty("default-meta-class") != null){
return defaultMessageConverter.fromMessage(msg);
}else{
throw new JMSException("Msg:[" + msg + "] is not Map");
}
return null;
}
public void setConverters(Map<String, MessageConverter> converters) {
this.converters = converters;
}
public void setDefaultMessageConverter(MessageConverter defaultMessageConverter) {
this.defaultMessageConverter = defaultMessageConverter;
}
}
二.为大部分MessageConverter实现自己的默认功能
DefaultMessageConverter 为默认的MessageConverter实现,摆脱重复繁琐的MessageConverter AG and AG,懒人的福音。哈哈
如果没有特殊的要求,DefaultMessageConverter足以满足90%以上的要求 。
DefaultMessageConverter代码:
public class DefaultMessageConverter implements MessageConverter {
public Message toMessage(Object obj, Session session) throws JMSException {
// check Type
ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session
.createObjectMessage();
HashMap<String, byte[]> map = new HashMap<String, byte[]>();
try {
// POJO must implements Seralizable
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
map.put("POJO", bos.toByteArray());
objMsg.setObjectProperty("Map", map);
} catch (IOException e) {
e.printStackTrace();
}
return objMsg;
}
public Object fromMessage(Message msg) throws JMSException {
if (msg instanceof ObjectMessage) {
HashMap<String, byte[]> map= (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
try {
//POJO must implements Seralizable
ByteArrayInputStream bis=new ByteArrayInputStream(map.get("POJO"));
ObjectInputStream ois=new ObjectInputStream(bis);
return ois.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
} else {
throw new JMSException("Msg:[" + msg + "] is not Map");
}
}
在Spring中配置
<!-- Holder Message converter -->
<bean id="messageConverter" class="com.service.CoverterHolder">
<property name="defaultMessageConverter">
<bean class="com.service.DefaultMessageConverter"/>
</property>
</bean>
为 messageConverter 入住默认的MessageConverter实现。
三.为特殊的MessageConverter实现提供自己的选择
例如 ReportPerdayMessageConverter为特殊的 POJO Coverter。
ReportPerdayMessageConverter代码:
public class ReportPerdayMessageConverter implements MessageConverter {
public Message toMessage(Object obj, Session session) throws JMSException {
//…
}
public Object fromMessage(Message msg) throws JMSException {
//…
}
}
在com.domain.ReportPerday模型中注入特殊的自己消息转换
<!-- Holder Message converter -->
<bean id="messageConverter" class="com.service.CoverterHolder">
<!-- 扩展自己实现 converter -->
<property name="converters">
<map>
<entry key="com.domain.ReportPerday">
<bean id="reportPerdayMessageConverter" class="com.service.ReportPerdayMessageConverter"/>
</entry>
</map>
</property>
<property name="defaultMessageConverter">
<bean class="com.service.DefaultMessageConverter"/>
</property>
</bean>
四.
Message Driven POJO (MDP) 扩展Adapter
解决对多个消费者 MDP的分发
ss中配置
<!-- Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="org.springside.bookstore.components.activemq.OrderMessageConsumer">
<property name="mailService" ref="mailService"/>
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="sendEmail"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
明显感觉<constructor-arg>中对于多个消费者不实用的特性。
自己扩展
<!-- Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!-- may be other method -->
<constructor-arg>
<bean class="com.service.MessageConsumerAdapter" >
<property name="reportPerdayMessageConsumer" ref="reportPerdayMessageConsumer"/>
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="messageConverter"/>
</bean>
MessageConsumerAdapter代码:
public class MessageConsumerAdapter {
private ReportPerdayMessageConsumer reportPerdayMessageConsumer;
public void receive(Object obj) {
if (obj instanceof ReportPerday) {
System.out.println((ReportPerday)obj);
} else if (obj instanceof ReportPerday2) {
System.out.println((ReportPerday2)obj);
}
}
public void receive(ReportPerday reportPerday) throws Exception {
reportPerdayMessageConsumer.sendEmail(reportPerday);
}
public void receive(ReportPerday2 reportPerday2) throws Exception {
//do other service consumer
System.out.println("ReportPerday2 Bean do other service consumer ");
}
public void setReportPerdayMessageConsumer(
ReportPerdayMessageConsumer reportPerdayMessageConsumer) {
this.reportPerdayMessageConsumer = reportPerdayMessageConsumer;
}
}
MessageConsumerAdapter中可以入住多个要分发的消费者或者业务方法,根据POJO对象不通而选择自己的消费者类型。
后话: http://wiki.springside.org.cn 对ActiveMQ 实用已经比较细致的入门讲解和展示。再次重申,如果没基础,请阅读ss中的文档。
本文目的只在搭建更为实用的JMS基础设施,如有不明之处,共同讨论与学习 :)
文中代码如下
http://www.blogjava.net/Files/hellboys/activemq-example-nojar.zip