随笔 - 41  文章 - 7  trackbacks - 0
<2016年6月>
2930311234
567891011
12131415161718
19202122232425
262728293012
3456789

常用链接

留言簿

随笔分类

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

     摘要: serverStatus.pdf原文:https://docs.mongodb.com/v3.0/reference/command/serverStatus/定义serverStatusserverStatus命令用于返回数据库进程状态的概述文档. 大部分监控程序都会定期运行此命令来收集实例相关的统计信息:{ serverStatus: 1 } 其值(即上面的1)不影响命令的操作。2.4版本中修...  阅读全文
posted @ 2017-06-26 21:08 胡小军 阅读(2515) | 评论 (0)编辑 收藏

SQL标准定义了4类隔离级别,包括了一些具体规则,用来限定事务内外的哪些改变是可见的,哪些是不可见的。低级别的隔离级一般支持更高的并发处理,并拥有更低的系统开销。
Read Uncommitted(读取未提交内容)

       在该隔离级别,所有事务都可以看到其他未提交事务的执行结果。本隔离级别很少用于实际应用,因为它的性能也不比其他级别好多少。读取未提交的数据,也被称之为脏读(Dirty Read)。
Read Committed(读取提交内容)

       这是大多数数据库系统的默认隔离级别(但不是MySQL默认的)。它满足了隔离的简单定义:一个事务只能看见已经提交事务所做的改变。这种隔离级别 也支持所谓的不可重复读(Nonrepeatable Read),因为同一事务的其他实例在该实例处理其间可能会有新的commit,所以同一select可能返回不同结果。
Repeatable Read(可重读)

       这是MySQL的默认事务隔离级别,它确保同一事务的多个实例在并发读取数据时,会看到同样的数据行。不过理论上,这会导致另一个棘手的问题:幻读 (Phantom Read)。简单的说,幻读指当用户读取某一范围的数据行时,另一个事务又在该范围内插入了新行,当用户再读取该范围的数据行时,会发现有新的“幻影” 行。InnoDB和Falcon存储引擎通过多版本并发控制(MVCC,Multiversion Concurrency Control)机制解决了该问题。

Serializable(可串行化) 
       这是最高的隔离级别,它通过强制事务排序,使之不可能相互冲突,从而解决幻读问题。简言之,它是在每个读的数据行上加上共享锁。在这个级别,可能导致大量的超时现象和锁竞争。

         这四种隔离级别采取不同的锁类型来实现,若读取的是同一个数据的话,就容易发生问题。例如:

         脏读(Drity Read):某个事务已更新一份数据,另一个事务在此时读取了同一份数据,由于某些原因,前一个RollBack了操作,则后一个事务所读取的数据就会是不正确的。

         不可重复读(Non-repeatable read):在一个事务的两次查询之中数据不一致,这可能是两次查询过程中间插入了一个事务更新的原有的数据。

         幻读(Phantom Read):在一个事务的两次查询中数据笔数不一致,例如有一个事务查询了几列(Row)数据,而另一个事务却在此时插入了新的几列数据,先前的事务在接下来的查询中,就会发现有几列数据是它先前所没有的。

         在MySQL中,实现了这四种隔离级别,分别有可能产生问题如下所示:


下面,将利用MySQL的客户端程序,分别测试几种隔离级别。测试数据库为test,表为tx;表结构:

id                              int

num

                              int

两个命令行客户端分别为A,B;不断改变A的隔离级别,在B端修改数据。

(一)、将A的隔离级别设置为read uncommitted(未提交读)

 在B未更新数据之前:

客户端A:

B更新数据:

客户端B:

客户端A:

        经过上面的实验可以得出结论,事务B更新了一条记录,但是没有提交,此时事务A可以查询出未提交记录。造成脏读现象。未提交读是最低的隔离级别。

(二)、将客户端A的事务隔离级别设置为read committed(已提交读)

 在B未更新数据之前:

客户端A:

B更新数据:

客户端B:

客户端A:

       经过上面的实验可以得出结论,已提交读隔离级别解决了脏读的问题,但是出现了不可重复读的问题,即事务A在两次查询的数据不一致,因为在两次查询之间事务B更新了一条数据。已提交读只允许读取已提交的记录,但不要求可重复读。

(三)、将A的隔离级别设置为repeatable read(可重复读)

 在B未更新数据之前:

客户端A:

B更新数据:

客户端B:

客户端A:

B插入数据:

客户端B:

客户端A:

       由以上的实验可以得出结论,可重复读隔离级别只允许读取已提交记录,而且在一个事务两次读取一个记录期间,其他事务部的更新该记录。但该事务不要求与其他事务可串行化。例如,当一个事务可以找到由一个已提交事务更新的记录,但是可能产生幻读问题(注意是可能,因为数据库对隔离级别的实现有所差别)。像以上的实验,就没有出现数据幻读的问题。

(四)、将A的隔离级别设置为 可串行化 (Serializable)

A端打开事务,B端插入一条记录

事务A端:

事务B端:

因为此时事务A的隔离级别设置为serializable,开始事务后,并没有提交,所以事务B只能等待。

事务A提交事务:

事务A端

事务B端

      
 serializable完全锁定字段,若一个事务来查询同一份数据就必须等待,直到前一个事务完成并解除锁定为止 。是完整的隔离级别,会锁定对应的数据表格,因而会有效率的问题。

 转自:http://xm-king.iteye.com/blog/770721
posted @ 2016-09-24 00:06 胡小军 阅读(234) | 评论 (0)编辑 收藏

一、rsync的概述

rsync是类unix系统下的数据镜像备份工具,从软件的命名上就可以看出来了——remote sync。rsync是Linux系统下的文件同步和数据传输工具,它采用“rsync”算法,可以将一个客户机和远程文件服务器之间的文件同步,也可以 在本地系统中将数据从一个分区备份到另一个分区上。如果rsync在备份过程中出现了数据传输中断,恢复后可以继续传输不一致的部分。rsync可以执行 完整备份或增量备份。它的主要特点有:

1.可以镜像保存整个目录树和文件系统;

2.可以很容易做到保持原来文件的权限、时间、软硬链接;无须特殊权限即可安装;

3.可以增量同步数据,文件传输效率高,因而同步时间短;

4.可以使用rcp、ssh等方式来传输文件,当然也可以通过直接的socket连接;

5.支持匿名传输,以方便进行网站镜象等;

6.加密传输数据,保证了数据的安全性;

 

二、镜像目录与内容

rsync -av duying  /tmp/test

 

查看/tmp/test目录,我们可以看到此命令是把duying这个文件夹目录连同内容全部考到当前目录下了

 

rsync  -av duying/ /tmp/test         注意:比上一条命令多了符号“/” 

 

再次查看/tmp/test目录,我们发现没有duying这个目录,只是看到了目录中的内容

 

三、增量备份本地文件

rsync -avzrtopgL  --progress /src /dst


-v是“--verbose”,即详细模式输出; -z表示“--compress”,即传输时对数据进行压缩处理;

-r表示“--recursive”,即对子目录以递归的模式处理;-t是“--time”,即保持文件时间信息;

-o表示“owner”,用来保持文件属主信息;-p是“perms”,用来保持文件权限;

-g是“group”,用来保持文件的属组信息;

--progress用于显示数据镜像同步的过程;

 

四、镜像同步备份文件

rsync -avzrtopg --progress --delete /src  /dst


--delete选项指定以rsync服务器端为基础进行数据镜像同步,也就是要保持rsync服务器端目录与客户端目录的完全一致;

--exclude选项用于排除不需要传输的文件类型;

 

五、设置定时备份策略

crontab -e

30 3 * * * rsync -avzrtopg  --progress  --delete  --exclude "*access*"

--exclude "*debug*"  /src /dst

 

如果文件比较大,可使用nohup将进程放到后台执行。

nohup rsync -avzrtopgL  --progress /data/opt /data2/  >/var/log/$(date +%Y%m%d).mail.log & 

 

六、rsync的优点与不足

与传统的cp、tar备份方式对比,rsync具有安全性高、备份迅速、支持增量备份等优点,通过rsync可以解决对实时性要求不高的数据备份需求,例如,定期地备份文件服务器数据到远端服务器,对本地磁盘定期进行数据镜像等。

但是随着系统规模的不断扩大,rsync的缺点逐渐被暴露了出来。首先,rsync做数据同步时,需要扫描所有文件后进行对比,然后进行差量传输。如果文 件很大,扫面文件是非常耗时的,而且发生变化的文件往往是很少一部分,因此rsync是非常低效的方式。其次,rsync不能实时监测、同步数据,虽然它 可以通过Linux守护进程的方式触发同步,但是两次触发动作一定会有时间差,可能导致服务器端和客户端数据出现不一致。


转自:http://blog.sina.com.cn/s/blog_6954b9a901011esn.html

posted @ 2016-09-23 22:01 胡小军 阅读(237) | 评论 (0)编辑 收藏

     Linux下如何查看版本信息, 包括位数、版本信息以及CPU内核信息、CPU具体型号等等,整个CPU信息一目了然。

 
  1、# uname -a   (Linux查看版本当前操作系统内核信息)
 
  Linux localhost.localdomain 2.4.20-8 #1 Thu Mar 13 17:54:28 EST 2003 i686 athlon i386 GNU/Linux
 
  2、# cat /proc/version (Linux查看当前操作系统版本信息)
 
      Linux version 2.4.20-8 (bhcompile@porky.devel.redhat.com)
      (gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5)) #1 Thu Mar 13 17:54:28 EST 2003
 
  3、# cat /etc/issue  或cat /etc/redhat-release(Linux查看版本当前操作系统发行版信息)
 
  Red Hat Linux release 9 (Shrike)
  4、# cat /proc/cpuinfo (Linux查看cpu相关信息,包括型号、主频、内核信息等)
 
  processor        : 0
     vendor_id         : AuthenticAMD
  cpu family        : 15
  model             : 1
  model name      : AMD A4-3300M APU with Radeon(tm) HD Graphics
  stepping         : 0
  cpu MHz          : 1896.236
  cache size       : 1024 KB
  fdiv_bug         : no
  hlt_bug          : no
  f00f_bug        : no
  coma_bug      : no
  fpu                : yes
  fpu_exception   : yes
  cpuid level      : 6
  wp                : yes
  flags             : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr
                           sse sse2 syscall mmxext lm 3dnowext 3dnow
  bogomips      : 3774.87
 
  5、# getconf LONG_BIT  (Linux查看版本说明当前CPU运行在32bit模式下, 但不代表CPU不支持64bit)
 
  32
 
  6、# lsb_release -a

      以上文章转载自:http://www.cnblogs.com/lanxuezaipiao/archive/2012/10/22/2732857.html
posted @ 2016-09-23 21:58 胡小军 阅读(230) | 评论 (0)编辑 收藏
     摘要: 原文:http://shiro.apache.org/reference.htmlApache Shiro介绍Apache Shiro是什么?Apache Shiro 是一个可干净处理认证,授权,企业会话管理以及加密的强大且灵活的开源安全框架.Apache Shiro的首要目标是易于使用和理解. 安全可以是非常复杂的,有时甚至是痛苦的,但它不是. 框架应该隐藏复杂的地方,暴露干净而方便的API,以...  阅读全文
posted @ 2016-08-18 17:32 胡小军 阅读(2485) | 评论 (0)编辑 收藏
  1. 在项目上右键进入Properties,选择Deployment Assembly,再点击Add...,如下图所示:

    2.然后在弹出的窗口中,选择
    Java Build Path Entries,点击Next,如下图所示:



    3.选择你要你引入的UserLibrary,点击Finish即可

    注意:如果在Java Web Project引入了其它Java Project,默认引用的Java Project的编译后字节码是不会部署到WEB-INF/class下的,此时需要使用上面的Project进行导出.
posted @ 2016-08-17 12:53 胡小军 阅读(2322) | 评论 (0)编辑 收藏

原文:http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html

介绍

除了帮助页面,所有URIs只会服务application/json类型的资源,并且需要HTTP基础认证(使用标准RabbitMQ用户数据库). 默认用户是guest/guest.

大多数URIs需要虚拟主机名称作为其路径的一部分, 因为名称是虚拟主机的唯一标识符对象. 默认虚拟主机称为"/", 它需要编码为"%2f".

PUT一个资源会对其进行创建. 你上传的JSON对象必须有某个键keys (下面文档有描述),其它的键会被忽略. 缺失键会引发错误.

在AMQP中,由于绑定没有名称或IDs,因此我们基于其所有属性人工合成了一个. 

由于一般情况下很难预测这个名字, 你可以通过POST一个工厂URI来创建绑定.查看下面的例子.

注意事项

这些注意事项适用于当前管理AP的开发版本。在未来,他们将是固定的。
  • arguments 字段会被忽略.你不创建一个队列,交换器或使用参数进行绑定. 带有参数的队列,交换器或绑定也不会显示这些参数.
  • 权限偶尔才需要强制执行.如果一个用户能用HTTP API进行认证,那么它们可以做任何事情.
  • 从GET请求中返回的对象中包含许多与监控相关的信息. 它们是无证实的,并且将来可能要发生变化.

示例

下面有几个快速例子,它们使用了Unix命令行工具curl:

  • 获取虚拟主机列表:
    $ curl -i -u guest:guest http://localhost:55672/api/vhosts 
    HTTP/1.1 200 OK
    Server: MochiWeb/1.1 WebMachine/1.7 (participate in the frantic)
    Date: Tue, 31 Aug 2010 15:46:59 GMT
    Content-Type: application/json
    Content-Length: 5
    ["/"]
  • 创建一个新虚拟主机:
    $ curl -i -u guest:guest -H "content-type:application/json" \   -XPUT http://localhost:55672/api/vhosts/foo 
    HTTP/1.1 204 No Content
    Server: MochiWeb/1.1 WebMachine/1.7 (participate in the frantic)
    Date: Fri, 27 Aug 2010 16:56:00 GMT
    Content-Type: application/json
    Content-Length: 0

    注意: 你必须将mime类型指定为application/json.

    Note: 在上传的JSON对象中,对象名称是不需要的,因为它已经包含在了URI中. 由于一个虚拟主机除了名称外没有其它属性,这意味着你完全不需要指定一个body.

  • 在默认虚拟主机中创建一个新的交换器:
    $ curl -i -u guest:guest -H "content-type:application/json" \   -XPUT -d'{"type":"direct","auto_delete":false,"durable":true,"arguments":[]}' \   http://localhost:55672/api/exchanges/%2f/my-new-exchange 
    HTTP/1.1 204 No Content
    Server: MochiWeb/1.1 WebMachine/1.7 (participate in the frantic)
    Date: Fri, 27 Aug 2010 17:04:29 GMT
    Content-Type: application/json
    Content-Length: 0

    注意: 在PUT或DELETE的响应中, 除非失败了,否则我们绝不会返回一个body.

  • 再删除它:
    $ curl -i -u guest:guest -H "content-type:application/json" \   -XDELETE http://localhost:55672/api/exchanges/%2f/my-new-exchange 
    HTTP/1.1 204 No Content
    Server: MochiWeb/1.1 WebMachine/1.7 (participate in the frantic)
    Date: Fri, 27 Aug 2010 17:05:30 GMT
    Content-Type: application/json
    Content-Length: 0

参考


GETPUTDELETEPOSTPathDescription
X


/api/overview
描述整个系统的各种随机信息。
X


/api/connections所有打开连接的列表.
X
X
/api/connections/name一个单独的连接. DELETE它会导致连接关闭.
X


/api/channels所有打开通道的列表.
X


/api/channels/channel单个通道的详情.
X


/api/exchanges所有交换器的列表.
X


/api/exchanges/vhost指定虚拟主机中所有交换器列表.
XXX
/api/exchanges/vhost/name一个单独的交换器.要PUT一个交换器,你需要一些像下面这样的body:
{"type":"direct","auto_delete":false,"durable":true,"arguments":[]}
X


/api/exchanges/vhost/name/bindings指定交换器中的绑定列表.
X


/api/queues所有队列的列表.
X


/api/queues/vhost指定虚拟主机中所有队列列表.
XXX
/api/queues/vhost/name一个单独队列.要PUT一个队列, 你需要一些像下面这样的body:
{"auto_delete":false,"durable":true,"arguments":[]}
X


/api/queues/vhost/queue/bindings指定队列中的所有绑定列表.
X


/api/bindings所有绑定列表.
X


/api/bindings/vhost指定虚拟主机上的所有绑定列表.
X

X/api/bindings/vhost/queue/exchange队列和交换器之间的所有绑定列表. 记住,队列和交换器可以绑定多次!要创建一个新绑定, POST 这个URI.你需要一些像下面这样的body:
{"routing_key":"my_routing_key","arguments":[]}
响应会包含一个Location header,它会告诉你新绑定的URI.
XXX
/api/bindings/vhost/queue/exchange/props队列和交换器之间的单个绑定. URI的props部分是一个名称,用于由路由键和属性组成的绑定.你可以通过PUT这个URI来创建一个绑定,它比上面POST URI更方便.
X


/api/vhosts所有虚拟主机列表.
XXX
/api/vhosts/name单个虚拟主机.由于虚拟主机只有一个名称,因此在PUT时不需要body.
X


/api/users所有用户列表.
XXX
/api/users/name单个用户. 要PUT一个用户, 你需要一些像下面这样的body:
{"password":"secret"}
X


/api/users/user/permissions指定用户的所有权限列表.
X


/api/permissions所有用户的所有权限列表.
XXX
/api/permissions/vhost/user一个虚拟主机中某个用户的个人权限. 要PUT一个权限,你需要一些像下面这样的body:
{"scope":"client","configure":".*","write":".*","read":".*"}
posted @ 2016-08-13 21:50 胡小军 阅读(7278) | 评论 (0)编辑 收藏
     摘要: 3.1.15 消息监听器容器配置有相当多的配置SimpleMessageListenerContainer 相关事务和服务质量的选项,它们之间可以互相交互.当使用命名空间来配置<rabbit:listener-container/>时,下表显示了容器属性名称和它们等价的属性名称(在括号中).未被命名空间暴露的属性,以`N/A`表示.Table 3.3. 消...  阅读全文
posted @ 2016-08-13 16:24 胡小军 阅读(6532) | 评论 (0)编辑 收藏
     摘要: 3.1.10 配置broker介绍AMQP 规范描述了协议是如何用于broker中队列,交换器以及绑定上的.这些操作是从0.8规范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin 接口中. 那个接口的RabbitMQ 实现是RabbitAdmin,它位于org.springframework.amqp.rabbit.core 包.A...  阅读全文
posted @ 2016-08-13 16:07 胡小军 阅读(4936) | 评论 (0)编辑 收藏
     摘要: 3.1.9 Request/Reply 消息介绍AmqpTemplate 也提供了各种各样的sendAndReceive 方法,它们接受同样的参数选项(exchange, routingKey, and Message)来执行单向发送操作. 这些方法对于request/reply 场景也是有用的,因为它们在发送前处理了必要的"reply-to"属性配置,并能通过它在专...  阅读全文
posted @ 2016-08-13 15:59 胡小军 阅读(6612) | 评论 (0)编辑 收藏
     摘要: Consumer Tags从1.4.5版本开始,你可以提供一种策略来生成consumer tags.默认情况下,consumer tag是由broker来生成的.public interface ConsumerTagStrategy { String createConsumerTag(String queue); }该队列是可用的,所以它可以(可选)在tag中使用。参考Sectio...  阅读全文
posted @ 2016-08-13 12:48 胡小军 阅读(13023) | 评论 (0)编辑 收藏
     摘要: Queue Affinity 和 LocalizedQueueConnectionFactory当在集群中使用HA队列时,为了获取最佳性能,可以希望连接到主队列所在的物理broker. 虽然CachingConnectionFactory 可以配置为使用多个broker 地址; 这会失败的,client会尝试按顺序来连接. LocalizedQueueConnectionFac...  阅读全文
posted @ 2016-08-13 12:38 胡小军 阅读(6229) | 评论 (0)编辑 收藏
     摘要: 3. 参考这部分参考文档详细描述了组成Sring AMQP的各种组件. main chapter 涵盖了开发AMQP应用程序的核心类. 这部分也包含了有关示例程序的章节.3.1 使用 Spring AMQP在本章中,我们将探索接口和类,它们是使用Spring AMQP来开发应用程序的必要组件 .3.1.1 AMQP 抽象介绍Spring ...  阅读全文
posted @ 2016-08-13 12:21 胡小军 阅读(6578) | 评论 (0)编辑 收藏
     摘要: 原文:http://docs.spring.io/spring-amqp/docs/1.6.0.RELEASE/reference/html/1. 前言Spring AMQP项目将其核心Spring概念应用于基于AMQP消息解决方案的开发中.我们提供了一个发送和接收消息的高级抽象模板.同时,我们也提供了消息驱动POJO的支持.这些包有助于AMQP资源的管理,从而提升依赖注入和声明式配置的使用. 在...  阅读全文
posted @ 2016-08-13 12:03 胡小军 阅读(5890) | 评论 (0)编辑 收藏
     摘要: 1 概述1.1 本文档的目标此文档定义了一个网络协议-高级消息队列协议(AMQP), 它使一致的客户端程序可以与一致的消息中间件服务器进行通信.我们面对的是这个领域有经验的技术读者,同时还提供了足够的规范和指南.技术工程师可以根据这些文档,在任何硬件平台上使用各种编程语言来构建遵从该协议的解决方案。1.2 摘要1.2.1 为什么使用AMQP?AMQP在一致性客户端和消息中间件(也称为"broker...  阅读全文
posted @ 2016-08-12 18:30 胡小军 阅读(10777) | 评论 (0)编辑 收藏
     摘要: 原文:http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#beans-java7.12.1 基本概念: @Bean 和 @Configuration在Spring新的Java配置支持中,其核心构件是@Configuration注解类和@Bean注解方法.@Bean 注解用来表示方...  阅读全文
posted @ 2016-08-05 17:04 胡小军 阅读(2301) | 评论 (0)编辑 收藏
     摘要: RabbitMQ内置支持TLS。自RabbitMQ 3.4.0起, 为防止 POODLE attack 攻击,已经自动禁用了SSLv3.使用TLS时,推荐安装的Erlang/OTP版本为17.5或以上版本. R16版本在某些证书中可以工作,但存在major limitations.必须安装Erlang加密程序,并且保证它能工作.对于那些从源码进行Erlang编译的Windows...  阅读全文
posted @ 2016-08-02 22:25 胡小军 阅读(11847) | 评论 (0)编辑 收藏
     摘要: 原文:http://www.rabbitmq.com/configure.htmlRabbitMQ 提供了三种方式来定制服务器:环境变量定义端口,文件位置和名称(接受shell输入,或者在环境配置文件(rabbitmq-env.conf)中设置)配置文件为服务器组件设置权限,限制和集群,也可以定义插件设置.运行时参数和策略可在运行时进行修改集群设置大部分设置都使用前面的两种方法,但本指南会全部讲解...  阅读全文
posted @ 2016-08-02 09:38 胡小军 阅读(49246) | 评论 (0)编辑 收藏

名称

rabbitmqctl — 用于管理中间件的命令行工具

语法

rabbitmqctl [-n node] [-t timeout] [-q] {command} [command options...]

描述

RabbitMQ是AMQP的实现, 后者是高性能企业消息通信的新兴标准. RabbitMQ server是AMQP 中间件健壮的,可扩展的实现.

rabbitmqctl 用来管理RabbitMQ中间件的命令行工具.它通过连接中间件节点来执行所有操作。

如果中间件没有运行,将会显示诊断信息, 不能到达,或因不匹配Erlang cookie而拒绝连接.

选项

[-n node]

默认节点是"rabbit@server",此处的server是本地主机. 在一个名为"server.example.com"的主机上, RabbitMQ Erlang node 的节点名称通常是rabbit@server (除非RABBITMQ_NODENAME在启动时设置了非默认值). hostname -s 的输出通常是"@" 标志后的东西.查看rabbitmq-server(1)来了解配置RabbitMQ broker的细节.

[-q]

使用-q标志来启用宁静(quiet)模式,这会一致消息输出.

[-t timeout]

操作超时时间(秒为单位). 只适用于"list" 命令. 默认是无穷大.

命令

应用程序和集群管理

stop [pid_file]

用于停止运行RabbitMQ的Erlang node.如果指定了pid_file,还将等待指定的过程结束。例如:

rabbitmqctl stop

此命令会终止RabbitMQ node的运行.

stop_app

停止RabbitMQ application,但Erlang node会继续运行.此命令主要用于优先执行其它管理操作(这些管理操作需要先停止RabbitMQ application),如reset.例如:

rabbitmqctl stop_app

start_app

启动RabbitMQ application.

此命令典型用于在执行了其它管理操作之后,重新启动停止的RabbitMQ application。如reset.例如:

rabbitmqctl start_app

此命令来指导RabbitMQ node来启动RabbitMQ application.

wait {pid_file}

等待RabbitMQ application启动.此命令用来等待RabbitMQ application来启动node。它会等待创建pid文件,然后等待pid文件中的特定pid过程启动,最后等待RabbitMQ  application 来启动node. 

pid file是通过rabbitmq-server 脚本来创建的.默认情况下,它存放于Mnesia目录中. 修改RABBITMQ_PID_FILE 环境变量可以改变此位置。如:

rabbitmqctl wait /var/run/rabbitmq/pid

此命令会在RabbitMQ node启动后返回.

reset

将RabbitMQ node还原到最初状态.包括从所在群集中删除此node,从管理数据库中删除所有配置数据,如已配置的用户和虚拟主机,以及删除所有持久化消息.

执行reset和force_reset之前,必须停止RabbitMQ application ,如使用stop_app.

示例:

rabbitmqctl reset

此命令会重设RabbitMQ node.

force_reset

强制RabbitMQ node还原到最初状态.

不同于reset , force_reset 命令会无条件地重设node,不论当前管理数据库的状态和集群配置是什么. 它只能在数据库或集群配置已损坏的情况下才可使用。

执行reset和force_reset之前,必须停止RabbitMQ application ,如使用stop_app.

示例:

rabbitmqctl force_reset

此命令会重设RabbitMQnode.

rotate_logs {suffix}

指示RabbitMQ node循环日志文件.

RabbitMQ 中间件会将原来日志文件中的内容追加到原始名称和后辍的日志文件中,然后再将原始日志文件内容复制到新创建的日志上。实际上,当前日志内容会移到以此后辍结尾的文件上。当目标文件不存在时,将会进行创建。如果不指定后辍,则不会发生循环,日志文件只是重新打开。示例:

rabbitmqctl rotate_logs .1

此命令指示RabbitMQ node将日志文件的内容追加到新日志文件(文件名由原日志文件名和.1后辍构成)中。如. rabbit@mymachine.log.1 和 rabbit@mymachine-sasl.log.1. 最后, 日志会在原始位置恢复到新文件中.

集群管理

join_cluster {clusternode} [--ram]

clusternode

加入集群的节点.

[--ram]

如果进行了设置,节点将以RAM节点身份加入集群.

指导节点成为集群中的一员. 在加入集群之前,节点会重置,因此在使用此命令时,必须小心. 这个命令要成功,RabbitMQ应用程序必须先停止,如stop_app.

集群节点可以是两种类型: 磁盘节点(Disc Node) 或 内存节点(RAM Node).磁盘节点会在RAM和磁盘中复制数据, 通过冗余可以防止节点失效事件,并可从断电这种全局事件中进行恢复. RAM节点只在RAM中复制数据(除了队列的内容外,还依赖于队列是否是持久化的或者内容对于内存来说是否过大) ,并主要用于可伸缩性. RAM节点只有当管理资源(如,增加/删除队列,交换机,或绑定)的时候才具有更高的性能.一个集群必须至少有一个磁盘节点,通常来说还不止一个.

默认情况下,节点是磁盘节点.如果你想要创建内存节点,需要提供--ram 标志.

在执行cluster命令之后, 无论何时,当前节点上启动的RabbitMQ 应用程序在节点宕机的情况下,会尝试连接集群中的其它节点。

要脱离集群, 必须重设(reset)节点. 你也可以通过forget_cluster_node 命令来远程删除节点.

更多详情,参考集群指南.

例如:

rabbitmqctl join_cluster hare@elena --ram

此命令用于指示RabbitMQ node以ram节点的形式将 hare@elena 加入集群.

cluster_status

按节点类型来分组展示集群中的所有节点,包括当前运行的节点.

例如:

rabbitmqctl cluster_status

此命令会显示集群中的所有节点.

change_cluster_node_type {disc | ram}

修改集群节点的类型. 要成功执行此操作,必须首先停止节点,要将节点转换为RAM节点,则此节点不能是集群中的唯一disc节点。

例如:

rabbitmqctl change_cluster_node_type disc

此命令会将一个RAM节点转换为disc节点.

forget_cluster_node [--offline]

[--offline]

允许节点从脱机节点中删除. 这只在所有节点都脱机且最后一个掉线节点不能再上线的情况下有用,从而防止整个集群从启动。它不能使用在其它情况下,因为这会导致不一致

远程删除一个集群节点.要删除的节点必须是脱机的, 而在删除节点期间节点必须是在线的,除非使用了--offline 标志.

当使用--offline 标志时,rabbitmqctl不会尝试正常连接节点;相反,它会临时改变节点以作修改.如果节点不能正常启动的话,这是非常有用的.在这种情况下,节点将变成集群元数据的规范源(例如,队列的存在),即使它不是以前的。因此,如果有可能,你应该在最新的节点上使用这个命令来关闭。

例如:

rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer

此命令会从节点hare@mcnulty中删除rabbit@stringer节点.

rename_cluster_node {oldnode1} {newnode1} [oldnode2] [newnode2 ...]

支持在本地数据库中重命名集群节点.

此子命令会促使rabbitmqctl临时改变节点以作出修改. 因此本地集群必须是停止的,其它节点可以是在线或离线的.

这个子命令接偶数个参数,成对表示节点的旧名称和新名称.你必须指定节点的旧名称和新名称,因为其它停止的节点也可能在同一时间重命名.

同时停止所有节点来重命名也是可以的(在这种情况下,每个节点都必须给出旧名称和新名称)或一次停止一个节点来重命名(在这种情况下,每个节点只需要被告知其名句是如何变化的).

例如:

rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia

此命令来将节点名称rabbit@misshelpful 重命名为rabbit@cordelia.

update_cluster_nodes {clusternode}

clusternode

用于咨询具有最新消息的节点.

指示已集群的节点醒来时联系clusternode.这不同于join_cluster ,因为它不会加入任何集群 - 它会检查节点已经以clusternode的形式存在于集群中了.

需要这个命令的动机是当节点离线时,集群可以变化.考虑这样的情况,节点A和节点B都在集群里边,这里节点A掉线了,C又和B集群了,然后B又离开了集群.当A醒来的时候,它会尝试联系B,但这会失败,因为B已经不在集群中了.update_cluster_nodes -n A C 可解决这种场景.

force_boot

确保节点将在下一次启动,即使它不是最后一个关闭的。通常情况下,当你关闭整个RabbitMQ 集群时,你重启的第一个节点应该是最后一个下线的节点,因为它可以看到其它节点所看不到的事情. 但有时这是不可能的:例如,如果整个集群是失去了电力而所有节点都在想它不是最后一个关闭的.

在这种节点掉线情况下,你可以调用rabbitmqctl force_boot .这就告诉节点下一次无条件的启动节点.在此节点关闭后,集群的任何变化,它都会丢失.

如果最后一个掉线的节点永久丢失了,那么你需要优先使用rabbitmqctl forget_cluster_node --offline因为它可以确保在丢失的节点上掌握的镜像队列得到提升。

例如:

rabbitmqctl force_boot

这可以强制节点下次启动时不用等待其它节点.

sync_queue [-p vhost] {queue}

queue
同步队列的名称

指示未同步slaves上的镜像队列自行同步.同步发生时,队列会阻塞(所有出入队列的发布者和消费者都会阻塞).此命令成功执行后,队列必须是镜像的.

注意,未同步队列中的消息被耗尽后,最终也会变成同步. 此命令主要用于未耗尽的队列。

cancel_sync_queue [-p vhost] {queue}

queue

取消同步的队列名称.

指示同步镜像队列停止同步.

purge_queue [-p vhost] {queue}

queue

要清除队列的名称.

清除队列(删除其中的所有消息).

set_cluster_name {name}

设置集群名称. 集群名称在client连接时,会通报给client,也可用于federation和shovel插件记录消息的来源地. 群集名称默认是来自在群集中的第一个节点的主机名,但可以改变。

例如:

rabbitmqctl set_cluster_name london

设置集群名称为"london".

用户管理

注意rabbitmqctl 管理RabbitMQ 内部用户数据库. 任何来自其它认证后端的用户对于rabbitmqctl来说是不可见的.

add_user {username} {password}

username

要创建的用户名称.

password

设置创建用户登录broker的密码.       

rabbitmqctl add_user tonyg changeit

此命令用于指示RabbitMQ broker 创建一个拥有非管理权限的用户,其名称为tonyg, 初始密码为changeit.


delete_user {username}

username

要删除的用户名称.

例如:

rabbitmqctl delete_user tonyg

此命令用于指示RabbitMQ broker删除名为tonyg的用户

change_password {username} {newpassword}

username

要修改密码的用户名称.

newpassword

用户的新密码.

例如:

rabbitmqctl change_password tonyg newpass

此命令用于指定RabbitMQ broker将tonyg 用户的密码修改为newpass.

clear_password {username}

username

要清除密码的用户名称.

例如:

rabbitmqctl clear_password tonyg

此命令会指示RabbitMQ broker清除名为tonyg的用户密码.现在,此用户不能使用密码登录(但可以通过SASL EXTERNAL登录,如果配置了的话).

authenticate_user {username} {password}

username

用户的名称.

password

用户的密码.

例如:

rabbitmqctl authenticate_user tonyg verifyit

此命令会指示RabbitMQ broker以名称为tonyg, 密码为verifyit来进行验证.

set_user_tags {username} {tag ...}

username

要设置tag的用户名称.

tag

用于设置0个,1个或多个tags.任何现有的tags都将被删除.

例如:

rabbitmqctl set_user_tags tonyg administrator

此命令指示RabbitMQ broker用于确保tonyg 是administrator.当通过AMQP来登录时,这没有什么效果,但用户通过其它的途经来登录时,它可用来管理用户,虚拟主机和权限(如使用管理插件).

rabbitmqctl set_user_tags tonyg

此命令会指示RabbitMQ broker删除tonyg上的任何现有的tag.

list_users

列出用户. 每个结果行都包含用户名,其后紧跟用户的tags.

例如:

rabbitmqctl list_users

此命令指示RabbitMQ broker列出所有用户.

访问控制

注意rabbitmqctl 会管理RabbitMQ的内部用户数据库. 无权限的用户将不能使用rabbitmqctl.

add_vhost {vhost}

vhost

要创建虚拟主机名称.

创建一个虚拟主机.

例如:

rabbitmqctl add_vhost test

此命令指示RabbitMQ broker来创建一个新的名为test的虚拟主机.

delete_vhost {vhost}

vhost

要删除的虚拟主机的名称.

删除一个虚拟主机.

删除一个虚拟主机,同时也会删除所有交换机,队列,绑定,用户权限,参数和策略.

例如:

rabbitmqctl delete_vhost test

此命令指示RabbitMQ broker删除名为test的虚拟主机.

list_vhosts [vhostinfoitem ...]

列出所有虚拟主机.

vhostinfoitem 参数用于标识哪些虚拟主机应该包含在结果集中.结果集中的列顺序会匹配参数的顺序.vhostinfoitem 可接受下面的值:

name

虚拟主机的名称.

tracing

是否对虚拟主机启用追踪.

如果没有指定vhostinfoitem 参数,那么会显示虚拟主机名称.

例如:

rabbitmqctl list_vhosts name tracing

此命令用于指示RabbitMQ broker显示所有虚拟主机.

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

授予用户可访问的虚拟机名称,默认是/.

user

可访问指定虚拟主机的用户名称.

conf

一个用于匹配用户在哪些资源名称上拥有配置权限的正则表达式

write

一个用于匹配用户在哪些资源名称上拥有写权限的正则表达式.

read

一个用于匹配用户在哪些资源名称上拥有读权限的正则表达式.

设置用户权限.

例如:

rabbitmqctl set_permissions -p /myvhost tonyg "^tonyg-.*" ".*" ".*"

此命令表示RabbitMQ broker授予tonyg 用户可访问 /myvhost虚拟主机,并在资源名称以"tonyg-"开头的所有资源上都具有配置权限,并在所有资源上都拥有读写权限。

clear_permissions [-p vhost] {username}

vhost

用于设置禁止用户访问的虚拟主机名称,默认为/.

username

禁止访问特定虚拟主机的用户名称.

设置用户权限.

例如:

rabbitmqctl clear_permissions -p /myvhost tonyg

此命令用于指示RabbitMQ broker禁止tonyg 用户访问/myvhost虚拟主机.

list_permissions [-p vhost]

vhost

用于指定虚拟主机名称,将会列出所有可访问此虚拟主机的所有用户名称和权限.默认为/.

显示虚拟机上权限.

例如:

rabbitmqctl list_permissions -p /myvhost

此命令指示RabbitMQ broker列出所有已授权访问/myvhost 虚拟主机的用户,同时也会列出这些用户能在虚拟主机资源可操作的权限.注意,空字符串表示没有任何授予的权限。

list_user_permissions {username}

username

要显示权限的用户名称.

列出用户权限.

例如:

rabbitmqctl list_user_permissions tonyg

此命令指示RabbitMQ broker列出tonyg可授权访问的所有虚拟主机名称,以及在这些虚拟主机上的操作.

参数管理

RabbitMQ的某些特性(如联合插件)是动态控制的. 每个参数都是与特定虚拟主机相关的组件名称, name和value构成的. 组件名称和name都是字符串,值是Erlang term. 参数可被设置,清除和显示.通常你可以参考文档来了解如何设置参数.

set_parameter [-p vhost] {component_name} {name} {value}

设置一个参数.

component_name

要设置的组件名称.

name

要设置的参数名称.

value

要设置的参数值,作不JSON项。在多数shells中,你更喜欢将其引起来.

例如:

rabbitmqctl set_parameter federation local_username '"guest"'

此命令用于在默认虚拟主机上设置federation 组件的local_username 参数值"guest".

clear_parameter [-p vhost] {component_name} {key}

清除参数.

component_name

要清除参数的组件名称.

name

要清除的参数名称.

例如:

rabbitmqctl clear_parameter federation local_username

此命令用于清除默认虚拟主机上的federation 组件的local_username 参数值.

list_parameters [-p vhost]

列出虚拟主机上的所有参数.

示例:

rabbitmqctl list_parameters

此命令用于列出默认虚拟主机上的所有参数.

策略管理

策略用于在集群范围的基础上用于控制和修改队列和交换机的行为. 策略应用于虚拟主机,由name, pattern, definition或可选的priority组成. 策略可被设置,清除和列举.

set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}

设置策略.

name

策略名称.

pattern

正则表达式, 匹配要应用的资源

definition

策略的定义,JSON形式.在大多数shells中,你很可能需要引用这个

priority

策略的整数优先级. 数字越高则优先级越高.默认是0.

apply-to

策略适用的对象类型,其值可为 "queues", "exchanges" 或 "all".默认是"all".

例如:

rabbitmqctl set_policy federate-me "^amq." '{"federation-upstream-set":"all"}'

此命令在默认虚拟主机上设置策略为federate-me,这样内建的交换器将进行联合.

clear_policy [-p vhost] {name}

清除策略.

name

要清除的策略名称.

例如:

rabbitmqctl clear_policy federate-me

此命令来清除默认虚拟主机上的federate-me 策略.

list_policies [-p vhost]

显示虚拟主机上的所有策略.

例如:

rabbitmqctl list_policies

此命令会显示默认虚拟主机上的所有策略.

服务器状态

服务器状态查询查询服务器返回一个结果以制表符分隔的列表. 某些查询(list_queueslist_exchangeslist_bindings, 和 list_consumers) 接受一个可选的vhost 参数. 如果这个参数出现了,那么它必须指定在查询的后面.

list_queues, list_exchanges and list_bindings 命令接受一个可选的虚拟主机参数以显示其结果.默认值为"/".

list_queues [-p vhost] [queueinfoitem ...]

返回队列的详细信息. 如果无-p标志,将显示/虚拟主机上的队列详情."-p" 标志可用来覆盖此默认值.

queueinfoitem 参数用于指示哪些队列信息项会包含在结果集中.结果集的列顺序将匹配参数的顺序.queueinfoitem 可以是下面列表中的任何值:

name

非ASCII字符的队列名称.

durable

服务器重启后,队列是否能幸存.

auto_delete

不再使用时,是否需要自动删除队列.

arguments

队列参数.

policy

应用到队列上的策略名称.

pid

关联队列的Erlang进程ID.

owner_pid

表示队列专用所有者的代表连接的Erlang进程ID.如果队列是非专用的,此值将为空.

exclusive

True:如果队列是专用的(即有owner_pid), 反之false

exclusive_consumer_pid

表示此channel的专用消费者订阅到此队列的Erlang进程Id. 如果没有专用消费者,则为空.

exclusive_consumer_tag

专用消费者订阅到此队列的Consumer tag.如果没有专用消费者,则为空.

messages_ready

准备分发给客户端的消息数目.

messages_unacknowledged

分发到客户端但尚未应答的消息数目.

messages

准备分发和未应答消息的总和(队列深度).

messages_ready_ram

驻留在ram中messages_ready的消息数目.

messages_unacknowledged_ram

驻留在ram中messages_unacknowledged的消息数目.

messages_ram

驻留在ram中的消息总数.

messages_persistent

队列中持久化消息的数目(对于瞬时队列总是0).

message_bytes

队列中所有消息体的大小总和.这不包括消息属性(包括headers) 或任何开销(overhead)。

message_bytes_ready

类似于message_bytes ,但只统计准备投递给clients的那些消息.

message_bytes_unacknowledged

类似于message_bytes ,但只统计那些已经投递给clients但还未应答的消息

message_bytes_ram

类似于message_bytes ,但只统计那些在RAM中的消息

message_bytes_persistent

类似于message_bytes ,但只统计那些持久化的消息

head_message_timestamp

如果存在,只显示队列中第1个消息的timestamp属性. 消息的时间戳只出现在分页情况下.

disk_reads

从队列启动开如,已从磁盘上读取该队列的消息总次数.

disk_writes

从队列启动开始,已向磁盘队列写消息总次数.

consumers

消费者数目.

consumer_utilisation

时间分数(0.0与1.0之间),队列可立即向消费者投递消息. 它可以小于1.0,如果消费者受限于网络堵塞或预提取数量.

memory

与队列相关的Erlang进程消耗的内存字节数,包括栈,堆以及内部结构.

slave_pids

如果队列是镜像的,这里给出的是当前slaves的IDs.

synchronised_slave_pids

如果队列是镜像的,当前slaves的IDs是master同步的- 即它们可在无消息丢失的情况下,接管master.

state

队列状态.正常情况下是'running', 但如果队列正在同步也可能是"{syncing, MsgCount}". 处于集群下的节点如果掉线了,队列状态交显示'down' (大多数queueinfoitems 将不可用).

如果没有指定queueinfoitems,那么将显示队列名称和队列深度.

例如:

rabbitmqctl list_queues -p /myvhost messages consumers

此命令显示了/myvhost虚拟主机中每个队列的深度和消费者数目.

list_exchanges [-p vhost] [exchangeinfoitem ...]

返回交换器细节.如果没有指定"-p"选项,将返回 / 虚拟主机的细节.  "-p" 选项可用来覆盖默认虚拟主机.

exchangeinfoitem 参数用来表示哪些交换器信息要包含在结果中. 结果集中列的顺序将与参数顺序保持一致. exchangeinfoitem 可接受下面的列表中任何值:

name

交换器名称.

type

交换器类型(如[directtopicheadersfanout]).

durable

当服务器重启时,交换器是否能复活.

auto_delete

当不再使用时,交换器是否需要自动删除.

internal

交换器是否是内部的,即不能由client直接发布.

arguments

交换器参数


policy

应用到交换器上的策略名称.

如果没有指定exchangeinfoitems,那么将会显示交换器类型和类型

例如:

rabbitmqctl list_exchanges -p /myvhost name type

此命令会显示/myvhost中每个交换器的名称和类型.

list_bindings [-p vhost] [bindinginfoitem ...]

返回绑定细节.默认情况下返回的是 / 虚拟主机上的绑定详情.可使用"-p" 标记来覆盖默认虚拟主机.

bindinginfoitem 参数用来表示结果中包含哪些绑定信息. 结果集中列的顺序将匹配参数的顺序.bindinginfoitem可接受下面列表的任意值:

source_name

绑定中消息来源的名称. C中非ASCII转义字符.

source_kind

绑定中消息来源的类别.当前总是exchange. C中非ASCII转义字符.

destination_name

绑定中消息目的地名称.C中非ASCII转义字符.

destination_kind

绑定中消息目的地的种类. C中非ASCII转义字符.

routing_key

绑定的路由键,C中非ASCII转义字符.

arguments

绑定参数.

如果没有指定bindinginfoitems,将会显示所有上述条目.

例如:

rabbitmqctl list_bindings -p /myvhost exchange_name queue_name

此命令来显示/myvhost虚拟主机上绑定的交换器名称和队列名称.

list_connections [connectioninfoitem ...]

返回TCP/IP连接统计.

connectioninfoitem 参数用来表示在结果中包含哪些连接信息. 结果集中列的顺序将匹配参数的顺序.               connectioninfoitem可接受下面列表的任意值:

pid

与连接相关的Erlang进程ID.

name

连接的可读名称.

port

服务器端口.

host

返回反向DNS获取的服务器主机名称,或 IP地址(反向DNS解析失败) 或者未启用.

peer_port

Peer 端口.

peer_host
   返回反向DNS获取的Peer主机名称,或 IP地址(反向DNS解析失败) 或者未启用.
ssl

用Boolean来表示连接是否是SSL的.

ssl_protocol

SSL 协议(如. tlsv1)

ssl_key_exchange

SSL key exchange 算法 (如 rsa)

ssl_cipher

SSL cipher 算法 (如aes_256_cbc)

ssl_hash

SSL hash 函数 (如 sha)

peer_cert_subject

peer的 SSL 安全证书的主体, RFC4514形式.

peer_cert_issuer

peer的 SSL安全证书的发行者, RFC4514 形式.

peer_cert_validity

peer的SSL安全证书的有效期.

state

连接状态(可为[startingtuningopeningrunningflowblockingblockedclosingclosed]其中一个).

channels
使用连接的channel数。
protocol

使用的AMQP协议版本(当前是{0,9,1} 或{0,8,0}). 注意,如果client请求的是AMQP 0-9 连接, 我们会视为AMQP 0-9-1.

auth_mechanism

使用的SASL认证机制,如PLAIN.

user

与连接相关的用户名

vhost

虚拟主机名称,C中非ASCII转义字符.

timeout

连接超时/协商的心跳间隔,秒为单位.

frame_max

最大 frame 大小(字节).

channel_max
        此连接上channel的最大数目.
client_properties

连接建立期间由client发送的信息属性.

recv_oct

Octets已收到.

recv_cnt

Packets 已收到.

send_oct

Octets 发送.

send_cnt

Packets 发送.

send_pend

发送队列大小.

connected_at

连接建立的日期和时间,当作timestamp.

如果没有connectioninfoitems, 那么会显示user, peer host, peer port,流量控制和内存块状态的时间

例如:

rabbitmqctl list_connections send_pend port

此命令会显示发送队列的大小以及第个连接的服务器端口.

list_channels [channelinfoitem ...]

返回所有当前channel上的信息,逻辑容器执行大部分 AMQP命令.这将包含最初AMQP连接的部分,以及不同插件和其它扩展创建的channels.

channelinfoitem 参数用来表示在结果集中包含哪些channel信息.结果集中列的顺序将匹配参数的顺序. channelinfoitem 可接受下面列表中的任何一个参数:

pid

与连接相关的Erlang进程ID.

connection

channel所属的连接Erlang进程ID.

name

channel的可读名称.

number

channel的数目,在一个连接中,它有唯一的标识符.

user

与channel相关的用户名称.

vhost

channel操作的虚拟主机.

transactional

True:如果channel处于事务模式,其它情况为false.

confirm

True:如果channel是确认模式,其它情况为false.

consumer_count

在channel中接收消息的逻辑AMQP消费者数目.

messages_unacknowledged

在channel中消息已投递但还未应答的消息数目.

messages_uncommitted

在channel中已收到消息但还没有提交事务的消息个数.

acks_uncommitted
确认收到一个还未提交的事务数。
messages_unconfirmed
尚未确认已发布消息的数目。在通道不在确认模式下时,这将是0。
prefetch_count

新消费者QoS预提取限制, 0表示无上限.

global_prefetch_count

整个channel QoS预提取限制, 0表示无上限.

如果没有指定channelinfoitems,那么将显示pid, user, consumer_count,messages_unacknowledged.


例如:

rabbitmqctl list_channels connection messages_unacknowledged

此命令会显示每个channel中连接进程和未应答消息的数目.

list_consumers [-p vhost]

列举消费者, 即订阅队列的消息流. 每行将打印出由制表符分隔的已订阅队列的名称,创建并管理订阅的channel进程的标识,channel中订阅的consumer tag唯一标识符, boolean值表示投递到此消费者的消息是否需要应答,整数值表示表示预提取限制(为0表示无限制), 以及关于此消费者的任何其它参数.

status

显示 broker 状态信息,如当前Erlang节点上运行的应用程序, RabbitMQ 和 Erlang 的版本信息, OS 名称, 内存和文件描述符统计信息. (查看cluster_status 命令来找出那些节点是集群化的以及正在运行的.)

例如:

rabbitmqctl status

此命令显示了RabbitMQ broker的相关信息.

environment

显示每个运行程序环境中每个变量的名称和值.

report

为所有服务器状态生成一个服务器状态报告,输出应该重定向到一个文件.

例如:

rabbitmqctl report > server_report.txt

此命令创建了一个服务器报告,可将它附着在支持请求的电子邮件中.

eval {expr}

执行任意Erlang表达式.

例如:

rabbitmqctl eval 'node().'

此命令用于返回rabbitmqctl连接的节点名称

杂项

close_connection {connectionpid} {explanation}

connectionpid

要关闭的与连接相关的Erlang进程ID.

explanation

解释字符串.

指示broker关闭与Erlang进程id相关的连接(可通过list_connections 命令查看), 通过为连接客户端传递解释字符串(作为AMQP连接关闭协议的一部分).

例如:

rabbitmqctl close_connection "<rabbit@tanto.4262.0>" "go away"

此命令指示RabbitMQ broker关闭与Erlang 进程id<rabbit@tanto.4262.0>相关联的连接, 同时向客户端传递go away字符串.

trace_on [-p vhost]

vhost

要开启追踪的虚拟主机的名称.

开启追踪.注意,追踪状态不是持久化的; 如果服务器重启,追踪状态将会丢失.

trace_off [-p vhost]

vhost

要停止追踪的虚拟主机名称.

停止追踪.

set_vm_memory_high_watermark {fraction}

fraction
当一个浮点数大于或等于0时,会触发流量控制新内存阈值部分。

set_vm_memory_high_watermark absolute {memory_limit}

memory_limit

流程控制触发的新内存限制, 以字节来表示大于或等于0的整数或以字符串和内存单位来表示(如 512M或1G). 可用的单位是: k, kiB: kibibytes (2^10 bytes) M, MiB: mebibytes (2^20) G, GiB: gibibytes (2^30) kB: kilobytes (10^3) MB: megabytes (10^6) GB: gigabytes (10^9)

set_disk_free_limit {disk_limit}

disk_limit

以整数或字符串单位的可用磁盘下限限制(查看vm_memory_high_watermark), 如 512M or 1G. 一旦可用磁盘空间达到这个限制,就会设置磁盘报警.

set_disk_free_limit mem_relative {fraction}

fraction

相对于整个可用内存的限制,其值为非负浮点数. 当值小于1.0时是很危险的,应该谨慎使用.

posted @ 2016-07-30 16:52 胡小军 阅读(12779) | 评论 (0)编辑 收藏
原文:http://www.rabbitmq.com/production-checklist.html
介绍
像RabbitMQ这样的数据服务经常有许多的可调参数.某些配置对于开发环境来说是意义的,但却不适合产品环境. 单个配置不能满足每种使用情况. 因此,在进入产品环境时,评估配置是很重要的. 这就是本指南提供帮助的目的.
虚拟主机,用户,权限
Virtual Hosts
在单租户环境中,例如,当RabbitMQ在产品环境中只致力于为某单个系统服务时,使用默认虚拟主机 (/)是非常好的.
在多租户环境中,为每个租户/环境使用单独的虚拟主机,如:project1_developmentproject1_productionproject2_developmentproject2_production, 等等.
用户
在产品环境中,删除默认用户(guest). 默认情况下,默认用户只能通过本地来连接, 因为它有众所周的凭证.为了不启用远程连接,可考虑使用带有administrative权限和生成密码的独立用户来代替
强烈建议在每个程序中使用单独的用户,例如,如果你有一个mobile app, 一个Web app, 和一个数据聚合系统, 你最好有3个独立的用户. 这会使许多事情变得更简单:
  • 使 client 连接与程序相关联
  • 使用细粒度的权限
  • 凭据翻滚(如. 周期性地或遭到破坏的情况下)
如果有许多相同应用程序的实例,有一个更好安全性权衡(每一个实例的凭据)和方便的配置(共享一些或所有实例之间的一组凭据)。物联网的应用涉及很多客户执行相同或相似的功能,有固定的IP地址,它可以使用X509证书或源IP地址范围验证。
资源限制
当消费者跟不上的时候,RabbitMQ 使用Resource-driven alarms (资源驱动报警)来压制(throttle)发布者. 在进入生产之前评估资源限制配置是重要的。
内存
默认情况下, RabbitMQ会使用可用RAM的40%. 这专门针对于那些运行RabbitMQ的节点,通常情况下,提高此限制是合理的. 然而,应注意的是,操作系统和文件系统的缓存也需要内存来运行。如果不这样做,会由于操作系统交换导致严重的吞吐量下降,甚至导致操作系统会终止RabbitMQ过程的运行。
下面是一些基本的指导方针,用于确定推荐的RAM limit:
  • 至少有128 MB
  • 当RAM达到4GB时,可配置为75%的RAM限制
  • 当RAM达到4GB-8GB时,可配置为80%的RAM限制
  • 当RAM达到8GB-16GB时,可配置为85%的RAM限制
  • 当RAM大于16GB时,可配置为90%的RAM限制
高于0.9的值是很危险的,不推荐配置
可用磁盘空间
必要的可用磁盘空间可防止disk space alarms.(磁盘空间报警) .默认情况下,RabbitMQ始终需要 50 MiB的可用磁盘空间.在大多数Linux发行者,根据开发者的经验,可将放置到小分区的/var 目录下. 然而,对于产品环境来说,这不是一个推荐值, 因为它们可能明显的更高的RAM 限制. 下面是一些基本的指导方针,如何确定有多少空闲磁盘空间是推荐的:
  • 至少有2 GB
  • 当限制为1到8GB的RAM时,可配置为RAM限制的50%
  • 当限制为8到32GB的RAM时,可配置为RAM限制的40%
  • 当限制超过32GB的RAM时,可配置为RAM限制的30%
rabbit.disk_free_limit 配置可通过 {mem_relative, N}来完成,使其相对于RAM限制的百分比来计算. 例如, 使用{mem_relative, 0.5} 设为50%, {mem_relative, 0.25}设为25%等等.
打开文件句柄限制
操作系统限制了并发打开的文件句柄的最大数量,其中包括网络套接字。确保您的限制设置得足够高,以允许预期数量的并发连接和队列。
对于有效RabbitMQ用户,确保你的环境允许至少50K的打开文件描述符,包括开发环境。
作为经验法则,并发连接数的95%乘以2再加上队列的总数可以计算出打开文件句柄限制( multiple the 95th percentile number of concurrent connections by 2 and add total number of queues to calculate recommended open file handle limit). 值高于500K也是恰当地,它不会消耗太多的硬件资源,因此建议在生产环境中设置. 查看Networking guide 来了解更多信息.
安全注意事项
用户和权限
查看vhosts, users, 和 证书章节.
Erlang Cookie
在Linux 和BSD 系统中, 有必要限制只有运行RabbitMQ和rabbitmqctl工具的用户才能访问Erlang cookie.
TLS
如果有可能,我们建议使用LS connections, 至少要加密通信. Peer验证(身份验证)也被推荐 . 开发和QA 环境可使用self-signed TLS certificates. 当RabbitMQ和所有程序运行在可信网络或隔离(使用像VMware NSX技术)环境中时,自签名证书可以应用于产品环境.
虽然RabbitMQ试图提供一个默认的安全TLS 配置 (如.SSLv3是禁用的), 我们推荐评估TLS 版本和密码套件. 请参考TLS guide 了解更多信息.
网络配置
产品环境需要调整网络配置.请参考 Networking Guide 来了解细节.
自动连接恢复
某些client libraries, 例如 Java.NET, 和 Ruby, 在网络失败后,支持自动连接恢复.如果client提供了这种功能,建议使用它来代替你自己的恢复机制.
集群化考虑
集群大小
当确定集群大小时,需要重点考虑下面的几个因素:
  • 希望的吞吐量
  • 希望的复制( mirrors的数目)
  • 数据局部性
因为客户端可以连接到任何节点,RabbitMQ可能需要进行集群间消息路由和内部操作。尝试使消费者和生产者连接到同一个节点,如果可能的话:这将减少节点间的流量。 使消费者连接到持有队列的master上(可使用HTTP API进行推断),也是有帮助的.当考虑到数据局部性时,总的集群吞吐量可以达到不平凡的量
对于大多数环境中,镜像超过一半的群集节点是足够的。建议使用一个奇数的节点(3,5,等等)的集群。
分区处理策略
在用于产品环境之前,挑选partition handling strategy 是很重要的. 有疑问时,使用theautoheal策略。
posted @ 2016-07-30 16:47 胡小军 阅读(1644) | 评论 (0)编辑 收藏
原文:http://www.rabbitmq.com/memory.html
RabbitMQ服务器在启动时以及abbitmqctl set_vm_memory_high_watermark fraction 执行时,会检查计算机的RAM总大小. 默认情况下下, 当 RabbitMQ server 的使用量超过RAM的40% ,它就会发出内存警报,并阻塞所有连接. 一旦内存警报清除 (如,服务器将消息转存于磁盘,或者将消息投递给clients),服务又地恢复.
默认内存阀值设置为已安装RAM的40%. 注意这并不会阻止RabbitMQ server使用内存量超过40%, 它只是为了压制发布者. Erlang的垃圾回收器最坏情况下,可使用配置内存的2倍(默认情况下t, RAMr的80%). 因此强制建议开启OS swap或page files .
32位架构倾向于每一个进程有2GB的内存限制. 64位架构的一般实现(i.e. AMD64 和 Intel EM64T) 只允许每个进程为256TB. 64-位 Windows 限制为8TB. 但是,请注意,即使是64位操作系统下,一个32位的过程往往只有一个2GB的最大地址空间。
配置内存阀值
内存阀值可通过编辑configuration file来配置.下面的例子将阀值设为默认值0.4:
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
默认值0.4 代表的是已安装RAM的 40% , 有时候还更小.如:在 32位平台中,如果你安装有4GB RAM , 4GB 的40% 是 1.6GB, 但是 32-位 Windows 正常情况下限制进程为2GB,因此实际阀值是2GB的40% (即820MB).
另外, 内存阀值也可以设置为绝对值. 下面的例子将阀值设为了1073741824 字节 (1024 MB):
[{rabbit, [{vm_memory_high_watermark, {absolute, 1073741824}}]}].
同例, 也可使用内存单位:
[{rabbit, [{vm_memory_high_watermark, {absolute, "1024MiB"}}]}].
如果绝对上限大于了安装的RAM可用的虚拟地址空间, 阀值上限会略小.
当RabbitMQ服务器启动时,内存限制将追加到RABBITMQ_NODENAME.log 文件中:
=INFO REPORT==== 29-Oct-2009::15:43:27 === Memory limit set to 2048MB.
内存限制也可以使用rabbitmqctl status命令查询
其阀值也可以在broker运行时,通过rabbitmqctl set_vm_memory_high_watermark fraction 命令或 rabbitmqctl set_vm_memory_high_watermark absolute memory_limit 命令修改. 内存单位也可以在命令中使用. 此命令会在broker重启后生效. 当执行此命令时,内存限制可能会改变热插拔RAM,而不会发生报警,这是因为需要全部数量的系统RAM.
禁止所有发布
其值为0时,会立即触发报警并禁用所有发布 (当需要禁用全局发布时,这可能是有用的); use rabbitmqctl set_vm_memory_high_watermark 0.
限制的地址空间
当在64位操作系统中运行32位 Erlang VM时,(or a 32 bit OS with PAE), 可用地址内存是受限制的. 服务器探测到后会记录像下边的日志消息:
=WARNING REPORT==== 19-Dec-2013::11:27:13 === Only 2048MB of 12037MB memory usable due to limited address space. Crashes due to memory exhaustion are possible - see http://www.rabbitmq.com/memory.html#address-space
内存报警系统是不完美的.虽然停止发布通常会防止任何进一步的内存使用,但可能有其他东西继续增加内存使用。通常情况下,当这种情况发生时,物理内存耗尽,操作系统将开始交换。但是当运行一个有限的地址空间,超过限制的运行会导致虚拟机崩溃。
因此强制建议在在64位操作系统上运行64位的Erlang VM.
配置分页阈值
在broker达到最高水位阻塞发布者之前,它会尝试将队列内容分页输出到磁盘上来释放内存. 持久化和瞬时消息都会分页输出 (已经在磁盘上的持久化消息会被赶出内存).
默认情况下,在达最高水位的50%时,就会发生这种情况. (即,默认最高水位为0.4, 这会在内存使用达到20%时就会发生). 要修改此值,可修改vm_memory_high_watermark_paging_ratio 配置的0.5默认值. 例如:
[{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.4}]}].
上面的配置表示在内存使用达到30%时,就会启动,40%的时候会阻塞发布者.
也可以将vm_memory_high_watermark_paging_ratio 值设为大于1.0的值.在这种情况下,队列不会把它的内容分页到磁盘上.如果这引起了内存报警关闭,那么生产者会如上面预期的一样被阻塞.
未确认的平台
如果RabbitMQ服务器不能识别你的系统,它将在RABBITMQ_NODENAME.log 文件中追加警告.
然后它会假设安装了超过了1GB的RAM:
=WARNING REPORT==== 29-Oct-2009::17:23:44 === Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.
在这种情况下,vm_memory_high_watermark 配置值假设为1GB RAM. 在 vm_memory_high_watermark 默认设为 0.4的情况下, RabbitMQ的内存阀值设为了410MB, 因此当RabbitMQ使用了多于410M内存时,它会阻塞生产者.因此当RabbitMQ不能识别你的平台时,如果你实际有8GB RAM,并且你想让RabbitMQ内存使用量超过3GB阻塞生产者,你可以设置vm_memory_high_watermark为3.
推荐RAM 水位设置,可参考Production Checklist.

posted @ 2016-07-30 15:05 胡小军 阅读(7937) | 评论 (0)编辑 收藏
鸟欲高飞先振翅,人求上进先读书。本文是原书的第9章 线程的监控及其日常工作中如何分析里的9.3.3节常见的内存溢出的三种情况。
3. 常见的内存溢出的三种情况:
1)JVM Heap(堆)溢出:java.lang.OutOfMemoryError: Java heap space
JVM在启动的时候会自动设置JVM Heap的值, 可以利用JVM提供的-Xmn -Xms -Xmx等选项可进行设置。Heap的大小是Young Generation 和Tenured Generaion 之和。在JVM中如果98%的时间是用于GC,且可用的Heap size 不足2%的时候将抛出此异常信息。
解决方法:手动设置JVM Heap(堆)的大小。
2)PermGen space溢出: java.lang.OutOfMemoryError: PermGen space
PermGen space的全称是Permanent Generation space,是指内存的永久保存区域。为什么会内存溢出,这是由于这块内存主要是被JVM存放Class和Meta信息的,Class在被Load的时候被放入PermGen space区域,它和存放Instance的Heap区域不同,sun的 GC不会在主程序运行期对PermGen space进行清理,所以如果你的APP会载入很多CLASS的话,就很可能出现PermGen space溢出。一般发生在程序的启动阶段。
解决方法: 通过-XX:PermSize和-XX:MaxPermSize设置永久代大小即可。
3)栈溢出: java.lang.StackOverflowError : Thread Stack space
栈溢出了,JVM依然是采用栈式的虚拟机,这个和C和Pascal都是一样的。函数的调用过程都体现在堆栈和退栈上了。调用构造函数的 “层”太多了,以致于把栈区溢出了。 通常来讲,一般栈区远远小于堆区的,因为函数调用过程往往不会多于上千层,而即便每个函数调用需要 1K的空间(这个大约相当于在一个C函数内声明了256个int类型的变量),那么栈区也不过是需要1MB的空间。通常栈的大小是1-2MB的。通俗一点讲就是单线程的程序需要的内存太大了。 通常递归也不要递归的层次过多,很容易溢出。
解决方法:1:修改程序。2:通过 -Xss: 来设置每个线程的Stack大小即可。
4. 所以Server容器启动的时候我们经常关心和设置JVM的几个参数如下(详细的JVM参数请参看附录三):
-Xms:java Heap初始大小, 默认是物理内存的1/64。
-Xmx:ava Heap最大值,不可超过物理内存。
-Xmn:young generation的heap大小,一般设置为Xmx的3、4分之一 。增大年轻代后,将会减小年老代大小,可以根据监控合理设置。
-Xss:每个线程的Stack大小,而最佳值应该是128K,默认值好像是512k。
-XX:PermSize:设定内存的永久保存区初始大小,缺省值为64M。
-XX:MaxPermSize:设定内存的永久保存区最大大小,缺省值为64M。
-XX:SurvivorRatio:Eden区与Survivor区的大小比值,设置为8,则两个Survivor区与一个Eden区的比值为2:8,一个Survivor区占整个年轻代的1/10
-XX:+UseParallelGC:F年轻代使用并发收集,而年老代仍旧使用串行收集.
-XX:+UseParNewGC:设置年轻代为并行收集,JDK5.0以上,JVM会根据系统配置自行设置,所无需再设置此值。
-XX:ParallelGCThreads:并行收集器的线程数,值最好配置与处理器数目相等 同样适用于CMS。
-XX:+UseParallelOldGC:年老代垃圾收集方式为并行收集(Parallel Compacting)。
-XX:MaxGCPauseMillis:每次年轻代垃圾回收的最长时间(最大暂停时间),如果无法满足此时间,JVM会自动调整年轻代大小,以满足此值。
-XX:+ScavengeBeforeFullGC:Full GC前调用YGC,默认是true。
实例如:JAVA_OPTS=”-Xms4g -Xmx4g -Xmn1024m -XX:PermSize=320M -XX:MaxPermSize=320m -XX:SurvivorRatio=6″
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《 Java并发编程从入门到精通》 常见的内存溢出的三种情况
posted @ 2016-07-26 23:05 胡小军 阅读(323) | 评论 (0)编辑 收藏

原文:http://www.oracle.com/technetwork/articles/java/jsr356-1937161.html

学习如何在你的应用程序中集成WebSockets.

Published April 2013

对于许多基于客户端-服务器程序来说,老的HTTP 请求-响应模型已经有它的局限性. 信息必须通过多次请求才能将其从服务端传送到客户端.

过去许多的黑客使用某些技术来绕过这个问题,例如:长轮询(long polling)、基于 HTTP 长连接的服务器推技术(Comet)

然而,基于标准的、双向的、客户端和服务器之间全双工的信道需求再不断增加。

在2011年, IETF发布了标准WebSocket协议-RFC 6455. 从那时起,大多数Web浏览器都实现了支持WebSocket协议的客户端APIs.同时,许多Java 包也开始实现了WebSocket协议.

WebSocket协议利用HTTP升级技术来将HTTP连接升级到WebSocket. 一旦升级后,连接就有了在两个方向上相互独立(全双式)发送消息(数据桢)的能力. 

不需要headers 或cookies,这大大降低了所需的带宽通常,WebSockets来周期性地发送小消息 (例如,几个字节). 

额外的headers常常会使开销大于有效负载(payload)。

JSR 356

JSR 356, WebSocket的Java API, 明确规定了API,当Java开发者需要在应用程序中集成WebSocket时,就可以使用此API—服务端和客户端均可. 每个声明兼容JSR 356的WebSocket协议,都必须实现这个API. 

因此,开发人员可以自己编写独立于底层WebSocket实现的WebSocket应用。这是一个巨大的好处,因为它可以防止供应商锁定,并允许更多的选择、自由的库、应用程序服务器。

JSR 356是即将到来的java EE 7标准的一部分,因此,所有与Java EE 7兼容的应用服务器都有JSR 365标准WebSocket的实现.一旦建立,WebSocket客户端和服务器节点已经是对称的了。客户端API与服务器端API的区别是很小的,JSR 356定义的Java client API只是Java EE7完整API的子集.

客户段-服务器端程序使用WebSockets,通常会包含一个服务器组件和多个客户端组件, 如图1所示:

Figure 1

图1

在这个例子中,server application 是通过Java编写的,WebSocket 协议细节是由包含在Java EE 7容器中JSR 356 实现来处理的.

JavaFX 客户端可依赖任何与JSR 356兼容的客户端实现来处理WebSocket协议问题. 

其它客户端(如,iOS 客户端和HTML5客户端)可使用其它 (非Java)与RFC6455兼容的实现来与server application通信.

编程模型

JSR 356定义的专家小组,希望支持Java EE开发人员常用的模式和技术。因此,JSR 356使用了注释和注入。

一般来说,支持两种编程模型:

  • 注解驱动(annotation-driven). 通过使用注解POJOs, 开发者可与WebSocket生命周期事件交互.
  • 接口驱动(interface-driven). 开发者可实现Endpoint接口和与生命周期交互的方法.

生命周期事件

典型的WebSocket 交互生命周期如下:

  • 一端 (客户端) 通过发送HTTP握手请求来初始化连接.
  • 其它端(服务端) 回复握手响应.
  • 建立连接.从现在开始,连接是完全对称的.
  • 两端都可发送和接收消息.
  • 其中一端关闭连接.

大部分WebSocket生命周期事件都与Java方法对应,不管是 annotation-driven 还是interface-driven.

Annotation-Driven 方式

接受WebSocket请求的端点可以是以 @ServerEndpoint 注解的POJO. 

此注解告知容器,此类应该被认为是WebSocket端点. 

必须的value 元素指定了WebSocket端点的路径.

考虑下面的代码片断:

@ServerEndpoint("/hello")  public class MyEndpoint { } 

此代码将会以相对路径hello来发布一个端点.在后续方法调用中,此路径可携带路径参数,如: /hello/{userid}是一个有效路径,在这里{userid} 的值,可在生命周期方法使用@PathParam 注解获取.

在GlassFish中,如果你的应用程序是用上下文mycontextroot 部署的,且在localhost的8080端口上监听, WebSocket可通过使用ws://localhost:8080/mycontextroot/hello来访问.

初始化WebSocket连接的端点可以是以 @ClientEndpoint 注解的POJO.@ClientEndpoint 和 @ServerEndpoint的主要区别是ClientEndpoint 不接受路径路值元素,因为它监听进来的请求。

@ClientEndpoint  public class MyClientEndpoint {} 

Java中使用注解驱动POJO方式来初始化WebSocket连接,可通过如下代码来完成:

javax.websocket.WebSocketContainer container = javax.websocket.ContainerProvider.getWebSocketContainer();  container.conntectToServer(MyClientEndpoint.class, new URI("ws://localhost:8080/tictactoeserver/endpoint")); 

此后,以 @ServerEndpoint 或@ClientEndpoint 注解的类都称为注解端点.

一旦建立了WebSocket连接 ,就会创建 Session,并且会调用注解端点中以@OnOpen注解的方法. 

此方法包含了几个参数:

  • javax.websocket.Session 参数, 代表创建的Session
  • EndpointConfig 实例包含了关于端点配置的信息
  • 0个或多个以 @PathParam注解的字符串参数,指的是端点路径的path参数

下面的方法实现了当打开WebSocket时,将会打印session的标识符:

@OnOpen public void myOnOpen (Session session) {    System.out.println ("WebSocket opened: "+session.getId()); } 

Session实例只要WebSocket未关闭就会一直有效Session类中包含了许多有意思的方法,以允许开发者获取更多关于的信息

同时,Session 也包含了应用程序特有的数据钩子,即通过getUserProperties() 方法来返回 Map<String, Object>

这允许开发者可以使用session-和需要在多个方法调用间共享的应用程序特定信息来填充Session实例.

i当WebSocket端收到消息时,将会调用以@OnMessage注解的方法.以@OnMessage 注解的方法可包含下面的参数:

  • javax.websocket.Session 参数.
  • 0个或多个以 @PathParam注解的字符串参数,指的是端点路径的path参数
  • 消息本身. 下面有可能消息类型描述.

当其它端发送了文本消息时,下面的代码片断会打印消息内容:

@OnMessage public void myOnMessage (String txt) {    System.out.println ("WebSocket received message: "+txt); }  

如果以@OnMessage i注解的方法返回值不是void, WebSocket实现会将返回值发送给其它端点.下面的代码片断会将收到的文本消息以首字母大写的形式发回给发送者:

@OnMessage public String myOnMessage (String txt) {    return txt.toUpperCase(); }  

另一种通过WebSocket连接来发送消息的代码如下:

RemoteEndpoint.Basic other = session.getBasicRemote(); other.sendText ("Hello, world"); 

在这种方式中,我们从Session 对象开始,它可以从生命周期回调方法中获取(例如,以 @OnOpen注解的方法).session实例上getBasicRemote() 方法返回的是WebSocket其它部分的代表RemoteEndpointRemoteEndpoint 实例可用于发送文本或其它类型的消息,后面有描述.

当关闭WebSocket连接时,将会调用@OnClose 注解的方法。此方法接受下面的参数:

  • javax.websocket.Session 参数. 注意,一旦WebSocket真正关闭了,此参数就不能被使用了,这通常发生在@OnClose 注解方法返回之后.
  • javax.websocket.CloseReason 参数,用于描述关闭WebSocket的原因,如:正常关闭,协议错误,服务过载等等.
  • 0个或多个以 @PathParam注解的字符串参数,指的是端点路径的path参数

下面的代码片段打印了WebSocket关闭的原因:

@OnClose public void myOnClose (CloseReason reason) {    System.out.prinlnt ("Closing a WebSocket due to "+reason.getReasonPhrase()); } 

完整情况下,这里还有一个生命周期注解:如果收到了错误,将会调用 @OnError 注解的方法。

Interface-Driven 方式

annotation-driven 方式允许我们注解一个Java类,以及使用生命周期注解来注解方法. 

使用interface-driven方式,开发者可继承javax.websocket.Endpoint 并覆盖其中的onOpenonClose, 以及onError 方法:

public class myOwnEndpoint extends javax.websocket.Endpoint {    public void onOpen(Session session, EndpointConfig config) {...}    public void onClose(Session session, CloseReason closeReason) {...}    public void onError (Session session, Throwable throwable) {...} } 

为了拦截消息,需要在onOpen实现中注册一个javax.websocket.MessageHandler:

public void onOpen (Session session, EndpointConfig config) {    session.addMessageHandler (new MessageHandler() {...}); } 

MessageHandler 接口有两个子接口: MessageHandler.Partial和 MessageHandler.Whole

MessageHandler.Partial 接口应该用于当开发者想要收到部分消息通知的时候,MessageHandler.Whole的实现应该用于整个消息到达通知

下面的代码片断会监听进来的文件消息,并将文本信息转换为大小版本后发回给其它端点:

public void onOpen (Session session, EndpointConfig config) {    final RemoteEndpoint.Basic remote = session.getBasicRemote();    session.addMessageHandler (new MessageHandler.Whole<String>() {       public void onMessage(String text) {                  try {                      remote.sendString(text.toUpperCase());                  } catch (IOException ioe) {                      // handle send failure here                  }              }     }); } 

消息类型,编码器,解码器

WebSocket的JavaAPI非常强大,因为它允许发送任或接收任何对象作为WebSocket消息.

基本上,有三种不同类型的消息:

  • 基于文本的消息
  • 二进制消息
  • Pong 消息,它是WebSocket连接自身

当使用interface-driven模式,每个session最多只能为这三个不同类型的消息注册一个MessageHandler.

当使用annotation-driven模式,针对不同类型的消息,只允许出现一个@onMessage 注解方法. 在注解方法中,消息内容中允许的参数依赖于消息类型。

Javadoc for the @OnMessage annotation 明确指定了消息类型上允许出现的消息参数:

  • "如果方法用于处理文本消息: 

    • String 用于接收整个消息
    • Java 原型或等价的类用于接收整个消息并将其转换为此类型
    • String 和 boolean 对用于部分接收消息
    • Reader 用于以阻塞流的方式接收整个消息
    • 端点的任何对象参数存在文本解码器 (Decoder.Text 或 Decoder.TextStream).
  • 如果方法用于处理二进制消息: 

  • 如果方法是用于处理pong消息: 

任何Java对象使用编码器都可以编码为基于文本或二进制的消息.这种基于文本或二进制的消息将转输到其它端点,在其它端点,它可以解码成Java对象-或者被另外的WebSocket 包解释. 

通常情况下,XML或JSON用于来传送WebSocket消息, 编码/解码然后会将Java对象编组成XML或JSON并在另一端解码为Java对象.

encoder是以javax.websocket.Encoder 接口的实现来定义,decoder是以javax.websocket.Decoder 接口的实现来定义的. 

有时,端点实例必须知道encoders和decoders是什么.使用annotation-driven方式, 可向@ClientEndpoint 和 @ServerEndpoint l注解中的encode和decoder元素传递 encoders和decoders的列表。

Listing 1 中的代码展示了如何注册一个 MessageEncoder 类(它定义了MyJavaObject实例到文本消息的转换). MessageDecoder 是以相反的转换来注册的.

@ServerEndpoint(value="/endpoint", encoders = MessageEncoder.class, decoders= MessageDecoder.class) public class MyEndpoint { ... }  class MessageEncoder implements Encoder.Text<MyJavaObject> {    @override    public String encode(MyJavaObject obj) throws EncodingException {       ...    } }  class MessageDecoder implements Decoder.Text<MyJavaObject> {    @override     public MyJavaObject decode (String src) throws DecodeException {       ...    }     @override     public boolean willDecode (String src) {       // return true if we want to decode this String into a MyJavaObject instance    } } 

Listing 1

Encoder 接口有多个子接口:

  • Encoder.Text 用于将Java对象转成文本消息
  • Encoder.TextStream 用于将Java对象添加到字符流中
  • Encoder.Binary 用于将Java对象转换成二进制消息
  • Encoder.BinaryStream 用于将Java对象添加到二进制流中

类似地,Decoder 接口有四个子接口:

  • Decoder.Text 用于将文本消息转换成Java对象
  • Decoder.TextStream 用于从字符流中读取Java对象
  • Decoder.Binary 用于将二进制消息转换成Java对象
  • Decoder.BinaryStream 用于从二进制流中读取Java对象

结论

WebSocket Java API为Java开发者提供了标准API来集成IETF WebSocket标准.通过这样做,Web 客户端或本地客户端可使用任何WebSocket实现来轻易地与Java后端通信。

Java Api是高度可配置的,灵活的,它允许java开发者使用他们喜欢的模式。

也可参考

posted @ 2016-07-24 01:35 胡小军 阅读(2736) | 评论 (0)编辑 收藏
     摘要: Servlets定义为JSR 340,可以下载完整规范.servlet是托管于servlet容器中的web组件,并可生成动态内容.web clients可使用请求/响应模式同servlet交互. servlet容器负责处理servlet的生命周期事件,接收请求和发送响应,以及执行其它必要的编码/解码部分.WebServlet它是在POJO上使用@WebServlet注...  阅读全文
posted @ 2016-07-24 01:32 胡小军 阅读(851) | 评论 (0)编辑 收藏
在本章中,我们将涵盖下面的主题:
  1. 监控RabbitMQ的行为
  2. 使用RabbitMQ进行故障诊断
  3. 跟踪RabbitMQ当前活动
  4. 调试RabbitMQ的消息
  5. 当RabbitMQ重启失败时该做什么
  6. 使用Wireshark来调试
介绍
每当我们开发一个应用程序的时候,一种常见的做法是开发一个诊断基础设施. 这可以基于日志文件,SNMP 转移以及其它手段.
RabbitMQ提供了标准日志文件和内建消息故障诊断解决方案.
在前面三个食谱中,我们将看到如何来使用这三种特性.
某些时候,会存在一些阻止RabbitMQ启动的问题.在这种情况下,需要强制解决服务器上的问题并重启服务器。这一点,我们将在RabbitMQ重启失败时该做什么食谱中讲解。

然而,调试消息也是应用程序开发的一部分.在这种情况下,我们需要知道RabbitMQ和客户之间交换的准确信息.可以使用一个内建代理工具(Java client API的一部分) (查看调试RabbitMQ的消息食谱)或者使用高级网络监视工具来检查网络状况,正如我们将在使用Wireshark来调试食谱中看到的一样。
监控RabbitMQ行为
为了检查RabbitMQ的正常行为,有一个监控工具是很有用的,特别是在处理集群的时候.
有许多不同的工具,商业的或免费的, 
这有助于让它们受控于分布式系统如Nagios、Zabbix之下.
在本食谱中,我们将展示如何配置Ganglia  RabbitMQ 插件(http://sourceforge.net/apps/trac/ganglia/wiki/ganglia_quick_start).
准备
为了运行此食谱,你需要启用管理插件来配置RabbitMQ。同时,你也需要安装和配置Ganglia. 在本食谱中,我们使用的版本是3.6.0.
如何做
为了使用Ganglia监控图形来查看RabbitMQ统计数据,你需要执行下面的步骤:
1. 在你的Linux发行版上,使用yum或apt-get安装和配置Ganglia.你需要下面的包:
‰ ganglia-gmetad
‰ ganglia-gmond
‰ ganglia-gmond-python
‰ ganglia-web
‰ ganglia
2. 从https://github.com/ganglia/gmond_python_modules/blob/master/rabbit/python_modules/rabbitmq.py拷贝Python Ganglia监控插件到/usr/lib64/ganglia/python_modules.
3. 从https://github.com/ganglia/gmond_python_modules/blob/master/rabbit/conf.d/rabbitmq.pyconf拷贝Python Ganglia 配置文件到/etc/ganglia/conf.d 

4. 检查配置文件中的参数正确性.实际上, 你可能需要通过只留下默认的虚拟主机来解决以下条目:
paramvhost {
value = "/"
}
5.如果它已经运行了,使用下面的命令来重启gmond:
service gmond restart

如何工作
通过这个食谱, 你可以从Ganglia环境中来监控RabbitMQ.
TIP
对于基本的故障诊断,你可以通过日志文件来进行,默认情况下,日志文件存储于/var/log/rabbitmq.

一旦它运行起来,你就可以从同一个web界面中,看到系统信息和RabbitMQ节点、队列的相关信息,如下面的截图所示:


更多
Ganglia是集群监测一种广泛的解决方案,但不是唯一的一个。其它的解决方案还包括Nagios (www.nagios.org), Zabbix (www.zabbix.com), 以及Puppet (puppetlabs.com).

通过RabbitMQ排除故障
正如前面食谱提到的, 我们可以通过一种便利的方式,即以日志文件的方式来监控RabbitMQ行为.
也可以使用RabbitMQ自身来访问同种信息, 通过向AMQP client通知broker的活动.


准备
要运行本食谱,我们需要运行RabbitMQ以及Java client library.
如何做
为了消费日志消息,你可在Consumer.java中执行主方法.你可在Chapter12/Recipe02/Java/src/rmqexample中找到源友. 下面,我们将高亮主要步骤:
1. 创建一个临时匿名队列,并将其绑定到AMQP log交换器:
String tmpQueue = channel.queueDeclare().getQueue();
channel.queueBind(tmpQueue,"amq.rabbitmq.log","#");
2. 在消费者回调(ActualConsumer.java)中,可以检索消息和每个消息的路由键,并将它们打印出来:
String routingKey = envelope.getRoutingKey();
String message = new String(body);
System.out.println(routingKey + ": " + message);
3. 此时,你可以在broker上执行任何RabbitMQ操作,并且你会看到日志输出到标准输出上。


如何工作
RabbitMQ log交换器amq.rabbitmq.log是一个topic交换器,RabbitMQ自身用来发布其日志消息.
在我们的示例代码中,我们使用#通配符来消费所有topics的消息.
例如,通过运行另一份代码,我们可运行同一个broker的两个连接,然后中断它,我们将下面的输出:
info: accepting AMQP connection <0.2737.0> (127.0.0.1:54698 ->127.0.0.1:5672)

info: accepting AMQP connection <0.2753.0> (127.0.0.1:54699 ->127.0.0.1:5672)
warning: closing AMQP connection <0.2737.0> (127.0.0.1:54698 ->127.0.0.1:5672):
connection_closed_abruptly
warning: closing AMQP connection <0.2753.0> (127.0.0.1:54699 ->127.0.0.1:5672):
connection_closed_abruptly
值得注意的是,在这里报告的信息,信息和警告不是自己的一部分,但是我们在每个开始打印的路由键消息(前一步骤的步骤2)。

TIP
如果我们只想收到警告和错误消息,我们可以订阅相应的主题.

更多
默认情况下,日志交换器-amq.rabbitmq.log被创建在虚拟主机/中。通过在RabbitMQ配置文件中定义default_vhost从而设置其位置是可能的.

追踪RabbitMQ当前活动
有时,为了分析和调试未知的应用程序行为,我们需要追踪RabbitMQ接收和分发的所有消息.
RabbitMQ提供追踪这些消息的所谓的流水追踪工具。
追踪活动可以在运行时启用和禁用,并且它应该只用于调试,因为它规定了broker活动的开销.

准备
要运行此食谱,我们需要运行此食谱,我们需要运行RabbitMQ和Java client library.
如何做
RabbitMQ使用与日志消息中同样的机制来发送追踪消息;因此,示例代码与前一个食谱中的非常相似.

为了消费追踪消息,你可以执行Consumer.java中的主方法,你可在Chapter12/Recipe02/Java/src/rmqexample目录中找到源码.这里,我们高亮了主要步骤:
1. 创建一个临时队列,并将其绑定到AMQP log交换器上:
String tmpQueue = channel.queueDeclare().getQueue();
channel.queueBind(tmpQueue,"amq.rabbitmq.trace","#");
2.在消费者回调(ActualConsumer.java)中, 可以获取每个消息,并使用下面的代码打印出来:
String routingKey = envelope.getRoutingKey();
String message = new String(body);
Map<String,Object> headers = properties.getHeaders();
LongStringexchange_name = (LongString)
headers.get("exchange_name");
LongString node = (LongString) headers.get("node");
...
3. 可从root用户(Linux)或在RabbitMQ命令控制台(Windows)来激活firehose,其激活命令如下:
rabbimqctl trace_on
4. 此时,你可以向broker启动生产和发送消息,然后你们会在标准输出中看到追踪信息.
5. 可调用下面的命令来禁用firehose:
rabbimqctl trace_off


如何工作 
默认情况下,amq.rabbit.trace topic交换器不会接收任何消息,但一旦激活firehose后(前面步骤的步骤3),所有流经broker的消息将被按下面的规则拷贝:
1.进入broker的消息,它们是通过路由键publish.exchange-name发布的, 这里的exchange-name是消息最初发布的交换器名称.
2.离开broker的消息,它们是通过路由键deliver.queuename发布的,这里的queue-name是消息最初被消费的队列名称.
3.消息的body是从原始消息中拷贝过来的.

4.原始消息的元数据会插入到拷贝消息的header属性中. 在步骤2中,我们已经看到了,如何获取最初分发消息的交换器名称,但获取所有原始信息也是可以的,即,找到所有可有字段,并将它们插到消息属性中.firehose的官方文档链接位于:http://www.rabbitmq.com/firehose.html.

调试RabbitMQ消息
有时,通过在标准输出中记录通过broker的消息是有用的.通过RabbitMQ Java客户端提供的简单应用程序来追踪消息,也是可行的.
准备
要运行此食谱,你需要运行的RabbitMQ(运行标准端口 5672),以及RabbitMQ Java client library
如何做
RabbitMQ的Java client library中包含了一个追踪工具,你可以按下面的步骤来进行实际使用.
1. 从http://www.rabbitmq.com/java-client.html页面下载最新版本的RabbitMQ Java client library.
2. 将其解压,并进入其目录
.
3. 通过下面的命令来运行Java tracer:
./runjava.sh com.rabbitmq.tools.Tracer
4. 运行用于调试Java client,并将其连接到5673端口.对于本食谱,我们可以使用包含在Java client包中的另一个Java工具,其调用如下:
./runjava.sh com.rabbitmq.examples.PerfTest -h amqp://localhost:5673 -C 1 -D 1

如何工作
追踪工具是一个简单的AMQP代理;默认情况下,它监听5673端口,并会把所有的请求转发到默认监听5672端口的RabbitMQ broker.
所有生产或消费的消息,如同AMQP操作一样,都会记录到标准输出中.
运行前面步骤的步骤4,我们使用了包含在Java client包中另一个用于作RabbitMQ压力测试的工具.
在这里,我们只是限制了生产一个消息(-C 1) 并消费它(-D 1).
TIP
追踪工具只在Java client API中可用.

更多
可以使用下面的代码来为Java追踪程序传递更多的参数:
./runjava.sh com.rabbitmq.tools.Tracer listenPort connectHost connectPort
listenPort指的是追踪器监听的端口(默认为5673), connectHost/connectPort (默认为localhost/5672) 是用于连接并转发收到请求的主机和端口。
使用下面的命令,你可以找到所有PerfTest可用选项:
./runjava.sh com.rabbitmq.examples.PerfTest --help


也可参考
在http://www.rabbitmq.com/java-tools.html中,你可以找到Java追踪工具以及PerfTest的文档.

当RabbitMQ重启失败时,需要做什么
偶尔情况下, RabbitMQ 可能会重启失败。如果broker包含持久化数据时,这是一个严重的问题,否则,有足够的能力重设其持久化状态。
准备
要运行此食谱,你只需要一个测试RabbitMQ broker.
TIP
我们将销毁所有之前定义的数据—以避免使用生产实例.

如何做
要清空RabbitMQ, 可执行下面的简单步骤:
1. 如果RabbitMQ运行的话,停止它.
2. 定位到Mnesia 数据库目录.默认是/var/lib/rabbitmq/mnesia (Linux) or %APPDATA%\RabbitMQ\db (Windows).
3. 递归删除其目录和文件.
4. 重启RabbitMQ.

如何工作
Mnesia 数据库包含了所有运行时的RabbitMQ定义信息: 队列,交换器,用户等等.
通过删除Mnesia数据库 (或者通过重命名,这样可以在需要的时候恢复某些数据), RabbitMQ会重置到出厂默认状态,当RabbitMQ启动时,它会创建一个新的Mnesia数据库,并使用默认值进行初始化.


更多
如果broker无法在第一时间启动,有可能是某个系统目录存在权限问题:即要么是Mnesia数据库目录,要么是日志目录,要么是某些在配置文件中指定的临时的或自定义的目录.
你可以在RabbitMQ故障排除页页找到详尽的列表(http://www.rabbitmq.com/troubleshooting.html).

也可参考
在Mnesia API 文档页面(http://www.erlang.org/doc/man/mnesia.html),,你可以找到更多关于如何破解Mnesia数据库的信息.

使用Wireshark调试
在调试RabbitMQ消息食谱中,我们已经了解过了如何来追踪RabbitMQ的消息.

然而,以下情况并不总是可能的或可取的,即停止正在运行的客户端(或RabbitMQ服务器),修改它的连接端口,指向一个不同的broker;我们只想监控正在实时传递的消息,影响系统的活动应该尽可能的少。

TIP
但是,正如在前面食谱(追踪RabbitMQ当前活动)中看到的一样,激活firehose是可行的 .


Wireshark是一个免费的有能力解码AMQP消息的网络分析工具.
此工具既可用在客户段,也可以用在服务端,从而
无缝监控AMQP交通状况.

准备
要练习这个食谱,你需要运行的RabbitMQ以及RabbitMQ Java client library.

如何做
在下面的步骤中,我们将看到如何使用Wireshark来追踪AMQP消息:
1. 如果Wireshark在你的系统中尚不可用,那么需要从http://www.wireshark.org/下载和安装Wireshark . 如果可能的话,你也可以从你的发行版中进行安装,如:
yum install wireshark-gnome
2.在Linux系统中,以root用户来启动Wireshark.
3.启动从环回接口中捕获消息.
4.切换到Java client library路径下,在终端中运行下面的命令:
./runjava.sh com.rabbitmq.examples.PerfTest -C 1 -D 1
5.
停止Wireshark GUI的采集,并分析抓到的AMQP交通状况.
如何工作
使用Wireshark,可用于检测AMQP交通的退出或进行一个持有RabbitMQ服务器或客户端的服务器.
在我们的例子中,我们捕获了运行在同一台机器上服务端和客户端网络交通状况,因此连接是localhost中发生的.这也是为什么我们从环回接口中捕获交通状况的原因(前面步骤的步骤3).
反之,我们应该从网络接口中进行捕获,这种网络接口通常是eth0或其它相似的网络接口.

TIP
在Linux上,可以直接捕获localhost;但同样的操作不能应用到Windows上.在这种情况下,客户端和服务端必须位于不同的两台机器上, 并且在网络接口上(要么是物理的,要么是虚拟的)必须激活捕获, 这样它们之间才可连接.


所以,为了运行Wireshark的图形用户界面,如果RabbitMQ客户端和服务器运行在同一个节点,你需要选择环回接口,如图下面的截图所示:



TIP
在Linux上,当你安装Wireshark软件包,你通常只会有命令行界面,tshark。要安装Wireshark的GUI,你必须安装相应的软件包. 例如,在Fedora中,你需要安装wireshark-gnome包.

一旦AMQP交通已穿过环回接口,它已经被Wireshark捕获。
运行在步骤4的实验实际上在两个独立的连接中开启了一个生产者和一个消费者.
为了高亮显示,找到一个描述为Basic.PublishContent-Header的包,点击它,并选择Follow TCP stream.然后,您可以关闭显示客户端和服务器之间有效负载对话的窗口。在主窗口中,您现在可以看到在客户端和服务器之间交换的网络数据包,如下面的截图所示:



用同样的方式,你可以选择RabbitMQ server,如下面的截图所示:

在前面的两个截图中,我们已经强调了消息AMQP的有效载荷,但由于Wireshark中包括了一个非常完整的AMQP解剖器,你会发现在AMQP交通的很多细节。
更多
如果RabbitMQ配置为使用SSL,并且你想要分析加密流量,在一定条件下,通过在Wireshark配置中合理配置SSL公共/私有密钥也是可以的
可在http://wiki.wireshark.org/SSL找到更多信息.
也可参考
http://wiki.wireshark.org/AMQP,你可以找到一些关于Wireshark AMQP 解剖器的一些指南.




posted @ 2016-07-20 11:39 胡小军 阅读(3944) | 评论 (1)编辑 收藏
     摘要: 在本章节中,我们将展现一些RabbitMQ中的可用插件.然后,我们将展示如何使用现实世界中的例子来开发新插件.启用和配置STOMP插件管理RabbitMQ集群监控Shovel状态开发新插件– 使用ODBC连接关系数据库介绍多亏了插件设施,使得RabbitMQ成为了一个可扩展平台.它提供了许多通用插件,其中一些已经在前面的章节中解释过了.例如, Federation和...  阅读全文
posted @ 2016-07-20 11:30 胡小军 阅读(2569) | 评论 (1)编辑 收藏
     摘要: 在这一章中,我们将涵盖:多线程和队列系统调整改善带宽使用不同分发工具介绍这里没有标准的RabbitMQ调优指南,因为不同应用程序会采用不同方式优化.通常情况下,应用程序需要在客户端进行优化:处理器密集型应用程序可以通过为每个处理器内核运行一个线程来进行优化I/O密集型应用程序可以通过在单核上运行多个线程来隐藏隐式延迟在两种情况下,消息传递是完美的结合.为了优化网络传输速率,AMQP标准规定消息按束...  阅读全文
posted @ 2016-07-15 14:53 胡小军 阅读(9332) | 评论 (0)编辑 收藏
     摘要: 在本章中我们将覆盖涉及:镜像队列同步队列优化镜像策略在几个broker之间分发消息创建一个地理位置集群复制过滤和转发消息将高可用技术结合在一起客户端高可用性介绍RabbitMQ通过数据复制来达到高可用,当数据完整性、服务连续性是最重要的时候, 这一点与存储(如:RAID解决方案),数据库,以及所有IT基础设施解决方案是相同的。事实上,这些解决方案不仅可以避免数据丢失,也可以避免计划维护和...  阅读全文
posted @ 2016-07-02 19:11 胡小军 阅读(1903) | 评论 (0)编辑 收藏

名称

rabbitmq-service.bat — 管理RabbitMQ AMQP service

语法

rabbitmq-service.bat [command]

描述

RabbitMQ是AMQP的实现, 后者是高性能企业消息通信的新兴标准. RabbitMQ server是AMQP 中间件的健壮,可扩展实现.

运行rabbitmq-service,可允许RabbitMQ broker在NT/2000/2003/XP/Vista®环境上以服务来运行,这样就可以通过Windows® services applet来启动和停止服务.

默认情况下,服务会以本地系统帐户中认证上下文来运行。因此,有必要将Erlang cookies 和本地系统帐户进行同步(典型地,C:\WINDOWS\.erlang.cookie和帐户将用来运行 rabbitmqctl).

命令

help

显示使用信息.

install

安装service,安装后,它不会启动。如果环境变量修改了的话,随后的调用将更新服务参数.

remove

删除service.如果删除时,service正在运行,则将会自动停止。 它不会删除任何文件,后续可通过rabbitmq-server 继续操作。

start

启动service. 在此之前,service必须被正确安装

stop

停止service. 

disable

禁用service. 这等价于在服务控制面板中,将启动类型设置为禁用.

enable

启用service. 这等价于在服务控制面板中,将启动类型设置为自动.

环境变量

RABBITMQ_SERVICENAME

默认为RabbitMQ.

RABBITMQ_BASE

默认是当前用户的应用程序数据目录. 这是日志和数据目录的位置(C:\Users\Administrator\AppData\Roaming\RabbitMQ).

RABBITMQ_NODENAME

默认是rabbit. 当你想在一台机器上运行多个节点时,此配置是相当有用的, RABBITMQ_NODENAME在每个erlang-node和机器的组合中应该唯一。

参考clustering on a single machine guide 来更多细节.

RABBITMQ_NODE_IP_ADDRESS

默认情况下,RabbitMQ会绑定到所有网络接口上,如果只想绑定某个网络接口,可修改此设置。

RABBITMQ_NODE_PORT

默认为5672.

ERLANG_SERVICE_MANAGER_PATH

默认为C:\Program Files\erl5.5.5\erts-5.5.5\bin (或64位环境 中为C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin). 这是Erlang service manager的安装位置.

RABBITMQ_CONSOLE_LOG

将此变量设置为new或reuse,以将服务器控制台的输出重定向到名为SERVICENAME.debug文件中(位于安装服务的用户应用程序数据目录).在Vista下,其位置在C:\Users\AppData\username\SERVICENAME. 在Windows的前期版本中,位置在C:\Documents and Settings\username\Application Data\SERVICENAME. 

如果RABBITMQ_CONSOLE_LOG设置为new,那么每次服务启动时都会创建一个新文件。

如果RABBITMQ_CONSOLE_LOG设置为reuse,那么每次服务启动时,文件都会被覆盖.

当RABBITMQ_CONSOLE_LOG 没有设置或设置的值不是new或reuse时,默认的行为是丢弃服务器输出。


posted @ 2016-06-24 23:58 胡小军 阅读(1533) | 评论 (0)编辑 收藏

要求

要运行ftp4j library,你需要Java 运行时环境v. 1.4+.

安装

将ftp4j JAR文件添加到你应用程序的classpath中, 然后你就可以自动启用ftp4j类的使用了.

Javadocs

可参考ftp4j javadocs.

快速入门

包中的主类是FTPClient (it.sauronsoftware.ftp4j.FTPClient).

创建一个FTPClient 实例:

FTPClient client = new FTPClient();

连接远程FTP服务:

client.connect("ftp.host.com");

如果服务端口不是标准的21端口 (或 FTPS的990端口),需要使用port参数进行指定:

client.connect("ftp.host.com", port);

如:

client.connect("ftp.host.com", 8021);

然后进行登录流程:

client.login("carlo", "mypassword");

如果没有抛出任何异常的话,那么你就通过远程服务器的认证了.否则,如果验证失败,你将会收到it.sauronsoftware.ftp4j.FTPException异常.

匿名认证,如果被连接服务认可的话, 可通过发送用户名"anonymous" 和任意密码来完成(注意,有些服务器需要e-mail地址来代替密码):

client.login("anonymous", "ftp4j");

使用远程FTP服务来做任何事情,然后再断开连接:

client.disconnect(true);

这会向远程器发送FTP QUIT命令, 以进行一个合法断开流程.如果你只是想中断连接而不想向服务器发送任何通知,那么可以使用:

client.disconnect(false);

使用代理进行连接

客户端通过连接器(一个继承自it.sauronsoftware.ftp4j.FTPConnector的对象)来连接服务器, 它将返回一个已经打开的连接(一个实现了it.sauronsoftware.ftp4j.FTPConnection 接口的对象).这也是为什么ftp4j 可以支持大量代理的原因.

在连接远程服务器前,客户端实例可以使用setConnector() 方法来设置连接器:

client.setConnector(anyConnectorYouWant);

如果没有设置连接器的话,会使用默认的连接器DirectConnector (it.sauronsoftware.ftp4j.connectors.DirectConnector), 它实现了对远程服务器的直接连接,且不会使用代理。

如果你只能通过代理来连接远程服务器, ftp4j包可以让你在下面的连接器中进行选择:

  • HTTPTunnelConnector (it.sauronsoftware.ftp4j.connectors.HTTPTunnelConnector)
    它可以通过HTTP代理来进行连接,并支持CONNECT方法.
  • FTPProxyConnector (it.sauronsoftware.ftp4j.connectors.FTPProxyConnector)
    它可以通过FTP代理进行连接,支持SITE和OPEN命令风格的苛刻远程主机连接.其它类型的FTP代理,需要username@remotehost 认证,且可以不使用连接器,因为它们对于客户端来说是透明的。
  • SOCKS4Connector (it.sauronsoftware.ftp4j.connectors.SOCKS4Connector)
    它可以通过SOCKS 4/4a代理进行连接.
  • SOCKS5Connector (it.sauronsoftware.ftp4j.connectors.SOCKS5Connector)
    它可以通过SOCKS 5代理进行连接.

因为ftp4j的连接器架构设计为可插拔的,因此你可以继承FTPConnector 抽象类来构建自己的连接器.

FTPS/FTPES 安全连接

ftp4j包支持FTPS (隐式基于 TLS/SSL的FTP) 和FTPES (显示基于TLS/SSL的FTP).

setSecurity() 方法可用来打开这种特性:

client.setSecurity(FTPClient.SECURITY_FTPS); // 启用 FTPS
client.setSecurity(FTPClient.SECURITY_FTPES); // 启用 FTPES

两个方法都需要在连接远程服务器前调用.

如果安全协议设置成了SECURITY_FTPS, 则connect() 方法默认使用的端口为990.

默认情况下,客户端对象商讨SSL连接会使用javax.net.ssl.SSLSocketFactory.getDefault()作为其套接字工厂.可通过调用client.setSSLSocketFactory()方法来改变默认套接字工厂. 另外一种SSLSocketFactory, 可用来信任远程服务器颁发的证书(谨慎使用):

import it.sauronsoftware.ftp4j.FTPClient;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
// ... TrustManager[] trustManager = new TrustManager[] { new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() { return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) { }
public void checkServerTrusted(X509Certificate[] certs, String authType) { } } };
SSLContext sslContext = null;
try { sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, trustManager, new SecureRandom());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
}
SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
FTPClient client = new FTPClient();
client.setSSLSocketFactory(sslSocketFactory);
client.setSecurity(FTPClient.SECURITY_FTPS);
// or client.setSecurity(FTPClient.SECURITY_FTPES); // ...

浏览远程站点

获取当前目录的的绝对路径(此目录是FTP服务器的home目录):

String dir = client.currentDirectory();

改变目录:

client.changeDirectory(newPath);

你可以使用绝对路径和相对路径:

client.changeDirectory("/an/absolute/one"); 
client.changeDirectory("relative");

回到父目录:

client.changeDirectoryUp();

重命名文件和目录

要重命名远程文件或目录:

client.rename("oldname", "newname");

移动文件和文件家

rename() 方法也可以用来从当前位置移动文件或目录到其它位置.

在这个例子子,假设在当前工作目录中,你有一个名为"myfile.txt"的文件,然后你想将其移动到子目录"myfolder"中:

client.rename("myfile.txt", "myfolder/myfile.txt");

删除文件

要删除远程文件,需要调用:

client.deleteFile(relativeOrAbsolutePath);

在这个例子中:

client.deleteFile("useless.txt");

创建、删除目录

如果远程服务给你机会的话,你可以在远程站点上创建新目录:

client.createDirectory("newfolder");

你也可以已存在的目录:

client.deleteDirectory(absoluteOrRelativePath);

在这个例子中:

client.deleteDirectory("oldfolder");

请注意,通常情况下,FTP 服务器只允许删除空目录.

列出文件、目录、连接

FTP 协议并不会提供大量支持方法来获取工作目录的完整信息.通常LIST命令会给你想知道的东西,但不辛的是,每个服务器会使用不同样式的响应. 这意味着某些服务器会返回UNIX样式的目录,有些服务器会返回DOS样式的目录,其它的服务器又会使用别的样式.

ftp4j 包可以处理许多的LIST响应格式, 并将它们构建成统一目录内容的结构对象表示.当前ftp4j可以处理:

  • UNIX 样式及其变种(如MAC样式)
  • DOS 样式
  • NetWare 样式
  • EPLF
  • MLSD

这可以通过使用可插拔的parsers来完成.包it.sauronsoftware.ftp4j.listparsers包含了用于处理上述样式的对象.大多数时间,这些已经够用了。

要列出当前工作目录下的文件或文件夹,可调用:

FTPFile[] list = client.list();

如果你收到了FTPListParseException (it.sauronsoftware.ftp4j.FTPListParseException) 异常,这就意味着服务器对LIST命令返回了不可理解的样式,即它不是上述列出的样式.因此,你可以尝试使用listNames() 方法, 但它并不如list()方法有优势.。为了弥补这种缺陷,你可以构建你自己的LIST响应解析器,以支持你遇到的样式.你可以实现FTPListParser (it.sauronsoftware.ftp4j.FTPListParser) 接口,然后你可以在client的addListParser()方法使用此实现.

FTPFile (it.sauronsoftware.ftp4j.FTPFile) 对象提供了目录内容的表示,包括文件,子目录和连接. 根据服务器的响应,FTPFile对象的某些字段可以是null 的或者是无意义的.请检查javadocs来了解细节.

list() 方法中你也可以使用文件过滤参数,如:

FTPFile[] list = client.list("*.jpg");

如果连接服务器明确支持MLSD命令, ftp4j会用其来代替基本的LIST命令。MLSD的响应事实更为标准,准确,更易解析.不幸的是,不是所有服务器都支持这个命令,并且有些服务器支持得非常糟糕.基于这些理由,开发者可以控制ftp4j是否应该使用MLSD命令,即通过调用FTPClient对象的setMLSDPolicy()方法. 合法的值:

  • FTPClient.MLSD_IF_SUPPORTED
    client只在服务器明确支持MLSD命令时,才使用MLSD命令. 这是ftp4j默认的行为.

  • FTPClient.MLSD_ALWAYS
    client总是会使用MLSD命令, 即便服务器没有明确表明支持MLSD命令.

  • FTPClient.MLSD_NEVER
    client绝不使用MLSD命令,即便服务器明确表明支持MLSD命令.

例如:

client.setMLSDPolicy(FTPClient.MLSD_NEVER);

获取文件、目录的最后修改时间

通常情况下FTPFile对象会告诉你条目的最后修改时间, 但正如上面描述的,这依赖于服务器发回的响应.如果你需要最后的修改时间,但你又不能通过list()方法得到,那么可以尝试这样做:

java.util.Date md = client.modifiedDate("filename.ext");

下载、上传文件

下载远程文件最简单的方式是调用download(String, File) 方法:

client.download("remoteFile.ext", new java.io.File("localFile.ext"));

要上传:

client.upload(new java.io.File("localFile.ext"));

要在已有文件中上传追加内容:

client.append(new java.io.File("localFile.ext"));

这些是阻塞式调用:它们会在传输完成后(或failed, 或 aborted时)才返回. 此外同步锁是否由客户端来实施的,因为在每个时间段内只允许有一个常规的FTP通信.在每个时间段内,你可以处理多个传输器,即使用多个FTPClient 对象,每个都与服务器建立一个私有连接.

你可以使用FTPDataTransferListener (it.sauronsoftware.ftp4j.FTPDataTransferListener)对象来监控传输.你可以自己实现一个:

import it.sauronsoftware.ftp4j.FTPDataTransferListener;  
public class MyTransferListener implements FTPDataTransferListener {
public void started() {
// Transfer started
}
public void transferred(int length) {
// Yet other length bytes has been transferred since the last time this
// method was called
}
public void completed() {
// Transfer completed
}
public void aborted() {
// Transfer aborted
}
public void failed() {
// Transfer failed
}
}

现在像下面这样来下载或上传:

client.download("remoteFile.ext", new java.io.File("localFile.ext"), new MyTransferListener());
client.upload(new java.io.File("localFile.ext"), new MyTransferListener());
client.append(new java.io.File("localFile.ext"), new MyTransferListener());

当client处理下载或上传时,传输器可以被同一个FTPClient对象的不同线程通过调用 abortCurrentDataTransfer() 方法aborted. 此方法还需要一个boolean参数:true表示执行合法的abort过程(即向服务器发送ABOR命令), false表示实然关闭传输器,而不向服务器发送通知:

client.abortCurrentDataTransfer(true); // Sends ABOR
client.abortCurrentDataTransfer(false); // Breaks abruptly

需要注意的是,list()和listNames() 方法暗中包含了数据传输器,因abortCurrentDataTransfer() 方法也可以用来中断其list过程.

当数据传输器在download()upload()append()list() and listNames() 方法中中断时,将会抛出FTPAbortedException (it.sauronsoftware.ftp4j.FTPAbortedException).

下载和上传操作可通过restartAt 参数来重新恢复:

client.download("remoteFile.ext", new java.io.File("localFile.ext"), 1056);

此操作会文件的第1056个字节处继续执行下载操作。第一个传输的字节将是第1057个.

其它 download()upload() 和append()方法的变种可以让你使用流来替代java.io.File对象.因此你可以在数据库,网络连接或其它流上来传输数据。

Active 、 passive 数据传输模式

客户端和服务器之间的数据传输通道是通过单独的网络连接来建立的. 在传输通道建立期间,服务器可以是active或passive的. 服务器激活数据传输时,工作如下:

  1. client向服务器发送其IP地址和端口号.
  2. client向服务器发送数据传输请求,并在之前发送的端口上启动监听.
  3. 服务器使用客户端提供的地址和端口来连接客房端.
  4. 数据传输将在新建立的通道中进行.

active模式需要你的client能够收到来自服务器的连接.如果你的client处于防火墙, 代理或这两者混合之后,那么大部分时间都会出现问题,因为它不能收到来外界的连接. 下面是passive数据传输模式

  1. client要求服务器准备好一个passive数据传输.
  2. 服务器使用其IP地址和端口号进行响应.
  3. client请求传输和连接.
  4. 数据传输将在新建立的通道中进行.

在passive模式中,客户端连接不要求能收到服务器的连接请求.

在ftp4j中,你可以使用下面的调用来切换active、passive模式:

client.setPassive(false); // Active mode
client.setPassive(true); // Passive mode

ftp4j client passive 标志的默认值为true: 如果你没有调用setPassive(false) ,你的客户端在每次传输前,都会向服务器请求passive模式.

当使用 passive文件传输时,服务器会提供一个 IP地址和一个端口号.作为FTP规范的client,需要使用给定的主机号和端口进行连接.在商业环境中,这种行为可能会经常带来问题,因为NAT配置可能会阻止对IP地址的连接.这就是为什么FTP clients通常会忽略服务器返回的任何IP地址,进而在通信线路中使用同样的主机来连接服务器.ftp4j的行为依赖于服务器因素:

  • 每个FTPConnector 都有其默认行为.大部分连接器都会忽略服务器返回的IP地址。目前,默认使用返回地址的连接器是FTPProxyConnector.
  • 连接器的行为可通过定义名为ftp4j.passiveDataTransfer.useSuggestedAddress的系统属性来覆盖。如果设置为"true", "yes" 或"1",所有连接器都会使用服务器返回的地址,反之,如果将其设置为"false", "no" or "0", 所有服务器都不会使用返回的地址.
  • 最后,连接器的默认行为和全局设置都可以在任何连接器实例中进行覆盖。你可通过获取客房端连接器,并调用其setUseSuggestedAddressForDataConnections() 方法来达到目的.

在active传输模式中,可以设置下面的系统属性:

  • ftp4j.activeDataTransfer.hostAddress
    主机地址.当服务器请求执行与客户端连接时,client会跳转到给定地址的服务器. 此值应该是一个有效的IPv4地址,如:178.12.34.167. 如果没有提供此值,客户端会自动解析系统地址.但如果client运行于LAN中,为了激活数据传输,将会使用带端口转发的路由器来连接外部服务器,那么自动探测到的地址可能不是正确的. 当系统有多个网络接口时,也可能发生这种情况.通常使用系统属性,可以解决这种问题

  • ftp4j.activeDataTransfer.portRange
    连接端口范围. client会在其中挑选一个来进行数据传输.此值 必须是start-stop 形式 ,如6000-7000 表示client只会在给定范围内挑选一个端口来连接服务器.默认情况下没有指定端口范围:这表示client会挑选任何一个可用的端口.

  • ftp4j.activeDataTransfer.acceptTimeout
    以毫秒为单位的连接超时时间. 如果服务器不能在给定超时时间内连接client,传输会因FTPDataTransferException异常而中断.0值表示永不超时。默认值30000 (即30秒).

要设置系统属性,你可以:

  • 用一个或多个 -Dproperty=value参数来启动JVM.如:

    java -Dftp4j.activeDataTransfer.hostAddress=178.12.34.167      -Dftp4j.activeDataTransfer.portRange=6000-7000      -Dftp4j.activeDataTransfer.acceptTimeout=5000 MyClass
  • 直接在代码中设置系统属性,如:

    System.setProperty("ftp4j.activeDataTransfer.hostAddress", "178.12.34.167"); 
    System.setProperty("ftp4j.activeDataTransfer.portRange", "6000-7000");
    System.setProperty("ftp4j.activeDataTransfer.acceptTimeout", "5000");

二进制和文本数据传输类型

数据传输的另一个核心概念是binary 和textual 类型.当传传输的文件是二进制文件时,它将视为二进制流,服务器会按原样存储。而文本数据传输会将传输的文件视为字符流,会进行字符集转换. 假设你的client正运行Windows平台上,而服务器运行UNIX上,它们的默认字符集通常是不同的. client以文本类型来发送文件时,client会假设文件是按机器标准字符集来编码的,因此在发送前,它会解码每个字符并将其编码为 中间字符集. 服务器收到流后,在存储前,会解码中间字符集,并将其编码为机器默认的字符集.字节虽然被改变了,但内容是相同的。

你可以调用下面的方法选择你传输的类型:

client.setType(FTPClient.TYPE_TEXTUAL);
client.setType(FTPClient.TYPE_BINARY);
client.setType(FTPClient.TYPE_AUTO);

默认的TYPE_AUTO常量 ,会让client自动来挑选类型:如果文件的扩展名是client能被识别的文本类型标记,那么它会选择文本传输器来执行. 文件扩展名是通过FTPTextualExtensionRecognizer (it.sauronsoftware.ftp4j.FTPTextualExtensionRecognizer) 实例来识别的. 默认扩展识别器是it.sauronsoftware.ftp4j.recognizers.DefaultTextualExtensionRecognizer, 会将下面的视为文本类型:

abc acgi aip asm asp c c cc cc com conf cpp
csh css cxx def el etx f f f77 f90 f90 flx
for for g h h hh hh hlb htc htm html htmls
htt htx idc jav jav java java js ksh list
log lsp lst lsx m m mar mcf p pas php pl pl
pm py rexx rt rt rtf rtx s scm scm sdml sgm
sgm sgml sgml sh shtml shtml spc ssi talk
tcl tcsh text tsv txt uil uni unis uri uris
uu uue vcs wml wmls wsc xml zsh

你可通过实现FTPTextualExtensionRecognizer 接口来实现你自己的识别器,但你可以更喜欢使用 class ParametricTextualExtensionRecognizer(it.sauronsoftware.ftp4j.recognizers.ParametricTextualExtensionRecognizer)便利类.
无论如何,都不要忘记将你的识别器设置在client中:

client.setTextualExtensionRecognizer(myRecognizer);

数据传输压缩

有些服务器支持数据传输压缩特性-MODE Z. 在传输大文件时,此特性可以节省带宽.一旦client连上服务器并通过认证,就可通过调用下面的方法来检查是否支持压缩:

boolean compressionSupported = client.isCompressionSupported();

如果服务器端支持压缩,就可通过下面的调用来启用压缩:

client.setCompressionEnabled(true);

在此调用之后,后续的数据传输(下载,上传,列举操作)都被将压缩以节省带宽.

数据传输压缩可通过下面的调用来禁用:

client.setCompressionEnabled(false);

也可以检查标记值:

boolean compressionEnabled = client.isCompressionEnabled();

请注意:压缩数据传输只当压缩支持且启用了的情况下才会发生.

默认情况下,压缩是禁用的,即便是服务器支持压缩. 如果有需要,可以显示地打开.

不做任何事(NOOPing the server)

假设你的client什么事情都不做,因为在等待用户输入. 通常情况下, FTP服务器会自动断开非活跃客户端. 为了避免超时,你可以发送NOOP命令.
此命令不会做任何事情,但它会向服务器说明:客户端仍然还活着,请重围超时计数器.调用如下:

client.noop();

当非活跃超时发生时,客户端也可以自动发送NOOPs. 默认情况下,此特性是禁用的。它可以在 setAutoNoopTimeout() 方法中设置超时时间时启用,并提供一个毫秒为单位的值.如:

client.setAutoNoopTimeout(30000);

使用此值,client会在30秒后发送一个NOOP命令.

NOOP超时可通过设置小于等于0的值来禁用:

client.setAutoNoopTimeout(0);

网站特殊的自定义命令

你可以像下面这样来发送站点特殊命令:

FTPReply reply = client.sendSiteCommand("YOUR COMMAND");

你也可以发送自定义命令:

FTPReply reply = client.sendCustomCommand("YOUR COMMAND");

sendSiteCommand() 和 sendCustomCommand() 都会返回一个FTPReply (it.sauronsoftware.ftp4j.FTPReply)对象.使用此对象,你可以检查服务器的响应代码和消息. 
FTPCodes (it.sauronsoftware.ftp4j.FTPCodes) 接口报告了一些通用的FTP响应代码,因此你可以使用这些包中的某个来进行匹配.

异常处理

ftp4j 包定义了五种类型的异常:

  • FTPException (it.sauronsoftware.ftp4.FTPException
    依赖于方法,这会报告抛出了一个FTP故障.你可以检查报告的错误码,使用FTPCodes 常量来获取故障原因的详细信息.
  • FTPIllegalReplyException (it.sauronsoftware.ftp4.FTPIllegalReplyException)
    这表示远程服务器使用非法方式进行了应答, 这与FTP是不兼容的. 这应该是非常罕见的.
  • FTPListParseException (it.sauronsoftware.ftp4.FTPListParseException)
    这通常发生在list()方法,如果服务器发回的响应不能被客户端包中现有解析器进行的话,就会抛出此种异常
  • FTPDataTransferException (it.sauronsoftware.ftp4.FTPDataTransferException)
    当数据传输 (downloadupload, but also list and listNames) 因网络连接错误失败时,就会抛出此种异常.
  • FTPAbortedException (it.sauronsoftware.ftp4.FTPAbortedException)
    当数据传输 (download, upload, but also list and listNames) 因客户端发出中断请求失败时,抛出的异常.
posted @ 2016-06-21 22:34 胡小军 阅读(5945) | 评论 (0)编辑 收藏
     摘要: 在本章节中我们将覆盖:创建一个本地服务器集群创建一个简单集群自动添加一个RabbitMQ 集群引入消息负载均衡器创建集群客户端介绍RabbitMQ提供了各种各样的特性以及集群功能.通过使用集群,一组适当配置的主机的行为与单个broker实例一样,但集群带有下面的目的:高可用性: 如果一个节点宕机了,分布式broker仍然能接受和处理消息.这方面内容会在Chapter 7,Developi...  阅读全文
posted @ 2016-06-15 20:54 胡小军 阅读(1937) | 评论 (2)编辑 收藏
     摘要: 在本章中我们将覆盖:使用Spring来开发web监控程序使用Spring来开发异步web搜索使用STOMP来开发web监控程序介绍RabbitMQ可以像客户端一样使用在服务端。当前,RabbitMQ覆盖了大部分使用的语言和技术来构建web程序,如PHP,Node.js, Python, Ruby, 以及其它.你可以在http://www.rabbitmq.com/devtools.html找到全部...  阅读全文
posted @ 2016-06-14 22:07 胡小军 阅读(2268) | 评论 (0)编辑 收藏
     摘要: 在本章中我们将覆盖:使用.NET client通过MQTT绑定iPhone应用与RabbitMQ在Andriod上使用消息来更新Google Maps通过Andriod后端来发布消息使用Qpid来交换RabbitMQ消息使用Mosquitto来交换RabbitMQ消息使用.NET clients来绑定WCF程序介绍在前面的章节中,我们已经介绍了基本概念。现在,我们要使用这些概念来创建真实的应用程序...  阅读全文
posted @ 2016-06-13 20:25 胡小军 阅读(1757) | 评论 (2)编辑 收藏

RabbitMQ的目标是尽可能广泛地支持大部分平台.RabbitMQ 可运行在任何支持Erlang的平台上, 包括内嵌系统,多核集群,云服务器.

下面的平台支持Erlang,因此也可以运行RabbitMQ:

  • Linux
  • Windows, NT through 10
  • Windows Server 2003/2008/2012
  • Mac OS X
  • Solaris
  • FreeBSD
  • TRU64
  • VxWorks

RabbitMQ的开源版本大部分都部署在下面的平台上:

  • Ubuntu and Debian-based Linux distributions
  • Fedora, CentOS and RPM-based Linux distributions
  • openSUSE and derived distributions (including SLES and SLERT)
  • Mac OS X
  • Windows XP and later

Windows

RabbitMQ可运行Windows XP及其后续版本中(Server 2003, Vista, Windows 7, Windows 8, Windows 10, Server 2008 and Server 2012). 尽管没有测试,但应该可以运行在Windows NT ,Windows 2000 上.

64位的Windows Erlang VM从R15版本开始可用.建议使用最新的64位Erlang版本来运行。参考Erlang version compatibility page.

通用UNIX

虽没有官方支持,Erlang 和 RabbitMQ 能运行在含有POSIX layer including Solaris, FreeBSD, NetBSD, OpenBSD的操作系统上.

虚拟平台

RabbitMQ 可运行物理或虚拟硬件上. 这可以允许不支持的平台通过仿真来运行RabbitMQ.
参考EC2 guide 来了解RabbitMQ如何运行在Amazon EC2上的更多信息.

posted @ 2016-06-06 00:09 胡小军 阅读(1234) | 评论 (0)编辑 收藏

名称

rabbitmq-server — 启动RabbitMQ AMQP server

语法

rabbitmq-server [-detached]

描述

RabbitMQ是AMQP的实现, 后者是高性能企业消息通信的新兴标准. RabbitMQ server是AMQP 中间件的健壮,可扩展实现.

前端运行rabbitmq-server,它会显示横幅消息,会报告启动时的过程信息,最后会显示"broker running",以表明RabbitMQ中间件已经成功启动。

要关闭server,只需要终止过程或使用rabbitmqctl(1)(即:rabbitmqctl stop).

环境变量

RABBITMQ_MNESIA_BASE

默认是 /var/lib/rabbitmq/mnesia. 用于设置Mnesia 数据库文件存放的目录.

RABBITMQ_LOG_BASE

日志目录 ,server生成的/var/log/rabbitmq. Log 日志文志会放置在文件会放置在此目录.(如:window10下默认安装时,日志目录为:C:\Users\Administrator\AppData\Roaming\RabbitMQ\log

RABBITMQ_NODENAME

默认是rabbit. 当你想在一台机器上运行多个节点时,此配置是相当有用的, RABBITMQ_NODENAME在每个erlang-node和机器的组合中应该唯一。

参考clustering on a single machine guide 来更多细节.

RABBITMQ_NODE_IP_ADDRESS

默认情况下,RabbitMQ会绑定到所有网络接口上,如果只想绑定某个网络接口,可修改此设置。

RABBITMQ_NODE_PORT

默认是5672.

选项

-detached

以后端的方式来启动进程 ,注意,这会导致pid无法写入到pid文件中.例如:

rabbitmq-server -detached

以后端方式来启动RabbitMQ AMQP server.

也可参考

rabbitmq-env.conf(5) rabbitmqctl(1)

posted @ 2016-06-06 00:06 胡小军 阅读(1174) | 评论 (0)编辑 收藏
     摘要: 本章我们将覆盖:使用虚拟主机配置用户使用SSL实现客户端证书从浏览器中管理RabbitMQ配置RabbitMQ参数开Python程序来监控RabbitMQ自己开发web程序来监控RabbitMQ介绍一旦安装后,RabbitMQ不需要任何配置就可以工作. 然而,RabbitMQ有许多的配置选项,这些配置选项使得它更为灵活,能工作于多种不同环境中.在本章中,我们将看到如何改变配置来满足应用程序的需求。...  阅读全文
posted @ 2016-06-05 20:10 胡小军 阅读(1598) | 评论 (0)编辑 收藏

一.安装Erlang

1、下载推荐的安装包

2、安装

安装依赖包

yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

#rpm -ivh esl-erlang_18.3-1~centos~7_amd64.rpm

二.安装RabbitMQ
下载RabbitMQ
# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
# rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

安装rabbitmq-server的过程中遇到了一个问题:

Error: Package: rabbitmq-server-3.6.1-1.noarch (/rabbitmq-server-3.6.1-1.noarch) 
Requires: erlang >= R16B-3 
You could try using --skip-broken to work around the problem 
You could try running: rpm -Va --nofiles --nodigest

这是由于erlang的版本问题,其实是没有影响的,你可以使用下面的命令进行安装:

#rpm -ivh --nodeps rabbitmq-server-3.6.1-1.noarch.rpm


启动

#service rabbitmq-server start --后台方式运行

#service rabbitmq-server stop  --停止运行

#service rabbitmq-server status --查看状态

#rabbitmq-server start 

可以看到使用的日志文件

日志目录

/var/log/rabbitmq

#cat /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log 可以看到下面的日志记录

...................................................................................................................................................................................................................................................

=INFO REPORT==== 28-Apr-2016::04:20:10 ===
node           : rabbit@iZ94nxslz66Z
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
cookie hash    : fisYwC976M1LblhTfYslpg==
log            : /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log
sasl log       : /var/log/rabbitmq/rabbit@iZ94nxslz66Z-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Memory limit set to 397MB of 992MB total.
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Disk free limit set to 50MB
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Limiting to approx 65435 file handles (58889 sockets)
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
FHC read buffering:  OFF
FHC write buffering: ON
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Database directory at /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z is empty. Initialising from scratch...
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Priority queues enabled, real BQ is rabbit_variable_queue
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Adding vhost '/'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Creating user 'guest'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Setting user tags for user 'guest' to [administrator]
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Setting permissions for 'guest' in '/' to '.*', '.*', '.*'
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_transient: using rabbit_msg_store_ets_index to provide index
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_persistent: using rabbit_msg_store_ets_index to provide index
=WARNING REPORT==== 28-Apr-2016::04:20:11 ===
msg_store_persistent: rebuilding indices from scratch
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
started TCP Listener on [::]:5672
=INFO REPORT==== 28-Apr-2016::04:20:11 ===
Server startup complete; 0 plugins started.
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Management plugin: using rates mode 'basic'
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Management plugin started. Port: 15672
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Statistics database started.
=INFO REPORT==== 28-Apr-2016::04:21:52 ===
Plugins changed; enabled [mochiweb,webmachine,rabbitmq_web_dispatch,
                          amqp_client,rabbitmq_management_agent,
                          rabbitmq_management], disabled []
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Stopping RabbitMQ
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
stopped TCP Listener on [::]:5672
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Stopped RabbitMQ application
=INFO REPORT==== 28-Apr-2016::04:23:01 ===
Halting Erlang VM
=INFO REPORT==== 28-Apr-2016::04:23:29 ===
Starting RabbitMQ 3.6.1 on Erlang 18.3
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
...................................................................................................................................................................................................................................................


卸载

#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.1-1.noarch
#rpm -e --nodeps rabbitmq-server-3.6.1-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64

管理

Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。

1). 服务器启动与关闭

      启动: rabbitmq-server –detached

      关闭:rabbitmqctl stop

      若单机有多个实例,则在rabbitmqctlh后加–n 指定名称

2). 插件管理

      开启某个插件:rabbitmq-pluginsenable xxx

      关闭某个插件:rabbitmq-pluginsdisablexxx

      注意:重启服务器后生效。

3).virtual_host管理

      新建virtual_host: rabbitmqctladd_vhost  xxx

      撤销virtual_host:rabbitmqctl  delete_vhost xxx

4). 用户管理

      新建用户:rabbitmqctl add_user xxxpwd

      删除用户:   rabbitmqctl delete_user xxx

      改密码: rabbimqctlchange_password {username} {newpassword}

      设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}

              Tag可以为 administrator,monitoring, management

5). 权限管理

      权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

               Vhostpath

               Vhost路径

               user

      用户名

              Conf

      一个正则表达式match哪些配置资源能够被该用户访问。

              Write

      一个正则表达式match哪些配置资源能够被该用户读。

               Read

      一个正则表达式match哪些配置资源能够被该用户访问。

6). 获取服务器状态信息

       服务器状态:rabbitmqctl status

       队列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                Queueinfoitem可以为:name,durable,auto_delete,arguments,messages_ready,

                messages_unacknowledged,messages,consumers,memory

       Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                 Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

       Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                 Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

       Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

       Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

       Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

      Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

 

 

常用命令:

查看所有队列信息

# rabbitmqctl list_queues

关闭应用

# rabbitmqctl stop_app

启动应用,和上述关闭命令配合使用,达到清空队列的目的

# rabbitmqctl start_app

清除所有队列

# rabbitmqctl reset

更多用法及参数,可以执行如下命令查看

# rabbitmqctl

 

 

rabbitmq常用命令

rabbitmq-server start  或者   service rabbitmq-server start     #启动rabbitmq

rabbitmqctl list_exchanges 

rabbitmqctl list_bindings

rabbitmqctl list_queues #分别查看当前系统种存在的Exchange和Exchange上绑定的Queue信息。

rabbitmqctl status  #查看运行信息

rabbitmqctl stop     #停止运行rabbitmq

rabbitmq-plugins enable rabbitmq_management  

#启动rabbitmq的图形管理界面,这个操作必须重启rabbitmq, 然后在web中 http://127.0.0.1:15672 用户名和密码都是guest guest。如果局域网无法访问设置防火墙过滤规则或关闭防火墙。


posted @ 2016-06-05 20:08 胡小军 阅读(3172) | 评论 (0)编辑 收藏

概述

RabbitMQ broker是一个或多个Erlang节点的逻辑分组,多个运行的RabbitMQ应用程序可共享用户,虚拟主机,队列,交换机,绑定以及运行时参数。有时我们将多个节点的集合称为集群。

什么是复制?

RabbitMQ broker操作所需的所有数据/状态都可以在多个节点间复制. 例外是消息队列,默认情况下它驻留在一个节点, 尽管它们对所有节点来说,是可见的,可达的.要在集群中跨节点复制队列,可参考high availability 文档(注意,你仍然先需要一个工作集群).

主机名解析需求

RabbitMQ节点彼此之间使用域名,要么是简短的,要么是全限定的(FQDNs). 因此,集群中所有成员的主机名都必须是可解析的,也可用于机器上的命令行工具,如rabbitmqctl.

主机名解析可使用任何一种标准的操作系统提供方法:

  • DNS 记录
  • 本地主机文件(e.g. /etc/hosts)
在更加严格的环境中,DNS记录或主机文件修改是受限的,不可能的或不受欢迎的, Erlang VM可通过使用替代主机名解析方法来配置, 如一个替代的DNS服务器,一个本地文件,一个非标准的主机文件位置或一个混合方法. 这些方法可以与标准操作主机名解析方法一起协同工作。

要使用FQDNs, 参考RABBITMQ_USE_LONGNAME in the Configuration guide.

集群构成

集群可以通过多种方式来构建:

一个集群的构成可以动态修改. 所有RabbitMQ brokers开始都是以单个节点来运行的. 这些节点可以加入到集群中, 随后也可以脱离集群再次成为单一节点。

故障处理

RabbitMQ brokers 可以容忍个别节点故障. 节点可以随意地启动和关闭,只要在已知关闭的时间内能够联系到集群节点.

RabbitMQ 集群有多种模式来处理网络分化, 主要是一致性方向. 集群是在LAN中使用的,不推荐在WAN中运行集群. Shovel 或 Federation 插件对于跨WAN连接brokers ,有更好的解决方案. 注意 Shovel 和 Federation 不等同于集群.

磁盘和内存节点

节点可以是磁盘节点,也可以是内存节点。多数情况下,你希望所有的节点都是磁盘节点,但RAM节点是一种特殊情况,它可以提高集群中队列和,交换机,绑定的性能. 当有疑问时,最好只使用磁盘节点。

集群文字记录(Transcript)

下面是通过三台机器-rabbit1rabbit2rabbit3来设置和操作RabbitMQ集群的文字记录.

我们假设用户已经登录到这三台机器上,并且都已经在机器上安装了RabbitMQ,以及rabbitmq-server 和rabbitmqctl 脚本都已经在用户的PATH环境变量中.

This transcript can be modified to run on a single host, as explained more details below.

节点(以及CLI工具)之间如何来认证: Erlang Cookie

RabbitMQ 节点和CLI 工具(如rabbitmqctl) 使用cookie来确定每个节点之间是否可以通信. 两个节点之间要能通信,它们必须要有相同的共享密钥Erlang cookie. cookie只是具有字母数字特征的字符串。只要你喜欢,它可长可短. 每个集群节点必须有相同的cookie.

当RabbitMQ 服务器启动时,Erlang VM 会自动地创建一个随机的cookie文件. 最简单的处理方式是允许一个节点来创建文件,然后再将这个文件拷贝到集群的其它节点中。

在 Unix 系统中, cookie的通常位于/var/lib/rabbitmq/.erlang.cookie 或$HOME/.erlang.cookie.

在Windows中, 其位置在C:\Users\Current User\.erlang.cookie(%HOMEDRIVE% + %HOMEPATH%\.erlang.cookie) 或C:\Documents and Settings\Current User\.erlang.cookie, 对于RabbitMQ Windows service其位置在C:\Windows\.erlang.cookie。如果使用了Windows service ,  cookie可被放于这两个位置中.

作为替代方案,你可以在 rabbitmq-server 和 rabbitmqctl 脚本中调用erl时,插入"-setcookie cookie"选项.

当cookie未配置时 (例如,不相同), RabbitMQ 会记录这样的错误"Connection attempt from disallowed node" and "Could not auto-cluster".

启动独立节点

集群可通过重新配置,而将现有RabbitMQ 节点加入到集群配置中. 因此第一步是以正常的方式在所有节点上启动RabbitMQ:

rabbit1$ rabbitmq-server -detached 
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached

这会创建三个独立的RabbitMQ brokers, 每个节点一个,可通过cluster_status命令来验证:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

rabbitmq-server shell脚本来启动RabbitMQ broker的节点名称是rabbit@shorthostname,在这里,短节点名称是小写的(如上面的rabbit@rabbit1). 如果在windows上,你使用rabbitmq-server.bat批处理文件来启动,短节点名称是大写的(如:rabbit@RABBIT1). 当你输入节点名称时,不论是大写还是小写的,这些字符串都必须精确匹配。

创建集群

为了把这三个节点构建到一个集群中,我们可以告诉其中的两个节点, 假设为rabbit@rabbit2 和 rabbit@rabbit3, 将加入到第三个节点的集群中,这第三个节点假设为rabbit@rabbit1.

首先我们将rabbit@rabbit2加入到rabbit@rabbit1的集群中. 要做到这一点,我们必须在rabbit@rabbit2 上停止RabbitMQ应用程序,并将其加入到rabbit@rabbit1 集群中, 然后再重启RabbitMQ 应用程序. 

注意:加入集群会隐式地重置节点, 因此这会删除此节点上先前存在的所有资源和数据.(如何备份数据)

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

在每个节点上通过运行cluster_status 命令,我们可以看到两个节点已经加入了集群:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

现在我们将rabbit@rabbit3节点加入到同一个集群中. 操作步骤同上面的一致,除了这次我们选择rabbit2来加入集群,但这并不重要:

rabbit3$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

在任何一个节点上通过运行cluster_status命令,我们可以看到三个节点已经加入了集群:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

通过上面的步骤,当集群运行的时候,我们可以在任何时候将新的节点加入到集群中.

重启集群节点

注意,加入到集群中的节点可在任何时候停止, 对于崩溃来说也没有问题. 在这两种情况下,集群剩余的节点将不受影响地继续操作,当它们重启的时候,这些崩溃的节点会再次自动追赶上其它的集群节点。

我们关闭了节点rabbit@rabbit1和rabbit@rabbit3,并在每步观察集群的状态:

rabbit1$ rabbitmqctl stop 
Stopping and halting node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit3 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2]}] ...done.

译者注:关闭了rabbit1节点后,运行的节点已经没有rabbit1节点了

现在我们再次启动节点,并检查集群状态:

rabbit1$ rabbitmq-server -detached 
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

这里有一些重要的警告:

  • 当整个集群崩溃的时候, 最后一个崩溃的节点必须第一个上线.如果不是这样,节点将会等待最后一个磁盘节点30秒以确认其重新上线,否则就会失败. 如果最后一个下线的节点,不能再重新上线,那么它可能会使用forget_cluster_node命令来从集群中删除 - 查阅 rabbitmqctl页面来了解更多信息.
  • 如果所有集群节点都在同一个时间内停止且不受控制(如断电)。在这种情况下,你可以在某个节点上使用force_boot命令使其再次成为可启动的-查阅 rabbitmqctl页面来了解更多信息.

脱离集群

当节点不再是集群的一部分时,可以明确地将其从集群中删除. 首先我们将节点rabbit@rabbit3从集群中删除, 以使其回归独立操作.要做到这一点,需要在rabbit@rabbit3节点上停止RabbitMQ 应用程序,重设节点,并重启RabbitMQ应用程序.

rabbit3$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl reset
Resetting node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

在节点上运行cluster_status 命令来确认rabbit@rabbit3节点现在已不再是集群的一部分,并且会独自操作:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

我们也可以远程地删除节点,这是相当有用的,举例来说,当处理无反应的节点时.举例来说,我们可以从 rabbit@rabbit2中删除rabbit@rabbi1.

rabbit1$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
Removing node rabbit@rabbit1 from cluster ... ...done.

注意,rabbit1仍然认为它与rabbit2处在一个集群中,但尝试启动时会出现一个错误.这时,我们需要对其进行重置以使其能再次启动.

rabbit1$ rabbitmqctl start_app 
Starting node rabbit@rabbit1 ... Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
rabbit1$ rabbitmqctl reset
Resetting node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl start_app Starting node rabbit@mcnulty ... ...done.

现在, cluster_status 命令会显示三个节点都是独立节点,并且操作是独立的:

rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

注意:rabbit@rabbit2节点仍然残留有集群的状态(译者注:怎么看出来的呢?), 但是 rabbit@rabbit1 和rabbit@rabbit3 节点是新鲜的RabbitMQ brokers.如果我们想重新初始化rabbit@rabbit2节点,我们可以按其它节点的步骤来操作:

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl reset
Resetting node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

升级集群

当从主版本或小版本进行升级时 (如:从3.0.x 到3.1.x,或从2.x.x 到3.x.x),或者是升级Erlang时, 整个集群在升级时必须记下来(taken down) (因为集群不会像这样来运行多个混合的版本). 当从补丁版本升级到另一个时(如:从3.0.x 到3.0.y)时,这种情况是不会出现的;这些版本在集群中是可以混合使用的(例外是3.0.0不能与 3.0.x 系列后的版本混合).

在主版本与小版本之间升级时,RabbitMQ有必要的话会自动更新其持久化数据. 在集群中,此任务是由第一个磁盘节点来启动的("upgrader"节点). 因此在升级RabbitMQ集群时,你不需要尝试先启动RAM节点,任何启动的RAM节点都会发生错误,并且不能启动.

虽然不是严格必须的,但使用磁盘节点来作为升级节点通常是好的主意,最后停止那个节点。

自动升级只适用于2.1.1及其之后的版本,如果你有更早的集群 ,你必须重新构建升级.

单台机器上的集群

在某些情况下,在一台机器上运行RabbitMQ节点的集群是有用的(试验性质). 

要在一台机器上运行多个RabbitMQ节点,必须确保节点含有不同的节点名称,数据存储路径,日志文件位置,绑定到不同的端口,并包含那些插件使用的端口等等 .参考配置指南中的RABBITMQ_NODENAMERABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT文档 ,以及 File and Directory Locations guide指南中的 RABBITMQ_MNESIA_DIRRABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE。

你可以在同一个主机上通过重复调用rabbitmq-server(rabbitmq-server.bat on Windows)来手动地启动多个节点 . 例如:

$ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
$ rabbitmqctl -n hare stop_app
$ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
$ rabbitmqctl -n hare start_app

这会设置两个节点的集群,这两个节点都是磁盘节点. 注意,如果你想打开非AMQP的其它端口,你需要通过命令行进行配置

$ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached

主机名称变更

RabbitMQ节点使用主机名来相互通信.因此,所有节点名称都集群中的节点应该都能被解析.对于像 rabbitmqctl这样的工具来说,也是如此.

除此之外,默认情况下,RabbitMQ使用当前系统的主机名称来命名数据库目录.如果主机名变了,将会创建一个空的数据库.为避免数据丢失,应该总是设置一个固定的,可解析的主机名称。无论何时,只要主机名变化了,你就必须要重启RabbitMQ:

$ /etc/init.d/rabbitmq-server restart

类似的效果可通过使用 rabbit@localhost作为broker节点名称来达到。这个解决方案的影响是集群将不会工作,因为选中的主机名不能被远程主机所解析。当从远程主机调用时,类似地rabbitmqctl命令也会失败. 免遭此缺点的复杂方案是使用DNS,如:如果运行EC2,则使用 Amazon Route 53 。如果你想使用节点名称的全限定主机名(RabbitMQ 默认使用短名称),那么可使用DNS解析, 可设置环境变量 RABBITMQ_USE_LONGNAME=true.


防火墙节点

当在一个数据中心或可靠网络时,带防火墙的集群节点是存在的,但这些节点通常被防火墙隔离。再一次声明,当各节点之间的网络连接不稳定时,集群不建议在WAN在使用

在多数配置中,你需要打开4369和25672端口以使用集群正常工作.

Erlang 使用Port Mapper Daemon (epmd) 来解析集群中的节点名称. 默认epmd端口是4369,但它可以通过ERL_EPMD_PORT环境变量进行修改.所有的节点都必须使用同一个端口。详细信息可参考Erlang epmd manpage.

一旦分布式Erlang节点通过empd解析后,其它节点将会尝试直接通信。默认地通信端口比RABBITMQ_NODE_PORT (即,默认是25672)高了20000. 这可以通过RABBITMQ_DIST_PORT 环境变量修改

跨集群Erlang版本

集群中所有节点必须运行相同版本的Erlang.

从客户端连接集群

客户端可以正常连接到集群中的任意节点,如果那个节点发生故障了 ,只要有剩余集群节点幸存,当客户端发现在关闭的连接时,它就能够重新连接到剩余幸存的集群节点上。一般来说,将节点主机名称或IP地址放到客户端程序是极其不明智的,这会导致缺乏灵活性,并需要客户端程序重新编辑,编译,重新配置以适应集群配置变化或者集群节点变化。相反,我们建议采用更抽象的方法: 如有简短TTL配置的动态DNS服务或普通的TCP负载均衡器. 一般来说,这方面的管理集群内连接节点是超出了RabbitMQ本身的范围,我们建议使用其他技术专门设计来解决这些问题。

内存节点集群

内存节点只在内存中保存其元数据。它不会像磁盘节点将元数据写入到磁盘中,但它们拥有更好的性能。 然而,也应该注意到,由于持久化队列数据总是存储在磁盘上的,其性能提升只会影响资源管理(如: 添加/删除队列,交换机,或虚拟主机), 但不会影响发布或消费的速度.

内存节点是一个高级使用例子;当设置你的第一个集群时,你应该不使用它们。你应该用足够的磁盘节点来处理冗余需求,然后如果有必要,再用内存节点进行扩展.

集群中只含有内存节点是相当脆弱的,如果集群停止了,你将不能再次启动,并且会导致数据丢失。RabbitMQ在许多情况下,会阻止创建只包含内存节点的集群,但不能完全阻止。

(译者注:在集群构建中,最好有两个或以上的磁盘节点,然后再考虑使用内存节点进行扩展)

创建内存节点

当节点加入集群时,我们可将其声明为内存节点. 我们可以通过使用像先前rabbitmqctl join_cluster命令再加--ram标志来达到目的:

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.


rabbit1$ rabbitmqctl cluster_status 
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

改变节点类型

我们可以将节点的类型从磁盘修改为内存,反之亦然. 假设我们想反转rabbit@rabbit2 和 rabbit@rabbit1的节点类型,即先将内存节点转换为磁盘节点,随后再将其从磁盘节点转换为内存节点.要做到这点,我们可以使用change_cluster_node_type命令. 首先节点必须先停止.

rabbit2$ rabbitmqctl stop_app 
Stopping node rabbit@rabbit2 ...done. rabbit2$
rabbitmqctl change_cluster_node_type disc
Turning rabbit@rabbit2 into a disc node ... ...done. Starting node rabbit@rabbit2 ...done.
rabbit1$
rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit1$
rabbitmqctl change_cluster_node_type ram
Turning rabbit@rabbit1 into a ram node ...
rabbit1$
rabbitmqctl start_app
Starting node rabbit@rabbit1 ...done.
posted @ 2016-06-05 19:53 胡小军 阅读(3934) | 评论 (0)编辑 收藏
本章我们将覆盖:
  1. 如何使用消息过期
  2. 如何使指定队列上的消息过期
  3. 如何让队列过期
  4. 管理驳回的(rejected)或过期的消息
  5. 理解其它备用交换器扩展
  6. 理解有效user-ID扩展
  7. 通知队列消息者失败
  8. 理解交换器到交换器扩展
  9. 在消息中嵌入消息目的地
介绍
在本章中,我们将展示关于RabbitMQ扩展上的一些食谱.这些扩展不是AMQP 0-9-1标准的一部分,使用它们会破坏其它AMQPbroker的兼容性。
另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出现了轻微的变化,这是一个简单通往那里的路径.最后, 它们通常是优化问题的有效解决方案。

本章中的例子将更为真实,例如,配置参数,如列表和交换器, 以及路由键名称将定义在Constants接口中。事实上,一个真正的应用程序会遵循这样的准则从配置文件中读取配置文件,以在不同应用程序中共享。
然而,在下面的例子中,为了更简短和较好的可读性,我们并没有指定Constants的命名空间。

如何让消息过期
在本食谱中,我们将展示如何让消息过期.食谱的资源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
  1. Producer.java
  2. Consumer.java
  3. GetOne.java
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
本示例的核心是Producer.java文件.为了产生在给定生存时间(TTL)后过期的消息,我们需要执行下面的步骤:
1. 创建或声明一个用来发送消息的交换器, 并将其绑定到队列上,就像第1章使用AMQP看到的一样:
channel.exchangeDeclare(exchange, "direct", false);
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, routingKey);
2. 像下面这样初始化可选消息属性TTL:
BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
3. 使用下面的代码来发布消息:
channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
如何工作
在这个例子中,生产者创建了一个交换器,一个命名队列,并将它们进行了绑定,当队列上没有附着任何消费者,过期消息就显得非常有意义了。
设置过期时间TTL (以毫秒设置),会促使RabbitMQ在消息过期时,如果消息没有被客户端及时消费,立即删除消息.
在我们的例子中,我们假设应用程序发布了JVM资源统计信息到给定队列,如果存在消费者,那么会像平时一样,获取到实时数据,反之,如果不存在这样的消费者,那么消息会给定生存时间后立即过期。通过这种方式,可以避免我们收集大量的数据。一旦消息者绑定到了队列中,它会得到先前的消息(未过期)。进一步的试验,你可以用GetOne.java文件来替换Consumer.java文件运行.
在调用 channel.basicGet() 时,会使你一次只能消费一个消息。
TIP
可使用channel.basicGet()方法来检查未消费消息的队列.也可以通过为第二参数传递false来调用,即autoAck标志.

在这里我们可以通过调用rabbitmqctl list_queues来监控RabbitMQ队列的状态。  

也可参考
默认情况下,过期消息会丢失,但它们可以路由到其它地方。可参考管理拒绝消息或过期消息食谱来了解更多信息.

如何让指定队列上的消息过期
在本食谱中,我们将展示指定消息TTL的第二种方式.这次,我们不再通过消息属性来指定,而是通过缓存消息的队列来进行指定。在这种情况下,生产者只是简单地发布消息到交换器中,因此,在交换器上绑定标准队列和过期消息队列是可行的。
要在这方面进行备注,须存在一个创建自定义的队列的消费者。生产者是相当标准的.
像前面的食谱一样,你可以在Chapter02/Recipe02/Java/src/rmqexample找到这三个源码。
 
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
现在我们将展示创建特定消息TTL队列的必要步骤。在我们的例子中,需要在Consumer.java文件中执行下面的步骤:
1. 按下面来声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 创建或声明队列,像下在这样为x-message-ttl可选参数指定10,000毫秒的超时时间:
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queue, false, false, false, arguments);
3. 绑定队列到交换器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
在这个例子中,为了最终分析,我们再次假设生产者发送了JVM统计数据给RabbitMQ。最终因为Producer.java文件将其发到一个交换机,如果无消费者连接的话,消息最终会丢失。
想要监控或分析这些统计数据的消费有下面三种选择:
  1. 绑定到一个临时队列,即调用无参的channel.queueDeclare()方法
  2. 绑定到一个非自动删除的命名队列
  3. 绑定到一个非自动删除的命名队列,并且指定x-message-ttl ,如步骤2中展示的一样.
在第一种情况中,消费者将获取实时统计数据,但当它掉线期间,它将不能在数据上执行分析。
在第二种情况中,为了能让它掉线期间,能获取到发送的消息,可以使用一个命名队列(最终是持久化的).但在掉线较长时间后,再重启时,它将有巨大的backlog来进行恢复,因此在队列中可能存在大部分旧消息的垃圾。
在第三种情况中,旧消息垃圾会通过RabbitMQ自己来执行,以使我们从消费者和broker中获益。
更多
当设置per-queue TTL, 就像本食谱中看到的一样,只要未到超时时间,消息就不会被丢弃,此时消费者还可以尝试消费它们。
当使用queue TTL时, 这里有一个细微的变化,但使用per-message TTL时,在broker队列中可能会存在过期消息.
在这种情况下,这些过期消息仍然会占据资源(内存),同时broker统计数据中仍然会计数,直到它们不会到队列头部时。
也中参考
在这种情况下,过期消息也会恢复。参考管理驳回或过期消息食谱.
如何让队列过期
在第三种情况中,TTL不关联任何消息,只关联对列。这种情况对于服务器重启和更新,是一个完美的选择。一旦TTL超时,在最后一个消费者停止消费后,RabbitMQ会丢弃队列.
前面TTL相关食谱,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java ,  Consumer.java ,and  GetOne.java 相关文件。
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
在前面的例子中,扩展只需要关注Consumer.java :
1. 使用下面的代码来创建或声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 创建或声明队列,并为x-expires可选参数指定30,000毫秒的超时时间:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("x-expires", 30000);
channel.queueDeclare(queue, false, false, false,arguments);
3. 将队列绑定到交换器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
当我们运行Consumer.java或 GetOne.java 文件的时候, 超时队列已经创建好了,在消费者附着到队列上或调用channel.basicGet()时,它将持续存在.
只有当我们停止这两个操作超过30秒时,队列才会被删除,并且队列包含的消息也会清除。
TIP
无论生产者是否向其发送了消息,队列事实上都是独立删除的。

在这个试验课程中,我们可通过 rabbitmqctl list_queues 命令来监控RabbitMQ 队列状态.
因此,我们可以想像一种场景,有一个统计分析程序需要重启来更新其代码。由于命名队列有较长的超时时间,因此重启时,不会丢失任何消息。如果我们停止,队列会在超过TTL后被删除,无价值的消息将不再存储。
管理驳回或过期消息
在这个例子中,我们将展示如何使用死信交换器来管理过期或驳回的消息. 死信交换器是一种正常的交换器,死消息会在这里重定向,如果没有指定,死消息会被broker丢弃。
你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源码文件:
  1. Producer.java
  2. Consumer.java
要尝试过期消息,你可以使用第一个代码来发送带TTL的消息,就如如何使指定队列上消息过期食谱中描述的一样.
一旦启动了,消费者不允许消息过期,但可以可以驳回消息,最终导致成为死消息。
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
下面的步骤展示了使用死信交换器来管理过期或驳回消息:
1. 创建一个工作交换品节和死信交换器:
channel.exchangeDeclare(Constants.exchange, "direct", false);
channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
2. 创建使用使用死信交换器和 x-message-ttle参数的队列:
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange",exchange_dead_letter);
channel.queueDeclare(queue, false, false, false,arguments);
3. 然后像下面这样绑定队列:
channel.queueBind(queue, exchange, "");
4. 最后使用channel.basicPublish()来向交换器发送消息 .
5. 要尝试驳回消息,我们需要配置一个消费者,就像前面例子中看到的一样,并使用下面的代码来驳回消息:
basicReject(envelope.getDeliveryTag(), false);
如何工作
我们先从第一个场景开始(单独使用producer): the expired messages. 在步骤中,我们创建两个交换器,工作交换器和死信交换器。在步骤2中,我们使用下面两个可选参数来创建队列:
  1. 使用arguments.put("x-message-ttl", 10000)来设置消息TTL ,正如如何使指定队列上消息过期食谱中描述的一样.
  2. 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)来设置死信交换器名称;
正如你所看到的,我们只是在配置中添加了可选的队列参数。因此,当生产者发送消息到交换器时,它会队列参数来路由。消息会在10秒后过期,之后它会重定向到exchange_dead_letter 
TIP
死信交换器是一个标准的交换器,因此你可以基于任何目的来使用.
对于第二种场景,食谱的消费者会驳回消息.当消费者得到消息后, 它会使用basicReject()方法来发回一个否定应答(nack),当broker收到nack时,它会将消息重定向到exchange_dead_letter. 通过在死信交换器上绑定队列,你可以管理这些消息。
当消息重定向到死信队列时,broker会修改header消息,并在x-dead键中增加下面的值:
  1. reason : 表示队列是否过期的或驳回的(requeue =false )
  2. queue : 表示队列源,例如stat_queue_02/05
  3. time : 表示消息变为死信的日期和时间
  4. exchange : 表示交换器,如monitor_exchange_02/05
  5. routing-keys : 表示发送消息时原先使用的路由键
要在实践中查看这些值,你可使用GetOneDeadLetterQ 类.这可以创建queue_dead_letter队列并会绑定到exchange_dead_letter 
更多
你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")来指定死信路由键 ,它将会代替原来的路由键.这也就意味着你可以用不同的路由键来将不同消息路由到同一个队列中。相当棒。
理解交替交换器扩展
目前,在第1章使用 AMQP中我们已经展示了如何来处理未路由消息(消息发布到了交换器,但未能达到队列). AMQP让生产者通过此条件进行应答,并最终决定是否有需要再次将消息分发到不同的目的地。通过这种扩展,我们可在broker中指定一个交替交换器来路由消息,而并不会对生产者造成更多的干预,本食谱的代码在Chapter02/Recipe05/Java/src/rmqexample .

准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
在本食谱中,我们会在Producer.java中声明交替交换器.
1. 将交换器的名字(无目的地路由消息)-alternateExchange ,放到可选参数map的"alternate-exchange"中,如下所示:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange", alternateExchange);
2. 通过传递arguments map来声明交换器来发送消息:
channel.exchangeDeclare(exchange, "direct", false, false,arguments);
3. 声明alternateExchange自身(已经在步骤1中指定了),如下所示:
channel.exchangeDeclare(alternateExchange, "direct",false);
4. 声明标准化持久化队列,并使用路由键alertRK将其绑定到alternateExchange交换器中:
channel.queueDeclare(missingAlertQueue, true, false, false,null);
channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
如何工作
在这个例子中,我们再次使用了生成统计数据的producer,正如先前的例子一样.但这次,我们添加了路由键来让producer指定一个重要的级别,名为infoRK或alertRK (在例子中是随机分配的).如果你运行一个producer以及至少一个consumer,将不会丢失任何消息,并且一切都会正常工作.
TIP
Consumers在交换器和队列的声明中,必须传递相同的可选参数,否则会抛出异常。
但如果没有消费者监听的话,而我们不想丢失报警的话,这就是为什么必须选择让producer创建alternateExchange (步骤3)并将其绑定到持久队列-missingAlertQueue的原因 (步骤4).
在单独运行producer的时候,你将看到报警存储在这里.alternate交换器让我们在不丢失消息的情况下可以路由消息.你可通过调用rabbitmqctllist_queues或运行CheckAlerts.java来检查状态 .
最后的代码让我们可以查看队列的内容和第一个消息,但不会进行消费。完成这种行为是简单的,它足可以避免这种事实:RabbitMQ client发送了ack,消息未消费,而只是进行监控。
现在,如果我们再次运行Consumer.java文件,它会从missingAlertQueue队列中获取并消费消息.这不是自动的,我们可以选择性地从此队列中获取消息。
通过创建第二个消费者实例( missingAlertConsumer ) 并使用相同的代码来从两个不同队列消费消息就可以完成这种效果。如果在处理实时消息时,想要得到不同的行为,那么我们可以创建一个不同的消费者。

更多
在这个例子中,步骤3和步骤4是可选的。 当定义交换器时,可为交替交换器指定名称,对于其是否存在或是否绑定到任何队列上,并不作强制要求 。如果交替交换器不存在,生产者可通过在丢失消息上设置mandatory标志来得到应答,就如在第1章中处理未路由消息食谱中看到的一样。
甚至有可能出现另一种交换器-它自己的备用交换器,备用交换器可以是链式的,并且无目的地消息在按序地重试,直到找到一个目的地。
如果在交换器链的末尾仍然没有找到目的地,消息将会丢失,生产者可通过调设置mandatory 标志和指定一个合适的ReturnListener参数得到通知。
理解经过验证的user-ID扩展
依据AMQP, 当消费者得到消息时,它是不知道发送者信息的。一般说来,消费者不应该关心是谁生产的消息,对于生产者-消费者解藕来说是相当有利的。然而,有时出于认证需要,为了达到此目的,RabbitMQ 提供了有效的user-ID扩展。
在本例中,我们使用有效user-IDs模拟了订单。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源码.
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。
如何做
完成下面的步骤,以使用经过验证的user IDs来模拟订单:
1. 像下面一样声明或使用持久化队列:
channel.queueDeclare(queue, true, false, false, null);
2.发送消息时,使用BasicProperties对象,在消息头中指定一个user ID:
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest");
channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
3. 消费者获取到订单后,可像下面这样打印订单数据和消息头:
System.out.println("The message has been placed by "+properties.getUserId());
如何工作
当设置了user-ID时,RabbitMQ 会检查是否是同一个用户打开的连接。在这个例子中,用户是guest ,即RabbitMQ默认用户.
通过调用properties.getUserId() 方法,消费者可以访问发送者user ID。如果你想在步骤2中设置非当前用户的userId,channel.basicPublish()会抛出异常.
TIP
如果不使用user-ID属性,用户将是非验证的,properties.getUserId()方法会返回null.
也可参考
要更好的理解这个例子,你应该知道用户和虚拟机管理,这部分内容将在下个章节中讲解。在下个章节中,我们将了解如何通过在应用程序中使用SSL来提高程序的安全性。只使用user-ID属性,我们可保证用户已认证,但所有信息都是未加密的,因此很容易暴露。
队列失败时通知消费者

根据AMQP标准,消费者不会得到队列删除的通知。一个正在删除队列上等待消息的消费者不会收到任何错误信息,并会无限期地等待。然而,RabbitMQ client提供了一种扩展来让消息收到一个cancel参数-即消费者cancel通知。我们马上就会看到这个例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代码.

准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样。

如何做
为了能让扩展工作,你只需要执行下面的步骤:
1.在自定义的消费者中覆盖handleCancel()方法,可继承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
public void handleCancel(String consumerTag) throws IOException {
...
}
如何工作
在我们的例子中,我们选择实现一个消费者,这个消费者只在生产者是持久化的,且队列是由生产者创建的情况下才能工作。
因此,如果队列是非持久化的,Consumer.java文件会立即错误退出. 此行为可以通过调用channel.queueDeclarePassive()来完成 .
Producer.java类在其启动时会创建队列,并在其关闭时调用channel.queueDelete()方法删除队列,如果当队列关闭时,而消费者正在消费队列,那么RabbitMQ client会调用步骤1中覆盖的handleCancel()方法来立即通知消费者。
相对于显示调用channel.basicCancel() 消费者使用handleCancel()方法可以任意理由来退出。只有在这种情况下,RabbitMQ client library会调用Consumer接口的方法:  handleCancelOK() 
更多
消费者cancel通知是client library的扩展,而不是AMQP client libraries的常规方法.一个实例它们的library必须将其声明为可选属性(参考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
RabbitMQ client library 支持并声明了这种特性。
也可参考
在集群中,如果一个节点失效了,也会发生同样的事情:client在队列删除后仍然得不到通知,除非它定义了覆盖了自己的handleCancel()方法。关于这点的更多信息,可参考Chapter 6,开发可伸缩性应用程序。
理解交换器到交换器扩展
默认情况下,AMQP支持交换器到队列,但不支持交换器到交换器绑定。在本例中,我们将展示如何使用RabbitMQ 交换机到交换机扩展.
在本例中,我们将合并来自两个不同交换器的消息到第三个交换器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源码.
准备
为了使用本食谱,我们需要设置Java开发环境,如第1章节(使用AMQP)介绍章节中说明的一样,并像广播消息食谱中来运行生产者以及使用topic交换器来处理消息路由。
如何做
完成下面的步骤来使用RabbitMQ 交换器到交换器扩展:
1. 使用下面的代码来声明我们需要追踪消息的交换器:
channel.exchangeDeclare(exchange, "topic", false);
2. 使用exchangeBind()来绑定其它例子中的交换器 :
channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
3. 启动追踪消费者:
TraceConsumer consumer = new TraceConsumer(channel);
String consumerTag = channel.basicConsume(myqueue, false,consumer);
如何工作
在步骤1中,我们创建了一个新的交换器,在步骤2中我们绑定到了下面的交换器:
  1. ref_exchange_c1_6 (广播消息) 与exchange绑定.
  2. ref_exchange_c1_8 (使用topic来处理消息路由)与exchange绑定 .
在步骤3中, 消费者可以绑定一个队列到exchange上以任意地获取所有消息.
交换器到交换器扩展的工作方式与交换器到队列绑定过程类似,你也可以指定一个路由键来过滤消息.在步骤2中,我们可以使用#(匹配所有消息)来作为路由键。通过改变路由键你可以使用制作一个filter!
在消息中内嵌消息目的地
在本例子中,我们会展示如何发送单个发布带路由键的的消息.标准AMQP不提供此特性,但幸运的是,RabbitMQ使用消息属性header提供了此特性. 这种扩展称为sender-selected分发.
此扩展的行为类似于电子邮件逻辑.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).这也是为什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
  1. Producer.java
  2. Consumer.java
  3. StatsConsumer.java
  4. CCStatsConsumer.java
  5. BCCStatsConsumer.java
准备
To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
如何做
完成下面的步骤来使用单个发布带路由键的的消息:
1. 使用下面的代码来创建或声明交换器:
channel.exchangeDeclare(exchange, "direct", false);
2. 在消息的header属性中指定CC , BCC路由键:
List<String> ccList = new ArrayList<String>();
ccList.add(backup_alert_routing_key);
headerMap.put("CC", ccList);
List<String> ccList = new ArrayList<String>();
bccList.add(send_alert_routing_key);
headerMap.put("BCC", bccList);
BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
3. 使用下面的三个路由键来绑定三个队列three queues to the exchange using the following three routing keys:
channel.queueBind(myqueue,exchange, alert_routing_key);
channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
4. 使用三个消费者来消费消息
如何工作
当生产者使用CC和BCC消息属性来发送消息时,broker会在所有路由键的队列上拷贝消息 。在本例中,stat类会直接使用路由键alert_routing_key来向交换器发送消息,同时它也会将消息拷贝到使用CC和BCC参数信息来将消息拷贝到myqueueCC_BK,myqueueBCC_SA队列中。
当像e-mails一样发生时,在分发消息到队列前,BCC信息会被broker从消息头中删除,你可查看所有我们示例消费者的输出来观察这种行为。
更多
正常情况下,AMQP不会改变消息头,但BCC扩展是例外。这种扩展可减少发往broker的消息数目。没有此扩展,生产者只能使用不同的路由键来发送多个消息的拷贝。
posted @ 2016-06-05 19:51 胡小军 阅读(1325) | 评论 (0)编辑 收藏

概述

RabbitMQ Java client 将com.rabbitmq.client作为其顶层包. 关键类和接口有:

  • Channel
  • Connection
  • ConnectionFactory
  • Consumer
协议操作可通过Channel接口来进行.Connection用于开启channels,注册connection生命周期事件处理, 并在不需要时关闭connections.
Connections是通过ConnectionFactory来初始化的,在ConnectionFactory中,你可以配置不同的connection设置,如:虚拟主机和用户名等等.

Connections 和 Channels

核心API类是Connection和Channel, 它们代表对应AMQP 0-9-1 connection 和 channel. 在使用前,可像下面这样来导入:

import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

连接到broker

下面的代码会使用给定的参数连接到AMQP broker:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

也可以使用URIs 来设置连接参数:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();


Connection 接口可用来打开一个channel:

Channel channel = conn.createChannel(); 

channel现在可用来发送和接收消息,正如后续章节中描述的一样.

要断开连接,只需要简单地关闭channel和connection:

channel.close(); conn.close();

关闭channel被认为是最佳实践,但在这里不是严格必须的 - 当底层连接关闭的时候,channel也会自动关闭.

使用 Exchanges 和 Queues

采用交换器和队列工作的客户端应用程序,是AMQP高级别构建模块。在使用前,必须先声明.声明每种类型的对象都需要确保名称存在,如果有必要须进行创建.

继续上面的例子,下面的代码声明了一个交换器和一个队列,然后再将它们进行绑定.

channel.exchangeDeclare(exchangeName, "direct", true); 
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这实际上会声明下面的对象,它们两者都可以可选参数来定制. 在这里,它们两个都没有特定参数。

  1. 一个类型为direct,且持久化,非自动删除的交换器
  2. 采用随机生成名称,且非持久化,私有的,自动删除队列

上面的函数然后使用给定的路由键来绑定队列和交换器.

注意,当只有一个客户端时,这是一种典型声明队列的方式:它不需要一个已知的名称,其它的客户端也不会使用它(exclusive),并会被自动清除(autodelete).
如果多个客户端想共享带有名称的队列,下面的代码应该更适合:

channel.exchangeDeclare(exchangeName, "direct", true); 
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这实际上会声明:

  1. 一个类型为direct,且持久化,非自动删除的交换器
  2. 一个已知名称,且持久化的,非私有,非自动删除队列

注意,Channel API 的方法都是重载的。这些 exchangeDeclarequeueDeclare 和queueBind 都使用的是预设行为.
这里也有更多参数的长形式,它们允许你按需覆盖默认行为,允许你完全控制。


发由消息

要向交换器中发布消息,可按下面这样来使用Channel.basicPublish方法:

byte[] messageBodyBytes = "Hello, world!".getBytes(); 
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好的控制,你可以使用重载方法来指定mandatory标志,或使用预先设置的消息属性来发送消息:

channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

这会使用分发模式2(持久化)来发送消息, 优先级为1,且content-type 为"text/plain".你可以使用Builder类来构建你自己的消息属性对象:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

下面的例子使用自定义的headers来发布消息:

Map<String, Object> headers = new HashMap<String, Object>(); 
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

下面的例子使用expiration来发布消息:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

BasicProperties is an inner class of the autogenerated holder class AMQP.

Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

Channels 和并发考虑(线程安全性)

Channel 实例不能在多个线程间共享。应用程序必须在每个线程中使用不同的channel实例,而不能将同个channel实例在多个线程间共享。 有些channl上的操作是线程安全的,有些则不是,这会导致传输时出现错误的帧交叉。
在多个线程共享channels也会干扰Publisher Confirms.

通过订阅来来接收消息

import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

接收消息最高效的方式是用Consumer接口来订阅。当消息到达时,它们会自动地进行分发,而不需要显示地请求

当在调用Consumers的相关方法时, 个别订阅总是通过它们的consumer tags来确定的, consumer tags可通过客户端或服务端来生成,参考 the AMQP specification document.
同一个channel上的消费者必须有不同的consumer tags.

实现Consumer的最简单方式是继承便利类DefaultConsumer.子类可通过在设置订阅时,将其传递给basicConsume调用:

boolean autoAck = false; 
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
@Override
publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});

在这里,由于我们指定了autoAck = false,因此消费者有必要应答分发的消息,最便利的方式是在handleDelivery 方法中处理.

更复杂的消费者可能需要覆盖更多的方法,实践中,handleShutdownSignal会在channels和connections关闭时调用,handleConsumeOk 会在其它消费者之前

调用
,传递consumer tag(不明白,要研究)。

 

消费者可实现handleCancelOk 和 handleCancel方法来接收显示和隐式取消操作通知。

你可以使用Channel.basicCancel来显示地取消某个特定的消费者:

channel.basicCancel(consumerTag);

passing the consumer tag.

消费者回调是在单独线程上处理的,这意味着消费者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上调用阻塞方法。

每个Channel都有其自己的dispatch线程.对于一个消费者一个channel的大部分情况来说,这意味着消费者不会阻挡其它的消费者。如果在一个channel上多个消费者,则必须意识到长时间运行的消费者可能阻挡此channel上其它消费者回调调度.

获取单个消息

要显示地获取一个消息,可使用Channel.basicGet.返回值是一个GetResponse实例, 在它之中,header信息(属性) 和消息body都可以提取:

boolean autoAck = false; 
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

因为autoAck = false,你必须调用Channel.basicAck来应答你已经成功地接收了消息:

channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

处理未路由消息

如果发布消息时,设置了"mandatory"标志,但如果消息不能路由的话,broker会将其返回到发送客户端 (通过 AMQP.Basic.Return 命令).

要收到这种返回的通知, clients可实现ReturnListener接口,并调用Channel.setReturnListener.如果channel没有配置return listener,那么返回的消息会默默地丢弃。

channel.setReturnListener(new ReturnListener() {     
    publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
...
    }
});

 return listener将被调用,例如,如果client使用"mandatory"标志向未绑定队列的direct类型交换器发送了消息.

关闭协议

AMQP client 关闭概述

AMQP 0-9-1 connection和channel 使用相同的方法来管理网络故障,内部故障,以及显示本地关闭.

AMQP 0-9-1 connection  和 channel 有如下的生命周期状态:

  • open: 准备要使用的对象
  • closing: 对象已显示收到收到本地关闭通知, 并向任何支持的底层对象发出关闭请求,并等待其关闭程序完成
  • closed: 对象已收到所有底层对象的完成关闭通知,最终将执行关闭操作

这些对象总是以closed状态结束的,不管基于什么原因引发的关闭,比如:应用程序请求,内部client library故障, 远程网络请求或网络故障.

AMQP connection 和channel 对象会持有下面与关闭相关的方法:

  • addShutdownListener(ShutdownListener 监听器)和removeShutdownListener(ShutdownListener 监听器),用来管理监听器,当对象转为closed状态时,将会触发这些监听器.注意,在已经关闭的对象上添加一个ShutdownListener将会立即触发监听器
  • getCloseReason(), 允许同其交互以了解对象关闭的理由
  • isOpen(), 用于测试对象是否处于open状态
  • close(int closeCode, String closeMessage), 用于显示通知对象关闭

可以像这样来简单使用监听器:

import com.rabbitmq.client.ShutdownSignalException; 
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) { ... } }
);

关闭环境信息

可通过显示调用getCloseReason()方法或通过使用ShutdownListener类中的业务方法的cause参数来ShutdownSignalException中获取关闭原因的有用信息.

ShutdownSignalException 类提供方法来分析关闭的原因.通过调用isHardError()方法,我们可以知道是connection错误还是channel错误.getReason()会返回相关cause的相关信息,这些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引发的异常,如网络通信故障,在这种情况下,可通过getCause()方法来获取信息).

public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication()) {
Method reason = cause.getReason(); ... } ... }
else { Channel ch = (Channel)cause.getReference(); ... } }

原子使用isOpen()方法

channel和connection对象的isOpen()方法不建议在在生产代码中使用,因为此方法的返回值依赖于shutdown cause的存在性. 下面的代码演示了竟争条件的可能性:

public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

相反,我们应该忽略这种检查,并简单地尝试这种操作.如果代码执行期间,connection的channel关闭了,那么将抛出ShutdownSignalException,这就表明对象处于一种无效状态了.当broker意外关闭连接时,我们也应该捕获由SocketException引发的IOException,或者当broker清理关闭时,捕获ShutdownSignalException.

public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

高级连接选项

Consumer线程池

Consumer 线程默认是通过一个新的ExecutorService线程池来自动分配的(参考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的线程池.下面的示例展示了一个比正常分配稍大的线程池:

ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
Executors 和 ExecutorService 都是java.util.concurrent包中的类.

当连接关闭时,默认的ExecutorService将会被shutdown(), 但用户自定义的ExecutorService (如上面所示)将不会被shutdown(). 提供自定义ExecutorService的Clients必须确保最终它能被关闭(通过调用它的shutdown() 方法), 否则池中的线程可能会阻止JVM终止.

同一个executor service,可在多个连接之间共享,或者连续地在重新连接上重用,但在shutdown()后,则不能再使用.

使用这种特性时,唯一需要考虑的是:在消费者回调的处理过程中,是否有证据证明有严重的瓶颈. 如果没有消费者执行回调,或很少,默认的配置是绰绰有余. 开销最初是很小的,分配的全部线程资源也是有界限的,即使偶尔可能出现一阵消费活动.

使用Host列表

可以传递一个Address数组给newConnection()Address只是 com.rabbitmq.client 包中包含host和port组件的简单便利类. 例如:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
将会尝试连接hostname1:portnumber1, 如果不能连接,则会连接hostname2:portnumber2,然后会返回数组中第一个成功连接(不会抛出IOException)上broker的连接.这完全等价于在factory上重复调用factory.newConnection()方法来设置host和port, 直到有一个成功返回.

如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么线程池将与第一个成功连接相关联.

心跳超时

参考Heartbeats guide 来了解更多关于心跳及其在Java client中如何配置的更多信息.

自定义线程工厂

像Google App Engine (GAE)这样的环境会限制线程直接实例化. 在这样的环境中使用RabbitMQ Java client, 需要配置一个定制的ThreadFactory,即使用合适的方法来实例化线程,如: GAE's ThreadManager. 下面是Google App Engine的相关代码.

import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

网络故障时自动恢复

Connection恢复

clients和RabbitMQ节点之间的连接可发生故障. RabbitMQ Java client 支持连接和拓扑(queues, exchanges, bindings, 和consumers)的自动恢复. 大多数应用程序的连接自动恢复过程会遵循下面的步骤:

  1. 重新连接
  2. 恢复连接监听器
  3. 重新打开通道
  4. 恢复通道监听器
  5. 恢复通道basic.qos 设置,发布者确认和事务设置
拓扑恢复包含下面的操作,每个通道都会执行下面的步骤:
  1. 重新声明交换器(except for predefined ones)
  2. 重新声明队列
  3. 恢复所有绑定
  4. 恢复所有消费者
要启用自动连接恢复,须使用factory.setAutomaticRecoveryEnabled(true):
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
如果恢复因异常失败(如. RabbitMQ节点仍然不可达),它会在固定时间间隔后进行重试(默认是5秒). 时间间隔可以进行配置:
ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
当提供了address列表时,将会在所有address上逐个进行尝试:
ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

恢复监听器

可在可恢复连接和通道上注册一个或多个恢复监听器. 当启用了连接恢复时,ConnectionFactory#newConnection 和 Connection#createChannel 的连接已实现了com.rabbitmq.client.Recoverable,并提供了两个方法:

  • addRecoveryListener
  • removeRecoveryListener
注意,在使用这些方法时,你需要将connections和channels强制转换为Recoverable.

发布影响

当连接失败时,使用Channel.basicPublish方法发送的消息将会丢失. client不会保证在连接恢复后,消息会得到分发.要确保发布的消息到达了RabbitMQ,应用程序必须使用Publisher Confirms 


拓扑恢复

拓扑恢复涉及交换器,队列,绑定以及消费者恢复.默认是启用的,但也可以禁用:

ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

手动应答和自动恢复

当使用手动应答时,在消息分发与应答之间可能存在网络连接中断. 在连接恢复后,RabbitMQ会在所有通道上重设delivery标记. 也就是说,使用旧delivery标记的basic.ackbasic.nack, 以及basic.reject将会引发channel exception. 为了避免发生这种情况, RabbitMQ Java client可以跟踪,更新,以使它们在恢复期间单调地增长. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以转换这些 delivery tags,并且不再发送过期的delivery tags. 使用手动应答和自动恢复的应用程序必须负责处理重新分发.

未处理异常

关于connection, channel, recovery, 和consumer lifecycle 的异常将会委派给exception handler,Exception handler是实现了ExceptionHandler接口的任何对象. 默认情况下,将会使用DefaultExceptionHandler实例,它会将异常细节输出到标准输出上.

可使用ConnectionFactory#setExceptionHandler来覆盖原始handler,它将被用于由此factory创建的所有连接:

ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
Exception handlers 应该用于异常日志.

Google App Engine上的RabbitMQ Java Client

在Google App Engine (GAE) 上使用RabbitMQ Java client,必须使用一个自定义的线程工厂来初始化线程,如使用GAE's ThreadManager. 此外,还需要设置一个较小的心跳间隔(4-5 seconds) 来避免InputStream 上读超时:

ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

警告和限制

为了能使拓扑恢复, RabbitMQ Java client 维持了声明队列,交换器,绑定的缓存. 缓存是基于每个连接的.某些RabbitMQ的特性使得客户端不能观察到拓扑的变化,如,当队列因TTL被删除时. RabbitMQ Java client 会尝试在下面的情况中使用缓存实体失效:

  • 当队列被删除时.
  • 当交换器被删除时.
  • 当绑定被删除时.
  • 当消费者在自动删除队列上退出时.
  • 当队列或交换器不再绑定到自动删除的交换器上时.
然而, 除了单个连接外,client不能跟踪这些拓扑变化. 依赖于自动删除队列或交换器的应用程序,正如TTL队列一样 (注意:不是消息TTL!), 如果使用了自动连接恢复,在知道不再使用或要删除时,必须明确地删除这些缓存实体,以净化 client-side 拓扑cache. 这些可通过Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind来进行.

RPC (Request/Reply) 模式

为了便于编程, Java client API提供了一个使用临时回复队列的RpcClient类来提供简单的RPC-style communication.

此类不会在RPC参数和返回值上强加任何特定格式. 它只是简单地提供一种机制来向带特定路由键的交换器发送消息,并在回复队列上等待响应.

import com.rabbitmq.client.RpcClient;  
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(其实现细节为:请求消息使用basic.correlation_id唯一值字段来发送消息,并使用basic.reply_to来设置响应队列的名称.)

一旦你创建此类的实例,你可以使用下面的任意一个方法来发送RPC请求:

byte[] primitiveCall(byte[] message); 
String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

primitiveCall 方法会将原始byte数组转换为请求和响应的消息体. stringCall只是一个primitiveCall的简单包装,将消息体视为带有默认字符集编码的String实例.

mapCall 变种稍为有些复杂: 它会将原始java值包含在java.util.Map中,并将其编码为AMQP 0-9-1二进制表示形式,并以同样的方式来解码response. (注意:在这里,对一些值对象类型有所限制,具体可参考javadoc.)

所有的编组/解组便利方法都使用primitiveCall来作为传输机制,其它方法只是在它上面的做了一个封装.

posted @ 2016-06-04 00:37 胡小军 阅读(15614) | 评论 (1)编辑 收藏
     摘要: 在本章中,将包含下面的内容:连接中间件生产消息消费消息使用JSON序列化消息使用RPC消息  广播消息使用direct交换器来处理消息路由使用topic交换器来处理消息路由保证消息处理分发消息到多个消费者使用消息属性事务消息处理未路由消息要运行本章内的示例,你需要首先:安装Java JDK 1.6+安装Java RabbitMQ client library正确地配置CLASS...  阅读全文
posted @ 2016-06-03 23:22 胡小军 阅读(2588) | 评论 (0)编辑 收藏