要列出可用插(步骤1),我们将得到下面的屏幕输出:
上图显示的是所有可用的插件,其中方括号中为空的表示还未安装. 标记为[E]的插件是明确安装的. 标记为[e]的插件是隐式安装的,也就是说,这些插件是作为其它的插件的依赖而进行安装的.
在安装了STOMP插件之后,我们必须重启broker以让修改生效.
一旦插件激活了,你可以使用简单的文本STOMP协议来向队列test发送消息(步骤4). 这样你将看到下面的截图:
在另一个终端,我们可以启动队列test来消费消息(步骤5). 这样你将看到下面的截图:
请注意,从消息到最后的文本没有被类型化,这是实际接收到的消息。
更多在这个食谱中,我们使用了标准的STOMP配置。然而,也可以通过RabbitMQ配置文件的选项来定制其端口,SSL使用.
你可在http://www.rabbitmq.com/stomp.html查找到更多详细信息.
TIP
注意STOMP完全不同于Web-Stomp, Web-Stomp是封装在WebSockets中的STOMP.参考第5章节在web应用程序中使用STOMP来开发web监控程序食谱的内容.这两种协议不具有互操作性。
此外,也可以使用STOMP来发送其它类型的消息。如使用临时队列来发送RPC消息,向交换器发送消息,或从交换器中接收消息.
也可参考
在本食谱中,我们已经展示了如何使用Netcat作为文本客户端来让RabbitMQ与STOMP插件交互. 但,这不是一种典型在客户端使用STOMP的方式.
这里有许多可用的STOMP客户端APIs.
你可在http://stomp.github.io/implementations.html中找到可用客户端包的列表.
管理RabbitMQ集群
当部署了一个大的RabbitMQ集群时,管理插件会对集群操作造成一定的开销.在本食谱中,我们将展示如何减轻这种开销。
准备
为了验证这个食谱,你需要一个具有至少两个节点的RabbitMQ集群。如果他们已经安装了管理插件,你需要删除它。
如何做1. 使用下面的命令在一个节点上安装管理插件:
rabbitmq-plugins enable rabbitmq_management
2.使用以下命令在所有其他节点上安装管理代理插件:
rabbitmq-plugins enable rabbitmq_management_agent
如何工作
执行这些步骤后,您只能从第一个节点监视整个群集。其他节点将通过代理来更新第一个节点的控制台状态,但您无法访问他们的端口15672。
通常,你会在几个节点安装完整的管理插件,如前端或管理节点和管理代理插件的其它节点。
监控Shovel状态
在第7章节,开发高可用应用程序中,我们已经看过了如何使用Shovel插件.在本食谱中,我们将展示如何使用一个适当的插件来监控其正确行为. 这是rabbitmq_management插件的扩展.
准备
要测试这个食谱,我们需要运行两个RabbitMQ brokers. 在本食谱中,我们将其称为rabbit@node01和rabbit@node02.
如何做
我们假设这两个broker已经在他们各自的节点上运行了.我们将使用配置文件来配置节点node01上的broker,你也可以从Chapter07/Recipe05中拷贝配置文件:
1. 在RabbitMQ配置文件中,一般是/etc/rabbit/rabbitmq.config,插入下面的Shovel配置:
[{rabbitmq_shovel,
[ {shovels, [ {my_books_shovel,
[
{sources, [ {broker, "amqp://node02"}]}
, {destinations, [ {broker, "amqp://"}]}
, {queue, <<"myBooksQueueCopy">>}
, {prefetch_count, 10}
, {reconnect_delay, 5}
]}]}].
2. 输入下面的命令来激活插件:
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
3. 为修改的broker重启broker以使配置生效,命令如下:
service rabbitmq-server restart
4. 使用下面的地址来访问RabbitMQ 管理界面:
http://node01:15672/
5. 从管理界面中,分别在节点node01和node02上创建myBooksQueueCopy队列.
6. 在管理界面,导航至Admin | Shovel Status.
如何工作
当我们激活Shovel插件时,可通过rabbitmq_shovel_management 插件来监控其行为. 一旦我们激活了插件,由于重定向队列不存在,你会看到错误信息。显示如下:
只要我们创建了队列(步骤5),页面将在5秒内自动刷新:
在这个页面上, RabbitMQ 允许我们监视所有已配置的Shovels.
开发新插件 – 使用ODBC来操作关系型数据库
在前面的食谱中,我们已经看过了,如何使用一些现有的插件。现在,我们将了解如何来开发自定义新插件.
注意,这不是一种典型实践. 大部分的操作通常可通过 RabbitMQ/AMQP client API来执行.
然而,在某些必要的情况下,也需要执行一些定制化的操作.
这是出于优化目的,或需要严格监控broker自己的行为.
在本食谱中,我们将展示如何让RabbitMQ来消费消息,并使用ODBC驱动将其存入关系型数据库中.准备
在这个食谱中,我们需要RabbitMQ 和PostgreSQL. 然而, 由于ODBC驱动的普及,本食谱可以很容易地适应到几乎所有关系数据库上。
另外,我们还需要安装Mercurial (http://mercurial.selenic.com/),它是RabbitMQ的版本系统,用来检出RabbitMQ的源代码.
虽然这里的环境是Linux,但只需要稍为修改,本食谱也能工作在Windows上.
如何做
现在,我们将安装和配置PostgreSQL.
1. 安装PostgreSQL和它的ODBC驱动. 例如,在基于yum的Linux发行版上(如RedHat, CentOS, Fedor或其它),用roo执行下面的命令.
yum install postgresql-server
yum install postgresql-odbc
postgresql-setup initdb
service postgresql start
2. 从RabbitMQ插件中,使用下面的命令来创建一个新用户(密码为rmq_plugin_password) 和一个新数据库:
su postgres createuser --no-superuser --no-createdb --no-createrole --pwpromptrmq_plugin_user
createdb --owner rmq_plugin_userrmqdb
3. ,通过在文件/var/lib/pgsql/data/pg_hba.conf末尾追加下面的行,以允许新创建的用户访问给定数据库(只允许从本地访问):
# TYPE DATABASE USER ADDRESS METHOD
local rmqdb rmq_plugin_user md5
host rmqdb rmq_plugin_user 127.0.0.1/32 md5
4.以root身份重新加载PostgreSQL配置:
service postgresql reload
5. 此时,你应该可以从本地连接PostgreSQL了,例如,通过执行下面的命令:
psql --username=rmq_plugin_user --dbname=rmqdb
6. 完成ODBC驱动的配置来访问数据库,要做到这一点,必须在文件/etc/odbc.ini中插入下面的行:
[rmqDSN]
Driver = PostgreSQL
Description = PostgreSQL data source for RabbitMQ
Servername = localhost
Port = 5432
Protocol = 8.4
Database = rmqdb
7.现在我们可以使用isql命令来测试ODBC连接,同时我们也可以直接使用Erlang来测试. erl命令如下:
odbc:start()
odbc:connect("DSN=rmqDSN;UID=rmq_plugin_user;PWD=rmq_plugin_password", [])
到目前为止,我们已经准备好了开发插件所需要的东西.现在我们来了解如何启用metronome 插件.它存在于官方RabbitMQ文档(http://www.rabbitmq.com/plugin-development.html:
8. 使用下面的命令来检出RabbitMQ开发源代码树:
cd $HOME
hg clone http://hg.rabbitmq.com/rabbitmq-public-umbrella
cd rabbitmq-public-umbrella
make co
9. 使用下面的命令来编译metronome插件:
cd rabbitmq-metronome
make
10. 在开发的broker中安装插件及其依赖.
cd ../rabbitmq-server
mkdir plugins
cd plugins
ln –s ../../rabbitmq-metronome
ln–s../../rabbitmq-erlang-client
11.为了避免覆盖产品环境安装,即使是我们不用root用户,我们也需要停止产品服务器,设置一些环境变量,并创建一些相应的目录以让RabbitMQ以标准用户启动:
export RABBITMQ_LOG_BASE=$HOME/rmq/log
export RABBITMQ_MNESIA_BASE=$HOME/rmq/mnesia
export RABBITMQ_ENABLED_PLUGINS_FILE=$HOME/rmq/enabled_plugins
mkdir –p $RABBITMQ_LOG_BASE $RABBITMQ_MNESIA_BASE
12. 现在我们可以启用开发插件了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins list
scripts/rabbitmq-plugins enable rabbitmq_metronome
13. 最后,启动开发服务器:
scripts/rabbitmq-server
14. 此刻,我们已经准备好了如何开发一个新插件, 它使得通过ODBC的方式,将RabbitMQ与第三方数据库连接到了一起.你可在Chapter09/Recipe04中找到本食谱的源码. 你可从Chapter09/Recipe04/
rabbitmq-odbctap目录拷贝源码进RabbitMQ开发树-rabbitmq-publicumbrella(在步骤8中检出来的). 通过这种方式,你可以获得类似于下面截图的源码树: 15. 要开始开发一个新插件,需要从现有插件rabbitmq-metronome 或 rabbitmq-shovel中拷贝和重命名文件. The makefile will be left unmodified.
16. 修改package.mk文件以反映所需的依赖与测试模块.
17. 编辑rabbitmq_odbctap.app.src. 此文件包含Erlang项目所需的资源,在我们的例子中,一般和惯例都必须包含下面的配置:
{application, rabbitmq_odbctap,
[{description, "Embedded Rabbit ODBC tap"},
{vsn, "0.0.0"},
{modules, []},
{registered, []},
{mod, {rabbit_odbctap, []}},
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
{applications, [kernel, stdlib, rabbit, amqp_client]}]}.
18. 定制rabbit_odbctap.erl, 模块的入口点,为了让它启动或停止,需要配置插件自身.
19. 定制rabbit_odbctap_sup.erl, 对于Erlang管理源节点,通过定义回调来定义主管行为的回调.
20. 在rabbit_odbctap_worker.erl中实现插件逻辑.这是实际模块的入口点,在我们这里,它会连接RabbitMQ broker,绑定队列,并通过ODBC来消费指定数据库包含的消息.
21.多个模块共用的数据定义(也就是Erlang记录)可放置在include目录中.如,在rabbit_odbctap.hrl中,你可以找到Erlang记录odbctap_config的定义,此定义是被rabbit_odbctap.erl、 rabbit_odbctap_worker.erl所共用的.
22. 准备一个或多个测试模块.在我们的例子中,在rabbit_odbctap_tests.erl中只能找到一个骨架.
23. 编译插件并执行自动化测试.
cd $HOME/rabbitmq-public-umbrella/rabbimq-odbctap
make
make test
24. 在开发服务器中安装插件.
cd ../rabbitmq-server/plugins
ln –s ../../rabbitmq-odbctap .
25. 在设置步骤11中所示的环境后,我们可以启用插件和(重新)启动服务器了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins enable rabbitmq_odbctap
scripts/rabbitmq-server
如何工作
为了实战本食谱,我们一开始就配置了一个PostgreSQL数据库,以及它相应的ODBC驱动.这部分内容已经在食谱的前半部分中展示过了.
然后,我们展示了如何启用样例模板插件rabbitmq_metronome(RabbitMQ自身在开发树中提供的).如果一切正常, 步骤13启动的开发broker将以会以我们标准用户运行,并在日志文件$HOME/rmq/log/rabbit@localhost.log中,你可能会看到下面的行:
=INFO REPORT==== 21-Oct-2013::03:28:50 ===
Server startup complete; 2 plugins started.
* amqp_client
* rabbitmq_metronome
这是开发新插件的起点,即使用Erlang ODBC client来监控已配置PostgreSQL数据库中的某些给定队列.这部分内容将在食谱的第三部分展示。
要实现我们的插件,我们选择遵循一种公共的方案,即使用模块来实现Erlang行为.实际上, Erlang 鼓励使用通过supervisor来解藕应用程序(这里是我们新开发的插件)和运行者(即RabbitMQ自身) 的松耦合架构。
关于这方面的内容,可参考http://www.erlang.org/doc/man/supervisor.html and http://www.erlang.org/doc/design_principles/sup_princ.html.
supervisor唯一需要定制的是,可从startup模块中获取配置信息,并将其传递给worker模块. 为了达到目的,startup 模块(rabbit_odbctap.erl)包含了读取配置信息的代码。
read_config() ->
{ok, Dsn} = application:get_env(dsn),
{ok, User} = application:get_env(user),
{ok, Password} = application:get_env(password),
{ok, Queue} = application:get_env(queue),
Config = #odbctap_config{
dsn = Dsn,
user = User,
password = Password,
queue = Queue},
Config.
调用application:get_env/1可让模块自动访问如下设置的定义:
1.在Erlang资源文件rabbitmq_odbctap.app.src中,在编译时,设置了env键:
...
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
...
在rabbitmq.config文件中是以运行时读取的,其配置遵循典型的RabbitMQ (即Erlang)格式 , 且在下面的代码中指定了rabbitmq_odbctap键:
[{rabbitmq_odbctap,
[{dsn, "rmqDSN"},
{user,"rmq_plugin_user"},
{password, "rmq_plugin_password"},
{queue, "tapped_queue"}]}].
在本书的归档目录中,你能找到完整的文件.
rabbit_odbctap_worker.erl worker 模块实现gen_server行为.
你可在http://www.erlang.org/doc/design_principles/gen_server_concepts.html找到更多详细信息.
在我们的模块中,一旦模块启动了就会调用 init/2,然后它会通过ODBC客户端 Erlang API http://www.erlang.org/
doc/apps/odbc/来连接ODBC连接器,并触发RabbitMQ消费者. 重要的一点是,在这种情况下,内嵌RabbitMQ client是连接到本地broker的,即嵌入到同一个Erlang虚拟机中,并会执行下面的调用:
{ok, Connection} = amqp_connection:start(#amqp_params_direct{})
通过这种方式,插件可使用内部Erlang连接和更为高效的消息协议, 因为完全避免了AMQP通信协议的编组.
TIP
为了能在RabbitMQ日志文件中记录一些信息,你可以使用与rabbit_log:info/2相似的调用,正如init/2中定义的一样.
通过使用gen_server行为, broker 的消息不是从接收块中消费的(http://www.rabbitmq.com/erlang-client-user-guide.html, 参考订阅队列章节), 而是通过handle_info/2回调实施的:
handle_info({#'basic.deliver'{delivery_tag = Tag},
#amqp_msg{payload = Payload}},State = #state{channel=Channel,odbcHandle = Ohandle}) ->
Query = "INSERT INTO rmqmessages VALUES ('now','"++binary_to_list(Payload) ++"')",
{updated, _} = odbc:sql_query(Ohandle, Query),
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
{noreply, State};
这是本食谱的核心.当RabbitMQ自身初始化的时候,且数据库处于打开状态时,每个消费消息都将得到存储。
更多
非常重要的一点是开发插件是最后的选择,最好是通过使用AMQP client library来开发一个外部应用程序.
TIP
一个插件可能连累RabbitMQ的稳定性.
因此,只有当极端的整合或对性能有需要时,才可以评估是否开发一个插件。