1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.MessageConsumer; 5 import javax.jms.Session; 6 import javax.jms.TextMessage; 7 8 import org.apache.activemq.ActiveMQConnection; 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 public class QueueReceive{ 12 public static void main(String[] args) { 13 14 // ConnectionFactory :连接工厂,JMS 用它创建连接 15 ConnectionFactory connectionFactory; 16 // Connection :JMS 客户端到JMS Provider 的连接 17 Connection connection = null; 18 // Session: 一个发送或接收消息的线程 19 Session session; 20 // Destination :消息的目的地;消息发送给谁. 21 Destination destination; 22 // 消费者,消息接收者 23 MessageConsumer consumer; 24 25 connectionFactory = new ActiveMQConnectionFactory( 26 ActiveMQConnection.DEFAULT_USER, 27 ActiveMQConnection.DEFAULT_PASSWORD, 28 "tcp://localhost:61616"); 29 try { 30 // 构造从工厂得到连接对象 31 connection = connectionFactory.createConnection(); 32 // 启动 33 connection.start(); 34 // 获取操作连接 35 session = connection.createSession(Boolean.FALSE, 36 Session.AUTO_ACKNOWLEDGE); 37 // 获取session注意参数是一个服务器的queue,须在在ActiveMq的console配置 38 destination = session.createQueue("queue1"); 39 consumer = session.createConsumer(destination); 40 while (true) { 41 TextMessage message = (TextMessage) consumer.receive(1000); 42 if (null != message) { 43 System.out.println("收到消息" + message.getText()); 44 45 } else { 46 break; 47 } 48 49 } 50 51 } catch (Exception e) { 52 e.printStackTrace(); 53 } finally { 54 try { 55 if (null != connection) 56 connection.close(); 57 } catch (Throwable ignore) { 58 } 59 } 60 61 } 62
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.DeliveryMode; 4 import javax.jms.Destination; 5 import javax.jms.MessageProducer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnection; 10 import org.apache.activemq.ActiveMQConnectionFactory; 11 12 public class QueueSend { 13 private static final int SEND_NUMBER = 5; 14 15 public static void main(String[] args) { 16 // ConnectionFactory :连接工厂,JMS 用它创建连接 17 ConnectionFactory connectionFactory; 18 // Connection :JMS 客户端到JMS Provider 的连接 19 Connection connection = null; 20 // Session: 一个发送或接收消息的线程 21 Session session; 22 // Destination :消息的目的地;消息发送给谁. 23 Destination destination; 24 // MessageProducer:消息发送者 25 MessageProducer producer; 26 // TextMessage message; 27 // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar 28 29 connectionFactory = new ActiveMQConnectionFactory( 30 ActiveMQConnection.DEFAULT_USER, 31 ActiveMQConnection.DEFAULT_PASSWORD, 32 "tcp://localhost:61616"); 33 34 try { 35 // 构造从工厂得到连接对象 36 connection = connectionFactory.createConnection(); 37 // 启动 38 connection.start(); 39 // 获取操作连接 40 session = connection.createSession(Boolean.TRUE, 41 Session.AUTO_ACKNOWLEDGE); 42 // queue1需要在admin界面创建 43 destination = session.createQueue("queue1"); 44 // 得到消息生成者 45 46 producer = session.createProducer(destination); 47 // 设置不持久化,此处学习,实际根据项目决定 48 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 49 // 构造消息,此处写死,项目就是参数,或者方法获取 50 sendMessage(session, producer); 51 session.commit(); 52 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } finally { 56 try { 57 if (null != connection) 58 connection.close(); 59 } catch (Throwable ignore) { 60 } 61 } 62 63 } 64 65 public static void sendMessage(Session session, MessageProducer producer) 66 throws Exception { 67 for (int i = 1; i <=SEND_NUMBER; i++) { 68 TextMessage message = session 69 .createTextMessage("ActiveMq 发送的消息" + i); 70 // 发送消息到目的地方 71 System.out.println("发送消息:" + i+"成功"); 72 producer.send(message); 73 } 74 } 75 } } 聚沙成洲,奋发超越。
|