背景:项目中一个场景需要用java端的处理代码获取php端放到rabbitmq内的消息,然后做相应业务的处理。
前提:rabbitmq服务器已经搭建好,php端的消息发布正常运行。
首先:下载rabbitmq-client对应的java版jar包(spring好像有相应的支持)
开始代码coding的工作,上代码
package com.eelly.imagesearch.common;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
public class RabbitMqControll {
/**
* 读取RabbitMq中的存储信息
*
* @param queue_name 队列名
* @param exchange_name 交换机名
* @param route_key 绑定用到的route_key
* @param durable 是否持久化
*/
public void readRabbitMqInfo (String queue_name,
String exchange_name, String route_key, boolean durable)
{
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器ip
factory.setHost("172.18.107.66");
// 设置rabbitmq服务器运行的端口
factory.setPort(5672);
// 设置rabbitmq服务器连接用户
factory.setUsername("guest");
// 设置rabbitmq服务器连接用户密码
factory.setPassword("guest");
// 设置rabbitmq服务器节点目录(个人理解)
factory.setVirtualHost("/");
try {
// 创建工厂连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机(设置相关属性时需要和php端的一致)
channel.exchangeDeclare(exchange_name, "direct", durable);
// 声明消息队列(设置相关属性时需要和php端的一致)
channel.queueDeclare(queue_name, durable, false, true, null);
// 绑定消息队列(设置相关属性时需要和php端的一致)
channel.queueBind(queue_name, exchange_name, route_key);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// basicConsume消费模式
/*channel.basicQos(1);//消息分发处理
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue_name, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 提交消息处理完成回复
channel.basicAck(delivery.getEnvelope()。getDeliveryTag(), false);
}*/
// basicGet消费模式
while (true)
{
// get方式主动消费
GetResponse res=channel.basicGet(queue_name, false);
if (res != null && res.getMessageCount() >= 0)
{
System.out.println(res.getMessageCount());
String message = "";
message = new String(res.getBody());
channel.basicAck(res.getEnvelope()。getDeliveryTag(), false);
System.out.println(" [x] Received '" + message + "'");
}
else
{
System.out.println("消息队列中没有可消费的信息!");
break;
}
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在开发的过程中,主要报的异常是:
1.创建交换机和消息队列时,设置的属性和消息产生端的php代码设置的不一样,导致不匹配和一直重写属性
2.在调用时一直没有确定到底是用basicConsume的消费模式还是basicGet消费模式(前者带有监控效果,后者没有,不知道是不是因为一者有跳出while循环,一者没有的原因)托福改分