随笔 - 41  文章 - 7  trackbacks - 0
<2016年8月>
31123456
78910111213
14151617181920
21222324252627
28293031123
45678910

常用链接

留言簿

随笔分类

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

3.1.9 Request/Reply 消息

介绍

AmqpTemplate 也提供了各种各样的sendAndReceive 方法,它们接受同样的参数选项(exchange, routingKey, and Message)来执行单向发送操作.
这些方法对于request/reply 场景也是有用的,因为它们在发送前处理了必要的"reply-to"属性配置,并能通过它在专用队列(基于回复功能临时创建的队列)上监听回复消息.

类似的request/reply方法也是可用的, MessageConverter 可应用于请求和回复上.这些方法被称为convertSendAndReceive.参考 AmqpTemplate 的JavaDoc来了解详情.

从1.5.0版本开始,每个sendAndReceive 方法变种都有一个接受CorrelationData的重载版本. 连同正确配置的连接工厂,这使得发布者可以确认发送方的操作.
参考 the section called “Publisher Confirms and Returns” 来了解详情.

Reply 超时

默认情况下,这些 send和receive方法会在5秒后超时并返回null. 这可以通过设置replyTimeout属性来修改.
从1.5版本开始,如果你设置了 mandatory 属性为true (或特定消息上的 mandatory-expression 评估为true),如果消息不能投递到队列中,将抛出AmqpMessageReturnedException.
这个 exception 有 returnedMessagereplyCodereplyText 属性, 如同用于发送的exchangeroutingKey


这个功能使用了发布者返回特性,可通过在CachingConnectionFactory设置publisherReturns 为true来启用(参考the section called “Publisher Confirms and Returns”).
此外,你不必在RabbitTemplate上注册你自己的ReturnCallback.

RabbitMQ Direct reply-to

重要
从3.4.0版本开始,RabbitMQ 服务器现在支持 Direct reply-to,基于主要原因,它消除了固定回复队列(为了避免为每个请求创建临时队列).
Spring AMQP 1.4.1 版本开始,Direct reply-to 就已经做为了默认使用(如果服务器支持的话),而不再创建临时队列.
当没有提供replyQueue(或设置名称为amq.rabbitmq.reply-to), RabbitTemplate 会自动探测是否支持Direct reply-to, 要么使用它或使用临时回复队列来回退. 当使用Direct reply-to, reply-listener不是必需的,不应该被配置。

Reply 监听器仍然运行命名队列(不是amq.rabbitmq.reply-to),允许控制并发回复.

从.16版本开始,出于某些原因,你想为每个回复使用临时的,专用的,自动删除的队列,你可以设置useTemporaryReplyQueues 属性为true. 如果你设置了replyAddress,此属性会被忽略.

决定是否使用 direct reply-to,可以通过继承RabbitTemplate 并覆盖useDirectReplyTo()来修改. 

此方法只在发出第一个请求时,调用一次.

应答队列的消息相关性

当使用固定回复队列时(不是amq.rabbitmq.reply-to), 必须要提供 correlation data,这样回复才能关联请求.参考RabbitMQ Remote Procedure Call (RPC).
默认情况下,标准correlationId 属性会用来持有correlation data. 然而,如果你想使用自定义属性来持有correlation data, 你可在 <rabbit-template/>中设置 correlation-key 属性.
显示设置属性为correlationId 将与缺省属性相同. 当然,客户端和服务器对于correlation data必须要有相同的头.

Spring AMQP 1.1版本为这个data使用自定义属性spring_reply_correlation.如果你想在当前版本中恢复这种行为,也许是为了保持1.1中的另一个应用程序的兼容性,你必须设置属性以spring_reply_correlation。

回复监听器容器

当使用3.4.0之前的Rabbit版本,每个回复都会使用一个新临时队列. 然而,可在模板上配置单个回复队列, 这将更加高效,同时也允许你在队列上设置参数.然而,在这种情况下,你必须提供<reply-listener/>子元素. 

