5分钟开启Spring AMQP的旅行.
由于默认的Spring Framework 版本依赖是4.2.x, Spring AMQP 一般来说可兼容早期版本的Spring Framework.但基于注解的监听器和RabbitMessagingTemplate
需要Spring Framework 4.1+.
非常非常快速
使用纯java来发送和接收消息:
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
注意,在原生的Java Rabbit client中也有一个ConnectionFactory
. 在上面的代码中,我们使用的是Spring 抽象. 我们依赖的是broker中的默认交换器 (因为在发送操作中没有指定交换器),以及默认绑定(通过其名称作为路由键将所有队列绑定到默认交换器中).那些行为是由AMQP规范来定义的.
使用 XML配置
同上面的例子一样,只不过将外部资源配置到了XML:
ApplicationContext context =new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
<rabbit:admin/>
声明会默认自动查找类型为Queue
, Exchange
和Binding的bean,
并宣称他们代表的broker的用户, 因此在简单的Java driver中没有必要明确的使用那个bean.
有大量的选项来配置XML schema 中的组件属性- 你可以使用xml编辑的自动完成功能来探索它们并查看它们的相关文档.
使用 Java 配置
同样的例子可使用java来外部配置:
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
returnnew RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
returnnew RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
returnnew Queue("myqueue");
}
}
2.2 新特性
2.2.1从1.5到1.6的变化
测试支持
Builder
命名空间变化
连接工厂
Queue 定义
监听器容器变化
空闲消息监听器检测
不匹配的队列检测
默认情况下,当监听器容器启动时,如果探测到了队列的mismatched属性或参数,容器会记录异常但会继续监听.现在容器有了一个mismatchedQueuesFatal
属性,当启动时存在问题的话,这可以阻止容器(以及上下文)启动.如果随后检测到问题,它也会停止容器,如从连接故障中恢复. 参考Section 3.1.15, “Message Listener Container Configuration” 来了解更多信息.
监听器容器日志
现在监听器容器提供了它的beanName
给内部 SimpleAsyncTaskExecutor
作为threadNamePrefix
.对于日志分析非常有用.
默认错误处理器
默认错误处理器(ConditionalRejectingErrorHandler
) 现在认为无可挽救的@RabbitListener
异常是致命的. 参考Section 3.1.13, “Exception Handling” 来了解更多信息.
AutoDeclare 与 RabbitAdmins
AmqpTemplate: receive 与 timeout
AsyncRabbitTemplate
引入了新的AsyncRabbitTemplate
. 此模块提供了许多发送和接收的方法,其返回值为ListenableFuture
,此后可通过它来同步或异步地获取结果. 参考 the section called “AsyncRabbitTemplate” 来了解更多信息
RabbitTemplate 变化
消息属性
CorrelationId
长字符串头
以前,DefaultMessagePropertiesConverter
会将头转换成(其头长度不超过长字符串限制(默认为1024) )成一个DataInputStream
(实际上它只是引用LongString的
DataInputStream
).
在输出时,这个头不会进行转换(除了字符串, 例如. 在java.io.DataInputStream@1d057a39
上调用toString()方法)
.
在这个版本中, Long LongString
默认作为 LongString
s ;你可以通过其getBytes[]
, toString()
, 或 getStream()
方法来访问它的内容. 更大的输入LongString
现在也会在输出上正确转换了.
参考 the section called “Message Properties Converters” 来了解更多信息.
Inbound Delivery Mode
deliveryMode
属性不再映射MessageProperties.deliveryMode
;这是为了避免意外传播,如果同一个MessageProperties
对象用来发送出站消息.
相反, 入站deliveryMode
头映射为MessageProperties.receivedDeliveryMode
.
参考 the section called “Message Properties Converters” 来了解更多信息.
当使用注解endpoints时,头将在名为AmqpHeaders.RECEIVED_DELIVERY_MODE
的头中提供
.
参考the section called “Annotated Endpoint Method Signature” 来了解更多信息.
Inbound User ID
RabbitAdmin 变化
Declaration Failures
之间,ignoreDeclarationFailures
标志只在channel上发生IOException
才会生效(如未匹配参数). 现在它对于任何异常都可以生效(如:TimeoutException
).
此外,无论何时声明失败,都会发布DeclarationExceptionEvent
.RabbitAdmin
最后声明事件将作为属性lastDeclarationExceptionEvent也是可用的.
参考 Section 3.1.10, “Configuring the broker” 来了解更多信息.
@RabbitListener Changes
Multiple Containers per Bean
当使用Java 8+,可以添加多个@RabbitListener
注解到 @Bean
类或它们的方法上.当使用Java 7-,你可以使用 @RabbitListeners
容器注解来提供同样的功能.
参考the section called “@Repeatable @RabbitListener” 来了解更多信息.
@SendTo SpEL Expressions
@SendTo
for routing replies with no replyTo
property can now be SpEL expressions evaluated against the request/reply.
参考 the section called “Reply Management”来了解更多信息.
@QueueBinding Improvements
延迟消息交换器(Delayed Message Exchange)
交换器内部标志(Exchange internal flag)
CachingConnectionFactory 变化
CachingConnectionFactory Cache Statistics
访问底层RabbitMQ连接工厂
Channel Cache
默认的channel 缓存大小已从1增加到了25. 参考Section 3.1.2, “Connection and Resource Management”来了解更多信息.
此外, SimpleMessageListenerContainer
不再设置缓存大小至少要与concurrentConsumers的数目一样大
- 这是多余的,因为容器消费者channels是不会缓存的.
RabbitConnectionFactoryBean
factory bean现在暴露了一个属性来将client连接属性添加到由工厂产生的连接中.
Java 反序列化(Deserialization)
JSON MessageConverter
Logging Appenders
Log4j2
加入了log4j2 appender,此appenders现在可配置addresses
属性来连接broker集群.
Client 连接属性
2.2.2 早期版本