使用Spring集成ActiveMQ时,可以使用如下配置
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>
<bean id="queueDestination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="caojh" />
</bean>
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination"></property>
<property name="messageConverter" ref="voteMsgConverter"></property>
</bean>
<bean id="voteMsgConverter"
class="com.netease.queue.domain.VoteMsgConverter">
</bean>
然后使用如下的代码发送message
public void templateSend(long id, String location) {
JmsTemplate template = (JmsTemplate) context.getBean("jmsTemplate");
Vote vote = new Vote();
vote.setId(id);
vote.setUserid("caojh");
vote.setLocation(location);
template.convertAndSend(vote);
}
发送消息是没有问题的,但是当密集发送大量消息时,会抛出地址占用,无法创建connection(或者在某些较老版本下,当消息数达到65535条的时候,也会无法再次发送,只能重启队列),具体异常如下
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS
processing; nested exception is javax.jms.JMSException: Could not connect to broker
URL: tcp://192.168.20.23:61616. Reason: java.net.BindException: Address already in use: connect
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
at net.kentop.astoam.device.MG800DeviceService.excute(MG800DeviceService.java:423)
at net.kentop.astoam.device.MG800DeviceService$HandlerReceiveMessage.
handlerUdpData(MG800DeviceService.java:936)
at net.kentop.mon4mg.monitor.UDPReceiverThread.run(UDPReceiverThread.java:51)
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616.
Reason: java.net.BindException: Address already in use: connect
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:35)
at org.apache.activemq.ActiveMQConnectionFactory.
createActiveMQConnection(ActiveMQConnectionFactory.java:286)
at org.apache.activemq.ActiveMQConnectionFactory.
createActiveMQConnection(ActiveMQConnectionFactory.java:230)
at org.apache.activemq.ActiveMQConnectionFactory.
createConnection(ActiveMQConnectionFactory.java:178)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
4 more
无法发送的真正原因就在这里了
http://activemq.apache.org/jmstemplate-gotchas.html
这里有这么两段话
The thing to remember is JmsTemplate is designed for use in EJBs using the EJB containers JMS pooling abstraction. So every method will typically create a connection, session, producer or consumer, do something, then close them all down again. The idea being that this will use the J2EE containers pooling mechanism to pool the JMS resources under the covers. Without using a pooled JMS provider from the EJB container this is the worst possible way of working with JMS; since typically each create/close of a connection, producer/consumer results in a request-response with the JMS broker.
You should only use JmsTemplate with a pooled JMS provider. In J2EE 1.4 or later that typically means a JCA based JMS ConnectionFactory. If you are in an EJB then make sure you use your J2EE containers ConnectionFactory, never a plain-old-connection factory. If you are not inside an EJB Then you should use our PooledConnectionFactory, then things will be nicely pooled. If you need to take part in XA transactions then look into our spring based JCA Container. 大致意思就是说,每次发送一条消息时,都会创建一个connection和session,发送/接收完毕后再全部销毁。如果没有相应的pool机制,要发送大量消息,就会频繁的创建、销毁连接,这将是一个相当糟糕的选择。
至于解决的方法,第二段话也说的很清楚,要不就使用EJB容器的
ConnectionFactory,要么就使用ActiveMQ提供的
PooledConnectionFactory,这个类其实也是实现了ConnectionFactory接口
使用如下的配置,就可以避免上面的异常了
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>
【题外话】是否可以使用同一个connection并发发送/接收消息
可以参考
http://activemq.apache.org/can-i-send-and-receive-messages-concurrently-on-one-jms-connection.html
文中写的很明白,每个发送者/接收者应该使用独立的session,每个connection可以随意创建任意多个session