IBM的WebSphere MQ产品可以用来方便地实现分布式异构系统之间的消息传递。对于大型的分布式系统,使用MQ进行数据通信是可以说是非常有效的,而且适用于异构环境(如NT和多种UNIX之间通信)。本文主要介绍:MQ的核心组件介绍、MQ环境的搭建以及利用JAVA对MQ队列管理器的操作的程序设计,希望能起到抛砖引玉的作用。
第一部分、MQ的核心组件介绍
MQ的核心组件包括:队列管理器(QueueManager)、队列(Queue)、通道(Channel)、消息(Message)和集群(Cluster)。
队列管理器(QueueManager)提供队列服务,管理属于该队列管理器的队列和通道等所有MQ对象。
队列(Queue)是用于存储消息(Message)的数据结构,有四种类型:本地队列(LocalQueue)、远程队列(RemoteQueue)、别名队列(AliasQueue)和模型队列(ModelQueue),最常用到的是本地队列和远程队列。
通道(Channel)是提供了从一个队列管理器到其他队列管理器的数据传输路径。通道类型有若干种,其中常用的是发送方通道(SenderChannel)和接收方通道(ReceiverChannel)。
消息(Message)是应用程序之间传递的一系列字节数据,MQ传递的消息有两部分组成:消息描述符(MessageDescriptor)和应用数据(ApplicationData)。默认最大传递的消息大小是4MB,可以根据需要进行设置,最大可到100MB。
集群(Cluster)是分布式网络上的多个队列管理器的集合。(本文不涉及集群的具体内容)
第二部分、MQ环境的搭建
本文搭建的环境以Windows平台为例,涉及其他平台的请读者查阅相关文档。
具体搭建步骤:
1、根据安装向导安装IBM WebSphere MQ v5.3软件,安装路径为:D:\IBM\WebSphere MQ。
2、安装成功后,请使用命令echo %classpath%检查classpath变量中是否已经把D:\IBM\WebSphere MQ\Java\lib下面的jar文件包含进来,如没有包括请进行手工添加,本文要用到的2个关键的是:com.ibm.mq.jar和connector.jar。使用echo %path%检查path变量中是否已经把D:\IBM\WebSphere MQ\bin包含进来,如没有包括请进行手工添加。
3、创建一个配置文本文件,文件名为config.txt,内容如下(请读者到附件下载):
* 更改QM的字符集编码(CCSID)
ALTER QMGR FORCE CCSID(1381)
* 定义本地队列
DEFINE QLOCAL('LQ_SAMPLE') REPLACE +
USAGE(normal) +
DEFPSIST(YES)
4、创建一个批处理文件,文件名为mqsetup.bat,内容如下(请读者到附件下载):
rem 创建缺省队列管理器,拥有100个句柄,使用线性循环日志,容量为 1024 × 4 K/文件,主文件10个,辅文件20个
echo Creating QM_SAMPLE
crtmqm -t 5000 -h 100 -lc -lf 1024 -lp 10 -ls 20 -q QM_SAMPLE
rem 设置cpu个数为1
setmqcap 1
rem 启动队列管理器
echo Starting Queue Manager
strmqm QM_SAMPLE
rem 从配置文件中读入初始化命令
echo Running config
runmqsc QM_SAMPLE < config.txt
rem 停止队列管理器
amqmdain end QM_SAMPLE
rem 将队列管理器设置为自动启动
amqmdain auto QM_SAMPLE
rem 创建队列侦听器,使用1414端口
amqmdain crtlsr QM_SAMPLE -t TCP -p 1414
rem 修改MQ参数,采用AdoptNewMCA方式
amqmdain reg QM_SAMPLE -c add -s Channels -v AdoptNewMCA=ALL
rem 修改MQ参数,采用KeepAlive方式
amqmdain reg QM_SAMPLE -c add -s TCP -v KeepAlive=Yes
rem 重新启动队列管理器
amqmdain start QM_SAMPLE
5、运行mqsetup.bat,检查运行结果输出是否无误,如有错误,请仔细根据上述步骤进行检查并纠错。
6、在命令窗口中,输入dspmq,看是否显示如下结果:
QMNAME(QM_SAMPLE) STATUS(正在运行)
7、在命令窗口中,输入runmqsc回车,进入mq交互操作环境,输入display queue(LQ_SAMPLE),看是否显示如下结果:
AMQ8409: 显示队列细节。
DESCR(WebSphere MQ Default Local Queue)
PROCESS( ) BOQNAME( )
INITQ( ) TRIGDATA( )
CLUSTER( ) CLUSNL( )
QUEUE(LQ_SAMPLE) CRDATE(2006-10-31)
CRTIME(16.17.01) ALTDATE(2006-10-31)
8、输入end退出mq交互操作环境。
自此,NT平台上的最基本的MQ环境搭建完成了。
第三部分、利用JAVA对MQ队列管理器的操作的程序设计
本文涉及的程序在JDK 1.4.2上测试通过。文件名为MQSample.java,程序内容如下:
import java.io.IOException;
import com.ibm.mq.MQC;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MQSample{
//定义队列管理器和队列的名称
private static String qmName;
private static String qName;
public static void main(String args[]) {
try{
//第一个参数是队列管理器名,第二个参数是队列名
qmName = args[0].trim();
qName = args[1].trim();
}catch(Exception e){
System.out.println("USAGE: java MQSample 队列管理器名 队列名");
System.exit(0);
}
try {
//定义并初始化队列管理器对象并连接
MQQueueManager qMgr = new MQQueueManager(qmName);
// 设置将要连接的队列属性
// Note. All WebSphere MQ Options are prefixed with MQC in Java.
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
//连接队列
MQQueue localQ = qMgr.accessQueue(qName, openOptions);
//定义一个简单的消息
MQMessage putMessage = new MQMessage();
putMessage.writeUTF("Hello World!");
//设置写入消息的属性(默认属性)
MQPutMessageOptions pmo = new MQPutMessageOptions();
//将消息写入队列
localQ.put(putMessage,pmo);
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.messageId = putMessage.messageId;
//设置取出消息的属性(默认属性)
MQGetMessageOptions gmo = new MQGetMessageOptions();
// 从队列中取出消息
localQ.get(retrievedMessage, gmo);
String msgText = retrievedMessage.readUTF();
System.out.println("The message is: " + msgText);
//关闭队列
localQ.close();
//从队列管理器断开
qMgr.disconnect();
}catch (MQException ex) {
System.out.println("A WebSphere MQ error occurred : Completion code "
+ ex.completionCode + " Reason code " + ex.reasonCode);
}catch (IOException ex) {
System.out.println("An error occurred whilst writing to the message buffer: " + ex);
}catch(Exception ex){
ex.printStackTrace();
}
}
}
以上程序接受2个输入参数,第一个参数是队列管理器名,第二个参数是队列名。首先初始化MQQueueManager对象,以队列管理器的名称作为参数。这里用的是最简单的MQQueueManager,只能连接本机的队列管理器。推荐使用MQQueueManager(String qmName, Hashtable hashtable)构造函数,如下:
private static Hashtable properties = new Hashtable();
properties.put("hostname", "主机名");
properties.put("port", new Integer(1414));
......
MQQueueManager qMgr = new MQQueueManager(qmName,properties);
可以使应用程序访问MQ服务器的特定端口。
然后通过QueueManager的accessQueue()方法连接队列,该方法返回MQQueue对象。然后定义了MQMessage对象,通过writeUTF()将字符串写入消息,进而通过MQQueue的put()方法将消息放入队列中,然后通过get()方法取出消息,最后显示在console上。在程序结尾,执行MQQueue的close()方法关闭队列,执行MQQueueManager的disconnect()断开和队列管理器的连接。
对MQSample.java编译后,输入java MQSample QM_SAMPLE LQ_SAMPLE运行,得出如下结果:
The message is: Hello World!
在作者调试以上程序时,出现了一个错误:java.lang.NoClassDefFoundError: javax/resource/ResourceException
经过在IBM网站上进行搜索,说是classpath中没有指定connector.jar文件,但是我已经在Eclipse开发环境中进行设置。
通过仔细分析,终于发现是由于JDK的缘故(我用的是WebSphere6.0里的JDK),切换到SUN的JDK就正常了。
所以,如果你需要使用IBM的JDK的话,那么要把MQ的这几个包覆盖到IBM JDK的相关路径下。
到此,读者应该了解了MQ有哪些主要的对象,环境怎么搭建(请仔细研究config.txt和mqsetup.bat文件),以及如何对MQ的对象进行操作。
如果读者对以上的内容有任何疑问,可以和我联系,qianh@cntmi.com