@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "foo", value = "bar"),
@Argument(name = "baz")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
注意队列的x-message-ttl
参数设为了10秒钟,因为参数类型不是String
, 因此我们指定了它的类型,在这里是Integer
.有了这些声明后,如果队列已经存在了,参数必须匹配现有队列上的参数.对于header交换器,我们设置binding arguments 要匹配头中foo为bar,且baz可为任意值的消息. x-match
参数则意味着必须同时满足两个条件.
参数名称,参数值,及类型可以是属性占位符(${...}
) 或SpEL 表达式(#{...}
). name
必须要能解析为String
; type的表达式必须能解析为Class
或类的全限定名. value
必须能由DefaultConversionService
类型进行转换(如上面例子中x-message-ttl
).
如果name 解析为null
或空字符串,那么将忽略 @Argument
.
元注解(Meta-Annotations)
有时,你想将同样的配置用于多个监听器上. 为减少重复配置,你可以使用元注解来创建你自己的监听器注解:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public@interface MyAnonFanoutListener {
}
public class MetaListener {
@MyAnonFanoutListener
public void handle1(String foo) {
...
}
@MyAnonFanoutListener
public void handle2(String foo) {
...
}
}
在这个例子中,每个通过@MyAnonFanoutListener创建的监听器都会绑定一个匿名,自动删除的队列到fanout交换器 metaFanout上
. 元注解机制是简单的,在那些用户定义注解中的属性是不会经过检查的- 因此你不能从元注解中覆盖设置.当需要高级配置时,使用一般的 @Bean
定义.
Enable Listener Endpoint Annotations
为了启用 @RabbitListener
注解,需要在你的某个@Configuration类中添加@EnableRabbit
注解.
@Configuration
@EnableRabbit
publicclass AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
默认情况下,基础设施会查找一个名为rabbitListenerContainerFactory
的bean作为工厂来源来创建消息监听器容器. 在这种情况下,会忽略RabbitMQ 基础设施计划, processOrder
方法可使用核心轮询大小为3个线程最大10个线程的池大小来调用.
可通过使用注解或实现RabbitListenerConfigurer
接口来自定义监听器容器工厂. 默认只需要注册至少一个Endpoints,而不需要一个特定的容器工厂.查看javadoc来了解详情和例子.
如果你更喜欢XML配置,可使用 <rabbit:annotation-driven>
元素.
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers "value="3"/>
<property name="maxConcurrentConsumers"value="10"/>
</bean>
注解方法的消息转换
在调用监听器之前,在管道中有两个转换步骤. 第一个使用 MessageConverter
来将传入的Spring AMQP Message
转换成spring-消息系统的消息. 当目标方法调用时,消息负载将被转换,如果有必要,也会参考消息参数类型来进行.
第一步中的默认 MessageConverter
是一个Spring AMQP SimpleMessageConverter
,它可以处理String
和 java.io.Serializable对象之间的转换
; 其它所有的将保留为byte[]
. 在下面的讨论中,我们称其为消息转换器.
第二个步骤的默认转换器是GenericMessageConverter
,它将委派给转换服务(DefaultFormattingConversionService的实例
). 在下面的讨论中,我们称其为方法参数转换器.
要改变消息转换器,可在连接工厂bean中设置其相关属性:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}
这配置了一个Jackson2 转换器,希望头信息能通过它来指导转换.
你也可以考虑使用ContentTypeDelegatingMessageConverter
,它可以处理不同内容类型的转换.
大多数情况下,没有必要来定制方法参数转换器,除非你想要用自定义的ConversionService
.
在1.6版本之前,用于转换JSON的类型信息必须在消息头中提供或者需要一个自定义的ClassMapper
. 从1.6版本开始,如果没有类型信息头,类型可根据目标方法参数推断.
类型推断只能用于 @RabbitListener
的方法级.
参考 the section called “Jackson2JsonMessageConverter” 来了解更多信息.
如果您希望自定义方法参数转换器,您可以这样做如下:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
...
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}
@Bean
public ConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
编程式 Endpoint 注册
RabbitListenerEndpoint
提供了一个Rabbit endpoint 模型并负责为那个模型配置容器.除了通过RabbitListener注解检测外,这个基础设施允许你通过编程来配置endpoints.
@Configuration
@EnableRabbit
publicclass AppConfig implements RabbitListenerConfigurer {
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在上面的例子中,我们使用了SimpleRabbitListenerEndpoint
(它使用MessageListener
来进行处理),但你也可以构建你自己的endpoint变种来描述自定义的调用机制.
应该指出的是,你也可以跳过@RabbitListener
的使用,通过RabbitListenerConfigurer来编程注册你的endpoints.
Annotated Endpoint Method Signature
到目前为止,我们已经在我们的端点上注入了一个简单的字符串,但它实际上可以有一个非常灵活的方法签名。让我们重写它,以一个自定义的头来控制注入顺序:
@Component
publicclass MyService {
@RabbitListener(queues = "myQueue")
publicvoid processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
下面是你可以在监听端点上注入的主要元素:
原生org.springframework.amqp.core.Message
.
用于接收消息的
com.rabbitmq.client.Channel
org.springframework.messaging.Message
代表的是传入的AMQP消息.注意,这个消息持有自定义和标准的头部信息 (AmqpHeaders定义
).
从1.6版本开始, 入站deliveryMode
头可以AmqpHeaders.RECEIVED_DELIVERY_MODE
使用,代替了AmqpHeaders.DELIVERY_MODE
.
@Header
-注解方法参数可 提取一个特定头部值,包括标准的AMQP头.
@Headers
-注解参数为了访问所有头信息,必须能指定为java.util.Map
.
非注解元素(非支持类型(如. Message
和Channel
))可认为是负荷(payload).你可以使用 @Payload来明确标识
. 你也可以添加额外的 @Valid来进行验证
.
注入Spring消息抽象的能力是特别有用的,它可受益于存储在特定传输消息中的信息,而不需要依赖于特定传输API.
@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}
方法参数的处理是由DefaultMessageHandlerMethodFactory
提供的,它可以更进一步地定制以支持其它的方法参数. 转换和验证支持也可以定制.
例如,如果我们想确保我们的Order在处理之前是有效的,我们可以使用@Valid
来注解负荷,并配置必须验证器,就像下面这样:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
监听多个队列
当使用queues
属性时,你可以指定相关的容器来监听多个队列. 你可以使用 @Header
注解来指定对于那些队列中收到的消息对POJO方法可用:
@Component
public class MyService {
@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
从1.5版本开始,队列名称可以使用属性占位符和SpEL:
@Component
public class MyService {
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
在1.5版本之前,只有单个队列可以这种方法进行指定,每个队列需要一个单独的属性.
回复管理
MessageListenerAdapter
现有的支持已经允许你的方法有一个非void的返回类型.在这种情况下,调用的结果被封装在一个发送消息中,其消息发送地址要么是原始消息的ReplyToAddress头指定的地址要么是监听器上配置的默认地址.默认地址现在可通过@SendTo
注解进行设置.
假设我们的processOrder
方法现在需要返回一个OrderStatus
, 可将其写成下面这样来自动发送一个回复:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你需要以传输独立的方式来设置其它头,你可以返回Message
,就像这样:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
@SendTo
值按照exchange/routingKey模式(其中的一部分可以省略)来作为对exchange
和 routingKey
的回复.有效值为:
foo/bar
- 以交换器和路由键进行回复.
foo/
- 以交换器和默认路由键进行回复.
bar
or /bar
- 以路由键和默认交换器进行回复.
/
or empty - 以默认交换器和默认路由键进行回复.
@SendTo
也可以没有value
属性. 这种情况等价于空的sendTo 模式. @SendTo
只能应用于没有replyToAddress
属性的入站消息中.
从1.5版本开始, @SendTo
值可以通过bean SpEL 表达式初始化,例如…
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return"test.sendTo.reply.spel";
}
表达式必须能评估为String
,它可以是简单的队列名称(将发送到默认交换器中) 或者是上面谈到的exchange/routingKey
形式.
在初始化时,#{...}
表达式只评估一次.
对于动态路由回复,消息发送者应该包含一个reply_to
消息属性或使用运行时SpEL 表达式.
从1.6版本开始, @SendTo
可以是SpEL 表达式,它可在运行时根据请求和回复来评估:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表达式的运行时性质是由 !{...}
定界符表示的. 表达式评估上下文的#root
对象有三个属性:
request
- o.s.amqp.core.Message
请求对象.source
- 转换后的 o.s.messaging.Message<?>
.result
- 方法结果.
上下文有一个map 属性访问器,标准类型转换器以及一个bean解析器,允许引用上下文中的其它beans (如.@someBeanName.determineReplyQ(request, result)
).
总结一下, #{...}
只在初始化的时候评估一次, #root
对象代表的是应用程序上下文; beans可通过其名称来引用. !{...}
会在运行时,对于每个消息,都将使用root对象的属性进行评估,bean可以使用其名称进行引用,前辍为@
.
多方法监听器
从1.5.0版本开始,@RabbitListener
注解现在可以在类级上进行指定.与新的@RabbitHandler
注解一起,基于传入消息的负荷类型,这可以允许在单个监听器上调用不同的方法.这可以用一个例子来描述:
@RabbitListener(id="multi", queues = "someQueue")
publicclass MultiListenerBean {
@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}
@RabbitHandler
public String baz(Baz baz) {
...
}
@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}
}
在这种情况下,独立的 @RabbitHandler
方法会被调用,如果转换后负荷是Bar
, Baz
或Qux
. 理解基于负荷类型系统来确定唯一方法是很重要的.类型检查是通过单个无注解参数来执行的,否则就要使用@Payload
进行注解. 注意同样的方法签名可应用于方法级 @RabbitListener
之上.
注意,如果有必要,需要在每个方法上指定@SendTo,
在类级上它是不支持的.
@Repeatable @RabbitListener
从1.6版本开始,@RabbitListener
注解可用 @Repeatable进行标记
. 这就是说,这个注解可多次出现在相同的注解元素上(方法或类).在这种情况下,对于每个注解,都会创建独立的监听容器,它们每个都会调用相同的监听器@Bean
. Repeatable 注解能用于 Java 8+;当在Java 7-使用时,同样的效果可以使用 @RabbitListeners
"container" 注解(包含@RabbitListener注解的数组)来达到.
Proxy @RabbitListener and Generics
如果你的服务是用于代理(如,在 @Transactional的情况中
) ,当接口有泛型参数时,需要要一些考虑.要有一个泛型接口和特定实现,如:
interface TxService<P> {
String handle(P payload, String header);
}
static class TxServiceImpl implements TxService<Foo> {
@Override
@RabbitListener(...)
public String handle(Foo foo, String rk) {
...
}
}
你被迫切换到CGLIB目标类代理,因为接口handle方法的实际实现只是一个桥接方法.在事务管理的情况下, CGLIB是通过注解选项来配置的- @EnableTransactionManagement(proxyTargetClass = true)
. 在这种情况下,所有注解都需要在实现类的目标方法上进行声明:
static class TxServiceImpl implements TxService<Foo> {
@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
容器管理
由注解创建的容器不会在上下文中进行注册.你可以调用 RabbitListenerEndpointRegistry的getListenerContainers()方法来获取所有容器集合.然后,你可以迭代这个集合,例如,停止/启动所有容器或调用在其注册上调用Lifecycle
方法(调用每个容器中的操作).
你也可以使用id来获取单个容器的引用,即 getListenerContainer(String id)
; 例如registry.getListenerContainer("multi")
.
从1.5.2版本开始,你可以调用getListenerContainerIds()方法来获取所有注册容器的id.
从1.5版本开始,你可在RabbitListener端点上为容器分配一个组(group).
这提供了一种机制来获取子集容器的引用
; 添加一个group
属性会使Collection<MessageListenerContainer>
类型的bean使用组名称注册在上下文中.
一些不同的线程可与异步消费者关联。
当RabbitMQ Client投递消息时,来自于
SimpleMessageListener
配置的TaskExecutor中的线程会调用MessageListener.如果没有配置,将会使用SimpleAsyncTaskExecutor
. 如果使用了池化的executor,须确保池大小可以支撑并发处理.
当使用默认SimpleAsyncTaskExecutor
时,对于调用监听器的线程,监听器容器的beanName
将用作threadNamePrefix
. 这有益于日志分析,在日志appender配置中,一般建议总是包含线程名称.当在SimpleMessageListenerContainer的taskExecutor属性中指定TaskExecutor
时,线程名称是不能修改的.建议你使用相似的技术来命名线程, 帮助在日志消息中的线程识别。
当创建连接时,在
CachingConnectionFactory
配置的Executor将传递给RabbitMQ Client
,并且它的线程将用于投递新消息到监听器容器.在写作的时候,如果没有配置,client会使用池大小为5的内部线程池executor.
RabbitMQ client
使用ThreadFactory
来为低端I/O(socket)操作创建线程.要改变这个工厂,你需要配置底层RabbitMQ ConnectionFactory,
正如the section called “Configuring the Underlying Client Connection Factory”中所描述.
虽然高效,但异步消费者存在一个问题:如何来探测它们什么是空闲的 - 当有一段时间没有收到消息时,用户可能想要采取某些动作.
从1.6版本开始, 当没有消息投递时,可配置监听器容器来发布ListenerContainerIdleEvent
事件. 当容器是空闲的,事件会每隔idleEventInterval
毫秒发布事件.
要配置这个功能,须在容器上设置idleEventInterval
:
xml
<rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"...
>
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
Java
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
@RabbitListener
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}
在上面这些情况中,当容器空闲时,每隔60秒就会发布事件.
事件消费
通过实现ApplicationListener
可捕获这些事件- 要么是一个一般的监听器,要么是一个窄化的只接受特定事件的监听器. 你也可以使用Spring Framework 4.2中引入的@EventListener.
下面的例子在单个类中组合使用了@RabbitListener
和@EventListener
.重点要理解,应用程序监听器会收到所有容器的事件,因此如果你只对某个容器采取措施,那么你需要检查监听器id.你也可以使用@EventListener
条件来达到此目的.
事件有4个属性:
source
- 监听容器实例id
- 监听器id(或容器bean名称)idleTime
- 当事件发布时,容器已经空闲的时间queueNames
- 容器监听的队列名称
public class Listener {
@RabbitListener(id="foo", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}
@EventListener(condition = "event.listenerId == 'foo'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}
}
重要
事件监听器会查看所有容器的事件,因此,在上面的例子中,我们根据监听器ID缩小了要接收的事件.
警告
如果你想使用idle事件来停止监听器容器,你不应该在调用监听器的线程上来调用container.stop()
方法- 它会导致延迟和不必要的日志消息. 相反,你应该把事件交给一个不同的线程,然后可以停止容器。
3.1.7 消息转换器
介绍
AmqpTemplate
同时也定义了多个发送和接收消息(委派给MessageConverter)的方法.
MessageConverter
本身是很简单的. 在每个方向上它都提供了一个方法:一个用于转换成Message,另一个用于从Message中转换.注意,当转换成Message时,除了object外,你还需要提供消息属性. "object"参数通常对应的是Message body.
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
AmqpTemplate中相关的消息发送方法列举在下边. 这比我们前面提到的要简单,因为它们不需要Message
实例. 相反地, MessageConverter
负责创建每个消息(通过将提供的对象转换成Message
body的字节数组,以及添加提供的MessageProperties)
.
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,这里只有两个方法:一个接受队列名称,另一个依赖于模板设置的队列属性.
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
SimpleMessageConverter
MessageConverter
策略的默认实现被称为SimpleMessageConverter
. 如果你没有明确配置,RabbitTemplate实例会使用此转换器的实例.它能处理基于文本内容,序列化Java对象,以及简单的字节数组.
从 Message中转换
如果传入消息的内容类型以"text" (如. "text/plain")开头,它同时也会检查内容编码属性,以确定将消息body字节数组转换成字符串所要使用的字符集. 如果在输入消息中没有指定内容编码属性, 它默认会使用"UTF-8"字符集.如果你需要覆盖默认设置,你可以配置一个SimpleMessageConverter
实例,设置其"defaultCharset" 属性,再将其注入到RabbitTemplate
实例中.
如果传入消息的内容类型属性值为"application/x-java-serialized-object", SimpleMessageConverter
将尝试将字节数组反序列化为一个Java object. 虽然这对于简单的原型是有用的,但一般不推荐依赖于Java序列化机制,因为它会生产者和消费者之间的紧密耦合。当然,这也排除了在两边使用非Java的可能性.由于AMQP 是线路级协议, 因这样的限制失去了许多优势,这是不幸的. 在后面的两个章节中,我们将探讨通过丰富的域对象的内容来替代java序列化.
对于其它内容类型,SimpleMessageConverter
会以字节数组形式直接返回消息body内容.
参考the section called “Java Deserialization” 来了解更多信息.
转换成消息
当从任意Java对象转换成Message时, SimpleMessageConverter
同样可以处理字节数组,字符串,以及序列化实例.它会将每一种都转换成字节(在字节数组的情况下,不需要任何转换), 并且会相应地设置内容类型属性.如果要转换的对象不匹配这些类型,Message body 将是null.
SerializerMessageConverter
除了它可以使用其它application/x-java-serialized-object转换的Spring框架Serializer
和 Deserializer
实现来配置外,此转换器类似于SimpleMessageConverter
.
参考the section called “Java Deserialization” 来了解更多信息.
Jackson2JsonMessageConverter
转换成消息
正如前面章节提到的,一般来说依赖于Java序列化机制不是推荐的.另一个常见更灵活且可跨语言平台的选择JSON (JavaScript Object Notation).可通过在RabbitTemplate实例上配置转换器来覆盖默认SimpleMessageConverter
.Jackson2JsonMessageConverter
使用的是com.fasterxml.jackson
2.x 包.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
正如上面展示的, Jackson2JsonMessageConverter
默认使用的是DefaultClassMapper
. 类型信息是添加到MessageProperties中的(也会从中获取)
. 如果入站消息在MessageProperties中没有包含类型信息,但你知道预期类型,你可以使用defaultType
属性来配置静态类型
<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
转换Message
入站消息会根据发送系统头部中添加的类型信息来转换成对象.
在1.6之前的版本中,如果不存在类型信息,转换将失败。从1.6版开始,如果类型信息丢失,转换器将使用Jsckson默认值(通常是一个map)来转换JSON.
此外,从1.6版本开始,当在方法上使用
@RabbitListener
注解时, 推断类型信息会添加到MessageProperties
; 这允许转换器转换成目标方法的参数类型.这只适用于无注解的参数或使用@Payload
注解的单个参数. 在分析过程中忽略类型消息的参数。
重要
默认情况下,推断类型信息会覆盖inbound __TypeId__
和发送系统创建的相关headers. 这允许允许接收系统自动转换成不同的领域对象. 这只适用于具体的参数类型(不是抽象的或不是接口)或者来自java.util
包中的对象.其它情况下,将使用 __TypeId__
和相关的头.也可能有你想覆盖默认行为以及总是使用__TypeId__信息的情况. 例如, 让我们假设你有一个接受Foo参数的@RabbitListener
,但消息中包含了Bar(
它是的Foo
(具体类)的子类). 推断类型是不正确的.要处理这种情况,需要设置Jackson2JsonMessageConverter 的TypePrecedence
属性为TYPE_ID
而替换默认的INFERRED
. 这个属性实际上转换器的DefaultJackson2JavaTypeMapper
,但为了方便在转换器上提供了一个setter方法. 如果你想注入一个自定义类型mapper, 你应该设置属性mapper.
@RabbitListener
public void foo(Foo foo) {...}
@RabbitListener
public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void foo(Foo foo, o.s.amqp.core.Message message) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void foo(Foo foo, String bar) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<?> message) {...}
上面前4种情况下,转换器会尝试转换成Foo
类型. 第五个例子是无效的,因为我们不能确定使用哪个参数来接收消息负荷. 在第六个例子中, Jackson 会根据泛型WildcardType来应用
.
然而,你也可以创建一个自定义转换器,并使用targetMethod
消息属性来决定将JSON转换成哪种类型.
这种类型接口只能在@RabbitListener
注解声明在方法级上才可实现.在类级@RabbitListener
, 转换类型用来选择调用哪个@RabbitHandler
方法.基于这个原因,基础设施提供了targetObject
消息属性,它可用于自定义转换器来确定类型.
MarshallingMessageConverter
还有一个选择是MarshallingMessageConverter
.它会委派到Spring OXM 包的 Marshaller
和 Unmarshaller
策略接口实现.
你可从here了解更多. 在配置方面,最常见的是只提供构造器参数,因为大部分Marshaller
的实现都将实现Unmarshaller
.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
ContentTypeDelegatingMessageConverter
这个类是在1.4.2版本中引入的,并可基于MessageProperties的contentType属性允许委派给一个特定的MessageConverter
.默认情况下,如果没有contentType属性或值没有匹配配置转换器时,它会委派给SimpleMessageConverter
.
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
重要
当从不可信任的来源反序列化Java对象时,存在一个可能的漏洞.如果从不可信来源,使用内容类型 application/x-java-serialized-object
来接收消息,你可以考虑配置允许哪些包/类能反序列化.
这既适用于SimpleMessageConverter,也适用于SerializerMessageConverter,当它被配置为使用一个DefaultDeserializer时 -或含蓄地或通过配置方式的。
默认情况下,白名单列表是空的,这意味着所有类都会反序列化.你可以设置模式列表,如 foo.*
, foo.bar.Baz
或 *.MySafeClass
.模式会按照顺序进行检查,直到找到匹配的模式.如果没有找到匹配,将抛出SecurityException
.在这些转换器上,可使用whiteListPatterns
属性来设置.
消息属性转换器
MessagePropertiesConverter
策略接口用于Rabbit Client BasicProperties
与Spring AMQP MessageProperties
之间转换. 默认实现(DefaultMessagePropertiesConverter
)通常可满虽大部分需求,但如果有需要,你可以自己实现. 当大小不超过1024字节时,默认属性转换器将 BasicProperties
中的LongString
转换成String
. 更大的 LongString
将不会进行转换(参考下面的内容.这个限制可通过构造器参数来覆盖.
从1.6版本开始, 现在headers 长超过 long string 限制(默认为1024) 将被DefaultMessagePropertiesConverter
保留作为 LongString
. 你可以通过 the getBytes[]
, toString()
, 或getStream()
方法来访问内容.
此前, DefaultMessagePropertiesConverter
会将这样的头转换成一个 DataInputStream
(实际上它只是引用了LongString的
DataInputStream
). 在输出时,这个头不会进行转换(除字符串外,如在流上调用toString()方法 java.io.DataInputStream@1d057a39)
.
更大输入LongString
头现在可正确地转换,在输出时也一样.
它提供了一个新的构造器来配置转换器,这样可像以前一样来工作:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
另外,从1.6版本开始,在 MessageProperties中添加了一个新属性correlationIdString
.此前,当在RabbitMQ 客户端中转换BasicProperties
时,将会执行不必要的byte[] <-> String
转换,这是因为 MessageProperties.correlationId
是一个byte[]
而 BasicProperties
使用的是String
.
(最终,RabbitMQ客户端使用UTF-8字符串转化为字节并放在协议消息中).
为提供最大向后兼容性,新属性correlationIdPolicy
已经被加入到了DefaultMessagePropertiesConverter
.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy
枚举参数.
默认情况下,它设置为BYTES
(复制先前的行为).
对于入站消息:
STRING
- 只映射correlationIdString
属性BYTES
- 只映射correlationId
属性BOTH
- 会同时映射两个属性
对于出站消息:
STRING
- 只映射correlationIdString
属性BYTES
- 只映射correlationId
属性BOTH
- 两种属性都会考虑,但会优先考虑String 属性
也从1.6版本开始,入站deliveryMode
属性不再需要映射 MessageProperties.deliveryMode
,相反使用MessageProperties.receivedDeliveryMode
来代替.另外,入站userId
属性也不需要再映射MessageProperties.userId
,相反使用MessageProperties.receivedUserId
来映射.
这种变化是为了避免这些属性的意外传播,如果同样的MessageProperties
对象用于出站消息时.
3.1.8 修改消息- 压缩以及更多
提供了许多的扩展点,通过它们你可以对消息执行预处理,要么在发送RabbitMQ之前,要么在接收到消息之后.
正如你在Section 3.1.7, “Message Converters”看到的,这样的扩展点存在于AmqpTemplate
convertAndReceive
操作中,在那里你可以提供一个MessagePostProcessor
.
例如,你的POJO转换之后, MessagePostProcessor
允许你在Message上设置自定义的头或属性.
从1.4.2版本开始,额外的扩展点已经添加到RabbitTemplate
- setBeforePublishPostProcessors()
和setAfterReceivePostProcessors()
. 第一个开启了一个post processor来在发送消息到RabbitMQ之前立即运行.当使用批量时(参考 the section called “Batching”), 这会在批处理装配之后发送之前调用.
第二个会在收到消息后立即调用.
这些扩展点对于压缩这此功能是有用的,基于这些目的,提供了多个MessagePostProcessor
:
- GZipPostProcessor
- ZipPostProcessor
针对于发送前的消息压缩,以及
- GUnzipPostProcessor
- UnzipPostProcessor
针对于消息解压.
类似地, SimpleMessageListenerContainer
也有一个 setAfterReceivePostProcessors()
方法,
允许在消息收到由容器来执行解压缩.