这是1.6版本中加入的新属性.当容器启动时,如果此属性为true (默认为false), 容器会检查上下文中声明的队列是否中间件中存在的队列是否一致.
如果属性不匹配(如. auto-delete
) 或参数 (e.g. x-message-ttl
) 存在, 容器 (和应用程序上下文) 会抛出致命异常而导致启动失败.如果是在恢复期间检测到的问题,容器会停止.
必须在上下文中存在单个RabbitAdmin
(或使用rabbitAdmin
属性在容器上特别配置);否则此属性必须为false
.
如果在初始启动期间,中间件还不可用,容器启动后,当建立连接时会检查条件.
重要
该检查针对的是上下文的所有队列,而不仅仅是特定监听器配置使用的队列.如果你希望只检查容器使用的队列,你需要为这个容器配置单独的RabbitAdmin
, 并使用rabbitAdmin
属性为其提供一个引用.
参考“Conditional Declaration”章节来了解更多信息.
属性
autoDeclare(auto-declare)
描述
从1.4版本开始, SimpleMessageListenerContainer
引入了这个新属性.
当设置为true
时(默认值),容器会使用RabbitAdmin
来重新声明所有 AMQP 对象(Queues, Exchanges, Bindings).
如果在启动期间探测到至少有一个队列缺失了,可能因为它是自动删除队列或过期队列,但不管队列缺失是基于什么原因,重新声明仍会进行处理(译者注:太浪费了).
要禁用这种行为, 可设置其属性为false
. 但需要注意的是,如果所有队列都缺失了(译者注:全部还是部分),容器会启动失败.
在1.6版本之前,如果在上下文中存在多个admin,容器会随机选择一个.反之,如果没有admin,它会从内部创建一个.
无论是哪种情况,这都将导致非预期结果出现. 从1.6版本开始,为了能使autoDeclare
工作,必须要上下文中明确存在一个RabbitAdmin
,或者特定实例的引用必须要在容器中使用rabbitAdmin属性中配置
.属性
declarationRetries(declaration-retries)
描述
从1.4.3, 1.3.9版本开始,SimpleMessageListenerContainer
有了这个新属性. 命名空间属性在1.5.x中可用.
用于设置被动声明失败时,重新尝试的次数.被动声明发生在当消费者启动了或从多个队列中消费时,初始化期间部分队列还不可用的情况下.
当重试次数用完后,如果还是不能被动声明配置队列,那么上面的missingQueuesFatal属性将控制容器行为. 默认: 3次重试 (4 次尝试).
属性
failedDeclarationRetryInterval(failed-declaration-retry-interval)
描述
从1.4.3, 1.3.9版本开始,SimpleMessageListenerContainer
有了这个新属性. 命名空间属性在1.5.x中可用.
重新尝试被动声明的时间间隔. 被动声明发生在当消费者启动了或从多个队列中消费时,初始化期间部分队列还不可用的情况下. 默认: 5000 (5秒).
属性
retryDeclarationInterval(missing-queue-retry-interval)
描述
从1.4.3, 1.3.9版本开始,SimpleMessageListenerContainer
有了这个新属性. 命名空间属性在1.5.x中可用.
如果配置队列的一个子集在消费者初始化过程中可用,则消费者将从这些队列中开始消费。消费者将被动地使用此间隔声明丢失的队列。
当这个间隔过去后,会再次使用declarationRetries 和 failedDeclarationRetryInterval.
如果还有缺失队列,消费者在重新尝试之前会等待此时间间隔.
这个过程会不停地进行到所有队列可用. 默认: 60000 (1分钟).
属性
consumerTagStrategy(consumer-tag-strategy)
描述
从1.4.5版本开始,SimpleMessageListenerContainer
有了这个新属性. 命名空间属性在1.5.x中可用.
之间,只能使用中间件生成的consumer tags;尽管现在这仍是默认的配置,但现在你可以提供一个ConsumerTagStrategy的实现, 这样就可为每个消费者创建独特的tag.
属性
idleEventInterval(idle-event-integer)
描述
从1.6版本开始,SimpleMessageListenerContainer
有了这个新属性.
参考"Detecting Idle Asynchronous Consumers"章节.
3.1.16 监听器并发
默认情况下,监听器容器会启动单个消费者来接收队列中的消息.
当检查前面章节中的表格时,你会发现有许多属性可控制并发.最简单的是concurrentConsumers
, 它会创建固定数量的消费者来并发处理消息.
在1.3.0版本之前,这只能在容器停止时才可设置.
从1.3.0版本开始,你可以动态调整 concurrentConsumers
属性.如果容器运行时修改了,会根据新设置来调需要的消费者(添加或删除).
此外,在容器中添加了一个新属性 maxConcurrentConsumers
来基于工作负载来动态调整并发数.
它可与其它四个属性一起工作: consecutiveActiveTrigger
, startConsumerMinInterval
, consecutiveIdleTrigger
, stopConsumerMinInterval
.
在默认设置的情况下,加大消费者的算法如下:
如果还没有达到maxConcurrentConsumers
,如果现有消费者活动了10个连续周期且离最后消费者启动至少消逝了10秒钟,那么将启动新的消费者. 如果消费者在txSize
* receiveTimeout
毫秒内至少收到一个消息,那么就认为此消费者是活动的.
在默认设置的情况下,减少消费者的算法如下:
如果有超过concurrentConsumers
数量的消费者在运行,且检测到消费者连续超时(空闲)了10个周期,且最后一个消费者至少停止了60秒,那么消费者将停止.
超时依赖于receiveTimeout
和 txSize
属性.当在txSize
* receiveTimeout
毫秒内未收到消息,则认为消费者是空闲的.
因此,当有默认超时(1秒)和 txSize为
4,那么在空闲40秒后,会认为消费者是空闲的并会停止(4超时对应1个空闲检测).
实际上,如果整个容器空闲一段时间,消费者将只会被停止。这是因为broker将分享其在所有活跃的消费者的工作。
3.1.17 专用消费者
也是从1.3版本开始,监听器容器可配置单个专用消费者; 这可以阻其它容器来消费队列直到当前消费者退出.
这样的容器的并发性必须是1。
当使用专用消费者时,其它容器会根据recoveryInterval
属性来消费队列, 如果尝试失败,会记录一个 WARNing 信息.
3.1.18 监听器容器队列
1.3版本在监听器容器中引入许多处理多个队列的改善措施.
容器配置必须监听至少一个队列以上; 以前也是这样的情况,但现在可以在运行时添加和删除队列了。当任何预先获取的消息被处理后,容器将回收(取消和重新创建)。
参考方法addQueues
, addQueueNames
, removeQueues
and removeQueueNames
.当删除队列时,至少要保留一个队列.
现在,只要有可用队列消费者就会启动 -先前如果没有可用队列,容器会停止.现在,唯一的问题是是否有可用队列.如果只是部分队列可用,容器会每60秒尝试被动声明(和消费)缺失队列.
此外,如果消费才从broker中收到了通道(例如,队列被删除)消费者会尝试重新恢复,重新恢复的消费会继续处理来自其它配置队列中的消息. 之前是队列上的取消会取消整个消费者,最终容器会因缺失队列而停止.
如果你想永久删除队列,你应该在删除队列的之前或之后更新容器,以避免消费.
3.1.19 恢复:从错误和代理失败中恢复
介绍
Spring提供了一些关键的 (最流行的)高级特性来处理协议错误或中间件失败时的恢复与自动重连接.
主要的重连接特性可通过CachingConnectionFactory
自身来开启. 它也常有利于使用rabbitadmin自动声明的特点.
除此之外, 如果你关心保证投递,你也许需要在RabbitTemplate中使用channelTransacted
标记以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO
(或者自己来手动应答) .
RabbitAdmin
组件在启动时可声明交换器,队列,绑定.它是通过ConnectionListener懒执行的
,因此如果启动时broker不存在,也没有关系.
Connection
第一次使用时(如.发送消息) ,监听器会被触发,admin功能也会应用.这种在监听器中自动声明的好处是,如果连接出于任何原因断开了,(如. broker死了,网络中断问题.),它们会在下次有需要的时候重新应用.
这种方式的队列声明必须要有固定的名称;要么是明确声明,要么是由框架生成AnonymousQueue
.匿名队列是非持久化的,专用的,且自动删除的.
重要
自动声明只在cachingConnectionFactory
缓存模式是CHANNEL
(默认)才可用. 这种限制的存在是因为专用和自动删除队列是绑定到connection上的.
如果你在同步序列中使用RabbitTemplate时丢失了broker的连接,那么Spring AMQP会抛出一个AmqpException
(通常但并不总是AmqpIOException
).
我们不想隐藏存在问题的事实,因此你可以捕获并对异常进行处理.如果你怀疑连接丢失了,而且这不是你的错,那么最简单的事情就是执行再次尝试操作. 重试操作可以手动进行,也可以使用Spring Retry来处理重试(强制或声明).
Spring Retry 提供了两个AOP拦截器并提供非常灵活的方式来指定retry的参数(尝试的次数,异常类型, 补偿算法等等.). Spring AMQP同时也提供了一些方便的工厂bean来创建Spring Retry拦截器, 你可以使用强类型回调接口来实现恢复逻辑.参考Javadocs和 StatefulRetryOperationsInterceptor
和StatelessRetryOperationsInterceptor
的属性来了解更多详情.
如果没有事务,或者如果一个事务是在重试回调中启动的话,则无状态重试是适当的。注意,相对于有状态重试,无状态重试只是简单配置和分析,如果存在一个正在进行的事务必须回滚或肯定会回滚的话, 这种无状态重试则是不合适的.
在事务中间掉下来的连接与回退有同样的效果, 所以对于事务开始于堆栈上的重连接来说,有状态重试通常是最佳选择(so for reconnection where the transaction is started higher up the stack, stateful retry is usually the best choice).
从1.3版本开始,提供了builder API来帮助在Java中使用这些拦截器(或者在 @Configuration
类中),例如:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有部分retry特性能通过这种方式,更加高级的特性需要在RetryTemplate
中配置.
参考Spring Retry Javadocs 来了解可用策略,配置的完整信息.
消息监听器和异步情况
如果 MessageListener
因业务异常而失败,异常可由消息监听器容器来处理,然后它会继续回去监听其它信息.如果失败是由于掉下的连接引起的(非业务异常),那么监听此消费者的监听器将退出和重启.
SimpleMessageListenerContainer
可以无逢地进行处理,并且它会在日志中记录监听器即将重启.
事实上,它会循环不断地尝试重新启动消费者,只有当消费者有非常糟糕的行为时,才会放弃。一个副作用是,如果broker在容器启动时关闭,它将会继续尝试直到建立一个连接。
业务异常处理, 相对于协议错误和连接丢失,它可能需要更多考虑和一些自定义配置,特别是处于事务或 容器应答时.
在2.8.x版本之前, RabbitMQ对于死信行为没有定义,因此默认情况下,一个因拒绝或因业务异常导致回退的消息可循环往复地重新分发.
要限制客户端的重新分发的次数,一个选择是在监听器的通知链中添加一个StatefulRetryOperationsInterceptor
. 拦截器有一个实现了自定义死信动作的恢复回调:
什么是适合你的特定的环境。
另一个选择是设置容器的rejectRequeued属性为false. 这会导致丢弃所有失败的消息.当使用RabbitMQ 2.8.x+时,这也有利于传递消息到一个死的信件交换。
或者,你可以抛出一个AmqpRejectAndDontRequeueException
;这会阻止消息重新入列,不管defaultRequeueRejected
属性设置的是什么.
通常情况下,可以组合使用这两种技术 在通知链中使用StatefulRetryOperationsInterceptor
, 在此处是MessageRecover
抛出AmqpRejectAndDontRequeueException
. MessageRecover
会一直调用,直到耗尽了所有重试.
默认MessageRecoverer
只是简单的消费错误消息,并发出WARN消息.在这种情况下,消息是通过应答的,且不会发送到死信交换器中.
从1.3版本开始,提供了一个新的RepublishMessageRecoverer
,它允许在重试次数耗尽后,发布失败消息:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
.build();
}
RepublishMessageRecoverer
会使用消息头的额外信息来发布,这些信息包括异常信息,栈轨迹,原始交换器和路由键.额外的头可通过创建其子类和覆盖additionalHeaders()
方法来添加.
Spring Retry 可以非常灵活地决定哪些异常可调用重试. 默认配置是对所有异常都进行重试.用户异常可以包装在ListenerExecutionFailedException
中,我们需要确保分类检查异常原因. 默认的分类只是看顶部级别的异常。
从 Spring Retry 1.0.3开始, BinaryExceptionClassifier
有一个属性traverseCauses
(默认为false
). 当当为true时,它将遍历异常的原因,直到它找到一个匹配或没有原因。
要使用分类重试,需要使用一个SimpleRetryPolicy
,其构造函数将接受最大尝试次数,Exception的Map,以及一个boolean值(traverseCauses),且还需要将此策略注入给RetryTemplate
.
3.1.20 调试
Spring AMQP 提供广泛的日志记录,尤其是在DEBUG级别.
如果你想在应用程序和broker之间监控AMQP协议,你可以使用像WireShark的工具, 它有一个插件可用于解码协议.
另一个选择是, RabbitMQ java client自身携带了一个非常有用的工具类:Tracer
.当以main方式运行时,默认情况下,它监听于5673 ,并连接本地的5672端口.
只需要简单的运行它,并修改你的连接工厂配置,将其连接到本地的5673端口. 它就会在控制台中显示解码的协议信息.参考Tracer
javadocs 来了解详细信息.
3.2 Logging Subsystem AMQP Appenders
框架为多个流行的日志系统提供了日志appenders:
- log4j (从Spring AMQP1.1版本开始)
- logback (从Spring AMQP1.4版本开始)
- log4j2 (从Spring AMQP1.6版本开始)
appenders使用正常机制为为子系统配置,可用属性参照下面的规定。
3.2.1 共同属性
下面的属性对于所有appenders都可用:
Table 3.4. 共同Appender属性
Property | Default | Description |
---|
exchangeName | logs | 用于发布日志事件的交换器名称. |
exchangeType | topic | 发布日志事件的交换器类型- 只在appender声明了交换器的情况下才需要. 参考declareExchange . |
routingKeyPattern | %c.%p | 日志子系统生成路由键的模式格式. |
applicationId |
| Application ID - 如果模式包含 %X{applicationId},则将其添加到路由键 . |
senderPoolSize | 2 | 用于发布日志事件的线程数目. |
maxSenderRetries | 30 | 当broker不可用时或有某些错误时,重试的次数. 延时重试像: N ^ log(N) , N 表示重试次数. |
addresses |
| 一个逗号分隔的broker地址列表: host:port[,host:port]* -覆盖host 和 port . |
host | localhost | 要连接RabbitMQ的主机. |
port | 5672 | |
virtualHost | / | 要连接的RabbitMQ虚拟主机. |
username | guest | 要连接RabbitMQ的用户. |
password | guest | 要连接RabbitMQ的用户密码. |
contentType | text/plain | 日志消息的content-type属性 .
|
contentEncoding |
| 日志消息的content-encoding属性. |
declareExchange | false | 当appender启动时,是否需要声明配置的交换器.也可参考 durable 和autoDelete . |
durable | true | 当declareExchange 为 true ,durable 标志才会设置此值. |
autoDelete | false | 当 declareExchange 为true , auto delete 标志才会设置此值. |
charset | null | 当将字符串转成byte[]时要使用的编码,默认为null (使用系统默认字符集).如果当前平台上不支持此字符集,将回退到使用系统字符集. |
deliveryMode | PERSISTENT | PERSISTENT 或 NON_PERSISTENT 用于决定RabbitMQ是否应该持久化消息. |
generateId | false | 用于确定messageId 属性是否需要设置成唯一值. |
clientConnectionProperties | null | 一个逗号分隔的key:value 对,它是连接RabbitMQ时设置的自定义客户端属性 |
3.2.2 Log4j Appender
样例log4j.properties片断.
log4j.appender.amqp.addresses=foo:5672,bar:5672
log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
log4j.appender.amqp.applicationId=myApplication
log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
log4j.appender.amqp.layout.ConversionPattern=%d %p %t [%c] - <%m>%n
log4j.appender.amqp.generateId=true
log4j.appender.amqp.charset=UTF-8
log4j.appender.amqp.durable=false
log4j.appender.amqp.deliveryMode=NON_PERSISTENT
log4j.appender.amqp.declareExchange=true
3.2.3 Log4j2 Appender
样例 log4j2.xml 片断.
<Appenders>
...
<RabbitMQ name="rabbitmq"
addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3" maxSenderRetries="5">
</RabbitMQ>
</Appenders>
3.2.4 Logback Appender
样例 logback.xml 片断.
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
</appender>
3.2.5 定制Messages
每个appenders都可以子类化,以允许你在发布前修改消息.
Customizing the Log Messages.
public class MyEnhancedAppender extends AmqpAppender {
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
}
3.2.6 定制客户端属性
简化 String 属性
每个appender都支持在RabbitMQ连接中添加客户端属性.
log4j.
log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux
logback.
<appender name="AMQP"...>...
<clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
...</appender>
log4j2.
<Appenders>
...
<RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>
这些属性是逗号分隔的key:value
队列表; 键和值不能包含逗号或 冒号.
当RabbitMQ Admin UI中查看连接上,你会看到这些属性.
使用 log4j 和 logback appenders, appenders 可以是子类化的, 允许你在连接建立前,修改客户连接属性:
定制客户端连接属性.
public class MyEnhancedAppender extends AmqpAppender {
private String foo;
@Override
protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
clientProperties.put("foo", this.foo);
}
public void setFoo(String foo) {
this.foo = foo;
}
}
对于 log4j2, 添加 log4j.appender.amqp.foo=bar
到log4j.properties 来设置发展.
对于logback, 在logback.xml中添加 <foo>bar</foo>
.
当然,对于像这个例子中简单的String 属性,可以使用先前的技术;
子类允许更丰富的属性(如添加 Map
的numeric 属性).
使用log4j2, 子类是不被支持的,因为 log4j2 使用静态工厂方法.
3.3 样例应用程序
3.3.1 介绍
Spring AMQP Samples 项目包含了两个样例应用程序. 第一个简单的"Hello World" 示例演示了同步和异步消息的处理. 它为理解基础部分提供了一个很好的开端.
第二个基于股票交易的例子演示了真实应用程序中的交互场景.在本章中,我们会每个示例进行快速浏览,使您可以专注于最重要的组成部分.
这两个例子都是基于Maven的,因此你可以直接将它们导入任何支持Maven的IDE中(如. SpringSource Tool Suite).
3.3.2 Hello World
介绍
Hello World示例演示了同步和异步消息处理.你可以导入spring-rabbit-helloworld 示例到IDE中并跟随下面的讨论.
同步例子
在src/main/java 目录中,导航到org.springframework.amqp.helloworld 包中.
打开HelloWorldConfiguration 类,你可以注意到它包含了@Configuration 类级注解和一些@Bean 方法级注解.
这是Spring 的基于Java的配置.你可进一步的了解here.
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =null;
connectionFactory =new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中同样也包含了RabbitAdmin的实例
, 它会默认查找类型为Exchange, Queue, 或 Binding的bean并在broker中进行声明.
事实上,"helloWorldQueue" bean是在HelloWorldConfiguration 中生成的,因为它是 Queue的实例.
@Bean
public Queue helloWorldQueue() {
returnnew Queue(this.helloWorldQueueName);
}
重看"rabbitTemplate"bean配置,你会看到它将helloWorldQueue的名称设成了"queue"属性(用于接收消息) 以及"routingKey" 属性(用于发送消息).
现在,我们已经探索了配置,让我们看看实际上使用这些组件的代码。
首先,从同一个包内打开Producer类。它包含一个用于创建Spring ApplicationContext的main()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在上面的例子中你可以看到, 取回的AmqpTemplate用来发送消息.因为客户端代码应该尽可能地依赖于接口,因此类型是AmqpTemplate而不是RabbitTemplate.
即使在HelloWorldConfiguration中创建的bean是RabbitTemplate的实例,依赖于接口则意味着这端代码更具有便携性(portable) (配置可以独立于代码进行修改).
因为convertAndSend() 方法是通过模板来调用的,因此模板会将调用委派给它的MessageConverter实例.在这种情况下,它默认使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中为"rabbitTemplate"指定其它的实现.
现在打开Consumer类. 它实际上共享了同一个配置基类,这意味着它将共享"rabbitTemplate" bean. 这就是为什么我们要使用"routingKey" (发送) 和"queue" (接收)来配置模板的原因.
正如你在Section 3.1.4, “AmqpTemplate”中看到的,你可以代替在发送方法中传递routingKey参数,代替在接收方法中传递queue 参数. Consumer 代码基本上是Producer的镜子,只不过调用的是receiveAndConvert()而非convertAndSend()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
你如果运行Producer,然后再运行Consumer, 在控制台输出中,你应该能看到消息"Received: Hello World"
异步示例
我们已经讲解了同步Hello World样例, 是时候移动到一个稍微先进,但更强大的选择上了.稍微修改一下代码,Hello World 样例就可以可以提供异步接收的示例了,又名 Message-driven POJOs. 事实上,有一个子包明确地提供了这种功能: org.springframework.amqp.samples.helloworld.async.
再一次地我们将从发送端开始. 打开ProducerConfiguration类可注意到它创建了一个"connectionFactory"和"rabbitTemplate" bean.
这次,由于配置是专用于消息发送端,因此我们不需要任何队列定义,RabbitTemplate只须设置routingKey属性.
回想一下,消息是发送到交换器上的而不是直接发到队列上的. AMQP默认交换器是无名称的direct类型交换器.
所有队列都是通过使用它们的名称作为路由键绑定到默认交换器上的.这就是为什么在这里我们只提供路由键的原因.
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于这个示例展示的是异步消息处理,生产方设计为连续发送消息(尽管类似于同步版本中的 message-per-execution模型,但不太明显,实际上它是消息驱动消费者)负责连续发送消息的组件是作为ProducerConfiguration类中的内部类来定义的,每3秒执行一次.
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
你不必要完全了解这些细节,因为真正的关注点是接收方(我们马上就会讲解).然而,如果你还熟悉Spring 3.0 任务调度支持,你可从here这里来了解.
简短故事是:在 ProducerConfiguration 中的"postProcessor" bean使用调度器来注册了任务.
现在,让我们转向接收方. 为强调 Message-driven POJO 行为,将从对消息起反应的组件开始.
此类被称为HelloWorldHandler.
publicclass HelloWorldHandler {
publicvoid handleMessage(String text) {
System.out.println("Received: " + text);
}
}
相当明显的, 这是一个POJO. 它没有继承任何基类,它没有实现任何接口,它甚至不包含任何导入. 它将通过Spring AMQP MessageListenerAdapter来适配MessageListener接口.然后适配器可配置在SimpleMessageListenerContainer上.
在这个例子中,容器是在ConsumerConfiguration类中创建的.你可以看到POJO是包装在适配器中的.
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer是一个Spring生命周期组件,默认会自动启动.如果你看了Consumer类的内部,你会看到main()方法中除了一行启动创建ApplicationContext的代码外,其它什么都没有.
Producer的main()方法也只有一行启动,因为以 @Scheduled注解的组件会自动开始执行.你可以任何顺序来启动Producer 和Consumer,你会看每秒就会发送消息和接收到消息.
3.3.3 股票交易(Stock Trading)
Stock Trading 示例演示了比Hello World示例更高级的消息场景.然而,配置却是很相似的 - 只是有一点复杂.
由于我们已经详细讲解了Hello World配置,因此在这里我们将重点关注不一样的东西. 有一个服务器发送市场数据(股票报价)到Topic交换器中.
然后,客户端可订阅市场数据,即通过使用路由模式(如. "app.stock.quotes.nasdaq.*")来绑定队列(e.g. "app.stock.quotes.nasdaq.*").
这个例子的其它主要功能是 有一个请求回复“股票交易”的互动,它是由客户发起并由服务器来处理的. 这涉及到一个私有的“回复(replyTo)”队列,发送客户端的信息在请求消息中。
服务器的核心配置在RabbitServerConfiguration类中(位于 org.springframework.amqp.rabbit.stocks.config.server 包中).
它继承了 AbstractStockAppRabbitConfiguration. 这是服务器和客户端定义常用资源的地方,包括市场数据Topic交换器(其名称为app.stock.marketdata) 以及服务器公开股票交易的队列(其名称为app.stock.request).
在那个公共配置文件中,你会看到在RabbitTemplate上配置了一个JsonMessageConverter.
服务器特有配置由2部分组成.首先,它在RabbitTemplate上配置了市场数据交换器,这样在发送消息时,就不必提供交换器名称.它是通过基础配置类中的抽象回调方法中定义做到这一点的.
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次, 声明了股票请求队列.在这里,它不需要任何明确的绑定,因为它将以它自己的名称作为路由键来绑定到无名称的默认交换器上.正如先前提到的,AMQP规范定义了此种行为.
@Beanpublic Queue stockRequestQueue() {
returnnew Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在你已经看过了服务器的AMQP资源配置,导航到src/test/java目录下的org.springframework.amqp.rabbit.stocks包.在那里你会实际的 提供了main()方法的Server类.
它基于server-bootstrap.xml 创建了一个ApplicationContext.在那里,你会看到发布虚假市场数据的调度任务.
那个配置依赖于Spring 3.0的"task"命名空间支持.bootstrap配置文件也导入了其它一些文件.最令人关注的是位于src/main/resources目录下的server-messaging.xml.在那里,你会看到"messageListenerContainer" bean,它负责处理股票交易请求.
最后在看一下定义在src/main/resources目录下的server-handlers.xml,其中定义了一个 "serverHandler" bean.这个bean是ServerHandler类的实例,它是Message-driven POJO 的好例子,它也有发送回复消息的能力.
注意,它自身并没有与框架或任何AMQP概念耦合.它只是简单地接受TradeRequest并返回一个TradeResponse.
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看了服务端的重要配置和代码,让我们转向客户端.最佳起点是从 org.springframework.amqp.rabbit.stocks.config.client 包下的RabbitClientConfiguration开始.
注意,它声明了两个不带明确参数的队列.
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
那些是私有队列, 唯一名称会自动自成.客户端会用第一个生成的队列来绑定由服务端公开的市场交换器.
记住在AMQP中,消费者与队列交互,而生产者与交换器交互. 队列和交换器之间的绑定指示broker从给定的交换器中投递或路由什么消息给队列.
由于市场交换器是一个Topic交换器,绑定可通过路由正则表达式来表达.
RabbitClientConfiguration声明了一个Binding对象,其对象是通过BindingBuilder的便利API来生成的.
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意,实际值已经在属性文件(src/main/resources目录下的"client.properties")中外部化了,因此我们使用Spring的@Value 注解来注入值.这通常是一个好主意,否则值就会硬编码在类中,没有修改就没有重新编译.
在这种情况下,通过修改绑定中的路由正则表达式,可很容易地运行多个版本的Client.让我们立即尝试.
启动运行org.springframework.amqp.rabbit.stocks.Server然后再运行 org.springframework.amqp.rabbit.stocks.Client.你将会看到NASDAQ股票的交易报价,因为关联stocks.quote.pattern 键的值在client.properties中是app.stock.quotes.nasdaq.
现在,保持现有Server 和Client 运行,将其属性值修改为app.stock.quotes.nyse.再启动第二个Client实例.你会看到第一个client仍然接收NASDAQ 报价,而第二个client接收的NYSE报价. 你可以改变模式,获取所有的股票报价或个别股票的报价。
最后一个我们将暴露的特性是从客户端的角度来看待请求-回复交互.记住我们已经看了ServerHandler,它会接受TradeRequest对象并返回TradeResponse对象. 客户端相应的代码是 RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway 包).为发送消息,它会委派给RabbitTemplate.
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
thrownew AmqpException(e);
}
return message;
}
});
}
注意,在发送消息前,它设置了"replyTo"地址. 这提供了队列,此队列是由上面的"traderJoeQueue" bean 定义生成的. 以下是StockServiceGateway类的@Bean定义.
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果你没有运行服务器和客户端,现在就启动它们. 尝试使用100 TCKR的格式来发送请求.经过一个简短的人工延迟来模拟“处理”请求,你应该看到一个确认消息出现在客户端上。
3.4 测试支持
3.4.1 介绍
为异步程序写集成测试比测试简单程序更复杂. 当引入了@RabbitListener这样的注解时,这尤其更加复杂.
现在的问题是发送消息后,如何来验证, 监听器按预期收到了消息.
框架自身带有许多单元测试和集成测试;有些使用mocks, 另外一些使用真实的RabbitMQ broker来集成测试. 您可以参照测试场景的一些想法进行测试。
Spring AMQP 1.6版本引入了sring-rabbit-test
jar ,它提供一些测试复杂场景的测试. 预计这一项目将随着时间的推移进行扩展,但我们需要社会反馈以帮助测试。请使用JIRA问题或GitHub提供这样的反馈。
3.4.2 Mockito Answer<?> 实现
当前有两个Answer<?>
实现可帮助测试:
第一个, LatchCountDownAndCallRealMethodAnswer
提供了返回null和计数下一个锁存器的Answer<Void>
.
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());
...
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
第二个, LambdaAnswer<T>
提供了一种调用真正方法的机制,并提供机会来返回定制结果(基于InvocationOnMock和结果
).
public class Foo {
public String foo(String foo) {
return foo.toUpperCase();
}
}
Foo foo = spy(new Foo());
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(foo).foo(anyString());
assertEquals("FOOFOO", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(foo).foo(anyString());
assertEquals("FOOfoo", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(foo).foo(anyString());
assertEquals("foofoo", foo.foo("foo"));
When using Java 7 or earlier:
doAnswer(new LambdaAnswer<String>(true, new ValueToReturn<String>() {
@Overridepublic String apply(InvocationOnMock i, String r) {
return r + r;
}
})).when(foo).foo(anyString());
3.4.3 @RabbitListenerTest and RabbitListenerTestHarness
在你的@Configuration
类中使用 @RabbitListenerTest
(它也会通过@EnableRabbit来启用@RabbitListener
探测).注解会导致框架使用子类RabbitListenerTestHarness来代替标准RabbitListenerAnnotationBeanPostProcessor.
RabbitListenerTestHarness
通过两种方式来增强监听器 - 将其包装进Mockito Spy
, 启用了Mockito
存根和验证操作.也可在监听器添加Advice
来启用对参数,结果或异常的访问.
你可以控制哪一个(或两个)来在@RabbitListenerTest上启用属性. 后者用于访问调用中更为低级数据- 它也支持测试线程阻塞,直到异步监听器被调用.
重要
final
@RabbitListener
不能被发现或通知 ,同时,只有带id属性的监听器才能发现或通知.
让我们看一些例子.
使用spy:
@Configuration
@RabbitListenerTest
public class Config {
@Bean
public Listener listener() {
returnnew Listener();
}
...
}
public class Listener {
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness;
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
Listener listener = this.harness.getSpy("foo");
assertNotNull(listener);
verify(listener).foo("foo");
}
@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer).when(listener).foo(anyString(), anyString());
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}
}
| 将harness 注入进测试用于,这样我们可访问spy. |
| 获取spy引用,这样我们可以验证是否按预期在调用. 由于这是一个发送和接收操作,因此不必暂停测试线程,因为RabbitTemplate 在等待回复时已经暂停过了. |
| 在这种情况下,我们只使用了发送操作,因为我们需要一个门闩来等待对容器线程中监听器的异步调用. 我们使用了Answer<?> 一个实现来帮助完成. |
| 配置spy来调用Answer . |
使用捕获建议:
@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {
}
@Service
public class Listener {
private boolean failed;
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
thrownew RuntimeException(foo);
}
failed = false;
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness;
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS);
assertThat(invocationData.getArguments()[0], equalTo("foo"));
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}
@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage());
}
}
| 将harness注入进测试用例,以便我们能获取spy的访问. |
| 使用 harness.getNextInvocationDataFor() 来获取调用数据 - 在这种情况下,由于它处于request/reply 场景,因为没有必要等待,因为测试线程在RabbitTemplate 中等待结果的时候,已经暂停过了. |
| 我们可以验证参数和结果是否与预期一致 |
| 这次,我们需要时间来等待数据,因为它在容器线程上是异步操作,我们需要暂停测试线程. |
| 当监听器抛出异常时,可用调用数据中的throwable 属性 |
4. Spring 整合- 参考
这部分参考文档提供了在Spring集成项目中提供AMQP支持的快速介绍.
4.1 Spring 整合AMQP支持4.1.1 介绍
Spring Integration 项目包含了构建于Spring AMQP项目之上的AMQP 通道适配器(Channel Adapters)和网关(Gateways). 那些适配器是在Spring集成项目中开发和发布的.在Spring 集成中, "通道适配器" 是单向的,而网关是双向的(请求-响应).
我们提供了入站通道适配器(inbound-channel-adapter),出站通道适配器( outbound-channel-adapter), 入站网关(inbound-gateway),以及出站网关(outbound-gateway).
由于AMQP 适配器只是Spring集成版本的一部分,因为文档也只针对Spring集成发行版本部分可用.
作为一个品酒师,我们只快速了解这里的主要特征。
4.1.2 入站通道适配器
为了从队列中接收AMQP消息,需要配置一个个<inbound-channel-adapter>
<amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.3 出站通道适配器
为了向交换器发送AMQP消息,需要配置一个<outbound-channel-adapter>. 除了交换名称外,还可选择提供路由键。
<amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
4.1.4 入站网关
为了从队列中接收AMQP消息,并回复到它的reply-to地址,需要配置一个<inbound-gateway>.
<amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.5 出站网关
为了向交换器发送AMQP消息并接收来自远程客户端的响应,需要配置一个<outbound-gateway>.
除了交换名称外,还可选择提供路由键。
<amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
除了这份参考文档,还有其它资源可帮助你了解AMQP.
5.1 进阶阅读
对于那些不熟悉AMQP的人来说, 规范 实际上具有相当的可读性.
这当然是信息的权威来源,对于熟悉规范的人来说,Spring AMQP代码应该很容易理解。
目前RabbitMQ实现基于2.8.x版本,并正式支持AMQP 0.8和9.1。我们推荐阅读9.1文档。
在RabbitMQ Getting Started 页面上,还有许多精彩的文章,演示, 博客. 因为当前只有Spring AMQP实现, 但我们仍建议将其作为了解所有中间件相关概念的起点.