随笔-109  评论-187  文章-25  trackbacks-0

由于一些原因,最近要复习一些东西,所以就把JMS的东西再复习一遍,以下便是例子

jms 中最重要的几个概念destination,ACKNOWLEDGE,subscribe,durable subscribe

destination:topic queque
  queue简单点说就是1:1 一个消息只能由一个consumer去消费,别的consumer来消费的时候已经没了,先到先得
topic简单点说就是1:N 一个消息可以由多个consumer来消费,谁来消费都有
 subscribe,拿topic来说如果当前订阅不是持久订阅,只有再订阅后生产者生产得消息才能被consumer得到,持久订阅只要没有被consumer消费,早晚会消费这个消息
 
 
 
 一下是几个例子
 
 queuesend:queque消息产生
 queuereceive:queque消息得消费
 topicsend :topic消息得产生
 topicreceive1:topic消息的非订阅
 topicrecieve2:topic消息的持久订阅
 
 这个例子实在WEBLOGIC814上测试过的,当然要定义JMSSERVER,FACTORY,DESTINATION。
 
 
 
 QueueSend
 
 import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QueueSend {
 // Defines the JNDI context factory.
 public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

 // Defines the JNDI provider url.
 public final static String PROVIDER_URL = " t3://localhost:7001";

 // Defines the JMS connection factory for the queue.
 public final static String JMS_FACTORY = "SendJMSFactory";

 // Defines the queue.
 public final static String QUEUE = "SendJMSQueue";

 private QueueConnectionFactory qconFactory;

 private QueueConnection qcon;

 private QueueSession qsession;

 private QueueSender qsender;

 private Queue queue;

 private TextMessage msg;

 /**
  * Creates all the necessary objects for sending messages to a JMS queue.
  *
  * @param ctx
  *            JNDI initial context
  * @param queueName
  *            name of queue
  * @exception NamingException
  *                if operation cannot be performed
  * @exception JMSException
  *                if JMS fails to initialize due to internal error
  */
 public void init(Context ctx, String queueName) throws NamingException,
   JMSException {
  qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
  qcon = qconFactory.createQueueConnection();
  qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  queue = (Queue) ctx.lookup(queueName);
  qsender = qsession.createSender(queue);
  msg = qsession.createTextMessage();
  qcon.start();
 }

 /**
  * Sends a message to a JMS queue.
  *
  * @param message
  *            message to be sent
  * @exception JMSException
  *                if JMS fails to send message due to internal error
  */
 public void send(String message) throws JMSException {
  msg.setText(message);
  qsender.send(msg);
 }

 /**
  * Closes JMS objects.
  *
  * @exception JMSException
  *                if JMS fails to close objects due to internal error
  */
 public void close() throws JMSException {
  qsender.close();
  qsession.close();
  qcon.close();
 }

 /**
  * main() method.
  *
  * @param args
  *            WebLogic Server URL
  * @exception Exception
  *                if operation fails
  */
 public static void main(String[] args) throws Exception {
  try {
   InitialContext ic = getInitialContext();
   QueueSend qs = new QueueSend();
   qs.init(ic, QUEUE);
   readAndSend(qs);
   qs.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 private static void readAndSend(QueueSend qs) throws IOException,
   JMSException {
  BufferedReader msgStream = new BufferedReader(new InputStreamReader(
    System.in));
  String line = null;
  boolean quitNow = false;
  do {
   System.out.print("Enter message (\"quit\" to quit): ");
   line = msgStream.readLine();
   if (line != null && line.trim().length() != 0) {
    qs.send(line);
    System.out.println("JMS Message Sent: " + line + "\n");
    quitNow = line.equalsIgnoreCase("quit");
   }
  } while (!quitNow);

 }

 private static InitialContext getInitialContext() throws NamingException {
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
  env.put(Context.PROVIDER_URL, PROVIDER_URL);
  return new InitialContext(env);
 }

}


QueueReceive

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QueueReceive implements MessageListener {
 // Defines the JNDI context factory.
 public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

 // Defines the JNDI provider url.
 public final static String PROVIDER_URL = " t3://localhost:7001";

 // Defines the JMS connection factory for the queue.
 public final static String JMS_FACTORY = "SendJMSFactory";

 // Defines the queue.
 public final static String QUEUE = "SendJMSQueue";

 private QueueConnectionFactory qconFactory;

 private QueueConnection qcon;

 private QueueSession qsession;

 private QueueReceiver qreceiver;

 private Queue queue;

 private boolean quit = false;

 /**
  * Message listener interface.
  *
  * @param msg
  *            message
  */
 public void onMessage(Message msg) {
  try {
   String msgText;
   if (msg instanceof TextMessage) {
    msgText = ((TextMessage) msg).getText();
   } else {
    msgText = msg.toString();
   }

   System.out.println("Message Received: " + msgText);

//   if (msgText.equalsIgnoreCase("123")) {
//    synchronized (this) {
//     quit = true;
//     this.notifyAll(); // Notify main thread to quit
//    }
//   }
  } catch (JMSException jmse) {
   jmse.printStackTrace();
  }
 }

 /**
  * Creates all the necessary objects for receiving messages from a JMS
  * queue.
  *
  * @param ctx
  *            JNDI initial context
  * @param queueName
  *            name of queue
  * @exception NamingException
  *                if operation cannot be performed
  * @exception JMSException
  *                if JMS fails to initialize due to internal error
  */
 public void init(Context ctx, String queueName) throws NamingException,
   JMSException {
  qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
  qcon = qconFactory.createQueueConnection();
  qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  queue = (Queue) ctx.lookup(queueName);
  qreceiver = qsession.createReceiver(queue);
  qreceiver.setMessageListener(this);
  qcon.start();
 }

 /**
  * Closes JMS objects.
  *
  * @exception JMSException
  *                if JMS fails to close objects due to internal error
  */
 public void close() throws JMSException {
  qreceiver.close();
  qsession.close();
  qcon.close();
 }

 /**
  * main() method.
  *
  * @param args
  *            WebLogic Server URL
  * @exception Exception
  *                if execution fails
  */

 public static void main(String[] args) throws Exception {

  InitialContext ic = getInitialContext();
  QueueReceive qr = new QueueReceive();
  qr.init(ic, QUEUE);

  System.out
    .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");

  // Wait until a "quit" message has been received.
  synchronized (qr) {
   System.out.println("111111111111");
   while (!qr.quit) {
    try {
     System.out.println("2222222222");
     qr.wait();
     System.out.println("333333333");
    } catch (InterruptedException ie) {
    }
   }
  }
  qr.close();
 }

 private static InitialContext getInitialContext() throws NamingException {
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
  env.put(Context.PROVIDER_URL, PROVIDER_URL);
  return new InitialContext(env);
 }

}

 

TopicSend

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.TopicPublisher;
public class TopicSend {
 // Defines the JNDI context factory.
 public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

 // Defines the JNDI provider url.
 public final static String PROVIDER_URL = " t3://localhost:7001";

 // Defines the JMS connection factory for the queue.
 public final static String JMS_FACTORY = "SendJMSFactory";

 // Defines the queue.
 public final static String TOPIC = "SendJMSTopic";

 private TopicConnectionFactory tconFactory;

 private TopicConnection tcon;

 private TopicSession tsession;

 private TopicPublisher tsender;

 private Topic topic;

 private TextMessage msg;
 public static InitialContext ic ;

 /**
  * Creates all the necessary objects for sending messages to a JMS queue.
  *
  * @param ctx
  *            JNDI initial context
  * @param queueName
  *            name of queue
  * @exception NamingException
  *                if operation cannot be performed
  * @exception JMSException
  *                if JMS fails to initialize due to internal error
  */
 public void init(Context ctx, String queueName) throws NamingException,
   JMSException {
  tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
  tcon = tconFactory.createTopicConnection();
  tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
  System.out.println(topic);
  topic = (Topic) ctx.lookup(queueName);
  System.out.println(topic);
  tsender = tsession.createPublisher(topic);
  msg = tsession.createTextMessage();
  tcon.start();
 }

 /**
  * Sends a message to a JMS queue.
  *
  * @param message
  *            message to be sent
  * @exception JMSException
  *                if JMS fails to send message due to internal error
  */
 public void send(String message) throws JMSException ,NamingException{
  System.out.println(topic+"-----------");
  msg.setText(message);
  tsender.publish(msg);
 }

 /**
  * Closes JMS objects.
  *
  * @exception JMSException
  *                if JMS fails to close objects due to internal error
  */
 public void close() throws JMSException {
  tsender.close();
  tsession.close();
  tcon.close();
 }

 /**
  * main() method.
  *
  * @param args
  *            WebLogic Server URL
  * @exception Exception
  *                if operation fails
  */
 public static void main(String[] args) throws Exception {
  try {
   ic= getInitialContext();
   TopicSend ts = new TopicSend();
   ts.init(ic, TOPIC);
   readAndSend(ts);
   ts.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 private static void readAndSend(TopicSend ts) throws IOException,
   JMSException,NamingException {
  BufferedReader msgStream = new BufferedReader(new InputStreamReader(
    System.in));
  String line = null;
  boolean quitNow = false;
  do {
   System.out.print("Enter message (\"quit\" to quit): ");
   line = msgStream.readLine();
   if (line != null && line.trim().length() != 0) {
    ts.send(line);
   
    System.out.println("JMS Message Sent: " + line + "\n");
    quitNow = line.equalsIgnoreCase("quit");
   }
  } while (!quitNow);

 }

 private static InitialContext getInitialContext() throws NamingException {
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
  env.put(Context.PROVIDER_URL, PROVIDER_URL);
  return new InitialContext(env);
 }

}

 

TopicReceive1

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class TopicReceive1 implements MessageListener {
 // Defines the JNDI context factory.
 public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

 // Defines the JNDI provider url.
 public final static String PROVIDER_URL = " t3://localhost:7001";

 // Defines the JMS connection factory for the queue.
 public final static String JMS_FACTORY = "SendJMSFactory";

 // Defines the queue.
 public final static String TOPIC = "SendJMSTopic";

 private TopicConnectionFactory tconFactory;

 private TopicConnection tcon;

 private TopicSession tsession;

 private TopicSubscriber tsubscriber;

 private Topic topic;

 private boolean quit = false;

 /**
  * Message listener interface.
  *
  * @param msg
  *            message
  */
 public void onMessage(Message msg) {
  System.out.println("===================");
  try {
   String msgText;
   if (msg instanceof TextMessage) {
    msgText = ((TextMessage) msg).getText();
   } else {
    msgText = msg.toString();
   }

   System.out.println("Message Received: " + msgText);

//   if (msgText.equalsIgnoreCase("123")) {
//    synchronized (this) {
//     quit = true;
//     this.notifyAll(); // Notify main thread to quit
//    }
//   }
  } catch (JMSException jmse) {
   jmse.printStackTrace();
  }
 }

 /**
  * Creates all the necessary objects for receiving messages from a JMS
  * queue.
  *
  * @param ctx
  *            JNDI initial context
  * @param queueName
  *            name of queue
  * @exception NamingException
  *                if operation cannot be performed
  * @exception JMSException
  *                if JMS fails to initialize due to internal error
  */
 public void init(Context ctx, String queueName) throws NamingException,
   JMSException {
  tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
  tcon = tconFactory.createTopicConnection();
  tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
  topic = (Topic) ctx.lookup(queueName);
  tsubscriber = tsession.createSubscriber(topic);
  //System.out.println("12");
  //Message msg = treceiver.receive();
  //msg.acknowledge();
  //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
  //System.out.println(msg);
  tsubscriber.setMessageListener(this);
  tcon.start();
 }

 /**
  * Closes JMS objects.
  *
  * @exception JMSException
  *                if JMS fails to close objects due to internal error
  */
 public void close() throws JMSException {
  tsubscriber.close();
  tsession.close();
  tcon.close();
 }

 /**
  * main() method.
  *
  * @param args
  *            WebLogic Server URL
  * @exception Exception
  *                if execution fails
  */

 public static void main(String[] args) throws Exception {

  InitialContext ic = getInitialContext();
  TopicReceive1 tr1 = new TopicReceive1();
  tr1.init(ic, TOPIC);

  System.out
    .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
  
  

  // Wait until a "quit" message has been received.
//  synchronized (tr1) {
//   System.out.println("111111111111");
//   while (!tr1.quit) {
//    try {
//     System.out.println("2222222222");
//     tr1.wait();
//     System.out.println("333333333");
//    } catch (InterruptedException ie) {
//    }
//   }
//  }
  tr1.close();
 }

 private static InitialContext getInitialContext() throws NamingException {
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
  env.put(Context.PROVIDER_URL, PROVIDER_URL);
  return new InitialContext(env);
 }

}


TopicReceive2


import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class TopicReceive2 {
 // Defines the JNDI context factory.
 public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

 // Defines the JNDI provider url.
 public final static String PROVIDER_URL = " t3://localhost:7001";

 // Defines the JMS connection factory for the queue.
 public final static String JMS_FACTORY = "SendJMSFactory";

 // Defines the queue.
 public final static String TOPIC = "SendJMSTopic";

 private TopicConnectionFactory tconFactory;

 private TopicConnection tcon;

 private TopicSession tsession;

 private TopicSubscriber tsubscriber;

 private Topic topic;

 private boolean quit = false;


 /**
  * Creates all the necessary objects for receiving messages from a JMS
  * queue.
  *
  * @param ctx
  *            JNDI initial context
  * @param queueName
  *            name of queue
  * @exception NamingException
  *                if operation cannot be performed
  * @exception JMSException
  *                if JMS fails to initialize due to internal error
  */
 public void init(Context ctx, String queueName) throws NamingException,
   JMSException,InterruptedException {
  tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
  tcon = tconFactory.createTopicConnection();
  tcon.setClientID("IP10.200.7.104");
  tcon.start();
  
  tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
  System.out.println("333333333333");
  topic = (Topic) ctx.lookup(queueName);
  //tsubscriber = tsession.createSubscriber(topic);
  tsubscriber= tsession.createDurableSubscriber(topic,"88888");
   for (int i=0; i<3; i++) {
             //
             TextMessage message = (TextMessage) tsubscriber.receive();
             System.out.println("message["+i+"]: " + message.getText());
         }
          Thread.sleep(10000);
  //System.out.println("12");
  //Message msg = treceiver.receive();
  //msg.acknowledge();
  //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
  //System.out.println(msg);

 }

 /**
  * Closes JMS objects.
  *
  * @exception JMSException
  *                if JMS fails to close objects due to internal error
  */
 public void close() throws JMSException {
  tsubscriber.close();
  tsession.close();
  tcon.close();
 }

 /**
  * main() method.
  *
  * @param args
  *            WebLogic Server URL
  * @exception Exception
  *                if execution fails
  */

 public static void main(String[] args) throws Exception {

  InitialContext ic = getInitialContext();
  TopicReceive2 tr2 = new TopicReceive2();
  tr2.init(ic, TOPIC);

  System.out
    .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
  
  

  // Wait until a "quit" message has been received.
//  synchronized (tr1) {
//   System.out.println("111111111111");
//   while (!tr1.quit) {
//    try {
//     System.out.println("2222222222");
//     tr1.wait();
//     System.out.println("333333333");
//    } catch (InterruptedException ie) {
//    }
//   }
//  }
  tr2.close();
 }

 private static InitialContext getInitialContext() throws NamingException {
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
  env.put(Context.PROVIDER_URL, PROVIDER_URL);
  return new InitialContext(env);
 }

}

posted on 2007-08-15 09:20 小小程序程序员混口饭吃 阅读(1163) 评论(0)  编辑  收藏 所属分类: java

只有注册用户登录后才能发表评论。


网站导航: