在本章中我们将覆盖涉及:- 镜像队列
- 同步队列
- 优化镜像策略
- 在几个broker之间分发消息
- 创建一个地理位置集群复制
- 过滤和转发消息
- 将高可用技术结合在一起
- 客户端高可用性
介绍
RabbitMQ通过数据复制来达到高可用,当数据完整性、服务连续性是最重要的时候, 这一点与存储(如:RAID解决方案),数据库,以及所有IT基础设施解决方案是相同的。
事实上,这些解决方案不仅可以避免数据丢失,也可以避免计划维护和系统故障时的停机时间。
我们将看到RabbitMQ中简单而高效的镜像队列解决方案. 通过本食谱,我们将看到多个不同用例,以及接近最小化性能优化(处理高可用时,总会付出点成本。
然后 ,我们将看到如何实施地理位置复制,这种方案适用于
应用程序对QoS要求较高,尤其是整个网站掉线时(如, 由于网络问题,应急电源问题,自然灾害,或人为错误),也要保证可用性.
这种方案对于云计算资源也给出了建议.如,当使用Amazon Web Services (AWS)时,我们强烈建议在不同可用区域上来分配应用程序(
http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
由于不同区域之间的连接有较高延迟, 跨不同可用区域来创建集群是不可取的,但每个区域创建一个集群,然后像创建一个地理位置复制食谱中介绍的来复制数据是推荐的。
无论简单与否,对于镜像队列或地理位置复制集群来说,客户端只有在收到相应的回复通知(使用发布确认ack消息或使用AMQP标准的事务机制)时,它才会假设消息已成功分发,并担保消息已经抵达镜像队列 .
TIP
如果你正在处理一个地理集群复制,客户端发布消息后,在消息在多个节点上复制和在本地建立镜像队列之后,RabbitMQ将发送ack消息发回给客户端. 地理消息的复制是异步的。 发布在本地集群的消息已经被认为是安全的,通常异步复制到远程集群对于灾难恢复是可容忍的.
在客户端高可用食谱中,我们将看到如何正确地实现一个客户端来处理连接高可用集群的情况。
镜像队列
RabbitMQ集群默认无镜像队列.队列存储在客户端连接并创建它们的broker节点中。无论何时,如果这个节点发生了故障,所有存储在此节点中的队列和消息将变得不可用。
如果你将队列和消息定义为持久化的,那么可能在节点恢复时,不会丢失数据,但这光这些是不够的。
事实上,设计一个高可用程序是不能被接受的.在许多情况下,应用程序必须能够承受一个组件的死亡,而且程序不能中断。
ha策略可帮助解决这个问题.在本食谱中,我们将向你展示如何在集群所有节点上反射队列.
准备
你需要有带三个节点的集群
如何做
配置镜像队列有两种方法,即使用rabbitmqctl来配置,或使用web管理插件(或它的API).我们将按下面的步骤来展示这两种方法:
1. 首先,你需要一个RabbitMQ集群,可参考第6章-开发可伸缩应用程序-创建一个简单集群食谱。
2. 在web管理中,导航到Admin | Policies | Add/ update a policy来配置策略.
3. 在Name字段中输入mirror-all,在Pattern字段中输入^mirr\. ,在
Definition中输入ha-mode和 all.
4. 点击Add policy,如下面的截图所示:
5. 或者,也可以通过rabbitmqctl来执行下面的命令:
rabbitmqctl set_policy ha-all "^mirr\." '{"ha-mode":"all"}'
6. 创建一个名为mirr.q_connection_1_1的队列. (重要的是前辍必须是mirr.; 你可以按你喜欢的来称呼队列.)
如何工作
在创建集群之后(步骤1),你需要使用ha策略来定义镜像队列的行为(步骤2).
TIP
RabbitMQ 策略是一种键值对,它可以用来描述federation plugin, mirrored queues, alternate exchanges, dead lettering, per queue TTLs, and maximum queue length的行为.
mirror-all参数是策略名称,^mirr\.字符串是正则表达式. 我们指示RabbitMQ来反射所有名称以mirr.开头的队列.
TIP
策略通过正则表达式来定义,这可以允许你创建灵活的,复杂的反射行为 .
最后一个参数是反射模式。通过ha-mode:all,队列会使用master slave模式来集群所有节点上进行反射(slaves可以是多个). 无论何时,只要匹配给定模式的队列在某个节点上创建了,就会在其它节点上进行复制,同时,只要有客户端向它插入了消息,消息也会在所有slaves进行复制。
TIP
最佳实践是客户端应该总是连接持有master队列的节点,只有在它发生故障时,才连接其它节点。
例如,如果我们有三个节点,名为rabbit@rabbitmqc1, rabbit@rabbitmqc2,和rabbit@rabbitmqc3, 如果我们在
rabbitmqc1节点上创建一个前辍为mirr.的队列(对于这个队列来说,它是master),那么队列将在其它两个节点(
slaves)上进行复制. 要检查队列的状态,可打开web管理控制台,并点击队列的Overview tab ,将会看类似下面的截图:
如果rabbitmqc1节点宕机或不可用,将会为此队提拔一个新的master.例如,如果我们调用rabbitmqctl stop_app来执行关闭操作, rabbit@
rabbitmqc3 节点将会被提拔为mirr.q_connection_1_1队列的新master,如下面的截图所示:
当我们重启rabbit@rabbitmqc1, 队列又会再次反射.多亏了策略,它将会成为slave,并且不再像先前一样提拔为master:
集群再次成为全反射的了.
更多
在本食谱中,我们已经了解了如何来快速设置镜像队列,但对于基于RabbitMQ构建完整的高可用方案还是不够的.事实上,RabbitMQ 反射配置只是让broker保留了消息的拷贝,而不造成消息丢失.但是,适当在客户端处理连接和消息也是很重要的。
在下面的食谱中,我们会了解镜像队列的详细情况
也可参考
正如我们在镜像队列食谱中看到的,当配置了镜像后,消息就会在集群节点间复制。
然而,在任何时候, 一个新节点都可以加入到集群中,并可以开始包含消息的镜像队列.那么集群是如何来表示这种存储消息的呢?
我们假设有一个单独节点且在其个队列上存储了一些消息的典型场景:
现在,如果我们向集群中添加了一个节点并正确地配置了ha策略,第一个节点的队列变成镜像队列,并且后续消息会复制到新添加的节点上,如下所示:
重要的是,当第二个节点加入到集群时,master队列中已有消息默认是不会复制到第二个节点上的。如果此时master挂掉了,消息将会丢失。
然而,在活(live)队列的情况下,只要消费者消耗了单个拷贝消息,配置就会变成全部复制,如下截图所示:
或者,在队列没被耗尽的情况下,重要的是要显式地同步队列到镜像,以提高可靠性。(这就是为什么我们是增加镜像的原因).
这不是默认的行为,因为同步任务会对broker的性能产生重要影响.
当启动一个同步时,队列将被卡住,直到这个过程完成. 在这个食谱中,我们将向您展示如何检查复制状态和队列同步。
准备
你需要一个带镜像策略的RabbitMQ集群(参考镜像队列食谱).
如何做
为了看到未同步队列的行为,我们将使用以下步骤来手动模拟节点失效的场景:
1. 配置镜像队列前辍为mirr.,正如在镜像队列中食谱中看到的一样(我们称为节点rabbit@rabbitmqc1、rabbit@rabbitmqc2).
2. 创建一个名为mirr.q_connection_1_1的队列.
3. 从web控制台来检查队列的状态,你应该可以看到下面的截图:
4. 你也可以使用 rabbitmqctl list_queues name policyslave_pids. 结果应该看起来像下面这样:
mirr.q_connection_1_1 ha-all all
<rabbit@rabbitmqc1.2.2844.1>
[<rabbit@rabbitmqc2.2.3363.1>]
running
5. 使用rabbitmqctl stop_app来关闭
rabbit@rabbitmqc2 节点 (实际上它不影响节点).
6. 使用rabbit@rabbitmqc1 节点来向队列发布非持久化消息node.
7. 使用 rabbitmqctl start_app来重启
rabbit@rabbitmqc2节点上的应用程序. 然后,像下面截图来检查队列:
8. 点击Syncronise按扭来同步队列或使用rabbitmqctl sync_queue mirr.q_connection_1_1来同步:
如何工作
通过步骤1-4创建稳定情况后,我们通过停止rabbit@rabbitmqc2节点模拟了节点故障情形.当我们使用rabbit@rabbitmqc1节点向队列中发布消息时,消息不是镜像的,因为我们只运行了一个节点。
当rabbit@rabbitmqc2节点再次运行时(步骤7),其队列是未同步的,这一点我们已经在介绍章节中看过了.
你可使用web管理插件或rabbitmqctl来同步队列.通过这种方式,你的消息可在集群节点间复制.
更多
我们已经看了如何手动来同步队列,但也可通过自动的方式来操作. 当你配置ha策略时,只需要增加ha-sync-mode=automatic的配置项就可以了,如下面的截图所示:
如果你用两种策略来模拟故障场景,你可能会看到下面的截图:
也可参考
在镜像队列食谱中,我们已经看到如何在集群节点间反射队列.在多于两个节点上复制消息可以改善系统的可利用性,但如果集群因高负载增长,它会对应用程序的性能产生负面影响。在本食谱中,我们将向您展示,为了保证每个队列都有两份拷贝,如何将每个队列分散到两个节点上:
通过这种方式,每个队列都有一个master和一个slave.
准备
你需要一个带镜像策略的RabbitMQ集群(参考镜像队列食谱)
如何做
在本食谱中,我们使用了四台机器;让我们看详细步骤:
1. 创建四个节点,其节点名称分别为rabbit@rabbitmqc1, rabbit@rabbitmqc2, rabbit@rabbitmqc3, 和rabbit@rabbitmqc4的集群. 下面的截图显示了实际集群:
2. 使用下面的代码来创建ha策略:
"mirr-pair" as name,
"^pair\."as pattern
"ha-mode":"exactly"
"ha-params": 2
"ha-sync-mode":"automatic" as parameters
参考下面的截图:
3. 创建一个名称为任意前辍的队列.你可以直接使用web管理控制台:
如何工作
在步骤2中,我们将ha-mode配置为exactly. 此参数需要ha-params参数(它表示用于反射队列的节点个数,在我们的例子中是2个节点).在步骤3中,创建了一个前辍pair的新队列,它将作为选择节点的master, 并在另一个节点上得到一个slave队列。
在真实应用程序中,也就是不连接管理控制台,我们会从应用程序调用channel.queueDeclare()来创建队列. 在这种情况下,连接的节点将成为master队列.
为了能均衡地向集群分配队列,使用负载均衡器来连接所有节点或让客户端以round-robin来连接集群中的所有可用节点是非常重要的,这一点我们已经在客户端高可用性食谱中看过了。
HA/Mirroring 特性增加了负载,可能会影响性能.
更多
exactly 参数会从集群中自动选择一个
节点作为slave. 你也可以使用更多的参数节点来选择镜像队列是否应该替换,正如下面截图中看到的一样:
也可参考
ha-mirror 插件需要依赖一个集群,这一点,我们已经第6章-开发可伸缩应用程序 看过了,同样的,它也不能容忍网络分化问题。
为了能跨WAN复制消息,你可以使用federation插件.此插件不依赖于集群,因此你可在WAN上联合更多的RabbitMQ实例,即使这此实例拥有不同的Erlang版本.
准备
你需要准备两个或两个以上的RabbitMQ节点. 在本例中,我们使用两台Linux机器,其RabbitMQ节点名称分别为rabbit@rabbitmqc1、rabbit@rabbitmqc2.
如何做
必须先启用federation插件;缺省情况下,它是禁用的.对于这两台机器,须执行下面的步骤:
1. 用下面的命令来启用插件:
rabbitmq-plugins enable rabbitmq_federation
2. 用下面的命令来为插件启用web管理控制台:
rabbitmq-plugins enable rabbitmq_federation_management
3. 重启 RabbitMQ,并使用rabbitmqctl status来检查插件,如下面的截图所示:
针对rabbitmqc1机器,执行下面的步骤:
4.也可以通过管理控制台检查状态.在 Admin tab, 在右边,你可以看到Federation Status,以及Federation Upstream,如下面的截图所示:
5. 本国federation upstreams. 打开web管理控制台并导航至Admin | Federation Upstreams | Add a new upstream,然后填写下面的字段:
‰ Name: first_upstream
‰ URI: amqp://rabbitmqc2
6. 针对rabbitmqc1机器,配置federation policy. 打开管理控制台,并导航至Admin | Policies | Add / update a policy, 然后填写下面的字段:
‰ Name: fed_policy
‰ Pattern: ^fed\.
‰ Definition: federation-upstream-set:all
7. 针对rabbitmqc1机器,添加一个前辍为fed.的新交换机,如fed.myfanoutexchange.
8.
针对rabbitmqc1机器, 在web管理控制台中导航至 Admin | Federation Status来检查
upstream状态, 如果一切正常,你应该可以看到下面的截图:
如何工作
RabbitMQ federation 插件需要遵循步骤1、步骤2来启用,当然,你也可以使用命令行工具或web管理控制台来验证其状态。
要让federation工作,你还需要定义一个upstream链接(指的是步骤4).
你可以在下游节点,rabbitmqc1上进行定义,并指定其上游节点,rabbitmqc2。
TIP
federation 插件不需要集群,URI配置可使用IP地址,例如, amqp://192.168.0.23. 在这里,不需要使用短主机名称。
发布到上游节点上的federated交换器上的消息将在相应的交换机上传播。
TIP
federation严格上单向的.向下游节点发布的消息不会被复制到上游。
当在rabbitmqc1节点上使用fed.前辍创建了交换机时,如fed.myfanoutexchange, 交换器也会在rabbitmqc2broker上创建.
消息是以异步的方式来复制的,这不同于镜像队列,因此不能保证高可用性.
更多
除了ha-mirror和federation插件,还有一个shovel插件可用来增强应用程序的可靠性. shovel是一种队列到交换机的机制。通过将集成在broker中的RabbitMQ客户端作为一个插件,可以消耗来自一个或多个队列的消息,并将其重定向到其他的本地或远程代理.
作为一个client,它可用于WAN连接,并能容忍网络分化情景.
在本食谱中,我们将展示如何简单地配置来使用插件.
我们将通过WAN连接的方式,在不同broker的两个队列之间来发送消息,看起来就像下面这样:
准备
你需要两个RabbitMQ实例;其节点名称分别为rabbitmq@rabbitmqc1、rabbitmq@rabbitmqShovel.
如何做
你需要执行下面的步骤来在rabbitmq@rabbitmqShovel节点上安装shovel插件:
1. 使用命令 rabbitmq-plugins enable rabbitmq_shovel来启用插件.
2. 使用 rabbitmq-plugins enable rabbitmq_shovel_management命令来启用插件的web管理控制台.
3. 编辑,或在不存在的情况下,创建rabbitmq.config文件,并像下面一样来增加shovel配置:
[{rabbitmq_shovel,
[{shovels,
[{my_books_shovel,
[
{sources,
[{broker, "amqp://yourrabbitmqip"}]},
{destinations, [ {broker, "amqp://"}]}
, {queue, <<"myBooksQueueCopy">>}
, {prefetch_count, 10}
, {reconnect_delay, 5}
]}
]}
]}
].
你可以使用Chapter07/Recipe05/simple_shovel_rabbitmq.config中的样例配置文件.
4. 重启broker.
5. 创建前图所示的队列.
6. 通过web管理控制台来检查shovel.
如何工作
在这个例子中,插件将从rabbitmq@rabbitmqc1的myBooksQueueCopy队列来消费消息,并将这些消息发布到 rabbitmq@rabbitmqShovel的myBooksQueueCopy队列. 这两个brokers可以是地理位置上完全分离的,因为shovel插件实际上是内嵌RabbitMQ client,它从一个节点消息消费消息,然后又把消息转发到另一个节点。
本例中的 shovel是随同RabbitMQ启动的,它会启动对源队列内容的轮询,甚至是队列本身无定义时,也会持续地轮询。
source 和 destination参数是强制配置的.
TIP
如果没有指定URI(amqp://)的话,插件会使用localhost作为broker IP.
我们创建复制队列的原因是消息是由插件来消费的.因此myBooksQueue 队列是由应用程序消费者来消费的,而myBooksQueueCopy是由shovel来消费的.
更多
也可参考
在本食谱中,我们已经了解简单的消息传递.在下面的食谱中,我们将看到如何动态地来绑定shovel.
过滤和转发消息
在本食谱中,我们将实现一个可选择性地消息转发.
我们将让shovel插件转发消息的子集到不同的目的地. 一种可能的使用场景是,有三个不同的站点,它们有下面的职责:- 一个用于接受book订单
- 一个只需要以london为路由键的订单
- 一个只需要以rome为路由键的订单
其目的是在不干涉源broker的情况下,有选择性地添加或删除shovels:
准备
你需要准备三个brokers;我们将其命名为rbbitmq@rabbitmqc1, rabbitmq@rabbitmqShovelLondon,rabbitmq@rabbitmqShovelRome.
如何做
在本例中,rabbitmq@rabbitmqc1 broker是shovels将要连接的节点,此插件不是必须的,但我们须为其它broker启用shovel:
1. 在rabbitmq@rabbitmqShovelLondon和rabbitmq@rabbitmqShovelRome节点上启用shovel插件,就如在创建地理位置复制食谱中看到的一样
2. 为rabbitmq@rabbitmqShovelLondon节点创建一个shovel脚本:
{sources, [ {broker, "amqp://rabbitmqc1IP"},
{declarations, [ 'queue.declare'
{routing_key, <<"london">>}
....
, {destinations, [ {broker, "amqp://"}]}
, {prefetch_count, 10}
,{publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key,<<"from_london_order">>} ]}
...
你可在 Chapter07/Recipe06/london_shovel_rabbitmq.config中找到完整的配置.
3. 为rabbitmq@rabbitmqShovelRome节点创建一个shovel脚本:
{sources, [ {broker, "amqp://rabbitmqc1IP"},
{declarations, [ 'queue.declare'
...
{routing_key, <<"rome">>}
...
, {destinations, [ {broker, "amqp://"}]}
, {prefetch_count, 10}
,{publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key,<<"from_rome_order">>} ]}
...
你可在Chapter07/Recipe06/rome_shovel_rabbitmq.config中找到完整的配置.
4. 重启brokers.
5. 发送两个消息到myBooksExchange, 一个使用london路由键,另一个使用rome路由键.
如何工作
步骤2和步骤3中的脚本包含了每个broker的交换机和队列的声明.因此只要运行声明,shovel插件将在远程节点上(默认)声明、创建两个队列,如下面的截图所示:
londonorders队列已由rabbitmq@rabbitmqShovelLondon节点声明,romeorders队列已由rabbitmq@rabbitmqShovelRome节点声明.
脚本同时也在相同的节点上声明了myBooksExchange交换机,并使用london和rome进行了相应地绑定,如下面的截图所示:
通过这种方式,当你使用rome路由键发布消息到myBooksExchange交换器时,消息将会路由到本地romeorders队列上,然后会被运行在
rabbitmq@rabbitmqShovelRome节点立即消费。一旦消息到达节点,shovel会将其发布到本地my_exchange topic交换器,最终它会路由到新路由键为from_rome_order的my_queue.
你可以看到在下面的截图:shovel插件是一个强大的工具,它可以应用于不同的上下文中;如,你可以在单个broker上使用它来创建一个异步队列镜像.
也可参考
RabbitMQ 有三种不同的方式来在brokers中分发消息:
- 集群队列镜像
- Federation
- Shovel
在本食谱中,我们将向你展示如何结合集群,高可用队列镜像, 以及shovel来跨WAN来
从ha-cluster到单个RabbitMQ
节点传输消息, 如下面的截图所示:
准备
你需要准备三个RabbitMQ节点.
如何做
在同一个LAN中需要有两个节点,第三个节点应该在此LAN外.我们将节点分别命名为rabbit@rabbitmqc1,rabbit@rabbitmqc2, 和rabbit@rabbitmqShovel:
1. 使用rabbit@rabbitmqc1、rabbit@rabbitmqc2来创建集群,我们将其称为Cluster1.
2.在Cluster1中,在集群中创建一个名为
myBooksExchange的topic交换机.
3.
在Cluster1中, 创建一个名为mirr.orders的ha-queue,并使用"#"路由键将其绑定到myBooksExchange.
4. 在rabbit@rabbitmqShovel上, 启用shovel插件.
5. 在rabbit@rabbitmqShovel上,创建或编辑rabbitmq.config来增加shovel配置:
你可在 Chapter07/Recipe07/rabbitmq.config找到配置文件,并将其直接拷贝到配置文件夹中.
6. 在rabbit@rabbitmqShovel上重启broker.
如何工作
在步骤1和2中,我们创建一个集群和两个镜像队列。shovel配置(步骤5)创建了一个名为mirr.myshovelqueuecluster1的队列.
此队列是两个RabbitMQ节点的镜像.在集群中,你至少应该有两个队列:
shovel配置中的brokers参数包含节点的集群地址.如果一个节点发生了故障,shovel将连接到其它节点,在这里mirr.myshovelqueuecluster1队列已经事先存在了,因为它是镜像的。此配置只会在Cluster1节点发生故障时会丢失数据,其它情况下,是不会丢失数据的。
更多
在使用shovel插件时,它可以创建一些奇怪的拓扑,特别是在一个循环放置把两个或两个以上的shovels. 因此你如果想创建一个双向复制,你应该使用federation插件来防止无止循环.在这种情况下,你可以设置 max-hops参数来达到此目的.
也可参考
客户端高可用性
即使是RabbitMQ broker提供了许多高可用的服务端选项,但如果连接客户端没有采取措施的话,那么高可用性也是无用的:
- Clients必须确保生产的消息已经成功发送给RabbitMQ,并有适当的错误处理机制,例如,如果有需要重新发布消息.
- Clients消费消息时必须确保消息不是复制的; 鉴于转发消息的可能性,消息有可能会重复
- 在消费者端的消息也一样。这个操作通常被称为重复数据删除.
- Clients必须确保它们连接的RabbitMQ节点是健康的. 特别是等待消息的消费者,不能意识到在当服务器卡住了而不能接收消息的情况
- 客户端应该尝试连接到任何可用的集群节点,无论是基于最大可靠性还是统一的资源分配.
这些机制在client libraries中不是内置的,但应该按照指南来实现 (
http://www.rabbitmq.com/reliability.html 和
http://www.rabbitmq.com/ha.html#behaviour) 。
在本食谱中我们将展示这些实现.
一如既往,增加可靠性会造成性能的损失:集群配置,队列镜像在生产消息和消费消息时都需要检查,这样就增加了每个消息的延迟,并降低了最大消息速率。
大多数情况下,为降低在这些条件下的延迟,不需要做什么,但通过增长集群节点数目来以处理更高的信息速率是可能的.在这里,RabbitMQ的伸缩性扮演了这种角色.
我们将介绍如何开发客户端来从RabbitMQ集群的镜像队列上可靠地生产和消费消息。
准备
要运行本食谱,你需要下面的工具:
- 必须有两个或两个以上节点的RabbitMQ集群
- 有镜像队列食谱中展示的ha-configuration;镜像队列须匹配正则表达式^mirr\.
- RabbitMQ Java Client API
如何做
这个例子由两个Java程序组成:ProducerMain.java和ConsumerMain.java,你可在Chapter09/Recipe08找到源码。UML图如下:
上面的截图展示了需要开发可靠客户端的步骤.让我们从生产者和消费者共同的步骤开始:
1. 写一个通用的方法来试图打开连接,直到成功连接(打开ReliableClient.java文件):
protected void waitForConnection() throws InterruptedException {
while (true) {
ConnectionFactory factory = new ConnectionFactory();
ArrayList<Address> addresses = new ArrayList<Address>();
for (int i = 0; i<Constants.hosts.length; ++i) {
addresses.add(new Address(Constants.hosts[i],Constants.port));
}
// randomize the order used to try the servers:
// distribute their usage
Collections.shuffle(addresses);
Address[] addrArr=new Address[Constants.hosts.length];
addresses.toArray(addrArr);
try {
connection = factory.newConnection(addrArr);
channel = connection.createChannel();
channel.exchangeDeclare(Constants.exchange, "direct",false);
channel.queueDeclare(Constants.queue,Constants.durableQueue, Constants.exclusiveQueue,Constants.autodeleteQueue, null);
channel.queueBind(Constants.queue,Constants.exchange,Constants.routingKey);
return;
} catch (Exception e) {
e.printStackTrace();
disconnect();
Thread.sleep(1000);
}
}
}
2. 编写一个disconnect方法(打开ReliableClient.java文件):
protected void disconnect() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
channel = null;
}
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
}
} catch (IOException e) {
// just ignore
e.printStackTrace();
}
}
然后,让我们看一下可靠的生产者是如何工作的(打开ReliableProducer.java文件):
3. 继承ReliableClient类并像下面一样来覆盖waitForConnection()方法:
public class ReliableProducer extends ReliableClient {
...
@Override
protected void waitForConnection() throws
InterruptedException {
super.waitForConnection();
try {
channel.confirmSelect();
} catch (IOException e) {
e.printStackTrace();
}
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
ReliableProducer.this.removeItemsUpto(deliveryTag);
} else {
ReliableProducer.this.removeItem(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
ReliableProducer.this.requeueItemsUpto(deliveryTag);
} else {
ReliableProducer.this.requeueItem(deliveryTag);
}
}
});
}
4. 编写一个方法来让ReliableProducer类来从其内部dataQueue队列来发布消息:
protected void publishFromLocalQueue() throws InterruptedException {
try {
for (;;) {
synchronized (dataQueue) {
if (dataQueue.isEmpty()) {
dataQueue.wait(1000);
// if the queue stays empty for more than
// one second, disconnect and
// wait offline
if (dataQueue.isEmpty()) {
System.out.println("disconnected for inactivity");
disconnect();
dataQueue.wait();
waitForConnection();
}
}
}
DataItem item = dataQueue.peek();
BasicProperties messageProperties = newBasicProperties.Builder().messageId(Long.toString(item.getId())).deliveryMode(2).build();
long deliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.queue,messageProperties, item.getData().getBytes());
// only after successfully publishing,
// move the item to the
// container of pending items.
// They will be removed from it only
// upon the
// reception of the confirms from the broker.
synchronized (pendingItems) {
pendingItems.put(deliveryTag, item);
}
dataQueue.remove();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
} catch (IOException e) {
// do nothing: the connection will be closed
// and then retried
}
}
5. 在ProducerMain.java的main()方法中编写一个循环方法,它将启动一个后台线程,它将用来等待一个连接,在local队列中异步发送消息:
public void startAsynchronousPublisher() {
exService = Executors.newSingleThreadExecutor();
exService.execute(new Runnable() {
@Override
public void run() {
try {
for (;;) {
waitForConnection();
publishFromLocalQueue();
disconnect();
}
} catch (InterruptedException ex) {
// disconnect and exit
disconnect();
}
}
});
}
6. 在ProducerMain.java中编写调用ReliableProducer的方法,它将在local dataQueue中临时存储消息:
public void send(String data) {
synchronized (dataQueue) {
dataQueue.add(data);
dataQueue.notify();
}
}
7. 在这里, dataQueue是线程安全类DataQueue的实例,它包含了一个唯一的索引:
public class DataQueue {
..
public synchronized long add(String data) {
++lastID;
dataQueue.add(new DataItem(data,lastID));
returnlastID;
}
...
}
现在让我们看一下ReliableConsumer类中所需要的步骤:
8. 在这里,我们也覆盖了ReliableClient.WaitForConnection()的方法:
public class ReliableConsumer extends ReliableClient {
...
@Override
protected void waitForConnection() throws InterruptedException {
super.waitForConnection();
try {
channel.basicConsume(Constants.queue, false, new Consumer() {
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("got handleCancel signal");
}
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("got handleCancelOk signal");
}
@Override
public void handleConsumeOk(String consumerTag) {
System.out.println("got handleConsumeOK signal");
}
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException {
long messageId =Long.parseLong(properties.getMessageId());
if (worker != null) {
// if the message is not a re-delivery,
// sure it is not aretransmission
if (!envelope.isRedeliver() ||
toBeWorked(messageId)) {
try {
worker.handle(new String(body));
// the message is ack'ed just after it has
// been
// secured (handled, stored in database...)
setAsWorked(messageId);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (WorkerException e) {
// the message worker has reported
// an exception,
// so the message
// cannot be considered to be handled
// properly,so requeue it
channel.basicReject
(envelope.getDeliveryTag(), true);
}
}
}
}
@Override
public void handleRecoverOk(String consumerTag) {
System.out.println("got recoverOK signal");
}
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException cause) {
System.out.println("got shutdown signal");
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
9. 在后台线程中编写一个方法来启动ReliableConsumer的异步消费者:
public void StartAsynchronousConsumer() {
exService = Executors.newSingleThreadExecutor();
exService.execute(new Runnable() {
@Override
public void run() {
try {
for (;;) {
waitForConnection();
synchronized (this) {
this.wait(5000);
}
disconnect();
}
} catch (InterruptedException ex) {
disconnect();
}
}
});
}
10. 让ReliableConsumer类允许设置一个下面接口的回调:
public interface MessageWorker {
public void handle(String message) throws
WorkerException;
}
11. 代码将被传递到ConsumerMain.java 中,并在每个接收的消息中调用:
reliableConsumer.setWorker(new MessageWorker() {
@Override
public void handle(String message) throws WorkerException
{
System.out.println("received: " + message);
++count;
}
});
如何工作
为了可靠地连接到集群,同时连接多个节点是很重要的。这一点可以显示地完成,正如本例子中展示的一样—在这里,是通过客户端来轮询不同的节点-或者使用负载均衡器。
TIP
根据情况的不同,通过随机连接集群中的节点是有用的(这种方案称为"active-active"配置:master 和 slave 都是活动的), 或最好是master, 或当master不可用时使用slave(这通常称为active-passive 配置).
然后在生产方,当真正的应用程序发送一个消息时,消息不是实际发送的,而是放在一个临时队列中。直到消息到达那里之前,应用程序必须假定该消息尚未被broker接收。
只要后台循环线程(指的是步骤4)成功发送了消息,它会从队列中拉取消息,并将其存储在pendingItems hash map中.即使到了现在,应用程序也不能确保消息已经成功地存储到了镜像队列中,如果节点停机时,消息仍然会丢失.
TIP
正如我们已经指出的,即使我们使用镜像队列,我们需要确保我们不会在发布消息时或消费者消息时丢失消息。
ReliableProducer 类只有当其收到来自broker的确认时才能保证消息没有丢失, 即它可通过channel.addConfirmListener() (步骤3)来检查.在此时,将会使用delivery tag来从map中删除.
注意,在本例中,在消息确认超时时,没有提供重新传送消息的机制.在真实应用程序中,这需要根据真实情况添加这种功能。
TIP
AMQP 0-9-1 不包含确认机制. 如果需要更严格的处理,你需要使用事务, 这通常在数量级上是低效的,因为它们的行为需要保持同步.
我们已经走了一半的路了;我们可以肯定的是,该消息处在一个hacluster的镜像队列中。真 的!但是,现在我们需要保证,在消费信息时,我们不会失去它。这还不够,我们还需要确保,对于相同的消息,我们不会消费多次。
实际上,消费者的连接方法也启动了一个后台消费者,这是通过调用channel.basicConsume()来实现的,同时也实现了handleDelivery()回调(步骤8).
正如第1章节,使用AMQP中介绍的一样,如果回调中的一切都是正常的,那么消息就收到了.用户回调worker.handle()将被调用,且ack消息会发回到broker,此时,如果消息被正常消费的话,就会被删除.
但如果用户回调抛出WorkerException,客户端拒绝了消息. 再次回到这个点,例子中的连接是打开的,这种情况下的消息会在同一个队列上重新排队(channel.basicReject()的第二个参数设置为true), 但也有可能这个消息会被重定向到dead-letter 队列,或者在真实程序中拒绝和丢失.
然而,也有可能,消息被消费了,回调也被调用了,但ack消息因为网络问题或broker意外关闭而没有到达应用程序. 这也是将tem ID 设置在DataQueue实例的地方(步骤7). 需要在接收端防止消息重复. 事实上, 当它没有被应答时,RabbitMQ会在下次重新连接时,重新传输消息.
TIP
由于当RabbitMQ怀疑处在高风险条件时,可以重新传输消息 ,因为我们需要一种机制来保证避免消息的重复.
在这个食谱中,我们假定有一个单调递增从0开始的值,它将用来标记所需要的消息-再次说明,在真实程序中,可能会有不同的方案.
在这个食谱中,我们在下面的三次时刻中使用了计数器来保证可靠性:
1.deliveryTag参数(指的是步骤4), 它在生产端,用来在生产端检查确认,并在确认后从pendingItems hash map中删除
2.deliveryTag参数(指的是步骤8), 它在消费端,用来向RabbitMQ集群发送ack消息
3. DataQueue item ID (指的是步骤7,步骤8)用于在消费端避免重复消息
posted on 2016-07-02 19:11
胡小军 阅读(1903)
评论(0) 编辑 收藏 所属分类:
RabbitMQ