随笔 - 41  文章 - 7  trackbacks - 0
<2016年6月>
2930311234
567891011
12131415161718
19202122232425
262728293012
3456789

常用链接

留言簿

随笔分类

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

在本章中我们将覆盖:
  1. 使用Spring来开发web监控程序
  2. 使用Spring来开发异步web搜索
  3. 使用STOMP来开发web监控程序
介绍
RabbitMQ可以像客户端一样使用在服务端。当前,RabbitMQ覆盖了大部分使用的语言和技术来构建web程序,如PHP,Node.js, Python, Ruby, 以及其它.你可以在http://www.rabbitmq.com/devtools.html找到全部的列表.

在本章中,我们将展示三种使用场景,如下所示,在这里RabbitMQ将被用于:
  1. 用于WebSockets通知的源(使用Spring来开发web监控程序食谱)
  2. 用于管理搜索结果的后端服务(使用Spring来开发异步web搜索食谱)
  3. 在web页面上直接用来处理消息(使用STOMP食谱来开发web监控程序食谱)
在本章节中,我们选择Spring Framework 来构建大多数例子。我们将展示一些有趣的特性,集成和RabbitMQ监控.
使用Spring来开发web监控程序
在本食谱中,我们将展示如何一个 web程序来监控服务器CPU负载和内存使用情况.本例中的服务器状态是由a .NET客户端来发送,但你可以使用其它语言或操作系统来扩展这些例子。它可以使用标准的JSON协议来发送消息。 例如,一个简单的JSON编码消息看起来像下面的文本:
{"UPDATETIME":"23/06/2013 22:55:32","SERVERID":"1","CPU":10,"MEM":40}
现在我们将介绍Spring Insight.使用Insight,你可以监控web 程序的性能和正确行为.
查看http://gopivotal.com/products/pivotal-tc-server来了解更多信息。
在客户端,我们可以使用JQuery,Twitter的Bootstrap,以及Google Chart.
在这个例子的末尾,你可以在监控控制台看到每个服务器的工作情况以及动态更新信息,如下面的截图所示:

你可在Chapter05/Recipe01/web and the client at Chapter05/Recipe01/csharp中找到web程序的源码.
准备
要让例子能工作,你需要1.7+.
为了构建web程序,我们假设你使用Spring Tool Suite (也称为STS) ,可从http://spring.io/tools进行下载.
对于客户端,你需要.NET 3.5+ 框架.

如何做
要快速开始,你可以使用MVC Spring模板来创建一个完整简单的Spring project,就像下面这样:
1. 打开 STS,然后导航至File New Other Spring Template Project |Spring MVC Project.
2. 修改POM.xml,并添加Tomcat的WebSocket libraries:
tomcat-coyote
tomcat-servlet-api
tomcat-catalina
3. 当然,也需要将RabbitMQ client library添加到POM中:
amqp-client
4. 创建一个RabbitMQ实例来存储RabbitMQ参数
5.为了使用WebSocket,需要创建一个继承自Tomcat WebSocketServlet类的RmqWebSocket类.在RmqWebSocket.java文件中,你也可以找到RabbitMQ消费者代码.
6. 创建monitor_exchange_5_1 交换器,并在上面绑定一个消费者.
7. 实现ActualConsumer类中的handleDelivery方法,并将消息重定向到已连接的clients, 并按下面来迭代clients列表:
for(ClientMessage item: clients){
CharBuffer buffer = CharBuffer.wrap(message);
try {
item.myoutbound.writeTextMessage(buffer);

8. 在client端, 将使用JavaScript来连接WebSocket实例:
new WebSocket('ws://' + window.location.host + window.location.pathname+ "websocket");
9. 实现ws.onmessage()事件,并解析
JSON消息来更新charts:
var obj = jQuery.parseJSON(message.data);
xcpu =obj.CPU ;
xmem = obj.MEM;
xupdate = obj.UPDATETIME;
var data = google.visualization.arrayToDataTable([['Label', 'Value'], ['Memory',xmem],['CPU', xcpu]]);
10. 作为一个生产者,我们创建一个.NET程序.一般说来,生产者需要将JSON消息发布到monitor_exchange_5_1交换器中。交换的JSON消息看起来像下面的代码:
{"UPDATETIME":"23/06/2013 22:55:32","SERVERID":"1","CPU":10,"MEM":40}
到这里,程序就准备好了,但如果你想要配置Spring Insight的话,你还需要执行下面的步骤:
11. 转到server section,然后点击new server wizard, 选择 vFabric server,如下所示:

12. 然后,移动并配置实例名称和insight标志,如下所示:
13. 添加步骤1-10中创建的web程序.
14. 启动vFabric server,并转向http://localhost:8080/insight/address.
15. 为了能快速安装RabbitMQ Insight plugin,你可以从http://maven.springframework.org/release/com/springsource/insight/plugins/insight-plugin-rabbitmq-client/下载.
16. 将insight-plugin-rabbitmq-client-XXX.jar文件到你的$STSHOMEINSTALLATION/vfabric-tc-server-developer-2.8.2/mytcinstance/insight/collection-plugins文件夹中.
17. 最后,拷贝
RabbitMQ Java client 到$STSHOMEINSTALLATION/vfabric-tcserver-developer-2.8.2/mytcinstance/insight/lib中.
18. 在你的浏览器指向http://localhost:8080/example来实时地获取服务器负载信息,正如食谱开头截屏中报告的一样
19. 在你的浏览器中指向http://localhost:8080/insight或在 Insight 主页点击Click if insight is enabled! 按扭.

如何工作
在步骤1-3中,我们已经配置了环境;默认的向导模板会创建一个Maven project.
Spring 3.2.x 不支持WebSocket(Version 4.x支持),因此我们需要在POM.xml文件中添加websocket Tomcat依赖(步骤2).
RabbitMQ java client也存在于Maven repository, 在步骤3中,我们添加了此依赖.
RabbitMQinstance bean 包含RabbitMQ 连接参数.
bean的生命周期在root-context.xml文件中进行定义,带有init和destroy方法和自定义参数. (可查看http://static.springsource.org/spring/
docs/3.2.x/spring-framework-reference/html/beans.html#beans-factorylifecycle来了解更多信息)
当client向monitor_exchange_5_1交换器发送消息时(步骤10)时,web程序会收到消息并将其重定向到WebSocket client的浏览器(步骤7).
当JSON消息到达浏览器的web页面时,JQuery将对其进行解析,并最终用来更新Google Chart的
饼图.
TIP
你可以将程序部署到Tomcat.如果你使用STS标准服务器部署,应用程序URL就像下面这样:
http://localhost:8080/example

步骤11-14 支持Spring Insight配置,因此你可以监控你的应用程序;使用RabbitMQ Insight plugin,你也可以监控RabbitMQ, 就像下面截图展示的一样:
更多
Spring Insight 可以帮你监控程序的行为.感谢这个插件,你也可以监控RabbitMQ. 这可能是较为复杂的主题,我们猜你从下面的连接读取到的:
http://gopivotal.com/products/pivotal-tc-server
http://blogs.vmware.com/management/2013/01/new-video-deep-dive-intospring-insights-plugins-for-spring-integration-and-rabbitmq.html
无论如何,我们仍然会在后面的食谱中使用
.

使用Spring来开发异步web搜索
当开发一个小型站点时,通常应用服务器会直接在关系数据库上直接执行查询,在这种情况下,它可能是非常快速的,且易于构建。当站点增长时,这种模式会出现伸缩性问题,特别是当服务器执行了多个数据查询操作,再将最终结果呈现给终端用户之前。另一个问题是系统升级,每个小升级都需要应用服务器停止和重新启动。
在这个例子中,我们将展示如何使用消息队列来伸缩web程序。我们将使用RabbitMQ来创建具有不同责任的单独模块。
传统web应用程序模式(只存在两个模块:一个web server和一个数据库server,它们是直接相连的)如下:
  1. 通信是异步的
  2. 每个模块都不知道其它模块的数目和网络位置
  3. 添加、删除和更新的模块可以无缝地完成对网站的响应能力
  4. 不同scope的新模块可以容易地添加
  5. 架构是可伸缩性的
相同的缺点如下:
  1. 程序过于复杂
  2. 本地化的,当与简单快速方案比起来,程序较慢
在本例中,我们将实现一个异步的,可伸缩性的,平衡的搜索引擎来使用关键字来查询一个条目(a book).
下面的图解释了我们即将构建的架构:

正如你看到的,我们将应用程序的职责做如下划分:
  1. Tomcat只是接收HTTP请求和发布消息到RabbitMQ broker的代理
  2. Java 模块以负载均衡的方式获取消息,按需地执行数据库查询,并返回结果
  3. Tomcat获取到结果后转发给浏览器
TIP
为了简化例子,为对每个java模块模拟分布式数据库,我们创建一个简单的book列表,在这里,每个模块都有整个数据的拷贝.在真实的场景中,你可能会有一个集群数据库,或在分布式数据库的前端有一个集群缓存.
与前面例子不同的是,在这里我们没有使用WebSocket,而是使用Spring来返回结果DeferredResult (http://static.springsource.org/spring/
docs/3.2.0.BUILD-SNAPSHOT/api/org/springframework/web/context/request/async/DeferredResult.html).
你可在Chapter05/Recipe02/web 找到源码,客户端代码在Chpter05/Recipe01/backend.
准备
要使例子工作,你需要Java 1.7+ 和Apache Maven.
为了构建web程序,我们假设你使用Spring Tool Suite (STS),可从http://spring.io/tools下载.
如何做
我们会跳过前面食谱中已经介绍过的的创建MVC模板应用程序和RabbitMQ配置, 并会直接跳到相关的主题.
下面的步骤将会展示如何来实现web模块:
1. 在RabbitMQInstance bean的Init()方法用于环境初始化-创建了search_exchange_05/02交换器和queue_consumer_search_05/02 队列.
2. 配置消息超时时间:
Map<String, Object>args = new HashMap<String, Object>();
args.put("x-message-ttl", 4000);
channel.queueDeclare(Constants.queue, false, false,false, args);
3. 创建应用程序UUID (http://en.wikipedia.org/wiki/Universally_unique_identifier) ,并使用它来作为路由键,以让RabbitMQ client来订阅search_exchange_05/02交换器:
channel.queueBind(myQueue,Constants.exchange,UUIDAPPLICATION);
4. 在HomeController类中添加下面的方法:
publicDeferredResult<String>searchbook(@RequestParam String bookid) {..}
5. 在web.xml中使用<async-supported>true</asyncsupported>来启用异步servlet.
6. 在ConcurrentHashMap<String,DeferredResult<String>>中注册HTTP调用.
7. 使用bookkey参数来将消息发布到queue_consumer_search_05/02 队列.
8. 当消息到达SearchResultConsumer, 将会产生一个新结果.
9. 在request map中找到 guidRequest,并使用setResult(message)来将结果设置为DeferredResult.
让我们按下面的步骤,实现Java后端模块来
继续完成食谱:
10. 使用channel.basicQos(1)来订阅queue_consumer_search_05/02队列.
11. 处理消息和执行搜索:
Book res= myDB.get(idx);
String jsonResult="";
..
jsonResult= jsonWriter.write(res);
12. 将结果发布到search_exchange_05/02交换器:
Builder bob = new Builder();
Map<String, Object> header = new HashMap<String,Object> ();
header.put("guidrequest", guidRequest);
bob.headers(header);
channel.basicPublish(Constants.exchange, RoutingKey,bob.build(), jsonResponse.getBytes());

如何工作
在创建环境之后(步骤1),web程序就准备就绪了. 我们使用queue_consumer_search_05/02来放置查询结果,并使用search_exchange_05/02 交换器来获取查询结果.
在步骤2中,我们创建一个应用程序UUID,并将其作为search_exchange_05/02交换器的路由键,原因是你可能运行了多个能快速进行后端查询的多个Tomcat 实例和应用程序 。唯一的UUIDs可用来区分每个模块。
在步骤3中,我们使用DeferredResult实现了/searchbook URL 的GET handlerDeferredResult 类使用servlet3.0中的异步支持特性.(参考https://blogs.oracle.com/enterprisetechtips/entry/asynchronous_support_in_servlet_3 来了解更多信息)
为了启用servlet3.0的异步支持,我们修改了web.xml file(步骤4).
使用DeferredResult类,你可以通过其它不同的线程来设置结果。在这种情况下,传入的结果将会触发一个回调
.
让我们看一下每个HTTP请求都发生了什么.
当浏览器执行请求时,如执行 /searchbook?bookkey=5, searchbook handler 会接受请求,并为其分配一个新的UUID.然后,它会创建DeferredResult<String>,并在请求参数发送到queue_consumer_search_05/02队列后,将其放入到requests map (步骤6) .消息包含了request UUID和application UUID:
Builder bob = new Builder();
Map<String, Object> header = new HashMap<String,Object> ();
header.put("guidrequest", guidRequest);
bob.headers(header);
bob.correlationId(UUIDAPPLICATION);
注意,java后端会获取消息(步骤11), 它会执行数据库查询(在我们的例子中,使用localDbEmulation进行的模拟),并将查询结果发布search_exchange_05/02交换器 (步骤12).
为了能够正确地重定向消息, Java后端模块将发回了request UUID,并使用correlationId header字段作为路由键.通过这种方式,我们可以确保响应消息能够路由到发送请求的模块

TIP
在步骤9中,我们使用channel.basicQos(1)方法来创建了一个负载均衡请求(已在Distributing messages to many consumers recipe in Chapter 1Working with AMQP看到过了).
如果你添加了多个Java后端模块,它们会被自动地均衡负载。

因此,当消息到达web程序时,它包含下面的东西:
  1. request UUID
  2. JSON格式结果
SearchResultConsumer 将处理消息,并在request map中查询request,同时它也会将结果设置到DeferredResult:
DeferredResult<String> deferredResult =requests.get(guidRequest.toString());
..
deferredResult.setResult(jsonResultSet);
结果最终会展现在浏览器中,request 也会从map中删除.
requests.remove(guidRequest.toString());

TIP
web 和后端程序可通过Maven来编译.使用这种方式,部署WAR和JAR文件将更加容易

在本场景中,为操作设置超时时间是很重要的,这样可以防止不断膨胀的请求,DeferredResult类的带超时时间参数(单位毫秒)的构造器可以帮助我们:
new DeferredResult<String>(3000)
当请求超时时,必须从request map中删除request,可以像下面这样定义适当的回调来完成:
deferredResult.onTimeout(new Runnable() {
public void run() {
rmq.requests.remove(guidRequest);
}

为了防止队列中消息膨胀,我们使用了x-message-ttl来配置消息的过期时间.如果队列中的消息生存时间超过了4秒,该消息将会被删除(参考Chapter 2Going beyond the AMQP Standard).为了使例子简单,我们只决定删除长请求和膨胀消息—有效但过于激烈.在真实的程序中,超时应该以报警的方式进行处理.参考Chapter 2Going beyond the AMQP Standard来了解队列超时;对于DeferredResult, 需要注意超时处理器中的错误:
deferredResult.onTimeout(new Runnable() {
public void run() {
rmq.requests.remove(guidRequest);
}

TIP
超时监控是非常重要的,它可以作为扩展web应用程序的一个信号,也可以当作是出现问题的信号。

为了完成这个例子,我们使用了Apache JMeter (http://jmeter.apache.org/)来对website作压力测试,以了解其能承受的负载。
Chapter05/Recipe02/jm/ 文件夹, 你可以找到用来测试本食谱的JMeter文件,正如下面截屏看到的一样:
你可以通过查看日志文件或STS控制台来检查程序行为是否正确.如果所有都是按预期执行的话,你会看到下面的日志信息:
INFO :com.test.bean.RabbitMQInstance - you have 0 pending requests 
再次说明,你可以使用Maven来编译例子,然后部署web WAR file,并使用 java -cp rmq-0.0.1-SNAPSHOT.jar:./rabbitmq-client.jar com.test.
rmq.Main来执行后端jar文件. 这样一来,你可以在浏览器打开 http://localhost:8080/rmq-1.0.0-BUILDSNAPSHOT/ ,并像下面一样来执行搜索:
更多
在本食谱中,我们看到了队列系统是企业系统的根基. 在此架构中,可以容易地添加多个Tomcat instances或更多的Java后端模块来扩展程序。
新应用程序的集成是非常容易,因为可以将新程序非常容易地绑定到交换和队列上.
同时这种架构对于管理软件系统升级来说,也可以避免程序的停机时间.

也可参考
更多信息,可以阅读下面的连接:
https://blogs.oracle.com/enterprisetechtips/entry/asynchronous_support_in_servlet_3

使用STOMP来开发web监控程序
在本食谱中,我们将展示如何使用Web-Stomp插件在web程序中来直接连接RabbitMQ. 我们将构建一个与Developing web monitoring applications with Spring 食谱一样的例子.
STOMP 是简单(或流)文本面向消息协议(http://stomp.github.io/),它是另一种消息传输协议,相对于AMQP来说,功能不强但非常轻量的客户端。

Web-stomp RabbitMQ plugin 只提供了SockJS 服务器上的STOMP交互协议. On a different node, the STOMP plugin that we will see in Chapter 9Extending
RabbitMQ Functionality provides just plain STOMP protocol interoperability.
你可在 Chapter05/Recipe03/HTML中找源码.
准备
你只需要一个文本编辑器.
如何做
要烹饪此食谱, 我们将使用Web-Stomp (https://www.rabbitmq.com/web-stomp.html)插件, 它是模拟WebSockets的STOMP桥梁,并且我们会使用JavaScript STOMP client来进行访问.
1. 启用插件.
rabbitmq-plugins enable rabbitmq_web_stomp
2. 重启RabbitMQ.
3. 通过http://127.0.0.1:15674/stomp来测试插件,正常的话,你会看到Welcome to SockJS!信息.
4. 创建一个简单的HTML页面,并增加下面的
library:
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
5.  https://raw.github.com/jmesnil/stompwebsocket/master/dist/stomp.js下载Stomp.js,并将其拷贝到HTML文件附近js/stomp.js.
6. 在HTML页面引入Stomp.js:
<script src="js/stomp.js"></script>
7. 初始始连接参数:
var ws = new SockJS('http://' + window.location.hostname +':15674/stomp');
var client = Stomp.over(ws);
8. 将client连接到RabbitMQ broker:
client.connect('guest', 'guest', on_connect, on_error, '/');
9. 让client订阅交换器:
var on_connect = function() {
client.subscribe("/exchange/monitor_exchange_05_01/stats",function(d) {

10. 解析消息并更新统计信息:
var obj = jQuery.parseJSON(d.body);
xcpu = obj.CPU;
xmem = obj.MEM;
xupdate = obj.UPDATETIME;

如何工作
一旦我们启用并测试了插件(步骤1-3), 你就可以使用JavaScript页面来连接RabbitMQ.第一个需要的包(步骤4)是Sockjs JavaScript library (你可在http://sockjs.org上找到更多的信息),第二个需要的包是 Stomp.jsStomp.js, 它是WebSocket上的STOMP JavaScript library (也可以参考http://jmesnil.net/stompwebsocket/doc/). 在初始化参数之后(步骤7), client 连接上RabbitMQ后,就可以on_connected事件上进行订阅. 
client使用subscribe 方法来订阅 (/exchange/monitor_exchange_05_01/stats), 三个反斜杠隔开的子串分别表示:
  1. 第一个子串是交换器的类型或者是queue; 在我们的例子中,是交换器
  2. 第二个子串是名称;在我们的例子中是monitor_exchange_05_01
  3. 第三个子串是路由键;在我们的例子中是stats
正如第一个食谱Developing web monitoring applications with Spring,你可以运行.NET client来向monitor_exchange_05_01交换器发送状态信息.
这个例子中与第一食谱中的效果是一样的,你可以将它们放到一起来执行.即使我们使用了不同技术,消息会同时到达这两个web程序。

TIP
与第一个食谱不同的是,要部署HTML页面,你只需要一个web服务器,如Apache.

本食谱中使用的是HTML5,因此须确保你的浏览器支持HTML5; 你也可以手机浏览器,就像下面截图所示:
posted on 2016-06-14 22:07 胡小军 阅读(2268) 评论(0)  编辑  收藏 所属分类: RabbitMQ

只有注册用户登录后才能发表评论。


网站导航: