消息是JMS中的一种类型对象,由两部分组成:消息头和消息体(或分三部分:消息头、属性、消息体)。消息头由路由信息以及有关该消息的元数据组成。消息体则携带着应用程序的数据或有效负载。
JMSDeliveryMode:传递模式,有两种模式: PERSISTENT和NON_PERSISTENT,PERSISTENT表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可靠性和吞吐量之间找到平衡点。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。
JMSExpiration:消息过期时间,等于QueueSender的send方法中的timeToLive值或TopicPublisher的publish方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
JMSPriority:消息优先级,从0-9 十个级别,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。
JMSRedelivered:如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。
除了消息头中定义好的标准属性外,JMS提供一种机制增加新属性到消息头中,这种新属性包含以下几种:应用需要用到的属性、消息头中原有的一些可选属性、JMS Provider需要用到的属性。
2)消息体:JMS API定义了5种消息体格式,也叫消息类型,你可以使用不同形式发送接收数据并可以兼容现有的消息格式,下面描述这5种类型:
TextMessage:java.lang.String对象,如xml文件内容
MapMessage:名/值对的集合,名是String对象,值类型可以是Java任何基本类型
BytesMessage:字节流
StreamMessage:Java中的输入输出流
ObjectMessage:Java中的可序列化对象
Message:没有消息体,只有消息头和属性
3)消息可靠性
在上面谈及消息体格式定义中,有个字段JMSDeliveryMode用来表示该消息发送后,JMS提供商应该怎么处理消息。PERSISTENT(持久化)的消息在JMS服务器中持久化。接收端如果采用点对点的queue方式或者Durable Subscription(持久订阅者)方式,那么消息可保证只且只有一次被成功接收。NON_PERSISTENT(非持久化)的消息在JMS服务器关闭或宕机时,消息丢失。根据发送端和接收端采用的方式,列出如下可靠性表格,以作参考。
注意:以下的可靠性不包括JMS服务器由于资源关系,造成的消息不能持久化等因素引起的不可靠(该类不可靠应该是JMS提供商或硬件引起的的资源和处理能力的极限问题,应该由管理人员解决)。也不包括消息由于超时时间造成的销毁丢失。
消息发送端 消息接收端 可靠性及因素
PERSISTENT queue receiver/durable subscriber 消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。
PERSISTENT non-durable subscriber 最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时消息不保留。
NON_PERSISTENT queue receiver/durable subscriber 最多消费一次。这是由于服务器的宕机会造成消息丢失
NON_PERSISTENT non-durable subscriber 最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定
4)消息的优先级
虽然JMS规范并不需要JMS供应商实现消息的优先级路线,但是它需要递送加快的消息优先于普通级别的消息。JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说:
topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub
或 queueSender.send(message,DeliveryMode.PERSISTENT, 8, 10000);//P2P
这个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是10000 (以毫秒度量)。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设置生存周期是有用的。
5)消息的通知确认
在客户端接收了消息之后,JMS服务怎样有效确认消息是否已经被客户端接收呢?
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
这段代码创建一个非事务性的session,并采用auto_acknowledge方式通知JMS服务器。如果采用事务性session时,通知会伴随session的commit/rollback同时发送通知。在我们采用非事务session时,有三种通知方式。
通知方式 效果
DUPS_OK_ACKNOWLEDGE session 延迟通知。如果JMS服务器宕机,会造成重复消息的情况。程序必须保证处理重复消息而不引起程序逻辑的混乱。
AUTO_ACKNOWLEDGE 当receive或MessageListener方法成功返回后自动通知。
CLIENT_ACKNOWLEDGE 客户端调用消息的acknowledge方法通知
5、PTP模型
点对点(Point- to-Point)消息传送使用的目标是队列。通过使用队列,消息可以被异步或同步地发送和接收。每一条到达队列的消息将会被投递到单独一个消费者一次,并且只有一次。这就好像两个人之间的邮件发送。消费者可以通过MessageConsumer.receive()方法同步地接收消息或使用 MessageConsumer.setMessageListener()方法注册一个MessageListener实现来异步地接收消息。队列保存所有的消息直到它们被投递出去或过期。
JMS PTP模型中的主要概念和对象:
Queue:由JMS Provider管理,队列由队列名识别,客户端可以通过JNDI接口用队列名得到一个队列对象。
TemporaryQueue:由QueueConnection创建,而且只能由创建它的QueueConnection使用。
QueueConnectionFactory:客户端用QueueConnectionFactory创建QueueConnection对象。
QueueConnection:一个到JMS PTP provider的连接,客户端可以用QueueConnection创建QueueSession来发送和接收消息。
QueueSession:提供一些方法创建QueueReceiver 、QueueSender、QueueBrowser和TemporaryQueue。如果在QueueSession关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当接收者下次连接到相同的队列时,这些消息还会被再次接收。
QueueReceiver:客户端用QueueReceiver接收队列中的消息,如果用户在QueueReceiver中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。
QueueSender:客户端用QueueSender发送消息到队列。
QueueBrowser:客户端可以QueueBrowser浏览队列中的消息,但不会收走消息。
QueueRequestor:JMS提供QueueRequestor类简化消息的收发过程。QueueRequestor的构造函数有两个参数:QueueSession和queue,QueueRequestor通过创建一个临时队列来完成最终的收发消息请求。
可靠性(Reliability):队列可以长久地保存消息直到接收者收到消息。接收者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
可以看到,一个或多个生产者发送消息,消息m2先抵达了queue,然后m1也发出了,并一同存在于一个先进先出的queue里面。消费者也存在一个或多个,对queue里的消息进行消费。但一个消息只会被的一个消费者消费且仅消费一次。
6、PUB/SUB模型
JMS Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。
JMS Pub/Sub 模型中的主要概念和对象:
订阅(subscription):消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider会为这个ID保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
Topic:主题由JMS Provider管理,主题由主题名识别,客户端可以通过JNDI接口用主题名得到一个主题对象。JMS没有给出主题的组织和层次结构的定义,由JMS Provider自己定义。
TemporaryTopic:临时主题由TopicConnection创建,而且只能由创建它的TopicConnection使用。临时主题不能提供持久订阅功能。
TopicConnectionFactory:客户端用TopicConnectionFactory创建TopicConnection对象。
TopicConnection:TopicConnection是一个到JMS Pub/Sub provider的连接,客户端可以用TopicConnection创建TopicSession来发布和订阅消息。
TopicSession:TopicSession提供一些方法创建TopicPublisher、TopicSubscriber、TemporaryTopic 。它还提供unsubscribe方法取消消息的持久订阅。
TopicPublisher:客户端用TopicPublisher发布消息到主题。
TopicSubscriber:客户端用TopicSubscriber接收发布到主题上的消息。可以在TopicSubscriber中设置消息过滤功能,这样,不符合要求的消息不会被接收。
Durable TopicSubscriber:如果一个客户端需要持久订阅消息,可以使用Durable TopicSubscriber,TopSession提供一个方法createDurableSubscriber创建Durable TopicSubscriber对象。
恢复和重新派送(Recovery and Redelivery):非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。
TopicRequestor:JMS提供TopicRequestor类简化消息的收发过程。TopicRequestor的构造函数有两个参数:TopicSession和topic。TopicRequestor通过创建一个临时主题来完成最终的发布和接收消息请求。
可靠性(Reliability):当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。
在pub/sub消息模型中,消息被广播给所有订阅者。订阅者可以同步接收消息也可以异步接收消息,消息的同步接收是指客户端主动去接收消息,消息的异步接收是指当消息到达时,主动通知客户端。
使用JMeter测试JMS
JMeter是Apache开发的一款小巧易用的开源性能测试工具,由java语言开发。JMeter不仅免费开源而且功能强大、易于扩展,如果有一定Java开发基础的话还可以在JMeter上做扩展开发新的插件等,几乎能满足各种性能测试需求。JMeter中使用Sampler元件(取样器)来模拟各种的类型的请求数据格式,类似于LR中的协议(比LR中的协议概念更广),如:http、ftp、soap、tcp等等。JMeter中支持的JMS Point-to Point、JMS Publisher和JMS Subscriber分别用于发送JMS的PTP消息和PUB/SUB消息,因此可以选择使用JMeter来测试JMS。
MOM(消息中间件)作为消息数据交换的平台,也是影响应用执行效率的潜在环节。在Java程序中,是通过JMS与MOM进行交互的。作为Java实现的性能测试工具JMeter也能使用JMS对应用的消息交换和相关的数据处理能力进行测试。在整个测试过程中,JMeter测试的重点是消息的产生者和消费者的能力,而不是MOM本身。JMeter虽然能使用JMS对MOM进行测试,但是它本身并没有提供JMS需要使用的包(实现类)。因此在使用JMeter测试JMS时需要使用到具体的MOM的相关jar包。以下结合流行的开源消息中间件ActiveMQ来演示如何使用JMeter来实现对JMS的测试。
1、安装并启动ActiveMQ服务
2、测试前的准备
使用JMeter进行压力测试时,所有的JMeter依赖的包需要复制到%JMETER_HOME%/lib目录下。对于ActiveMQ来说,就是复制%ACTIVEMQ_HOME%/lib目录下jar包,可根据实际情况来考虑是否复制。JMeter在测试时使用了JNDI,为了提供JNDI提供者的信息,需要提供jndi.properties。同时需要将jndi.properties放到JMeter的%JMETER_HOME%/lib和%JMETER_HOME%/bin目录中,还需要将jndi.properties与%JMETER_HOME%/bin目录下的ApacheJMeter.jar打包在一起。对于ActiveMQ,jndi.properties的演示内容如下:
1 #java.naming.factory.initial = org.activemq.jndi.ActiveMQInitialContextFactory 2 java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory 3 java.naming.provider.url = tcp://localhost:61616 4 5 #指定connectionFactory的jndi名字,多个名字之间可以逗号分隔。 6 #以下为例: 7 #对于topic,使用(TopicConnectionFactory)context.lookup("connectionFactry") 8 #对于queue,(QueueConnectionFactory)context.lookup("connectionFactory") 9 connectionFactoryNames = connectionFactory 10 11 #注册queue,格式: 12 #queue.[jndiName] = [physicalName] 13 #使用时:(Queue)context.lookup("jndiName"),此处是MyQueue 14 queue.MyQueue = example.MyQueue 15 16 #注册topic,格式: 17 # topic.[jndiName] = [physicalName] 18 #使用时:(Topic)context.lookup("jndiName"),此处是MyTopic 19 topic.MyTopic = example.MyTopic |
3、测试JMS的PTP模型
对于点对点模型,JMeter只提供了一种Sampler:JMS Point-to-Point。如图所示建立测试计划:
QueueConnection Factory:连接工厂,输入jndi配置文件中配置的connectionFactory
JNDI name Request queue:请求队列名,输入jndi配置文件中配置的MyQueue
JNDI name Receive queue:接收队列名,输入jndi配置文件中配置的MyQueue
Content:消息内容,比如输入:this is a test
Initial Context Factory:输入org.apache.activemq.jndi.ActiveMQInitialContextFactory
Provider URL:提供者URL,即安装的ActiveMQ的服务地址tcp://yourIP:61616
运行调试时通过监视器元件查看是否发送成功,如下说明发送成功: