随笔 - 41  文章 - 7  trackbacks - 0
<2024年11月>
272829303112
3456789
10111213141516
17181920212223
24252627282930
1234567

常用链接

留言簿

随笔分类

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

     摘要: 在本章节中我们将覆盖:创建一个本地服务器集群创建一个简单集群自动添加一个RabbitMQ 集群引入消息负载均衡器创建集群客户端介绍RabbitMQ提供了各种各样的特性以及集群功能.通过使用集群,一组适当配置的主机的行为与单个broker实例一样,但集群带有下面的目的:高可用性: 如果一个节点宕机了,分布式broker仍然能接受和处理消息.这方面内容会在Chapter 7,Developi...  阅读全文
posted @ 2016-06-15 20:54 胡小军 阅读(1937) | 评论 (2)编辑 收藏
     摘要: 在本章中我们将覆盖:使用Spring来开发web监控程序使用Spring来开发异步web搜索使用STOMP来开发web监控程序介绍RabbitMQ可以像客户端一样使用在服务端。当前,RabbitMQ覆盖了大部分使用的语言和技术来构建web程序,如PHP,Node.js, Python, Ruby, 以及其它.你可以在http://www.rabbitmq.com/devtools.html找到全部...  阅读全文
posted @ 2016-06-14 22:07 胡小军 阅读(2268) | 评论 (0)编辑 收藏
     摘要: 在本章中我们将覆盖:使用.NET client通过MQTT绑定iPhone应用与RabbitMQ在Andriod上使用消息来更新Google Maps通过Andriod后端来发布消息使用Qpid来交换RabbitMQ消息使用Mosquitto来交换RabbitMQ消息使用.NET clients来绑定WCF程序介绍在前面的章节中,我们已经介绍了基本概念。现在,我们要使用这些概念来创建真实的应用程序...  阅读全文
posted @ 2016-06-13 20:25 胡小军 阅读(1757) | 评论 (2)编辑 收藏

RabbitMQ的目标是尽可能广泛地支持大部分平台.RabbitMQ 可运行在任何支持Erlang的平台上, 包括内嵌系统,多核集群,云服务器.

下面的平台支持Erlang,因此也可以运行RabbitMQ:

  • Linux
  • Windows, NT through 10
  • Windows Server 2003/2008/2012
  • Mac OS X
  • Solaris
  • FreeBSD
  • TRU64
  • VxWorks

RabbitMQ的开源版本大部分都部署在下面的平台上:

  • Ubuntu and Debian-based Linux distributions
  • Fedora, CentOS and RPM-based Linux distributions
  • openSUSE and derived distributions (including SLES and SLERT)
  • Mac OS X
  • Windows XP and later

Windows

RabbitMQ可运行Windows XP及其后续版本中(Server 2003, Vista, Windows 7, Windows 8, Windows 10, Server 2008 and Server 2012). 尽管没有测试,但应该可以运行在Windows NT ,Windows 2000 上.

64位的Windows Erlang VM从R15版本开始可用.建议使用最新的64位Erlang版本来运行。参考Erlang version compatibility page.

通用UNIX

虽没有官方支持,Erlang 和 RabbitMQ 能运行在含有POSIX layer including Solaris, FreeBSD, NetBSD, OpenBSD的操作系统上.

虚拟平台

RabbitMQ 可运行物理或虚拟硬件上. 这可以允许不支持的平台通过仿真来运行RabbitMQ.
参考EC2 guide 来了解RabbitMQ如何运行在Amazon EC2上的更多信息.

posted @ 2016-06-06 00:09 胡小军 阅读(1234) | 评论 (0)编辑 收藏

名称

rabbitmq-server — 启动RabbitMQ AMQP server

语法

rabbitmq-server [-detached]

描述

RabbitMQ是AMQP的实现, 后者是高性能企业消息通信的新兴标准. RabbitMQ server是AMQP 中间件的健壮,可扩展实现.

前端运行rabbitmq-server,它会显示横幅消息,会报告启动时的过程信息,最后会显示"broker running",以表明RabbitMQ中间件已经成功启动。

要关闭server,只需要终止过程或使用rabbitmqctl(1)(即:rabbitmqctl stop).

环境变量

RABBITMQ_MNESIA_BASE

默认是 /var/lib/rabbitmq/mnesia. 用于设置Mnesia 数据库文件存放的目录.

RABBITMQ_LOG_BASE

日志目录 ,server生成的/var/log/rabbitmq. Log 日志文志会放置在文件会放置在此目录.(如:window10下默认安装时,日志目录为:C:\Users\Administrator\AppData\Roaming\RabbitMQ\log

RABBITMQ_NODENAME

默认是rabbit. 当你想在一台机器上运行多个节点时,此配置是相当有用的, RABBITMQ_NODENAME在每个erlang-node和机器的组合中应该唯一。

参考clustering on a single machine guide 来更多细节.

RABBITMQ_NODE_IP_ADDRESS

默认情况下,RabbitMQ会绑定到所有网络接口上,如果只想绑定某个网络接口,可修改此设置。

RABBITMQ_NODE_PORT

默认是5672.

选项

-detached

以后端的方式来启动进程 ,注意,这会导致pid无法写入到pid文件中.例如:

rabbitmq-server -detached

以后端方式来启动RabbitMQ AMQP server.

也可参考

rabbitmq-env.conf(5) rabbitmqctl(1)

posted @ 2016-06-06 00:06 胡小军 阅读(1174) | 评论 (0)编辑 收藏
     摘要: 本章我们将覆盖:使用虚拟主机配置用户使用SSL实现客户端证书从浏览器中管理RabbitMQ配置RabbitMQ参数开Python程序来监控RabbitMQ自己开发web程序来监控RabbitMQ介绍一旦安装后,RabbitMQ不需要任何配置就可以工作. 然而,RabbitMQ有许多的配置选项,这些配置选项使得它更为灵活,能工作于多种不同环境中.在本章中,我们将看到如何改变配置来满足应用程序的需求。...  阅读全文
posted @ 2016-06-05 20:10 胡小军 阅读(1598) | 评论 (0)编辑 收藏

一.安装Erlang

1、下载推荐的安装包

2、安装

安装依赖包

yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

#rpm -ivh esl-erlang_18.3-1~centos~7_amd64.rpm

二.安装RabbitMQ
下载RabbitMQ
# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
# rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

安装rabbitmq-server的过程中遇到了一个问题:

Error: Package: rabbitmq-server-3.6.1-1.noarch (/rabbitmq-server-3.6.1-1.noarch) 
Requires: erlang >= R16B-3 
You could try using --skip-broken to work around the problem 
You could try running: rpm -Va --nofiles --nodigest

这是由于erlang的版本问题,其实是没有影响的,你可以使用下面的命令进行安装:

#rpm -ivh --nodeps rabbitmq-server-3.6.1-1.noarch.rpm


启动

#service rabbitmq-server start --后台方式运行

#service rabbitmq-server stop  --停止运行

#service rabbitmq-server status --查看状态

#rabbitmq-server start 

可以看到使用的日志文件

日志目录

/var/log/rabbitmq

#cat /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log 可以看到下面的日志记录

...................................................................................................................................................................................................................................................

=INFO REPORT==== 28-Apr-2016::04:20:10 ===
node           : rabbit@iZ94nxslz66Z
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
cookie hash    : fisYwC976M1LblhTfYslpg==
log            : /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log
sasl log       : /var/log/rabbitmq/rabbit@iZ94nxslz66Z-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Memory limit set to 397MB of 992MB total.
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Disk free limit set to 50MB
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Limiting to approx 65435 file handles (58889 sockets)
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
FHC read buffering:  OFF
FHC write buffering: ON
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Database directory at /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z is empty. Initialising from scratch...
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Priority queues enabled, real BQ is rabbit_variable_queue
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Adding vhost '/'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Creating user 'guest'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Setting user tags for user 'guest' to [administrator]
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Setting permissions for 'guest' in '/' to '.*', '.*', '.*'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_transient: using rabbit_msg_store_ets_index to provide index
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_persistent: using rabbit_msg_store_ets_index to provide index
=WARNING REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_persistent: rebuilding indices from scratch
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
started TCP Listener on [::]:5672
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Server startup complete; 0 plugins started.
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Management plugin: using rates mode 'basic'
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Management plugin started. Port: 15672
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Statistics database started.
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Plugins changed; enabled [mochiweb,webmachine,rabbitmq_web_dispatch,
                          amqp_client,rabbitmq_management_agent,
                          rabbitmq_management], disabled []
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Stopping RabbitMQ
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
stopped TCP Listener on [::]:5672
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Stopped RabbitMQ application
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Halting Erlang VM
=INFO REPORT==== 28-Apr-2016::04:23:29 ===
Starting RabbitMQ 3.6.1 on Erlang 18.3
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
...................................................................................................................................................................................................................................................


卸载

#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.1-1.noarch
#rpm -e --nodeps rabbitmq-server-3.6.1-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64

管理

Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。

1). 服务器启动与关闭

      启动: rabbitmq-server –detached

      关闭:rabbitmqctl stop

      若单机有多个实例,则在rabbitmqctlh后加–n 指定名称

2). 插件管理

      开启某个插件:rabbitmq-pluginsenable xxx

      关闭某个插件:rabbitmq-pluginsdisablexxx

      注意:重启服务器后生效。

3).virtual_host管理

      新建virtual_host: rabbitmqctladd_vhost  xxx

      撤销virtual_host:rabbitmqctl  delete_vhost xxx

4). 用户管理

      新建用户:rabbitmqctl add_user xxxpwd

      删除用户:   rabbitmqctl delete_user xxx

      改密码: rabbimqctlchange_password {username} {newpassword}

      设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}

              Tag可以为 administrator,monitoring, management

5). 权限管理

      权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

               Vhostpath

               Vhost路径

               user

      用户名

              Conf

      一个正则表达式match哪些配置资源能够被该用户访问。

              Write

      一个正则表达式match哪些配置资源能够被该用户读。

               Read

      一个正则表达式match哪些配置资源能够被该用户访问。

6). 获取服务器状态信息

       服务器状态:rabbitmqctl status

       队列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                Queueinfoitem可以为:name,durable,auto_delete,arguments,messages_ready,

                messages_unacknowledged,messages,consumers,memory

       Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                 Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

       Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                 Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

       Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

       Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

       Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

      Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

 

 

常用命令:

查看所有队列信息

# rabbitmqctl list_queues

关闭应用

# rabbitmqctl stop_app

启动应用,和上述关闭命令配合使用,达到清空队列的目的

# rabbitmqctl start_app

清除所有队列

# rabbitmqctl reset

更多用法及参数,可以执行如下命令查看

# rabbitmqctl

 

 

rabbitmq常用命令

rabbitmq-server start  或者   service rabbitmq-server start     #启动rabbitmq

rabbitmqctl list_exchanges 

rabbitmqctl list_bindings

rabbitmqctl list_queues #分别查看当前系统种存在的Exchange和Exchange上绑定的Queue信息。

rabbitmqctl status  #查看运行信息

rabbitmqctl stop     #停止运行rabbitmq

rabbitmq-plugins enable rabbitmq_management  

#启动rabbitmq的图形管理界面,这个操作必须重启rabbitmq, 然后在web中 http://127.0.0.1:15672 用户名和密码都是guest guest。如果局域网无法访问设置防火墙过滤规则或关闭防火墙。


posted @ 2016-06-05 20:08 胡小军 阅读(3172) | 评论 (0)编辑 收藏

概述

RabbitMQ broker是一个或多个Erlang节点的逻辑分组,多个运行的RabbitMQ应用程序可共享用户,虚拟主机,队列,交换机,绑定以及运行时参数。有时我们将多个节点的集合称为集群。

什么是复制?

RabbitMQ broker操作所需的所有数据/状态都可以在多个节点间复制. 例外是消息队列,默认情况下它驻留在一个节点, 尽管它们对所有节点来说,是可见的,可达的.要在集群中跨节点复制队列,可参考high availability 文档(注意,你仍然先需要一个工作集群).

主机名解析需求

RabbitMQ节点彼此之间使用域名,要么是简短的,要么是全限定的(FQDNs). 因此,集群中所有成员的主机名都必须是可解析的,也可用于机器上的命令行工具,如rabbitmqctl.

主机名解析可使用任何一种标准的操作系统提供方法:

  • DNS 记录
  • 本地主机文件(e.g. /etc/hosts)
在更加严格的环境中,DNS记录或主机文件修改是受限的,不可能的或不受欢迎的, Erlang VM可通过使用替代主机名解析方法来配置, 如一个替代的DNS服务器,一个本地文件,一个非标准的主机文件位置或一个混合方法. 这些方法可以与标准操作主机名解析方法一起协同工作。

要使用FQDNs, 参考RABBITMQ_USE_LONGNAME in the Configuration guide.

集群构成

集群可以通过多种方式来构建:

一个集群的构成可以动态修改. 所有RabbitMQ brokers开始都是以单个节点来运行的. 这些节点可以加入到集群中, 随后也可以脱离集群再次成为单一节点。

故障处理

RabbitMQ brokers 可以容忍个别节点故障. 节点可以随意地启动和关闭,只要在已知关闭的时间内能够联系到集群节点.

RabbitMQ 集群有多种模式来处理网络分化, 主要是一致性方向. 集群是在LAN中使用的,不推荐在WAN中运行集群. Shovel 或 Federation 插件对于跨WAN连接brokers ,有更好的解决方案. 注意 Shovel 和 Federation 不等同于集群.

磁盘和内存节点

节点可以是磁盘节点,也可以是内存节点。多数情况下,你希望所有的节点都是磁盘节点,但RAM节点是一种特殊情况,它可以提高集群中队列和,交换机,绑定的性能. 当有疑问时,最好只使用磁盘节点。

集群文字记录(Transcript)

下面是通过三台机器-rabbit1rabbit2rabbit3来设置和操作RabbitMQ集群的文字记录.

我们假设用户已经登录到这三台机器上,并且都已经在机器上安装了RabbitMQ,以及rabbitmq-server 和rabbitmqctl 脚本都已经在用户的PATH环境变量中.

This transcript can be modified to run on a single host, as explained more details below.

节点(以及CLI工具)之间如何来认证: Erlang Cookie

RabbitMQ 节点和CLI 工具(如rabbitmqctl) 使用cookie来确定每个节点之间是否可以通信. 两个节点之间要能通信,它们必须要有相同的共享密钥Erlang cookie. cookie只是具有字母数字特征的字符串。只要你喜欢,它可长可短. 每个集群节点必须有相同的cookie.

当RabbitMQ 服务器启动时,Erlang VM 会自动地创建一个随机的cookie文件. 最简单的处理方式是允许一个节点来创建文件,然后再将这个文件拷贝到集群的其它节点中。

在 Unix 系统中, cookie的通常位于/var/lib/rabbitmq/.erlang.cookie 或$HOME/.erlang.cookie.

在Windows中, 其位置在C:\Users\Current User\.erlang.cookie(%HOMEDRIVE% + %HOMEPATH%\.erlang.cookie) 或C:\Documents and Settings\Current User\.erlang.cookie, 对于RabbitMQ Windows service其位置在C:\Windows\.erlang.cookie。如果使用了Windows service ,  cookie可被放于这两个位置中.

作为替代方案,你可以在 rabbitmq-server 和 rabbitmqctl 脚本中调用erl时,插入"-setcookie cookie"选项.

当cookie未配置时 (例如,不相同), RabbitMQ 会记录这样的错误"Connection attempt from disallowed node" and "Could not auto-cluster".

启动独立节点

集群可通过重新配置,而将现有RabbitMQ 节点加入到集群配置中. 因此第一步是以正常的方式在所有节点上启动RabbitMQ:

rabbit1$ rabbitmq-server -detached 
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached

这会创建三个独立的RabbitMQ brokers, 每个节点一个,可通过cluster_status命令来验证:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

rabbitmq-server shell脚本来启动RabbitMQ broker的节点名称是rabbit@shorthostname,在这里,短节点名称是小写的(如上面的rabbit@rabbit1). 如果在windows上,你使用rabbitmq-server.bat批处理文件来启动,短节点名称是大写的(如:rabbit@RABBIT1). 当你输入节点名称时,不论是大写还是小写的,这些字符串都必须精确匹配。

创建集群

为了把这三个节点构建到一个集群中,我们可以告诉其中的两个节点, 假设为rabbit@rabbit2 和 rabbit@rabbit3, 将加入到第三个节点的集群中,这第三个节点假设为rabbit@rabbit1.

首先我们将rabbit@rabbit2加入到rabbit@rabbit1的集群中. 要做到这一点,我们必须在rabbit@rabbit2 上停止RabbitMQ应用程序,并将其加入到rabbit@rabbit1 集群中, 然后再重启RabbitMQ 应用程序. 

注意:加入集群会隐式地重置节点, 因此这会删除此节点上先前存在的所有资源和数据.(如何备份数据)

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

在每个节点上通过运行cluster_status 命令,我们可以看到两个节点已经加入了集群:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

现在我们将rabbit@rabbit3节点加入到同一个集群中. 操作步骤同上面的一致,除了这次我们选择rabbit2来加入集群,但这并不重要:

rabbit3$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

在任何一个节点上通过运行cluster_status命令,我们可以看到三个节点已经加入了集群:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

通过上面的步骤,当集群运行的时候,我们可以在任何时候将新的节点加入到集群中.

重启集群节点

注意,加入到集群中的节点可在任何时候停止, 对于崩溃来说也没有问题. 在这两种情况下,集群剩余的节点将不受影响地继续操作,当它们重启的时候,这些崩溃的节点会再次自动追赶上其它的集群节点。

我们关闭了节点rabbit@rabbit1和rabbit@rabbit3,并在每步观察集群的状态:

rabbit1$ rabbitmqctl stop 
Stopping and halting node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit3 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2]}] ...done.

译者注:关闭了rabbit1节点后,运行的节点已经没有rabbit1节点了

现在我们再次启动节点,并检查集群状态:

rabbit1$ rabbitmq-server -detached 
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

这里有一些重要的警告:

  • 当整个集群崩溃的时候, 最后一个崩溃的节点必须第一个上线.如果不是这样,节点将会等待最后一个磁盘节点30秒以确认其重新上线,否则就会失败. 如果最后一个下线的节点,不能再重新上线,那么它可能会使用forget_cluster_node命令来从集群中删除 - 查阅 rabbitmqctl页面来了解更多信息.
  • 如果所有集群节点都在同一个时间内停止且不受控制(如断电)。在这种情况下,你可以在某个节点上使用force_boot命令使其再次成为可启动的-查阅 rabbitmqctl页面来了解更多信息.

脱离集群

当节点不再是集群的一部分时,可以明确地将其从集群中删除. 首先我们将节点rabbit@rabbit3从集群中删除, 以使其回归独立操作.要做到这一点,需要在rabbit@rabbit3节点上停止RabbitMQ 应用程序,重设节点,并重启RabbitMQ应用程序.

rabbit3$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl reset
Resetting node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

在节点上运行cluster_status 命令来确认rabbit@rabbit3节点现在已不再是集群的一部分,并且会独自操作:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

我们也可以远程地删除节点,这是相当有用的,举例来说,当处理无反应的节点时.举例来说,我们可以从 rabbit@rabbit2中删除rabbit@rabbi1.

rabbit1$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
Removing node rabbit@rabbit1 from cluster ... ...done.

注意,rabbit1仍然认为它与rabbit2处在一个集群中,但尝试启动时会出现一个错误.这时,我们需要对其进行重置以使其能再次启动.

rabbit1$ rabbitmqctl start_app 
Starting node rabbit@rabbit1 ... Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
rabbit1$ rabbitmqctl reset
Resetting node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl start_app Starting node rabbit@mcnulty ... ...done.

现在, cluster_status 命令会显示三个节点都是独立节点,并且操作是独立的:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

注意:rabbit@rabbit2节点仍然残留有集群的状态(译者注:怎么看出来的呢?), 但是 rabbit@rabbit1 和rabbit@rabbit3 节点是新鲜的RabbitMQ brokers.如果我们想重新初始化rabbit@rabbit2节点,我们可以按其它节点的步骤来操作:

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl reset
Resetting node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

升级集群

当从主版本或小版本进行升级时 (如:从3.0.x 到3.1.x,或从2.x.x 到3.x.x),或者是升级Erlang时, 整个集群在升级时必须记下来(taken down) (因为集群不会像这样来运行多个混合的版本). 当从补丁版本升级到另一个时(如:从3.0.x 到3.0.y)时,这种情况是不会出现的;这些版本在集群中是可以混合使用的(例外是3.0.0不能与 3.0.x 系列后的版本混合).

在主版本与小版本之间升级时,RabbitMQ有必要的话会自动更新其持久化数据. 在集群中,此任务是由第一个磁盘节点来启动的("upgrader"节点). 因此在升级RabbitMQ集群时,你不需要尝试先启动RAM节点,任何启动的RAM节点都会发生错误,并且不能启动.

虽然不是严格必须的,但使用磁盘节点来作为升级节点通常是好的主意,最后停止那个节点。

自动升级只适用于2.1.1及其之后的版本,如果你有更早的集群 ,你必须重新构建升级.

单台机器上的集群

在某些情况下,在一台机器上运行RabbitMQ节点的集群是有用的(试验性质). 

要在一台机器上运行多个RabbitMQ节点,必须确保节点含有不同的节点名称,数据存储路径,日志文件位置,绑定到不同的端口,并包含那些插件使用的端口等等 .参考配置指南中的RABBITMQ_NODENAMERABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT文档 ,以及 File and Directory Locations guide指南中的 RABBITMQ_MNESIA_DIRRABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE。

你可以在同一个主机上通过重复调用rabbitmq-server(rabbitmq-server.bat on Windows)来手动地启动多个节点 . 例如:

$ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
$ rabbitmqctl -n hare stop_app
$ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
$ rabbitmqctl -n hare start_app

这会设置两个节点的集群,这两个节点都是磁盘节点. 注意,如果你想打开非AMQP的其它端口,你需要通过命令行进行配置

$ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached

主机名称变更

RabbitMQ节点使用主机名来相互通信.因此,所有节点名称都集群中的节点应该都能被解析.对于像 rabbitmqctl这样的工具来说,也是如此.

除此之外,默认情况下,RabbitMQ使用当前系统的主机名称来命名数据库目录.如果主机名变了,将会创建一个空的数据库.为避免数据丢失,应该总是设置一个固定的,可解析的主机名称。无论何时,只要主机名变化了,你就必须要重启RabbitMQ:

$ /etc/init.d/rabbitmq-server restart

类似的效果可通过使用 rabbit@localhost作为broker节点名称来达到。这个解决方案的影响是集群将不会工作,因为选中的主机名不能被远程主机所解析。当从远程主机调用时,类似地rabbitmqctl命令也会失败. 免遭此缺点的复杂方案是使用DNS,如:如果运行EC2,则使用 Amazon Route 53 。如果你想使用节点名称的全限定主机名(RabbitMQ 默认使用短名称),那么可使用DNS解析, 可设置环境变量 RABBITMQ_USE_LONGNAME=true.


防火墙节点

当在一个数据中心或可靠网络时,带防火墙的集群节点是存在的,但这些节点通常被防火墙隔离。再一次声明,当各节点之间的网络连接不稳定时,集群不建议在WAN在使用

在多数配置中,你需要打开4369和25672端口以使用集群正常工作.

Erlang 使用Port Mapper Daemon (epmd) 来解析集群中的节点名称. 默认epmd端口是4369,但它可以通过ERL_EPMD_PORT环境变量进行修改.所有的节点都必须使用同一个端口。详细信息可参考Erlang epmd manpage.

一旦分布式Erlang节点通过empd解析后,其它节点将会尝试直接通信。默认地通信端口比RABBITMQ_NODE_PORT (即,默认是25672)高了20000. 这可以通过RABBITMQ_DIST_PORT 环境变量修改

跨集群Erlang版本

集群中所有节点必须运行相同版本的Erlang.

从客户端连接集群

客户端可以正常连接到集群中的任意节点,如果那个节点发生故障了 ,只要有剩余集群节点幸存,当客户端发现在关闭的连接时,它就能够重新连接到剩余幸存的集群节点上。一般来说,将节点主机名称或IP地址放到客户端程序是极其不明智的,这会导致缺乏灵活性,并需要客户端程序重新编辑,编译,重新配置以适应集群配置变化或者集群节点变化。相反,我们建议采用更抽象的方法: 如有简短TTL配置的动态DNS服务或普通的TCP负载均衡器. 一般来说,这方面的管理集群内连接节点是超出了RabbitMQ本身的范围,我们建议使用其他技术专门设计来解决这些问题。

内存节点集群

内存节点只在内存中保存其元数据。它不会像磁盘节点将元数据写入到磁盘中,但它们拥有更好的性能。 然而,也应该注意到,由于持久化队列数据总是存储在磁盘上的,其性能提升只会影响资源管理(如: 添加/删除队列,交换机,或虚拟主机), 但不会影响发布或消费的速度.

内存节点是一个高级使用例子;当设置你的第一个集群时,你应该不使用它们。你应该用足够的磁盘节点来处理冗余需求,然后如果有必要,再用内存节点进行扩展.

集群中只含有内存节点是相当脆弱的,如果集群停止了,你将不能再次启动,并且会导致数据丢失。RabbitMQ在许多情况下,会阻止创建只包含内存节点的集群,但不能完全阻止。

(译者注:在集群构建中,最好有两个或以上的磁盘节点,然后再考虑使用内存节点进行扩展)

创建内存节点

当节点加入集群时,我们可将其声明为内存节点. 我们可以通过使用像先前rabbitmqctl join_cluster命令再加--ram标志来达到目的:

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.


rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

改变节点类型

我们可以将节点的类型从磁盘修改为内存,反之亦然. 假设我们想反转rabbit@rabbit2 和 rabbit@rabbit1的节点类型,即先将内存节点转换为磁盘节点,随后再将其从磁盘节点转换为内存节点.要做到这点,我们可以使用change_cluster_node_type命令. 首先节点必须先停止.

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done. rabbit2$
rabbitmqctl change_cluster_node_type disc
Turning rabbit@rabbit2 into a disc node ... ...done. Starting node rabbit@rabbit2 ...done.
rabbit1$
rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit1$
rabbitmqctl change_cluster_node_type ram
Turning rabbit@rabbit1 into a ram node ...
rabbit1$
rabbitmqctl start_app
Starting node rabbit@rabbit1 ...done.
posted @ 2016-06-05 19:53 胡小军 阅读(3935) | 评论 (0)编辑 收藏
本章我们将覆盖:
  1. 如何使用消息过期
  2. 如何使指定队列上的消息过期
  3. 如何让队列过期
  4. 管理驳回的(rejected)或过期的消息
  5. 理解其它备用交换器扩展
  6. 理解有效user-ID扩展
  7. 通知队列消息者失败
  8. 理解交换器到交换器扩展
  9. 在消息中嵌入消息目的地
介绍
在本章中,我们将展示关于RabbitMQ扩展上的一些食谱.这些扩展不是AMQP 0-9-1标准的一部分,使用它们会破坏其它AMQPbroker的兼容性。
另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出现了轻微的变化,这是一个简单通往那里的路径.最后, 它们通常是优化问题的有效解决方案。

本章中的例子将更为真实,例如,配置参数,如列表和交换器, 以及路由键名称将定义在Constants接口中。事实上,一个真正的应用程序会遵循这样的准则从配置文件中读取配置文件,以在不同应用程序中共享。
然而,在下面的例子中,为了更简短和较好的可读性,我们并没有指定Constants的命名空间。

如何让消息过期
在本食谱中,我们将展示如何让消息过期.食谱的资源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
  1. Producer.java
  2. Consumer.java
  3. GetOne.java
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
本示例的核心是Producer.java文件.为了产生在给定生存时间(TTL)后过期的消息,我们需要执行下面的步骤:
1. 创建或声明一个用来发送消息的交换器, 并将其绑定到队列上,就像第1章使用AMQP看到的一样:
channel.exchangeDeclare(exchange, "direct", false);
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, routingKey);
2. 像下面这样初始化可选消息属性TTL:
BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
3. 使用下面的代码来发布消息:
channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
如何工作
在这个例子中,生产者创建了一个交换器,一个命名队列,并将它们进行了绑定,当队列上没有附着任何消费者,过期消息就显得非常有意义了。
设置过期时间TTL (以毫秒设置),会促使RabbitMQ在消息过期时,如果消息没有被客户端及时消费,立即删除消息.
在我们的例子中,我们假设应用程序发布了JVM资源统计信息到给定队列,如果存在消费者,那么会像平时一样,获取到实时数据,反之,如果不存在这样的消费者,那么消息会给定生存时间后立即过期。通过这种方式,可以避免我们收集大量的数据。一旦消息者绑定到了队列中,它会得到先前的消息(未过期)。进一步的试验,你可以用GetOne.java文件来替换Consumer.java文件运行.
在调用 channel.basicGet() 时,会使你一次只能消费一个消息。
TIP
可使用channel.basicGet()方法来检查未消费消息的队列.也可以通过为第二参数传递false来调用,即autoAck标志.

在这里我们可以通过调用rabbitmqctl list_queues来监控RabbitMQ队列的状态。  

也可参考
默认情况下,过期消息会丢失,但它们可以路由到其它地方。可参考管理拒绝消息或过期消息食谱来了解更多信息.

如何让指定队列上的消息过期
在本食谱中,我们将展示指定消息TTL的第二种方式.这次,我们不再通过消息属性来指定,而是通过缓存消息的队列来进行指定。在这种情况下,生产者只是简单地发布消息到交换器中,因此,在交换器上绑定标准队列和过期消息队列是可行的。
要在这方面进行备注,须存在一个创建自定义的队列的消费者。生产者是相当标准的.
像前面的食谱一样,你可以在Chapter02/Recipe02/Java/src/rmqexample找到这三个源码。
 
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
现在我们将展示创建特定消息TTL队列的必要步骤。在我们的例子中,需要在Consumer.java文件中执行下面的步骤:
1. 按下面来声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 创建或声明队列,像下在这样为x-message-ttl可选参数指定10,000毫秒的超时时间:
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queue, false, false, false, arguments);
3. 绑定队列到交换器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
在这个例子中,为了最终分析,我们再次假设生产者发送了JVM统计数据给RabbitMQ。最终因为Producer.java文件将其发到一个交换机,如果无消费者连接的话,消息最终会丢失。
想要监控或分析这些统计数据的消费有下面三种选择:
  1. 绑定到一个临时队列,即调用无参的channel.queueDeclare()方法
  2. 绑定到一个非自动删除的命名队列
  3. 绑定到一个非自动删除的命名队列,并且指定x-message-ttl ,如步骤2中展示的一样.
在第一种情况中,消费者将获取实时统计数据,但当它掉线期间,它将不能在数据上执行分析。
在第二种情况中,为了能让它掉线期间,能获取到发送的消息,可以使用一个命名队列(最终是持久化的).但在掉线较长时间后,再重启时,它将有巨大的backlog来进行恢复,因此在队列中可能存在大部分旧消息的垃圾。
在第三种情况中,旧消息垃圾会通过RabbitMQ自己来执行,以使我们从消费者和broker中获益。
更多
当设置per-queue TTL, 就像本食谱中看到的一样,只要未到超时时间,消息就不会被丢弃,此时消费者还可以尝试消费它们。
当使用queue TTL时, 这里有一个细微的变化,但使用per-message TTL时,在broker队列中可能会存在过期消息.
在这种情况下,这些过期消息仍然会占据资源(内存),同时broker统计数据中仍然会计数,直到它们不会到队列头部时。
也中参考
在这种情况下,过期消息也会恢复。参考管理驳回或过期消息食谱.
如何让队列过期
在第三种情况中,TTL不关联任何消息,只关联对列。这种情况对于服务器重启和更新,是一个完美的选择。一旦TTL超时,在最后一个消费者停止消费后,RabbitMQ会丢弃队列.
前面TTL相关食谱,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java ,  Consumer.java ,and  GetOne.java 相关文件。
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
在前面的例子中,扩展只需要关注Consumer.java :
1. 使用下面的代码来创建或声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 创建或声明队列,并为x-expires可选参数指定30,000毫秒的超时时间:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("x-expires", 30000);
channel.queueDeclare(queue, false, false, false,arguments);
3. 将队列绑定到交换器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
当我们运行Consumer.java或 GetOne.java 文件的时候, 超时队列已经创建好了,在消费者附着到队列上或调用channel.basicGet()时,它将持续存在.
只有当我们停止这两个操作超过30秒时,队列才会被删除,并且队列包含的消息也会清除。
TIP
无论生产者是否向其发送了消息,队列事实上都是独立删除的。

在这个试验课程中,我们可通过 rabbitmqctl list_queues 命令来监控RabbitMQ 队列状态.
因此,我们可以想像一种场景,有一个统计分析程序需要重启来更新其代码。由于命名队列有较长的超时时间,因此重启时,不会丢失任何消息。如果我们停止,队列会在超过TTL后被删除,无价值的消息将不再存储。
管理驳回或过期消息
在这个例子中,我们将展示如何使用死信交换器来管理过期或驳回的消息. 死信交换器是一种正常的交换器,死消息会在这里重定向,如果没有指定,死消息会被broker丢弃。
你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源码文件:
  1. Producer.java
  2. Consumer.java
要尝试过期消息,你可以使用第一个代码来发送带TTL的消息,就如如何使指定队列上消息过期食谱中描述的一样.
一旦启动了,消费者不允许消息过期,但可以可以驳回消息,最终导致成为死消息。
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
下面的步骤展示了使用死信交换器来管理过期或驳回消息:
1. 创建一个工作交换品节和死信交换器:
channel.exchangeDeclare(Constants.exchange, "direct", false);
channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
2. 创建使用使用死信交换器和 x-message-ttle参数的队列:
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange",exchange_dead_letter);
channel.queueDeclare(queue, false, false, false,arguments);
3. 然后像下面这样绑定队列:
channel.queueBind(queue, exchange, "");
4. 最后使用channel.basicPublish()来向交换器发送消息 .
5. 要尝试驳回消息,我们需要配置一个消费者,就像前面例子中看到的一样,并使用下面的代码来驳回消息:
basicReject(envelope.getDeliveryTag(), false);
如何工作
我们先从第一个场景开始(单独使用producer): the expired messages. 在步骤中,我们创建两个交换器,工作交换器和死信交换器。在步骤2中,我们使用下面两个可选参数来创建队列:
  1. 使用arguments.put("x-message-ttl", 10000)来设置消息TTL ,正如如何使指定队列上消息过期食谱中描述的一样.
  2. 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)来设置死信交换器名称;
正如你所看到的,我们只是在配置中添加了可选的队列参数。因此,当生产者发送消息到交换器时,它会队列参数来路由。消息会在10秒后过期,之后它会重定向到exchange_dead_letter 
TIP
死信交换器是一个标准的交换器,因此你可以基于任何目的来使用.
对于第二种场景,食谱的消费者会驳回消息.当消费者得到消息后, 它会使用basicReject()方法来发回一个否定应答(nack),当broker收到nack时,它会将消息重定向到exchange_dead_letter. 通过在死信交换器上绑定队列,你可以管理这些消息。
当消息重定向到死信队列时,broker会修改header消息,并在x-dead键中增加下面的值:
  1. reason : 表示队列是否过期的或驳回的(requeue =false )
  2. queue : 表示队列源,例如stat_queue_02/05
  3. time : 表示消息变为死信的日期和时间
  4. exchange : 表示交换器,如monitor_exchange_02/05
  5. routing-keys : 表示发送消息时原先使用的路由键
要在实践中查看这些值,你可使用GetOneDeadLetterQ 类.这可以创建queue_dead_letter队列并会绑定到exchange_dead_letter 
更多
你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")来指定死信路由键 ,它将会代替原来的路由键.这也就意味着你可以用不同的路由键来将不同消息路由到同一个队列中。相当棒。
理解交替交换器扩展
目前,在第1章使用 AMQP中我们已经展示了如何来处理未路由消息(消息发布到了交换器,但未能达到队列). AMQP让生产者通过此条件进行应答,并最终决定是否有需要再次将消息分发到不同的目的地。通过这种扩展,我们可在broker中指定一个交替交换器来路由消息,而并不会对生产者造成更多的干预,本食谱的代码在Chapter02/Recipe05/Java/src/rmqexample .

准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
在本食谱中,我们会在Producer.java中声明交替交换器.
1. 将交换器的名字(无目的地路由消息)-alternateExchange ,放到可选参数map的"alternate-exchange"中,如下所示:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange", alternateExchange);
2. 通过传递arguments map来声明交换器来发送消息:
channel.exchangeDeclare(exchange, "direct", false, false,arguments);
3. 声明alternateExchange自身(已经在步骤1中指定了),如下所示:
channel.exchangeDeclare(alternateExchange, "direct",false);
4. 声明标准化持久化队列,并使用路由键alertRK将其绑定到alternateExchange交换器中:
channel.queueDeclare(missingAlertQueue, true, false, false,null);
channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
如何工作
在这个例子中,我们再次使用了生成统计数据的producer,正如先前的例子一样.但这次,我们添加了路由键来让producer指定一个重要的级别,名为infoRK或alertRK (在例子中是随机分配的).如果你运行一个producer以及至少一个consumer,将不会丢失任何消息,并且一切都会正常工作.
TIP
Consumers在交换器和队列的声明中,必须传递相同的可选参数,否则会抛出异常。
但如果没有消费者监听的话,而我们不想丢失报警的话,这就是为什么必须选择让producer创建alternateExchange (步骤3)并将其绑定到持久队列-missingAlertQueue的原因 (步骤4).
在单独运行producer的时候,你将看到报警存储在这里.alternate交换器让我们在不丢失消息的情况下可以路由消息.你可通过调用rabbitmqctllist_queues或运行CheckAlerts.java来检查状态 .
最后的代码让我们可以查看队列的内容和第一个消息,但不会进行消费。完成这种行为是简单的,它足可以避免这种事实:RabbitMQ client发送了ack,消息未消费,而只是进行监控。
现在,如果我们再次运行Consumer.java文件,它会从missingAlertQueue队列中获取并消费消息.这不是自动的,我们可以选择性地从此队列中获取消息。
通过创建第二个消费者实例( missingAlertConsumer ) 并使用相同的代码来从两个不同队列消费消息就可以完成这种效果。如果在处理实时消息时,想要得到不同的行为,那么我们可以创建一个不同的消费者。

更多
在这个例子中,步骤3和步骤4是可选的。 当定义交换器时,可为交替交换器指定名称,对于其是否存在或是否绑定到任何队列上,并不作强制要求 。如果交替交换器不存在,生产者可通过在丢失消息上设置mandatory标志来得到应答,就如在第1章中处理未路由消息食谱中看到的一样。
甚至有可能出现另一种交换器-它自己的备用交换器,备用交换器可以是链式的,并且无目的地消息在按序地重试,直到找到一个目的地。
如果在交换器链的末尾仍然没有找到目的地,消息将会丢失,生产者可通过调设置mandatory 标志和指定一个合适的ReturnListener参数得到通知。
理解经过验证的user-ID扩展
依据AMQP, 当消费者得到消息时,它是不知道发送者信息的。一般说来,消费者不应该关心是谁生产的消息,对于生产者-消费者解藕来说是相当有利的。然而,有时出于认证需要,为了达到此目的,RabbitMQ 提供了有效的user-ID扩展。
在本例中,我们使用有效user-IDs模拟了订单。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源码.
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
完成下面的步骤,以使用经过验证的user IDs来模拟订单:
1. 像下面一样声明或使用持久化队列:
channel.queueDeclare(queue, true, false, false, null);
2.发送消息时,使用BasicProperties对象,在消息头中指定一个user ID:
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest");
channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
3. 消费者获取到订单后,可像下面这样打印订单数据和消息头:
System.out.println("The message has been placed by "+properties.getUserId());
如何工作
当设置了user-ID时,RabbitMQ 会检查是否是同一个用户打开的连接。在这个例子中,用户是guest ,即RabbitMQ默认用户.
通过调用properties.getUserId() 方法,消费者可以访问发送者user ID。如果你想在步骤2中设置非当前用户的userId,channel.basicPublish()会抛出异常.
TIP
如果不使用user-ID属性,用户将是非验证的,properties.getUserId()方法会返回null.
也可参考
要更好的理解这个例子,你应该知道用户和虚拟机管理,这部分内容将在下个章节中讲解。在下个章节中,我们将了解如何通过在应用程序中使用SSL来提高程序的安全性。只使用user-ID属性,我们可保证用户已认证,但所有信息都是未加密的,因此很容易暴露。
队列失败时通知消费者

根据AMQP标准,消费者不会得到队列删除的通知。一个正在删除队列上等待消息的消费者不会收到任何错误信息,并会无限期地等待。然而,RabbitMQ client提供了一种扩展来让消息收到一个cancel参数-即消费者cancel通知。我们马上就会看到这个例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代码.

准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。

如何做
为了能让扩展工作,你只需要执行下面的步骤:
1.在自定义的消费者中覆盖handleCancel()方法,可继承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
public void handleCancel(String consumerTag) throws IOException {
...
}
如何工作
在我们的例子中,我们选择实现一个消费者,这个消费者只在生产者是持久化的,且队列是由生产者创建的情况下才能工作。
因此,如果队列是非持久化的,Consumer.java文件会立即错误退出. 此行为可以通过调用channel.queueDeclarePassive()来完成 .
Producer.java类在其启动时会创建队列,并在其关闭时调用channel.queueDelete()方法删除队列,如果当队列关闭时,而消费者正在消费队列,那么RabbitMQ client会调用步骤1中覆盖的handleCancel()方法来立即通知消费者。
相对于显示调用channel.basicCancel() 消费者使用handleCancel()方法可以任意理由来退出。只有在这种情况下,RabbitMQ client library会调用Consumer接口的方法:  handleCancelOK() 
更多
消费者cancel通知是client library的扩展,而不是AMQP client libraries的常规方法.一个实例它们的library必须将其声明为可选属性(参考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
RabbitMQ client library 支持并声明了这种特性。
也可参考
在集群中,如果一个节点失效了,也会发生同样的事情:client在队列删除后仍然得不到通知,除非它定义了覆盖了自己的handleCancel()方法。关于这点的更多信息,可参考Chapter 6,开发可伸缩性应用程序。
理解交换器到交换器扩展
默认情况下,AMQP支持交换器到队列,但不支持交换器到交换器绑定。在本例中,我们将展示如何使用RabbitMQ 交换机到交换机扩展.
在本例中,我们将合并来自两个不同交换器的消息到第三个交换器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源码.
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样,并像广播消息食谱中来运行生产者以及使用topic交换器来处理消息路由。
如何做
完成下面的步骤来使用RabbitMQ 交换器到交换器扩展:
1. 使用下面的代码来声明我们需要追踪消息的交换器:
channel.exchangeDeclare(exchange, "topic", false);
2. 使用exchangeBind()来绑定其它例子中的交换器 :
channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
3. 启动追踪消费者:
TraceConsumer consumer = new TraceConsumer(channel);
String consumerTag = channel.basicConsume(myqueue, false,consumer);
如何工作
在步骤1中,我们创建了一个新的交换器,在步骤2中我们绑定到了下面的交换器:
  1. ref_exchange_c1_6 (广播消息) 与exchange绑定.
  2. ref_exchange_c1_8 (使用topic来处理消息路由)与exchange绑定 .
在步骤3中, 消费者可以绑定一个队列到exchange上以任意地获取所有消息.
交换器到交换器扩展的工作方式与交换器到队列绑定过程类似,你也可以指定一个路由键来过滤消息.在步骤2中,我们可以使用#(匹配所有消息)来作为路由键。通过改变路由键你可以使用制作一个filter!
在消息中内嵌消息目的地
在本例子中,我们会展示如何发送单个发布带路由键的的消息.标准AMQP不提供此特性,但幸运的是,RabbitMQ使用消息属性header提供了此特性. 这种扩展称为sender-selected分发.
此扩展的行为类似于电子邮件逻辑.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).这也是为什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
  1. Producer.java
  2. Consumer.java
  3. StatsConsumer.java
  4. CCStatsConsumer.java
  5. BCCStatsConsumer.java
准备
To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
如何做
完成下面的步骤来使用单个发布带路由键的的消息:
1. 使用下面的代码来创建或声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 在消息的header属性中指定CC , BCC路由键:
List<String> ccList = new ArrayList<String>();
ccList.add(backup_alert_routing_key);
headerMap.put("CC", ccList);
List<String> ccList = new ArrayList<String>();
bccList.add(send_alert_routing_key);
headerMap.put("BCC", bccList);
BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
3. 使用下面的三个路由键来绑定三个队列three queues to the exchange using the following three routing keys:
channel.queueBind(myqueue,exchange, alert_routing_key);
channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
4. 使用三个消费者来消费消息
如何工作
当生产者使用CC和BCC消息属性来发送消息时,broker会在所有路由键的队列上拷贝消息 。在本例中,stat类会直接使用路由键alert_routing_key来向交换器发送消息,同时它也会将消息拷贝到使用CC和BCC参数信息来将消息拷贝到myqueueCC_BK,myqueueBCC_SA队列中。
当像e-mails一样发生时,在分发消息到队列前,BCC信息会被broker从消息头中删除,你可查看所有我们示例消费者的输出来观察这种行为。
更多
正常情况下,AMQP不会改变消息头,但BCC扩展是例外。这种扩展可减少发往broker的消息数目。没有此扩展,生产者只能使用不同的路由键来发送多个消息的拷贝。
posted @ 2016-06-05 19:51 胡小军 阅读(1325) | 评论 (0)编辑 收藏

概述

RabbitMQ Java client 将com.rabbitmq.client作为其顶层包. 关键类和接口有:

  • Channel
  • Connection
  • ConnectionFactory
  • Consumer
协议操作可通过Channel接口来进行.Connection用于开启channels,注册connection生命周期事件处理, 并在不需要时关闭connections.
Connections是通过ConnectionFactory来初始化的,在ConnectionFactory中,你可以配置不同的connection设置,如:虚拟主机和用户名等等.

Connections 和 Channels

核心API类是Connection和Channel, 它们代表对应AMQP 0-9-1 connection 和 channel. 在使用前,可像下面这样来导入:

import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

连接到broker

下面的代码会使用给定的参数连接到AMQP broker:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

也可以使用URIs 来设置连接参数:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();


Connection 接口可用来打开一个channel:

Channel channel = conn.createChannel(); 

channel现在可用来发送和接收消息,正如后续章节中描述的一样.

要断开连接,只需要简单地关闭channel和connection:

channel.close(); conn.close();

关闭channel被认为是最佳实践,但在这里不是严格必须的 - 当底层连接关闭的时候,channel也会自动关闭.

使用 Exchanges 和 Queues

采用交换器和队列工作的客户端应用程序,是AMQP高级别构建模块。在使用前,必须先声明.声明每种类型的对象都需要确保名称存在,如果有必要须进行创建.

继续上面的例子,下面的代码声明了一个交换器和一个队列,然后再将它们进行绑定.

channel.exchangeDeclare(exchangeName, "direct", true); 
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这实际上会声明下面的对象,它们两者都可以可选参数来定制. 在这里,它们两个都没有特定参数。

  1. 一个类型为direct,且持久化,非自动删除的交换器
  2. 采用随机生成名称,且非持久化,私有的,自动删除队列

上面的函数然后使用给定的路由键来绑定队列和交换器.

注意,当只有一个客户端时,这是一种典型声明队列的方式:它不需要一个已知的名称,其它的客户端也不会使用它(exclusive),并会被自动清除(autodelete).
如果多个客户端想共享带有名称的队列,下面的代码应该更适合:

channel.exchangeDeclare(exchangeName, "direct", true); 
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这实际上会声明:

  1. 一个类型为direct,且持久化,非自动删除的交换器
  2. 一个已知名称,且持久化的,非私有,非自动删除队列

注意,Channel API 的方法都是重载的。这些 exchangeDeclarequeueDeclare 和queueBind 都使用的是预设行为.
这里也有更多参数的长形式,它们允许你按需覆盖默认行为,允许你完全控制。


发由消息

要向交换器中发布消息,可按下面这样来使用Channel.basicPublish方法:

byte[] messageBodyBytes = "Hello, world!".getBytes(); 
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好的控制,你可以使用重载方法来指定mandatory标志,或使用预先设置的消息属性来发送消息:

channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

这会使用分发模式2(持久化)来发送消息, 优先级为1,且content-type 为"text/plain".你可以使用Builder类来构建你自己的消息属性对象:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

下面的例子使用自定义的headers来发布消息:

Map<String, Object> headers = new HashMap<String, Object>(); 
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

下面的例子使用expiration来发布消息:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

BasicProperties is an inner class of the autogenerated holder class AMQP.

Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

Channels 和并发考虑(线程安全性)

Channel 实例不能在多个线程间共享。应用程序必须在每个线程中使用不同的channel实例,而不能将同个channel实例在多个线程间共享。 有些channl上的操作是线程安全的,有些则不是,这会导致传输时出现错误的帧交叉。
在多个线程共享channels也会干扰Publisher Confirms.

通过订阅来来接收消息

import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

接收消息最高效的方式是用Consumer接口来订阅。当消息到达时,它们会自动地进行分发,而不需要显示地请求

当在调用Consumers的相关方法时, 个别订阅总是通过它们的consumer tags来确定的, consumer tags可通过客户端或服务端来生成,参考 the AMQP specification document.
同一个channel上的消费者必须有不同的consumer tags.

实现Consumer的最简单方式是继承便利类DefaultConsumer.子类可通过在设置订阅时,将其传递给basicConsume调用:

boolean autoAck = false; 
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
@Override
publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});

在这里,由于我们指定了autoAck = false,因此消费者有必要应答分发的消息,最便利的方式是在handleDelivery 方法中处理.

更复杂的消费者可能需要覆盖更多的方法,实践中,handleShutdownSignal会在channels和connections关闭时调用,handleConsumeOk 会在其它消费者之前

调用
,传递consumer tag(不明白,要研究)。

 

消费者可实现handleCancelOk 和 handleCancel方法来接收显示和隐式取消操作通知。

你可以使用Channel.basicCancel来显示地取消某个特定的消费者:

channel.basicCancel(consumerTag);

passing the consumer tag.

消费者回调是在单独线程上处理的,这意味着消费者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上调用阻塞方法。

每个Channel都有其自己的dispatch线程.对于一个消费者一个channel的大部分情况来说,这意味着消费者不会阻挡其它的消费者。如果在一个channel上多个消费者,则必须意识到长时间运行的消费者可能阻挡此channel上其它消费者回调调度.

获取单个消息

要显示地获取一个消息,可使用Channel.basicGet.返回值是一个GetResponse实例, 在它之中,header信息(属性) 和消息body都可以提取:

boolean autoAck = false; 
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

因为autoAck = false,你必须调用Channel.basicAck来应答你已经成功地接收了消息:

channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

处理未路由消息

如果发布消息时,设置了"mandatory"标志,但如果消息不能路由的话,broker会将其返回到发送客户端 (通过 AMQP.Basic.Return 命令).

要收到这种返回的通知, clients可实现ReturnListener接口,并调用Channel.setReturnListener.如果channel没有配置return listener,那么返回的消息会默默地丢弃。

channel.setReturnListener(new ReturnListener() {     
    publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
...
    }
});

 return listener将被调用,例如,如果client使用"mandatory"标志向未绑定队列的direct类型交换器发送了消息.

关闭协议

AMQP client 关闭概述

AMQP 0-9-1 connection和channel 使用相同的方法来管理网络故障,内部故障,以及显示本地关闭.

AMQP 0-9-1 connection  和 channel 有如下的生命周期状态:

  • open: 准备要使用的对象
  • closing: 对象已显示收到收到本地关闭通知, 并向任何支持的底层对象发出关闭请求,并等待其关闭程序完成
  • closed: 对象已收到所有底层对象的完成关闭通知,最终将执行关闭操作

这些对象总是以closed状态结束的,不管基于什么原因引发的关闭,比如:应用程序请求,内部client library故障, 远程网络请求或网络故障.

AMQP connection 和channel 对象会持有下面与关闭相关的方法:

  • addShutdownListener(ShutdownListener 监听器)和removeShutdownListener(ShutdownListener 监听器),用来管理监听器,当对象转为closed状态时,将会触发这些监听器.注意,在已经关闭的对象上添加一个ShutdownListener将会立即触发监听器
  • getCloseReason(), 允许同其交互以了解对象关闭的理由
  • isOpen(), 用于测试对象是否处于open状态
  • close(int closeCode, String closeMessage), 用于显示通知对象关闭

可以像这样来简单使用监听器:

import com.rabbitmq.client.ShutdownSignalException; 
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) { ... } }
);

关闭环境信息

可通过显示调用getCloseReason()方法或通过使用ShutdownListener类中的业务方法的cause参数来ShutdownSignalException中获取关闭原因的有用信息.

ShutdownSignalException 类提供方法来分析关闭的原因.通过调用isHardError()方法,我们可以知道是connection错误还是channel错误.getReason()会返回相关cause的相关信息,这些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引发的异常,如网络通信故障,在这种情况下,可通过getCause()方法来获取信息).

public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication()) {
Method reason = cause.getReason(); ... } ... }
else { Channel ch = (Channel)cause.getReference(); ... } }

原子使用isOpen()方法

channel和connection对象的isOpen()方法不建议在在生产代码中使用,因为此方法的返回值依赖于shutdown cause的存在性. 下面的代码演示了竟争条件的可能性:

public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

相反,我们应该忽略这种检查,并简单地尝试这种操作.如果代码执行期间,connection的channel关闭了,那么将抛出ShutdownSignalException,这就表明对象处于一种无效状态了.当broker意外关闭连接时,我们也应该捕获由SocketException引发的IOException,或者当broker清理关闭时,捕获ShutdownSignalException.

public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

高级连接选项

Consumer线程池

Consumer 线程默认是通过一个新的ExecutorService线程池来自动分配的(参考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的线程池.下面的示例展示了一个比正常分配稍大的线程池:

ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
Executors 和 ExecutorService 都是java.util.concurrent包中的类.

当连接关闭时,默认的ExecutorService将会被shutdown(), 但用户自定义的ExecutorService (如上面所示)将不会被shutdown(). 提供自定义ExecutorService的Clients必须确保最终它能被关闭(通过调用它的shutdown() 方法), 否则池中的线程可能会阻止JVM终止.

同一个executor service,可在多个连接之间共享,或者连续地在重新连接上重用,但在shutdown()后,则不能再使用.

使用这种特性时,唯一需要考虑的是:在消费者回调的处理过程中,是否有证据证明有严重的瓶颈. 如果没有消费者执行回调,或很少,默认的配置是绰绰有余. 开销最初是很小的,分配的全部线程资源也是有界限的,即使偶尔可能出现一阵消费活动.

使用Host列表

可以传递一个Address数组给newConnection()Address只是 com.rabbitmq.client 包中包含host和port组件的简单便利类. 例如:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
将会尝试连接hostname1:portnumber1, 如果不能连接,则会连接hostname2:portnumber2,然后会返回数组中第一个成功连接(不会抛出IOException)上broker的连接.这完全等价于在factory上重复调用factory.newConnection()方法来设置host和port, 直到有一个成功返回.

如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么线程池将与第一个成功连接相关联.

心跳超时

参考Heartbeats guide 来了解更多关于心跳及其在Java client中如何配置的更多信息.

自定义线程工厂

像Google App Engine (GAE)这样的环境会限制线程直接实例化. 在这样的环境中使用RabbitMQ Java client, 需要配置一个定制的ThreadFactory,即使用合适的方法来实例化线程,如: GAE's ThreadManager. 下面是Google App Engine的相关代码.

import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

网络故障时自动恢复

Connection恢复

clients和RabbitMQ节点之间的连接可发生故障. RabbitMQ Java client 支持连接和拓扑(queues, exchanges, bindings, 和consumers)的自动恢复. 大多数应用程序的连接自动恢复过程会遵循下面的步骤:

  1. 重新连接
  2. 恢复连接监听器
  3. 重新打开通道
  4. 恢复通道监听器
  5. 恢复通道basic.qos 设置,发布者确认和事务设置
拓扑恢复包含下面的操作,每个通道都会执行下面的步骤:
  1. 重新声明交换器(except for predefined ones)
  2. 重新声明队列
  3. 恢复所有绑定
  4. 恢复所有消费者
要启用自动连接恢复,须使用factory.setAutomaticRecoveryEnabled(true):
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
如果恢复因异常失败(如. RabbitMQ节点仍然不可达),它会在固定时间间隔后进行重试(默认是5秒). 时间间隔可以进行配置:
ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
当提供了address列表时,将会在所有address上逐个进行尝试:
ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

恢复监听器

可在可恢复连接和通道上注册一个或多个恢复监听器. 当启用了连接恢复时,ConnectionFactory#newConnection 和 Connection#createChannel 的连接已实现了com.rabbitmq.client.Recoverable,并提供了两个方法:

  • addRecoveryListener
  • removeRecoveryListener
注意,在使用这些方法时,你需要将connections和channels强制转换为Recoverable.

发布影响

当连接失败时,使用Channel.basicPublish方法发送的消息将会丢失. client不会保证在连接恢复后,消息会得到分发.要确保发布的消息到达了RabbitMQ,应用程序必须使用Publisher Confirms 


拓扑恢复

拓扑恢复涉及交换器,队列,绑定以及消费者恢复.默认是启用的,但也可以禁用:

ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

手动应答和自动恢复

当使用手动应答时,在消息分发与应答之间可能存在网络连接中断. 在连接恢复后,RabbitMQ会在所有通道上重设delivery标记. 也就是说,使用旧delivery标记的basic.ackbasic.nack, 以及basic.reject将会引发channel exception. 为了避免发生这种情况, RabbitMQ Java client可以跟踪,更新,以使它们在恢复期间单调地增长. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以转换这些 delivery tags,并且不再发送过期的delivery tags. 使用手动应答和自动恢复的应用程序必须负责处理重新分发.

未处理异常

关于connection, channel, recovery, 和consumer lifecycle 的异常将会委派给exception handler,Exception handler是实现了ExceptionHandler接口的任何对象. 默认情况下,将会使用DefaultExceptionHandler实例,它会将异常细节输出到标准输出上.

可使用ConnectionFactory#setExceptionHandler来覆盖原始handler,它将被用于由此factory创建的所有连接:

ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
Exception handlers 应该用于异常日志.

Google App Engine上的RabbitMQ Java Client

在Google App Engine (GAE) 上使用RabbitMQ Java client,必须使用一个自定义的线程工厂来初始化线程,如使用GAE's ThreadManager. 此外,还需要设置一个较小的心跳间隔(4-5 seconds) 来避免InputStream 上读超时:

ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

警告和限制

为了能使拓扑恢复, RabbitMQ Java client 维持了声明队列,交换器,绑定的缓存. 缓存是基于每个连接的.某些RabbitMQ的特性使得客户端不能观察到拓扑的变化,如,当队列因TTL被删除时. RabbitMQ Java client 会尝试在下面的情况中使用缓存实体失效:

  • 当队列被删除时.
  • 当交换器被删除时.
  • 当绑定被删除时.
  • 当消费者在自动删除队列上退出时.
  • 当队列或交换器不再绑定到自动删除的交换器上时.
然而, 除了单个连接外,client不能跟踪这些拓扑变化. 依赖于自动删除队列或交换器的应用程序,正如TTL队列一样 (注意:不是消息TTL!), 如果使用了自动连接恢复,在知道不再使用或要删除时,必须明确地删除这些缓存实体,以净化 client-side 拓扑cache. 这些可通过Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind来进行.

RPC (Request/Reply) 模式

为了便于编程, Java client API提供了一个使用临时回复队列的RpcClient类来提供简单的RPC-style communication.

此类不会在RPC参数和返回值上强加任何特定格式. 它只是简单地提供一种机制来向带特定路由键的交换器发送消息,并在回复队列上等待响应.

import com.rabbitmq.client.RpcClient;  
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(其实现细节为:请求消息使用basic.correlation_id唯一值字段来发送消息,并使用basic.reply_to来设置响应队列的名称.)

一旦你创建此类的实例,你可以使用下面的任意一个方法来发送RPC请求:

byte[] primitiveCall(byte[] message); 
String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

primitiveCall 方法会将原始byte数组转换为请求和响应的消息体. stringCall只是一个primitiveCall的简单包装,将消息体视为带有默认字符集编码的String实例.

mapCall 变种稍为有些复杂: 它会将原始java值包含在java.util.Map中,并将其编码为AMQP 0-9-1二进制表示形式,并以同样的方式来解码response. (注意:在这里,对一些值对象类型有所限制,具体可参考javadoc.)

所有的编组/解组便利方法都使用primitiveCall来作为传输机制,其它方法只是在它上面的做了一个封装.

posted @ 2016-06-04 00:37 胡小军 阅读(15615) | 评论 (1)编辑 收藏
仅列出标题
共5页: 上一页 1 2 3 4 5 下一页