这个元素为回复队列提供了监听器容器, 以模板为监听器.
所有 Section 3.1.15, “Message Listener Container Configuration” 中的属性都可以配置在<listener-container/> 元素中,除了connection-factory 和 message-converter(它们是模块配置中继承下来的).

重要

如果运行了多个应用程序实例或者使用了多个RabbitTemplate,那么你必须为每个都使用唯一的回复队列- RabbitMQ 没有在队列中选择消息的能力,如果它们都使用相同队列,每个实例都将竞争的答复,而不一定是收到他们自己的。

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
reply-queue="replies" reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>

由于容器和模板可共享一个连接工厂,它们不会共享一个通道,因此请求和回复不是在同一个事务中执行的(如果是事务的).

重要

在1.5.0版本之前, reply-address 属性不可用,回复总是通过默认交换器和reply-queue作路由键来进行的. 现在这依然是默认的,但现在你可以指定新的reply-address 属性. 
reply-address 可以包含<exchange>/<routingKey> 形式的地址,回复将会路由到设定的exchange和路由到routing key绑定的队列上. 
reply-address 优先于 reply-queue<reply-listener> 必须配置为一个单独的<listener-container> 组件, 当只使用reply-address 时,无论是reply-address 还是 reply-queue (在<listener-container>中的queue属性) 必须指的是同一个队列.

在这个配置中,SimpleListenerContainer 用于接收回复; 而RabbitTemplate 将成为MessageListener. 当使用<rabbit:template/> 命名空间元素定义模板时, 正如上面所展示的, 分析器会定义容器,并将模板作为监听器进行包装.

重要

当模板不使用固定 replyQueue (或使用Direct reply-to - 参考 the section called “RabbitMQ Direct reply-to”) ,则不需要监听器容器. 当在RabbitMQ3.4.0+使用时,Direct reply-to 是更好的机制.


如果你将 RabbitTemplate 定义为 <bean/>, 或使用 @Configuration 类将其定义为@Bean,或者通过编程来创建模板,你需要自己定义和包装回复监听器容器.
如果这样做失败了,模板将不会收到回复,并最终会超时并返回null作为对sendAndReceive 方法调用者的回复.

从1.5版本开始, RabbitTemplate 会探测是否配置了MessageListener 来接收回复.如果没有,它会尝试发送并使用回复地地址来接收消息,如果失败了,则会抛出 IllegalStateException (因为不会收到回复).

此外,如果使用了简单的replyAddress (队列名称),回复监听器容器会验证与监听的队列是否是一样的名称.但如果这个地址是交换器和路由键,这种检查不会被执行,会输出调试日志信息.

重要

当在编写回复监听器和模板时,重要的一点是要保证模板的replyQueue 与容器的queues (或queueNames) 属性指的是相同的队列. 模板会将回复队列插入到出站消息的replyTo属性中.

下面的例子展示了如何来包装这些beans.

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {         
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate; }
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {         
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {         
return new Queue("my.reply.queue");
}

完整的RabbitTemplate 包装固定回复队列,与远程监听器容器的请求回复处理展示在 this test case.

重要

当回复超时时(replyTimeout), sendAndReceive() 方法会返回null.


在1.3.6版本之前, 消息超时回复只是简单地记录下来.现在,如果收到了迟到回复,将会拒绝(模板会抛出AmqpRejectAndDontRequeueException).
如果回复队列配置了将拒绝消息到死信交换器中, 可获取回复来作后面的分析. 只须将队列以队列名称作为路由键绑定到死信交换器中.

参考RabbitMQ Dead Letter Documentation 来了解更多关于死信的配置信息.

你也可以看示例中关于FixedReplyQueueDeadLetterTests 测试用例.

AsyncRabbitTemplate

1.6版本引入了 AsyncRabbitTemplate

它有与 AmqpTemplate 上类似的sendAndReceive (和 convertSendAndReceive) 方法,但不是阻塞的,它们会返回一个 ListenableFuture.

sendAndReceive 方法返回一个RabbitMessageFutureconvertSendAndReceive 方法会返回一个RabbitConverterFuture.

你可以同步稍后在future上调用get()方法来获取结果,也可以注册一个回调异步来获取结果.

@Autowired
private AsyncRabbitTemplate template;  
...
public void doSomeWorkAndGetResultLater() {
...
ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get();
}
catch (ExecutionException e) {
...
} ...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.addCallback(new ListenableFutureCallback<String>() {
@Override
publicvoid onSuccess(String result) { 
...
}
@Override
publicvoid onFailure(Throwable ex) { 
...
}
});
...
}

如果设置了mandatory ,且消息不能投递,future 会抛出一个ExecutionException ,并带有AmqpMessageReturnedException 原因,它封装了返回的消息和以及关于返回的信息.

如果设置了enableConfirms ,future会包含一个属性confirm ,它是 ListenableFuture<Boolean> , true 表示成功的发布.

如果confirm future是false,RabbitFuture 会有一个属性nackCause - 如果可用的话,则代表的是失败的原因.

重要

发布者确认已被废弃了(如果在回复之后收到),-因为回复已经暗示了成功发布.

在模板上设置receiveTimeout 属性来表示回复超时时间(它默认为 30 秒).如果发生了超时,future会以AmqpReplyTimeoutException结束.

模板可实现SmartLifecycle; 这样可阻止模板在等待回复时Future 退出.

Spring 远程调用 AMQP

Spring Framework 有一个普遍的远程处理能力, 允许 Remote Procedure Calls (RPC) 使用多种传输协议. Spring-AMQP 通过在客户端使用AmqpProxyFactoryBean ,在服务端使用AmqpInvokerServiceExporter也可以提供类似的机制.
它提供了基于AMQP的RPC. 在客户端,
RabbitTemplate 可以按照上面一样来使用,在服务器端, invoker (配置为MessageListener) 会收到消息, 调用配置的服务,使用入站消息的replyTo信息来返回回复.

客户端工厂可注入任何bean (使用它的serviceInterface);客户端然后可以调用代理上的方法,导致在AMQP上远程执行.

重要

使用默认 MessageConverter 器,方法参数和返回值必须是Serializable的实例.

在服务器端,AmqpInvokerServiceExporter 包含AmqpTemplate 和 MessageConverter属性. 

目前,未使用模板的MessageConverter.如果你需要提供定制的消息转换器,那么你需要使用messageConverter 属性进行提供.在客户端,可在AmqpTemplate 中添加定制消息转换器,它是使用其amqpTemplate 属性提供给 AmqpProxyFactoryBean 的.

样例 client 和server 配置如下所示.

<bean id="client" class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<propertyname="amqpTemplate "ref="template" />
<propertyname="serviceInterface"value="foo.ServiceInterface" />
</bean>
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"routing-key="remoting.binding" exchange="remoting.exchange" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="listener" class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
</bean><bean id="service" class="foo.ServiceImpl" />
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
重要
AmqpInvokerServiceExporter 只能处理适当格式的消息,如果从AmqpProxyFactoryBean中发出的消息. 如果它接收到一个不能解释的消息,那么将发送一个序列化的RuntimeException 作为回复.
如果这些消息无re
plyToAddress 属性,消息会被拒绝且在没有配置死信交换器时会永久丢失.
默认情况下,如果请求消息不能投递,调用线程最终会超时,并会抛出RemoteProxyFailureException. 超时时间是5秒,可在RabbitTemplate通过设置replyTimeout 属性来修改.
从1.5版本开始,如果设置 mandatory 属性为true, 并在连接工厂中启用了返回(参考 the section called “Publisher Confirms and Returns”), 调用线程会抛出一个AmqpMessageReturnedException.
参考 the section called “Reply Timeout” 来了解更多信息.



posted on 2016-08-13 15:59 胡小军 阅读(6611) 评论(0)  编辑  收藏 所属分类: RabbitMQ

只有注册用户登录后才能发表评论。


网站导航: