原文:
http://www.rabbitmq.com/memory.html
RabbitMQ服务器在启动时以及abbitmqctl set_vm_memory_high_watermark fraction 执行时,会检查计算机的RAM总大小. 默认情况下下, 当 RabbitMQ server 的使用量超过RAM的40% ,它就会发出内存警报,并阻塞所有连接. 一旦内存警报清除 (如,服务器将消息转存于磁盘,或者将消息投递给clients),服务又地恢复.
默认内存阀值设置为已安装RAM的40%. 注意这并不会阻止RabbitMQ server使用内存量超过40%, 它只是为了压制发布者. Erlang的垃圾回收器最坏情况下,可使用配置内存的2倍(默认情况下t, RAMr的80%). 因此强制建议开启OS swap或page files .
32位架构倾向于每一个进程有2GB的内存限制. 64位架构的一般实现(i.e. AMD64 和 Intel EM64T) 只允许每个进程为256TB. 64-位 Windows 限制为8TB. 但是,请注意,即使是64位操作系统下,一个32位的过程往往只有一个2GB的最大地址空间。
配置内存阀值
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
默认值0.4 代表的是已安装RAM的 40% , 有时候还更小.如:在 32位平台中,如果你安装有4GB RAM , 4GB 的40% 是 1.6GB, 但是 32-位 Windows 正常情况下限制进程为2GB,因此实际阀值是2GB的40% (即820MB).
另外, 内存阀值也可以设置为绝对值. 下面的例子将阀值设为了1073741824 字节 (1024 MB):
[{rabbit, [{vm_memory_high_watermark, {absolute, 1073741824}}]}].
同例, 也可使用内存单位:
[{rabbit, [{vm_memory_high_watermark, {absolute, "1024MiB"}}]}].
如果绝对上限大于了安装的RAM可用的虚拟地址空间, 阀值上限会略小.
当RabbitMQ服务器启动时,内存限制将追加到RABBITMQ_NODENAME.log 文件中:
=INFO REPORT==== 29-Oct-2009::15:43:27 === Memory limit set to 2048MB.
内存限制也可以使用rabbitmqctl status命令查询.
其阀值也可以在broker运行时,通过rabbitmqctl set_vm_memory_high_watermark fraction 命令或 rabbitmqctl set_vm_memory_high_watermark absolute memory_limit 命令修改. 内存单位也可以在命令中使用. 此命令会在broker重启后生效. 当执行此命令时,内存限制可能会改变热插拔RAM,而不会发生报警,这是因为需要全部数量的系统RAM.
禁止所有发布
其值为0时,会立即触发报警并禁用所有发布 (当需要禁用全局发布时,这可能是有用的); use rabbitmqctl set_vm_memory_high_watermark 0.
限制的地址空间
当在64位操作系统中运行32位 Erlang VM时,(or a 32 bit OS with PAE), 可用地址内存是受限制的. 服务器探测到后会记录像下边的日志消息:
=WARNING REPORT==== 19-Dec-2013::11:27:13 === Only 2048MB of 12037MB memory usable due to limited address space. Crashes due to memory exhaustion are possible - see http://www.rabbitmq.com/memory.html#address-space
内存报警系统是不完美的.虽然停止发布通常会防止任何进一步的内存使用,但可能有其他东西继续增加内存使用。通常情况下,当这种情况发生时,物理内存耗尽,操作系统将开始交换。但是当运行一个有限的地址空间,超过限制的运行会导致虚拟机崩溃。
因此强制建议在在64位操作系统上运行64位的Erlang VM.
配置分页阈值
在broker达到最高水位阻塞发布者之前,它会尝试将队列内容分页输出到磁盘上来释放内存. 持久化和瞬时消息都会分页输出 (已经在磁盘上的持久化消息会被赶出内存).
默认情况下,在达最高水位的50%时,就会发生这种情况. (即,默认最高水位为0.4, 这会在内存使用达到20%时就会发生). 要修改此值,可修改vm_memory_high_watermark_paging_ratio 配置的0.5默认值. 例如:
[{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.4}]}].
上面的配置表示在内存使用达到30%时,就会启动,40%的时候会阻塞发布者.
也可以将vm_memory_high_watermark_paging_ratio 值设为大于1.0的值.在这种情况下,队列不会把它的内容分页到磁盘上.如果这引起了内存报警关闭,那么生产者会如上面预期的一样被阻塞.
未确认的平台
如果RabbitMQ服务器不能识别你的系统,它将在RABBITMQ_NODENAME.log 文件中追加警告.
然后它会假设安装了超过了1GB的RAM:
=WARNING REPORT==== 29-Oct-2009::17:23:44 === Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.
在这种情况下,vm_memory_high_watermark 配置值假设为1GB RAM. 在 vm_memory_high_watermark 默认设为 0.4的情况下, RabbitMQ的内存阀值设为了410MB, 因此当RabbitMQ使用了多于410M内存时,它会阻塞生产者.因此当RabbitMQ不能识别你的平台时,如果你实际有8GB RAM,并且你想让RabbitMQ内存使用量超过3GB阻塞生产者,你可以设置vm_memory_high_watermark为3.
posted @
2016-07-30 15:05 胡小军 阅读(7937) |
评论 (0) |
编辑 收藏
鸟欲高飞先振翅,人求上进先读书。本文是原书的第9章 线程的监控及其日常工作中如何分析里的9.3.3节常见的内存溢出的三种情况。
3. 常见的内存溢出的三种情况:
1)JVM Heap(堆)溢出:java.lang.OutOfMemoryError: Java heap space
JVM在启动的时候会自动设置JVM Heap的值, 可以利用JVM提供的-Xmn -Xms -Xmx等选项可进行设置。Heap的大小是Young Generation 和Tenured Generaion 之和。在JVM中如果98%的时间是用于GC,且可用的Heap size 不足2%的时候将抛出此异常信息。
解决方法:手动设置JVM Heap(堆)的大小。
2)PermGen space溢出: java.lang.OutOfMemoryError: PermGen space
PermGen space的全称是Permanent Generation space,是指内存的永久保存区域。为什么会内存溢出,这是由于这块内存主要是被JVM存放Class和Meta信息的,Class在被Load的时候被放入PermGen space区域,它和存放Instance的Heap区域不同,sun的 GC不会在主程序运行期对PermGen space进行清理,所以如果你的APP会载入很多CLASS的话,就很可能出现PermGen space溢出。一般发生在程序的启动阶段。
解决方法: 通过-XX:PermSize和-XX:MaxPermSize设置永久代大小即可。
3)栈溢出: java.lang.StackOverflowError : Thread Stack space
栈溢出了,JVM依然是采用栈式的虚拟机,这个和C和Pascal都是一样的。函数的调用过程都体现在堆栈和退栈上了。调用构造函数的 “层”太多了,以致于把栈区溢出了。 通常来讲,一般栈区远远小于堆区的,因为函数调用过程往往不会多于上千层,而即便每个函数调用需要 1K的空间(这个大约相当于在一个C函数内声明了256个int类型的变量),那么栈区也不过是需要1MB的空间。通常栈的大小是1-2MB的。通俗一点讲就是单线程的程序需要的内存太大了。 通常递归也不要递归的层次过多,很容易溢出。
解决方法:1:修改程序。2:通过 -Xss: 来设置每个线程的Stack大小即可。
4. 所以Server容器启动的时候我们经常关心和设置JVM的几个参数如下(详细的JVM参数请参看附录三):
-Xms:java Heap初始大小, 默认是物理内存的1/64。
-Xmx:ava Heap最大值,不可超过物理内存。
-Xmn:young generation的heap大小,一般设置为Xmx的3、4分之一 。增大年轻代后,将会减小年老代大小,可以根据监控合理设置。
-Xss:每个线程的Stack大小,而最佳值应该是128K,默认值好像是512k。
-XX:PermSize:设定内存的永久保存区初始大小,缺省值为64M。
-XX:MaxPermSize:设定内存的永久保存区最大大小,缺省值为64M。
-XX:SurvivorRatio:Eden区与Survivor区的大小比值,设置为8,则两个Survivor区与一个Eden区的比值为2:8,一个Survivor区占整个年轻代的1/10
-XX:+UseParallelGC:F年轻代使用并发收集,而年老代仍旧使用串行收集.
-XX:+UseParNewGC:设置年轻代为并行收集,JDK5.0以上,JVM会根据系统配置自行设置,所无需再设置此值。
-XX:ParallelGCThreads:并行收集器的线程数,值最好配置与处理器数目相等 同样适用于CMS。
-XX:+UseParallelOldGC:年老代垃圾收集方式为并行收集(Parallel Compacting)。
-XX:MaxGCPauseMillis:每次年轻代垃圾回收的最长时间(最大暂停时间),如果无法满足此时间,JVM会自动调整年轻代大小,以满足此值。
-XX:+ScavengeBeforeFullGC:Full GC前调用YGC,默认是true。
实例如:JAVA_OPTS=”-Xms4g -Xmx4g -Xmn1024m -XX:PermSize=320M -XX:MaxPermSize=320m -XX:SurvivorRatio=6″
posted @
2016-07-26 23:05 胡小军 阅读(323) |
评论 (0) |
编辑 收藏
学习如何在你的应用程序中集成WebSockets.
Published April 2013
对于许多基于客户端-服务器程序来说,老的HTTP 请求-响应模型已经有它的局限性. 信息必须通过多次请求才能将其从服务端传送到客户端.
过去许多的黑客使用某些技术来绕过这个问题,例如:长轮询(long polling)、基于 HTTP 长连接的服务器推技术(Comet).
然而,基于标准的、双向的、客户端和服务器之间全双工的信道需求再不断增加。
在2011年, IETF发布了标准WebSocket协议-RFC 6455. 从那时起,大多数Web浏览器都实现了支持WebSocket协议的客户端APIs.同时,许多Java 包也开始实现了WebSocket协议.
WebSocket协议利用HTTP升级技术来将HTTP连接升级到WebSocket. 一旦升级后,连接就有了在两个方向上相互独立(全双式)发送消息(数据桢)的能力.
不需要headers 或cookies,这大大降低了所需的带宽. 通常,WebSockets来周期性地发送小消息 (例如,几个字节).
额外的headers常常会使开销大于有效负载(payload)。
JSR 356
JSR 356, WebSocket的Java API, 明确规定了API,当Java开发者需要在应用程序中集成WebSocket时,就可以使用此API—服务端和客户端均可. 每个声明兼容JSR 356的WebSocket协议,都必须实现这个API.
因此,开发人员可以自己编写独立于底层WebSocket实现的WebSocket应用。这是一个巨大的好处,因为它可以防止供应商锁定,并允许更多的选择、自由的库、应用程序服务器。
JSR 356是即将到来的java EE 7标准的一部分,因此,所有与Java EE 7兼容的应用服务器都有JSR 365标准WebSocket的实现.一旦建立,WebSocket客户端和服务器节点已经是对称的了。客户端API与服务器端API的区别是很小的,JSR 356定义的Java client API只是Java EE7完整API的子集.
客户段-服务器端程序使用WebSockets,通常会包含一个服务器组件和多个客户端组件, 如图1所示:
图1
在这个例子中,server application 是通过Java编写的,WebSocket 协议细节是由包含在Java EE 7容器中JSR 356 实现来处理的.
JavaFX 客户端可依赖任何与JSR 356兼容的客户端实现来处理WebSocket协议问题.
其它客户端(如,iOS 客户端和HTML5客户端)可使用其它 (非Java)与RFC6455兼容的实现来与server application通信.
编程模型
JSR 356定义的专家小组,希望支持Java EE开发人员常用的模式和技术。因此,JSR 356使用了注释和注入。
一般来说,支持两种编程模型:
- 注解驱动(annotation-driven). 通过使用注解POJOs, 开发者可与WebSocket生命周期事件交互.
- 接口驱动(interface-driven). 开发者可实现
Endpoint接口和与生命周期交互的方法.
生命周期事件
典型的WebSocket 交互生命周期如下:
- 一端 (客户端) 通过发送HTTP握手请求来初始化连接.
- 其它端(服务端) 回复握手响应.
- 建立连接.从现在开始,连接是完全对称的.
- 两端都可发送和接收消息.
- 其中一端关闭连接.
大部分WebSocket生命周期事件都与Java方法对应,不管是 annotation-driven 还是interface-driven.
Annotation-Driven 方式
接受WebSocket请求的端点可以是以 @ServerEndpoint
注解的POJO.
此注解告知容器,此类应该被认为是WebSocket端点.
必须的value
元素指定了WebSocket端点的路径.
考虑下面的代码片断:
@ServerEndpoint("/hello") public class MyEndpoint { }
此代码将会以相对路径hello来发布一个端点.在后续方法调用中,此路径可携带路径参数,如: /hello/{userid}是一个有效路径,在这里
{userid}
的值,可在生命周期方法使用@PathParam
注解获取.
在GlassFish中,如果你的应用程序是用上下文mycontextroot
部署的,且在localhost的8080端口上监听
, WebSocket可通过使用ws://localhost:8080/mycontextroot/hello来访问
.
初始化WebSocket连接的端点可以是以 @ClientEndpoint
注解的POJO.@ClientEndpoint
和 @ServerEndpoint的主要区别是
ClientEndpoint
不接受路径路值元素,因为它监听进来的请求。
@ClientEndpoint public class MyClientEndpoint {}
Java中使用注解驱动POJO方式来初始化WebSocket连接,可通过如下代码来完成:
javax.websocket.WebSocketContainer container = javax.websocket.ContainerProvider.getWebSocketContainer(); container.conntectToServer(MyClientEndpoint.class, new URI("ws://localhost:8080/tictactoeserver/endpoint"));
此后,以 @ServerEndpoint
或@ClientEndpoint
注解的类都称为注解端点.
一旦建立了WebSocket连接 ,就会创建 Session,并且会调用注解端点中以
@OnOpen注解的方法.
此方法包含了几个参数:
javax.websocket.Session
参数, 代表创建的Session
EndpointConfig
实例包含了关于端点配置的信息- 0个或多个以
@PathParam注解的
字符串参数,指的是端点路径的path参数
下面的方法实现了当打开WebSocket时,将会打印session的标识符:
@OnOpen public void myOnOpen (Session session) { System.out.println ("WebSocket opened: "+session.getId()); }
Session实例只要WebSocket未关闭就会一直有效
. Session类中包含了许多有意思的方法,以允许开发者获取更多关于的信息
。
同时,Session
也包含了应用程序特有的数据钩子,即通过getUserProperties()
方法来返回 Map<String, Object>
.
这允许开发者可以使用session-和需要在多个方法调用间共享的应用程序特定信息来填充Session
实例.
i当WebSocket端收到消息时,将会调用以@OnMessage
注解的方法.以@OnMessage
注解的方法可包含下面的参数:
javax.websocket.Session
参数.- 0个或多个以
@PathParam注解的
字符串参数,指的是端点路径的path参数 - 消息本身. 下面有可能消息类型描述.
当其它端发送了文本消息时,下面的代码片断会打印消息内容:
@OnMessage public void myOnMessage (String txt) { System.out.println ("WebSocket received message: "+txt); }
如果以@OnMessage
i注解的方法返回值不是void
, WebSocket实现会将返回值发送给其它端点.下面的代码片断会将收到的文本消息以首字母大写的形式发回给发送者:
@OnMessage public String myOnMessage (String txt) { return txt.toUpperCase(); }
另一种通过WebSocket连接来发送消息的代码如下:
RemoteEndpoint.Basic other = session.getBasicRemote(); other.sendText ("Hello, world");
在这种方式中,我们从Session
对象开始,它可以从生命周期回调方法中获取(例如,以 @OnOpen注解的方法
).session实例上getBasicRemote()
方法返回的是WebSocket其它部分的代表RemoteEndpoint
. RemoteEndpoint
实例可用于发送文本或其它类型的消息,后面有描述.
当关闭WebSocket连接时,将会调用@OnClose
注解的方法。此方法接受下面的参数:
javax.websocket.Session
参数. 注意,一旦WebSocket真正关闭了,此参数就不能被使用了,这通常发生在@OnClose
注解方法返回之后.- A
javax.websocket.CloseReason
参数,用于描述关闭WebSocket的原因,如:正常关闭,协议错误,服务过载等等. - 0个或多个以
@PathParam注解的
字符串参数,指的是端点路径的path参数
下面的代码片段打印了WebSocket关闭的原因:
@OnClose public void myOnClose (CloseReason reason) { System.out.prinlnt ("Closing a WebSocket due to "+reason.getReasonPhrase()); }
完整情况下,这里还有一个生命周期注解:如果收到了错误,将会调用 @OnError
注解的方法。
Interface-Driven 方式
annotation-driven 方式允许我们注解一个Java类,以及使用生命周期注解来注解方法.
使用interface-driven方式,开发者可继承javax.websocket.Endpoint
并覆盖其中的onOpen
, onClose
, 以及onError
方法:
public class myOwnEndpoint extends javax.websocket.Endpoint { public void onOpen(Session session, EndpointConfig config) {...} public void onClose(Session session, CloseReason closeReason) {...} public void onError (Session session, Throwable throwable) {...} }
为了拦截消息,需要在onOpen实现中注册一个javax.websocket.MessageHandler
:
public void onOpen (Session session, EndpointConfig config) { session.addMessageHandler (new MessageHandler() {...}); }
MessageHandler
接口有两个子接口: MessageHandler.Partial和
MessageHandler.Whole
.
MessageHandler.Partial
接口应该用于当开发者想要收到部分消息通知的时候,MessageHandler.Whole的实现应该用于整个消息到达通知
。
下面的代码片断会监听进来的文件消息,并将文本信息转换为大小版本后发回给其它端点:
public void onOpen (Session session, EndpointConfig config) { final RemoteEndpoint.Basic remote = session.getBasicRemote(); session.addMessageHandler (new MessageHandler.Whole<String>() { public void onMessage(String text) { try { remote.sendString(text.toUpperCase()); } catch (IOException ioe) { // handle send failure here } } }); }
消息类型,编码器,解码器
WebSocket的JavaAPI非常强大,因为它允许发送任或接收任何对象作为WebSocket消息.
基本上,有三种不同类型的消息:
- 基于文本的消息
- 二进制消息
- Pong 消息,它是WebSocket连接自身
当使用interface-driven模式,每个session最多只能为这三个不同类型的消息注册一个MessageHandler
.
当使用annotation-driven模式,针对不同类型的消息,只允许出现一个@onMessage
注解方法. 在注解方法中,消息内容中允许的参数依赖于消息类型。
Javadoc for the @OnMessage
annotation 明确指定了消息类型上允许出现的消息参数:
- "如果方法用于处理文本消息:
- 如果方法用于处理二进制消息:
- 如果方法是用于处理pong消息:
任何Java对象使用编码器都可以编码为基于文本或二进制的消息.这种基于文本或二进制的消息将转输到其它端点,在其它端点,它可以解码成Java对象-或者被另外的WebSocket 包解释.
通常情况下,XML或JSON用于来传送WebSocket消息, 编码/解码然后会将Java对象编组成XML或JSON并在另一端解码为Java对象.
encoder是以javax.websocket.Encoder
接口的实现来定义,decoder是以javax.websocket.Decoder
接口的实现来定义的.
有时,端点实例必须知道encoders和decoders是什么.使用annotation-driven方式, 可向@ClientEndpoint
和 @ServerEndpoint
l注解中的encode和decoder元素传递 encoders和decoders的列表。
Listing 1 中的代码展示了如何注册一个 MessageEncoder
类(它定义了MyJavaObject实例到文本消息的转换). MessageDecoder
是以相反的转换来注册的.
@ServerEndpoint(value="/endpoint", encoders = MessageEncoder.class, decoders= MessageDecoder.class) public class MyEndpoint { ... } class MessageEncoder implements Encoder.Text<MyJavaObject> { @override public String encode(MyJavaObject obj) throws EncodingException { ... } } class MessageDecoder implements Decoder.Text<MyJavaObject> { @override public MyJavaObject decode (String src) throws DecodeException { ... } @override public boolean willDecode (String src) { // return true if we want to decode this String into a MyJavaObject instance } }
Listing 1
Encoder
接口有多个子接口:
Encoder.Text
用于将Java对象转成文本消息Encoder.TextStream
用于将Java对象添加到字符流中Encoder.Binary
用于将Java对象转换成二进制消息Encoder.BinaryStream
用于将Java对象添加到二进制流中
类似地,Decoder
接口有四个子接口:
Decoder.Text
用于将文本消息转换成Java对象Decoder.TextStream
用于从字符流中读取Java对象Decoder.Binary
用于将二进制消息转换成Java对象Decoder.BinaryStream
用于从二进制流中读取Java对象
结论
WebSocket Java API为Java开发者提供了标准API来集成IETF WebSocket标准.通过这样做,Web 客户端或本地客户端可使用任何WebSocket实现来轻易地与Java后端通信。
Java Api是高度可配置的,灵活的,它允许java开发者使用他们喜欢的模式。
也可参考
posted @
2016-07-24 01:35 胡小军 阅读(2736) |
评论 (0) |
编辑 收藏
摘要: Servlets定义为JSR 340,可以下载完整规范.servlet是托管于servlet容器中的web组件,并可生成动态内容.web clients可使用请求/响应模式同servlet交互. servlet容器负责处理servlet的生命周期事件,接收请求和发送响应,以及执行其它必要的编码/解码部分.WebServlet它是在POJO上使用@WebServlet注...
阅读全文
posted @
2016-07-24 01:32 胡小军 阅读(851) |
评论 (0) |
编辑 收藏
在本章中,我们将涵盖下面的主题:
- 监控RabbitMQ的行为
- 使用RabbitMQ进行故障诊断
- 跟踪RabbitMQ当前活动
- 调试RabbitMQ的消息
- 当RabbitMQ重启失败时该做什么
- 使用Wireshark来调试
介绍
每当我们开发一个应用程序的时候,一种常见的做法是开发一个诊断基础设施. 这可以基于日志文件,SNMP 转移以及其它手段.
RabbitMQ提供了标准日志文件和内建消息故障诊断解决方案.
在前面三个食谱中,我们将看到如何来使用这三种特性.
某些时候,会存在一些阻止RabbitMQ启动的问题.在这种情况下,需要强制解决服务器上的问题并重启服务器。这一点,我们将在RabbitMQ重启失败时该做什么食谱中讲解。
然而,调试消息也是应用程序开发的一部分.在这种情况下,我们需要知道RabbitMQ和客户之间交换的准确信息.可以使用一个内建代理工具(Java client API的一部分) (查看调试RabbitMQ的消息食谱)或者使用高级网络监视工具来检查网络状况,正如我们将在使用Wireshark来调试食谱中看到的一样。
监控RabbitMQ行为
为了检查RabbitMQ的正常行为,有一个监控工具是很有用的,特别是在处理集群的时候.
有许多不同的工具,商业的或免费的, 这有助于让它们受控于分布式系统如Nagios、Zabbix之下.
在本食谱中,我们将展示如何配置Ganglia RabbitMQ 插件(http://sourceforge.net/apps/trac/ganglia/wiki/ganglia_quick_start).
准备
为了运行此食谱,你需要启用管理插件来配置RabbitMQ。同时,你也需要安装和配置Ganglia. 在本食谱中,我们使用的版本是3.6.0.
如何做
为了使用Ganglia监控图形来查看RabbitMQ统计数据,你需要执行下面的步骤:
1. 在你的Linux发行版上,使用yum或apt-get安装和配置Ganglia.你需要下面的包:
ganglia-gmetad
ganglia-gmond
ganglia-gmond-python
ganglia-web
ganglia
2. 从https://github.com/ganglia/gmond_python_modules/blob/master/rabbit/python_modules/rabbitmq.py拷贝Python Ganglia监控插件到/usr/lib64/ganglia/python_modules.
3. 从https://github.com/ganglia/gmond_python_modules/blob/master/rabbit/conf.d/rabbitmq.pyconf拷贝Python Ganglia 配置文件到/etc/ganglia/conf.d
4. 检查配置文件中的参数正确性.实际上, 你可能需要通过只留下默认的虚拟主机来解决以下条目:
paramvhost {
value = "/"
}
5.如果它已经运行了,使用下面的命令来重启gmond:
service gmond restart
如何工作
通过这个食谱, 你可以从Ganglia环境中来监控RabbitMQ.
TIP
对于基本的故障诊断,你可以通过日志文件来进行,默认情况下,日志文件存储于/var/log/rabbitmq.
一旦它运行起来,你就可以从同一个web界面中,看到系统信息和RabbitMQ节点、队列的相关信息,如下面的截图所示:
更多
Ganglia是集群监测一种广泛的解决方案,但不是唯一的一个。其它的解决方案还包括Nagios (www.nagios.org), Zabbix (www.zabbix.com), 以及Puppet (puppetlabs.com).
通过RabbitMQ排除故障
正如前面食谱提到的, 我们可以通过一种便利的方式,即以日志文件的方式来监控RabbitMQ行为.
也可以使用RabbitMQ自身来访问同种信息, 通过向AMQP client通知broker的活动.
准备
要运行本食谱,我们需要运行RabbitMQ以及Java client library.
如何做
为了消费日志消息,你可在Consumer.java中执行主方法.你可在Chapter12/Recipe02/Java/src/rmqexample中找到源友. 下面,我们将高亮主要步骤:
1. 创建一个临时匿名队列,并将其绑定到AMQP log交换器:
String tmpQueue = channel.queueDeclare().getQueue();
channel.queueBind(tmpQueue,"amq.rabbitmq.log","#");
2. 在消费者回调(ActualConsumer.java)中,可以检索消息和每个消息的路由键,并将它们打印出来:
String routingKey = envelope.getRoutingKey();
String message = new String(body);
System.out.println(routingKey + ": " + message);
3. 此时,你可以在broker上执行任何RabbitMQ操作,并且你会看到日志输出到标准输出上。
如何工作
RabbitMQ log交换器amq.rabbitmq.log是一个topic交换器,RabbitMQ自身用来发布其日志消息.
在我们的示例代码中,我们使用#通配符来消费所有topics的消息.
例如,通过运行另一份代码,我们可运行同一个broker的两个连接,然后中断它,我们将下面的输出:
info: accepting AMQP connection <0.2737.0> (127.0.0.1:54698 ->127.0.0.1:5672)
info: accepting AMQP connection <0.2753.0> (127.0.0.1:54699 ->127.0.0.1:5672)
warning: closing AMQP connection <0.2737.0> (127.0.0.1:54698 ->127.0.0.1:5672):
connection_closed_abruptly
warning: closing AMQP connection <0.2753.0> (127.0.0.1:54699 ->127.0.0.1:5672):
connection_closed_abruptly
值得注意的是,在这里报告的信息,信息和警告不是自己的一部分,但是我们在每个开始打印的路由键消息(前一步骤的步骤2)。
TIP
如果我们只想收到警告和错误消息,我们可以订阅相应的主题.
更多
默认情况下,日志交换器-amq.rabbitmq.log被创建在虚拟主机/中。通过在RabbitMQ配置文件中定义default_vhost从而设置其位置是可能的.
追踪RabbitMQ当前活动
有时,为了分析和调试未知的应用程序行为,我们需要追踪RabbitMQ接收和分发的所有消息.
RabbitMQ提供追踪这些消息的所谓的流水追踪工具。
追踪活动可以在运行时启用和禁用,并且它应该只用于调试,因为它规定了broker活动的开销.
准备
要运行此食谱,我们需要运行此食谱,我们需要运行RabbitMQ和Java client library.
如何做
RabbitMQ使用与日志消息中同样的机制来发送追踪消息;因此,示例代码与前一个食谱中的非常相似.
为了消费追踪消息,你可以执行Consumer.java中的主方法,你可在Chapter12/Recipe02/Java/src/rmqexample目录中找到源码.这里,我们高亮了主要步骤:
1. 创建一个临时队列,并将其绑定到AMQP log交换器上:
String tmpQueue = channel.queueDeclare().getQueue();
channel.queueBind(tmpQueue,"amq.rabbitmq.trace","#");
2.在消费者回调(ActualConsumer.java)中, 可以获取每个消息,并使用下面的代码打印出来:
String routingKey = envelope.getRoutingKey();
String message = new String(body);
Map<String,Object> headers = properties.getHeaders();
LongStringexchange_name = (LongString)
headers.get("exchange_name");
LongString node = (LongString) headers.get("node");
...
3. 可从root用户(Linux)或在RabbitMQ命令控制台(Windows)来激活firehose,其激活命令如下:
rabbimqctl trace_on
4. 此时,你可以向broker启动生产和发送消息,然后你们会在标准输出中看到追踪信息.
5. 可调用下面的命令来禁用firehose:
rabbimqctl trace_off
如何工作
默认情况下,amq.rabbit.trace topic交换器不会接收任何消息,但一旦激活firehose后(前面步骤的步骤3),所有流经broker的消息将被按下面的规则拷贝:
1.进入broker的消息,它们是通过路由键publish.exchange-name发布的, 这里的exchange-name是消息最初发布的交换器名称.
2.离开broker的消息,它们是通过路由键deliver.queuename发布的,这里的queue-name是消息最初被消费的队列名称.
3.消息的body是从原始消息中拷贝过来的.
4.原始消息的元数据会插入到拷贝消息的header属性中. 在步骤2中,我们已经看到了,如何获取最初分发消息的交换器名称,但获取所有原始信息也是可以的,即,找到所有可有字段,并将它们插到消息属性中.firehose的官方文档链接位于:http://www.rabbitmq.com/firehose.html.
调试RabbitMQ消息
有时,通过在标准输出中记录通过broker的消息是有用的.通过RabbitMQ Java客户端提供的简单应用程序来追踪消息,也是可行的.
准备
要运行此食谱,你需要运行的RabbitMQ(运行标准端口 5672),以及RabbitMQ Java client library
如何做
RabbitMQ的Java client library中包含了一个追踪工具,你可以按下面的步骤来进行实际使用.
1. 从http://www.rabbitmq.com/java-client.html页面下载最新版本的RabbitMQ Java client library.
2. 将其解压,并进入其目录.
3. 通过下面的命令来运行Java tracer:
./runjava.sh com.rabbitmq.tools.Tracer
4. 运行用于调试Java client,并将其连接到5673端口.对于本食谱,我们可以使用包含在Java client包中的另一个Java工具,其调用如下:
./runjava.sh com.rabbitmq.examples.PerfTest -h amqp://localhost:5673 -C 1 -D 1
如何工作
追踪工具是一个简单的AMQP代理;默认情况下,它监听5673端口,并会把所有的请求转发到默认监听5672端口的RabbitMQ broker.
所有生产或消费的消息,如同AMQP操作一样,都会记录到标准输出中.
运行前面步骤的步骤4,我们使用了包含在Java client包中另一个用于作RabbitMQ压力测试的工具.
在这里,我们只是限制了生产一个消息(-C 1) 并消费它(-D 1).
TIP
追踪工具只在Java client API中可用.
更多
可以使用下面的代码来为Java追踪程序传递更多的参数:
./runjava.sh com.rabbitmq.tools.Tracer listenPort connectHost connectPort
listenPort指的是追踪器监听的端口(默认为5673), connectHost/connectPort (默认为localhost/5672) 是用于连接并转发收到请求的主机和端口。
使用下面的命令,你可以找到所有PerfTest可用选项:
./runjava.sh com.rabbitmq.examples.PerfTest --help
也可参考
在http://www.rabbitmq.com/java-tools.html中,你可以找到Java追踪工具以及PerfTest的文档.
当RabbitMQ重启失败时,需要做什么
偶尔情况下, RabbitMQ 可能会重启失败。如果broker包含持久化数据时,这是一个严重的问题,否则,有足够的能力重设其持久化状态。
准备
要运行此食谱,你只需要一个测试RabbitMQ broker.
TIP
我们将销毁所有之前定义的数据—以避免使用生产实例.
如何做
要清空RabbitMQ, 可执行下面的简单步骤:
1. 如果RabbitMQ运行的话,停止它.
2. 定位到Mnesia 数据库目录.默认是/var/lib/rabbitmq/mnesia (Linux) or %APPDATA%\RabbitMQ\db (Windows).
3. 递归删除其目录和文件.
4. 重启RabbitMQ.
如何工作
Mnesia 数据库包含了所有运行时的RabbitMQ定义信息: 队列,交换器,用户等等.
通过删除Mnesia数据库 (或者通过重命名,这样可以在需要的时候恢复某些数据), RabbitMQ会重置到出厂默认状态,当RabbitMQ启动时,它会创建一个新的Mnesia数据库,并使用默认值进行初始化.
更多
如果broker无法在第一时间启动,有可能是某个系统目录存在权限问题:即要么是Mnesia数据库目录,要么是日志目录,要么是某些在配置文件中指定的临时的或自定义的目录.
你可以在RabbitMQ故障排除页页找到详尽的列表(http://www.rabbitmq.com/troubleshooting.html).
也可参考
在Mnesia API 文档页面(http://www.erlang.org/doc/man/mnesia.html),,你可以找到更多关于如何破解Mnesia数据库的信息.
使用Wireshark调试
在调试RabbitMQ消息食谱中,我们已经了解过了如何来追踪RabbitMQ的消息.
然而,以下情况并不总是可能的或可取的,即停止正在运行的客户端(或RabbitMQ服务器),修改它的连接端口,指向一个不同的broker;我们只想监控正在实时传递的消息,影响系统的活动应该尽可能的少。
TIP但是,正如在前面食谱(追踪RabbitMQ当前活动)中看到的一样,激活firehose是可行的 .
Wireshark是一个免费的有能力解码AMQP消息的网络分析工具.
此工具既可用在客户段,也可以用在服务端,从而无缝监控AMQP交通状况.
准备
要练习这个食谱,你需要运行的RabbitMQ以及RabbitMQ Java client library.
如何做
在下面的步骤中,我们将看到如何使用Wireshark来追踪AMQP消息:
1. 如果Wireshark在你的系统中尚不可用,那么需要从http://www.wireshark.org/下载和安装Wireshark . 如果可能的话,你也可以从你的发行版中进行安装,如:
yum install wireshark-gnome
2.在Linux系统中,以root用户来启动Wireshark.
3.启动从环回接口中捕获消息.
4.切换到Java client library路径下,在终端中运行下面的命令:
./runjava.sh com.rabbitmq.examples.PerfTest -C 1 -D 1
5.停止Wireshark GUI的采集,并分析抓到的AMQP交通状况.
如何工作
使用Wireshark,可用于检测AMQP交通的退出或进行一个持有RabbitMQ服务器或客户端的服务器.
在我们的例子中,我们捕获了运行在同一台机器上服务端和客户端网络交通状况,因此连接是localhost中发生的.这也是为什么我们从环回接口中捕获交通状况的原因(前面步骤的步骤3).
反之,我们应该从网络接口中进行捕获,这种网络接口通常是eth0或其它相似的网络接口.
TIP
在Linux上,可以直接捕获localhost;但同样的操作不能应用到Windows上.在这种情况下,客户端和服务端必须位于不同的两台机器上, 并且在网络接口上(要么是物理的,要么是虚拟的)必须激活捕获, 这样它们之间才可连接.
所以,为了运行Wireshark的图形用户界面,如果RabbitMQ客户端和服务器运行在同一个节点,你需要选择环回接口,如图下面的截图所示
:
TIP
在Linux上,当你安装Wireshark软件包,你通常只会有命令行界面,tshark。要安装Wireshark的GUI,你必须安装相应的软件包. 例如,在Fedora中,你需要安装wireshark-gnome包.
一旦AMQP交通已穿过环回接口,它已经被Wireshark捕获。
运行在步骤4的实验实际上在两个独立的连接中开启了一个生产者和一个消费者.
为了高亮显示,找到一个描述为Basic.PublishContent-Header的包,点击它,并选择Follow TCP stream.然后,您可以关闭显示客户端和服务器之间有效负载对话的窗口。在主窗口中,您现在可以看到在客户端和服务器之间交换的网络数据包,如下面的截图所示:
用同样的方式,你可以选择RabbitMQ server,如下面的截图所示:
在前面的两个截图中,我们已经强调了消息AMQP的有效载荷,但由于Wireshark中包括了一个非常完整的AMQP解剖器,你会发现在AMQP交通的很多细节。
更多如果RabbitMQ配置为使用SSL,并且你想要分析加密流量,在一定条件下,通过在Wireshark配置中合理配置SSL公共/私有密钥也是可以的。
可在http://wiki.wireshark.org/SSL找到更多信息.
也可参考在http://wiki.wireshark.org/AMQP,你可以找到一些关于Wireshark AMQP 解剖器的一些指南.
posted @
2016-07-20 11:39 胡小军 阅读(3944) |
评论 (1) |
编辑 收藏
摘要: 在本章节中,我们将展现一些RabbitMQ中的可用插件.然后,我们将展示如何使用现实世界中的例子来开发新插件.启用和配置STOMP插件管理RabbitMQ集群监控Shovel状态开发新插件– 使用ODBC连接关系数据库介绍多亏了插件设施,使得RabbitMQ成为了一个可扩展平台.它提供了许多通用插件,其中一些已经在前面的章节中解释过了.例如, Federation和...
阅读全文
posted @
2016-07-20 11:30 胡小军 阅读(2569) |
评论 (1) |
编辑 收藏
摘要: 在这一章中,我们将涵盖:多线程和队列系统调整改善带宽使用不同分发工具介绍这里没有标准的RabbitMQ调优指南,因为不同应用程序会采用不同方式优化.通常情况下,应用程序需要在客户端进行优化:处理器密集型应用程序可以通过为每个处理器内核运行一个线程来进行优化I/O密集型应用程序可以通过在单核上运行多个线程来隐藏隐式延迟在两种情况下,消息传递是完美的结合.为了优化网络传输速率,AMQP标准规定消息按束...
阅读全文
posted @
2016-07-15 14:53 胡小军 阅读(9333) |
评论 (0) |
编辑 收藏
摘要: 在本章中我们将覆盖涉及:镜像队列同步队列优化镜像策略在几个broker之间分发消息创建一个地理位置集群复制过滤和转发消息将高可用技术结合在一起客户端高可用性介绍RabbitMQ通过数据复制来达到高可用,当数据完整性、服务连续性是最重要的时候, 这一点与存储(如:RAID解决方案),数据库,以及所有IT基础设施解决方案是相同的。事实上,这些解决方案不仅可以避免数据丢失,也可以避免计划维护和...
阅读全文
posted @
2016-07-02 19:11 胡小军 阅读(1903) |
评论 (0) |
编辑 收藏
名称
rabbitmq-service.bat — 管理RabbitMQ AMQP service
语法
rabbitmq-service.bat [command]
描述
RabbitMQ是AMQP的实现, 后者是高性能企业消息通信的新兴标准. RabbitMQ server是AMQP 中间件的健壮,可扩展实现.
运行rabbitmq-service,可允许RabbitMQ broker在NT/2000/2003/XP/Vista®环境上以服务来运行,这样就可以通过Windows® services applet来启动和停止服务.
默认情况下,服务会以本地系统帐户中认证上下文来运行。因此,有必要将Erlang cookies 和本地系统帐户进行同步(典型地,C:\WINDOWS\.erlang.cookie和帐户将用来运行 rabbitmqctl).
命令
- help
显示使用信息.
- install
安装service,安装后,它不会启动。如果环境变量修改了的话,随后的调用将更新服务参数.
- remove
删除service.如果删除时,service正在运行,则将会自动停止。 它不会删除任何文件,后续可通过rabbitmq-server 继续操作。
- start
启动service. 在此之前,service必须被正确安装
- stop
停止service.
- disable
禁用service. 这等价于在服务控制面板中,将启动类型设置为禁用.
- enable
启用service. 这等价于在服务控制面板中,将启动类型设置为自动.
环境变量
- RABBITMQ_SERVICENAME
默认为RabbitMQ.
- RABBITMQ_BASE
默认是当前用户的应用程序数据目录. 这是日志和数据目录的位置(C:\Users\Administrator\AppData\Roaming\RabbitMQ).
- RABBITMQ_NODENAME
默认是rabbit. 当你想在一台机器上运行多个节点时,此配置是相当有用的, RABBITMQ_NODENAME在每个erlang-node和机器的组合中应该唯一。
参考clustering on a single machine guide 来更多细节.
- RABBITMQ_NODE_IP_ADDRESS
默认情况下,RabbitMQ会绑定到所有网络接口上,如果只想绑定某个网络接口,可修改此设置。
- RABBITMQ_NODE_PORT
默认为5672.
- ERLANG_SERVICE_MANAGER_PATH
默认为C:\Program Files\erl5.5.5\erts-5.5.5\bin (或64位环境 中为C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin). 这是Erlang service manager的安装位置.
- RABBITMQ_CONSOLE_LOG
将此变量设置为new或reuse,以将服务器控制台的输出重定向到名为SERVICENAME.debug文件中(位于安装服务的用户应用程序数据目录).在Vista下,其位置在C:\Users\AppData\username\SERVICENAME. 在Windows的前期版本中,位置在C:\Documents and Settings\username\Application Data\SERVICENAME.
如果RABBITMQ_CONSOLE_LOG设置为new,那么每次服务启动时都会创建一个新文件。
如果RABBITMQ_CONSOLE_LOG设置为reuse,那么每次服务启动时,文件都会被覆盖.
当RABBITMQ_CONSOLE_LOG 没有设置或设置的值不是new或reuse时,默认的行为是丢弃服务器输出。
posted @
2016-06-24 23:58 胡小军 阅读(1533) |
评论 (0) |
编辑 收藏
要求
要运行ftp4j library,你需要Java 运行时环境v. 1.4+.
安装
将ftp4j JAR文件添加到你应用程序的classpath中, 然后你就可以自动启用ftp4j类的使用了.
Javadocs
可参考ftp4j javadocs.
快速入门
包中的主类是FTPClient (it.sauronsoftware.ftp4j.FTPClient).
创建一个FTPClient 实例:
FTPClient client = new FTPClient();
连接远程FTP服务:
client.connect("ftp.host.com");
如果服务端口不是标准的21端口 (或 FTPS的990端口),需要使用port参数进行指定:
client.connect("ftp.host.com", port);
如:
client.connect("ftp.host.com", 8021);
然后进行登录流程:
client.login("carlo", "mypassword");
如果没有抛出任何异常的话,那么你就通过远程服务器的认证了.否则,如果验证失败,你将会收到it.sauronsoftware.ftp4j.FTPException异常.
匿名认证,如果被连接服务认可的话, 可通过发送用户名"anonymous" 和任意密码来完成(注意,有些服务器需要e-mail地址来代替密码):
client.login("anonymous", "ftp4j");
使用远程FTP服务来做任何事情,然后再断开连接:
client.disconnect(true);
这会向远程器发送FTP QUIT命令, 以进行一个合法断开流程.如果你只是想中断连接而不想向服务器发送任何通知,那么可以使用:
client.disconnect(false);
使用代理进行连接
客户端通过连接器(一个继承自it.sauronsoftware.ftp4j.FTPConnector的对象)来连接服务器, 它将返回一个已经打开的连接(一个实现了it.sauronsoftware.ftp4j.FTPConnection 接口的对象).这也是为什么ftp4j 可以支持大量代理的原因.
在连接远程服务器前,客户端实例可以使用setConnector() 方法来设置连接器:
client.setConnector(anyConnectorYouWant);
如果没有设置连接器的话,会使用默认的连接器DirectConnector (it.sauronsoftware.ftp4j.connectors.DirectConnector), 它实现了对远程服务器的直接连接,且不会使用代理。
如果你只能通过代理来连接远程服务器, ftp4j包可以让你在下面的连接器中进行选择:
- HTTPTunnelConnector (it.sauronsoftware.ftp4j.connectors.HTTPTunnelConnector)
它可以通过HTTP代理来进行连接,并支持CONNECT方法. - FTPProxyConnector (it.sauronsoftware.ftp4j.connectors.FTPProxyConnector)
它可以通过FTP代理进行连接,支持SITE和OPEN命令风格的苛刻远程主机连接.其它类型的FTP代理,需要username@remotehost 认证,且可以不使用连接器,因为它们对于客户端来说是透明的。 - SOCKS4Connector (it.sauronsoftware.ftp4j.connectors.SOCKS4Connector)
它可以通过SOCKS 4/4a代理进行连接. - SOCKS5Connector (it.sauronsoftware.ftp4j.connectors.SOCKS5Connector)
它可以通过SOCKS 5代理进行连接.
因为ftp4j的连接器架构设计为可插拔的,因此你可以继承FTPConnector 抽象类来构建自己的连接器.
FTPS/FTPES 安全连接
ftp4j包支持FTPS (隐式基于 TLS/SSL的FTP) 和FTPES (显示基于TLS/SSL的FTP).
setSecurity() 方法可用来打开这种特性:
client.setSecurity(FTPClient.SECURITY_FTPS); // 启用 FTPS
client.setSecurity(FTPClient.SECURITY_FTPES); // 启用 FTPES
两个方法都需要在连接远程服务器前调用.
如果安全协议设置成了SECURITY_FTPS, 则connect() 方法默认使用的端口为990.
默认情况下,客户端对象商讨SSL连接会使用javax.net.ssl.SSLSocketFactory.getDefault()作为其套接字工厂.可通过调用client.setSSLSocketFactory()方法来改变默认套接字工厂. 另外一种SSLSocketFactory, 可用来信任远程服务器颁发的证书(谨慎使用):
import it.sauronsoftware.ftp4j.FTPClient;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
// ... TrustManager[] trustManager = new TrustManager[] { new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() { return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) { }
public void checkServerTrusted(X509Certificate[] certs, String authType) { } } };
SSLContext sslContext = null;
try { sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, trustManager, new SecureRandom());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
}
SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
FTPClient client = new FTPClient();
client.setSSLSocketFactory(sslSocketFactory);
client.setSecurity(FTPClient.SECURITY_FTPS);
// or client.setSecurity(FTPClient.SECURITY_FTPES); // ...
浏览远程站点
获取当前目录的的绝对路径(此目录是FTP服务器的home目录):
String dir = client.currentDirectory();
改变目录:
client.changeDirectory(newPath);
你可以使用绝对路径和相对路径:
client.changeDirectory("/an/absolute/one");
client.changeDirectory("relative");
回到父目录:
client.changeDirectoryUp();
重命名文件和目录
要重命名远程文件或目录:
client.rename("oldname", "newname");
移动文件和文件家
rename() 方法也可以用来从当前位置移动文件或目录到其它位置.
在这个例子子,假设在当前工作目录中,你有一个名为"myfile.txt"的文件,然后你想将其移动到子目录"myfolder"中:
client.rename("myfile.txt", "myfolder/myfile.txt");
删除文件
要删除远程文件,需要调用:
client.deleteFile(relativeOrAbsolutePath);
在这个例子中:
client.deleteFile("useless.txt");
创建、删除目录
如果远程服务给你机会的话,你可以在远程站点上创建新目录:
client.createDirectory("newfolder");
你也可以已存在的目录:
client.deleteDirectory(absoluteOrRelativePath);
在这个例子中:
client.deleteDirectory("oldfolder");
请注意,通常情况下,FTP 服务器只允许删除空目录.
列出文件、目录、连接
FTP 协议并不会提供大量支持方法来获取工作目录的完整信息.通常LIST命令会给你想知道的东西,但不辛的是,每个服务器会使用不同样式的响应. 这意味着某些服务器会返回UNIX样式的目录,有些服务器会返回DOS样式的目录,其它的服务器又会使用别的样式.
ftp4j 包可以处理许多的LIST响应格式, 并将它们构建成统一目录内容的结构对象表示.当前ftp4j可以处理:
- UNIX 样式及其变种(如MAC样式)
- DOS 样式
- NetWare 样式
- EPLF
- MLSD
这可以通过使用可插拔的parsers来完成.包it.sauronsoftware.ftp4j.listparsers包含了用于处理上述样式的对象.大多数时间,这些已经够用了。
要列出当前工作目录下的文件或文件夹,可调用:
FTPFile[] list = client.list();
如果你收到了FTPListParseException (it.sauronsoftware.ftp4j.FTPListParseException) 异常,这就意味着服务器对LIST命令返回了不可理解的样式,即它不是上述列出的样式.因此,你可以尝试使用listNames() 方法, 但它并不如list()方法有优势.。为了弥补这种缺陷,你可以构建你自己的LIST响应解析器,以支持你遇到的样式.你可以实现FTPListParser (it.sauronsoftware.ftp4j.FTPListParser) 接口,然后你可以在client的addListParser()方法使用此实现.
FTPFile (it.sauronsoftware.ftp4j.FTPFile) 对象提供了目录内容的表示,包括文件,子目录和连接. 根据服务器的响应,FTPFile对象的某些字段可以是null 的或者是无意义的.请检查javadocs来了解细节.
在list() 方法中你也可以使用文件过滤参数,如:
FTPFile[] list = client.list("*.jpg");
如果连接服务器明确支持MLSD命令, ftp4j会用其来代替基本的LIST命令。MLSD的响应事实更为标准,准确,更易解析.不幸的是,不是所有服务器都支持这个命令,并且有些服务器支持得非常糟糕.基于这些理由,开发者可以控制ftp4j是否应该使用MLSD命令,即通过调用FTPClient对象的setMLSDPolicy()方法. 合法的值:
FTPClient.MLSD_IF_SUPPORTED
client只在服务器明确支持MLSD命令时,才使用MLSD命令. 这是ftp4j默认的行为.
FTPClient.MLSD_ALWAYS
client总是会使用MLSD命令, 即便服务器没有明确表明支持MLSD命令.
FTPClient.MLSD_NEVER
client绝不使用MLSD命令,即便服务器明确表明支持MLSD命令.
例如:
client.setMLSDPolicy(FTPClient.MLSD_NEVER);
获取文件、目录的最后修改时间
通常情况下FTPFile对象会告诉你条目的最后修改时间, 但正如上面描述的,这依赖于服务器发回的响应.如果你需要最后的修改时间,但你又不能通过list()方法得到,那么可以尝试这样做:
java.util.Date md = client.modifiedDate("filename.ext");
下载、上传文件
下载远程文件最简单的方式是调用download(String, File) 方法:
client.download("remoteFile.ext", new java.io.File("localFile.ext"));
要上传:
client.upload(new java.io.File("localFile.ext"));
要在已有文件中上传追加内容:
client.append(new java.io.File("localFile.ext"));
这些是阻塞式调用:它们会在传输完成后(或failed, 或 aborted时)才返回. 此外同步锁是否由客户端来实施的,因为在每个时间段内只允许有一个常规的FTP通信.在每个时间段内,你可以处理多个传输器,即使用多个FTPClient 对象,每个都与服务器建立一个私有连接.
你可以使用FTPDataTransferListener (it.sauronsoftware.ftp4j.FTPDataTransferListener)对象来监控传输.你可以自己实现一个:
import it.sauronsoftware.ftp4j.FTPDataTransferListener;
public class MyTransferListener implements FTPDataTransferListener {
public void started() {
// Transfer started
}
public void transferred(int length) {
// Yet other length bytes has been transferred since the last time this
// method was called
}
public void completed() {
// Transfer completed
}
public void aborted() {
// Transfer aborted
}
public void failed() {
// Transfer failed
}
}
现在像下面这样来下载或上传:
client.download("remoteFile.ext", new java.io.File("localFile.ext"), new MyTransferListener());
client.upload(new java.io.File("localFile.ext"), new MyTransferListener());
client.append(new java.io.File("localFile.ext"), new MyTransferListener());
当client处理下载或上传时,传输器可以被同一个FTPClient对象的不同线程通过调用 abortCurrentDataTransfer() 方法aborted. 此方法还需要一个boolean参数:true表示执行合法的abort过程(即向服务器发送ABOR命令), false表示实然关闭传输器,而不向服务器发送通知:
client.abortCurrentDataTransfer(true); // Sends ABOR
client.abortCurrentDataTransfer(false); // Breaks abruptly
需要注意的是,list()和listNames() 方法暗中包含了数据传输器,因abortCurrentDataTransfer() 方法也可以用来中断其list过程.
当数据传输器在download(), upload(), append(), list() and listNames() 方法中中断时,将会抛出FTPAbortedException (it.sauronsoftware.ftp4j.FTPAbortedException).
下载和上传操作可通过restartAt 参数来重新恢复:
client.download("remoteFile.ext", new java.io.File("localFile.ext"), 1056);
此操作会文件的第1056个字节处继续执行下载操作。第一个传输的字节将是第1057个.
其它 download(), upload() 和append()方法的变种可以让你使用流来替代java.io.File对象.因此你可以在数据库,网络连接或其它流上来传输数据。
Active 、 passive 数据传输模式
客户端和服务器之间的数据传输通道是通过单独的网络连接来建立的. 在传输通道建立期间,服务器可以是active或passive的. 服务器激活数据传输时,工作如下:
- client向服务器发送其IP地址和端口号.
- client向服务器发送数据传输请求,并在之前发送的端口上启动监听.
- 服务器使用客户端提供的地址和端口来连接客房端.
- 数据传输将在新建立的通道中进行.
active模式需要你的client能够收到来自服务器的连接.如果你的client处于防火墙, 代理或这两者混合之后,那么大部分时间都会出现问题,因为它不能收到来外界的连接. 下面是passive数据传输模式:
- client要求服务器准备好一个passive数据传输.
- 服务器使用其IP地址和端口号进行响应.
- client请求传输和连接.
- 数据传输将在新建立的通道中进行.
在passive模式中,客户端连接不要求能收到服务器的连接请求.
在ftp4j中,你可以使用下面的调用来切换active、passive模式:
client.setPassive(false); // Active mode
client.setPassive(true); // Passive mode
ftp4j client passive 标志的默认值为true: 如果你没有调用setPassive(false) ,你的客户端在每次传输前,都会向服务器请求passive模式.
当使用 passive文件传输时,服务器会提供一个 IP地址和一个端口号.作为FTP规范的client,需要使用给定的主机号和端口进行连接.在商业环境中,这种行为可能会经常带来问题,因为NAT配置可能会阻止对IP地址的连接.这就是为什么FTP clients通常会忽略服务器返回的任何IP地址,进而在通信线路中使用同样的主机来连接服务器.ftp4j的行为依赖于服务器因素:
- 每个FTPConnector 都有其默认行为.大部分连接器都会忽略服务器返回的IP地址。目前,默认使用返回地址的连接器是FTPProxyConnector.
- 连接器的行为可通过定义名为ftp4j.passiveDataTransfer.useSuggestedAddress的系统属性来覆盖。如果设置为"true", "yes" 或"1",所有连接器都会使用服务器返回的地址,反之,如果将其设置为"false", "no" or "0", 所有服务器都不会使用返回的地址.
- 最后,连接器的默认行为和全局设置都可以在任何连接器实例中进行覆盖。你可通过获取客房端连接器,并调用其setUseSuggestedAddressForDataConnections() 方法来达到目的.
在active传输模式中,可以设置下面的系统属性:
ftp4j.activeDataTransfer.hostAddress
主机地址.当服务器请求执行与客户端连接时,client会跳转到给定地址的服务器. 此值应该是一个有效的IPv4地址,如:178.12.34.167. 如果没有提供此值,客户端会自动解析系统地址.但如果client运行于LAN中,为了激活数据传输,将会使用带端口转发的路由器来连接外部服务器,那么自动探测到的地址可能不是正确的. 当系统有多个网络接口时,也可能发生这种情况.通常使用系统属性,可以解决这种问题
ftp4j.activeDataTransfer.portRange
连接端口范围. client会在其中挑选一个来进行数据传输.此值 必须是start-stop 形式 ,如6000-7000 表示client只会在给定范围内挑选一个端口来连接服务器.默认情况下没有指定端口范围:这表示client会挑选任何一个可用的端口.
ftp4j.activeDataTransfer.acceptTimeout
以毫秒为单位的连接超时时间. 如果服务器不能在给定超时时间内连接client,传输会因FTPDataTransferException异常而中断.0值表示永不超时。默认值30000 (即30秒).
要设置系统属性,你可以:
用一个或多个 -Dproperty=value参数来启动JVM.如:
java -Dftp4j.activeDataTransfer.hostAddress=178.12.34.167 -Dftp4j.activeDataTransfer.portRange=6000-7000 -Dftp4j.activeDataTransfer.acceptTimeout=5000 MyClass
直接在代码中设置系统属性,如:
System.setProperty("ftp4j.activeDataTransfer.hostAddress", "178.12.34.167");
System.setProperty("ftp4j.activeDataTransfer.portRange", "6000-7000");
System.setProperty("ftp4j.activeDataTransfer.acceptTimeout", "5000");
二进制和文本数据传输类型
数据传输的另一个核心概念是binary 和textual 类型.当传传输的文件是二进制文件时,它将视为二进制流,服务器会按原样存储。而文本数据传输会将传输的文件视为字符流,会进行字符集转换. 假设你的client正运行Windows平台上,而服务器运行UNIX上,它们的默认字符集通常是不同的. client以文本类型来发送文件时,client会假设文件是按机器标准字符集来编码的,因此在发送前,它会解码每个字符并将其编码为 中间字符集. 服务器收到流后,在存储前,会解码中间字符集,并将其编码为机器默认的字符集.字节虽然被改变了,但内容是相同的。
你可以调用下面的方法选择你传输的类型:
client.setType(FTPClient.TYPE_TEXTUAL);
client.setType(FTPClient.TYPE_BINARY);
client.setType(FTPClient.TYPE_AUTO);
默认的TYPE_AUTO常量 ,会让client自动来挑选类型:如果文件的扩展名是client能被识别的文本类型标记,那么它会选择文本传输器来执行. 文件扩展名是通过FTPTextualExtensionRecognizer (it.sauronsoftware.ftp4j.FTPTextualExtensionRecognizer) 实例来识别的. 默认扩展识别器是it.sauronsoftware.ftp4j.recognizers.DefaultTextualExtensionRecognizer, 会将下面的视为文本类型:
abc acgi aip asm asp c c cc cc com conf cpp
csh css cxx def el etx f f f77 f90 f90 flx
for for g h h hh hh hlb htc htm html htmls
htt htx idc jav jav java java js ksh list
log lsp lst lsx m m mar mcf p pas php pl pl
pm py rexx rt rt rtf rtx s scm scm sdml sgm
sgm sgml sgml sh shtml shtml spc ssi talk
tcl tcsh text tsv txt uil uni unis uri uris
uu uue vcs wml wmls wsc xml zsh
你可通过实现FTPTextualExtensionRecognizer 接口来实现你自己的识别器,但你可以更喜欢使用 class ParametricTextualExtensionRecognizer(it.sauronsoftware.ftp4j.recognizers.ParametricTextualExtensionRecognizer)便利类.
无论如何,都不要忘记将你的识别器设置在client中:
client.setTextualExtensionRecognizer(myRecognizer);
数据传输压缩
有些服务器支持数据传输压缩特性-MODE Z. 在传输大文件时,此特性可以节省带宽.一旦client连上服务器并通过认证,就可通过调用下面的方法来检查是否支持压缩:
boolean compressionSupported = client.isCompressionSupported();
如果服务器端支持压缩,就可通过下面的调用来启用压缩:
client.setCompressionEnabled(true);
在此调用之后,后续的数据传输(下载,上传,列举操作)都被将压缩以节省带宽.
数据传输压缩可通过下面的调用来禁用:
client.setCompressionEnabled(false);
也可以检查标记值:
boolean compressionEnabled = client.isCompressionEnabled();
请注意:压缩数据传输只当压缩支持且启用了的情况下才会发生.
默认情况下,压缩是禁用的,即便是服务器支持压缩. 如果有需要,可以显示地打开.
不做任何事(NOOPing the server)
假设你的client什么事情都不做,因为在等待用户输入. 通常情况下, FTP服务器会自动断开非活跃客户端. 为了避免超时,你可以发送NOOP命令.
此命令不会做任何事情,但它会向服务器说明:客户端仍然还活着,请重围超时计数器.调用如下:
client.noop();
当非活跃超时发生时,客户端也可以自动发送NOOPs. 默认情况下,此特性是禁用的。它可以在 setAutoNoopTimeout() 方法中设置超时时间时启用,并提供一个毫秒为单位的值.如:
client.setAutoNoopTimeout(30000);
使用此值,client会在30秒后发送一个NOOP命令.
NOOP超时可通过设置小于等于0的值来禁用:
client.setAutoNoopTimeout(0);
网站特殊的自定义命令
你可以像下面这样来发送站点特殊命令:
FTPReply reply = client.sendSiteCommand("YOUR COMMAND");
你也可以发送自定义命令:
FTPReply reply = client.sendCustomCommand("YOUR COMMAND");
sendSiteCommand() 和 sendCustomCommand() 都会返回一个FTPReply (it.sauronsoftware.ftp4j.FTPReply)对象.使用此对象,你可以检查服务器的响应代码和消息.
FTPCodes (it.sauronsoftware.ftp4j.FTPCodes) 接口报告了一些通用的FTP响应代码,因此你可以使用这些包中的某个来进行匹配.
异常处理
ftp4j 包定义了五种类型的异常:
- FTPException (it.sauronsoftware.ftp4.FTPException)
依赖于方法,这会报告抛出了一个FTP故障.你可以检查报告的错误码,使用FTPCodes 常量来获取故障原因的详细信息. - FTPIllegalReplyException (it.sauronsoftware.ftp4.FTPIllegalReplyException)
这表示远程服务器使用非法方式进行了应答, 这与FTP是不兼容的. 这应该是非常罕见的. - FTPListParseException (it.sauronsoftware.ftp4.FTPListParseException)
这通常发生在list()方法,如果服务器发回的响应不能被客户端包中现有解析器进行的话,就会抛出此种异常 - FTPDataTransferException (it.sauronsoftware.ftp4.FTPDataTransferException)
当数据传输 (download, upload, but also list and listNames) 因网络连接错误失败时,就会抛出此种异常. - FTPAbortedException (it.sauronsoftware.ftp4.FTPAbortedException)
当数据传输 (download, upload, but also list and listNames) 因客户端发出中断请求失败时,抛出的异常.
posted @
2016-06-21 22:34 胡小军 阅读(5945) |
评论 (0) |
编辑 收藏