qileilove

blog已经转移至github,大家请访问 http://qaseven.github.io/

MetaQ集群安装测试

  1,ZooKeeper集群安装,自己搜索安装
  2,下载https://github.com/killme2008/Metamorphosis/tree/metamorphosis-all-1.4.6.2,如果不想自己编译可以直接下载http://fnil.net/downloads/index.html,我这里选择自己编译,主要是以后如果出现问题自己可以修改其源码,重新编译
  3,maven编译,maven环境自己搜索配置好,下载all项目后需要编译其子项目metamorphosis-server-wrapper。dos环境进入其目录下mvn eclipse:eclipse,完成后导入到eclipse,用eclipse插件编译。或者直接dos该目录下执行mvn clean install -Dmaven.test.skip=true。完成后target目录下生产其jar包;
  可以在工程创建lib文件夹,输入以下命令:mvn dependency:copy-dependencies -DoutputDirectory=lib (不加DoutputDirectory会默认输出到targed/dependency下)。再把install的jar包也copy到lib下。
  4,完成编译后上传到服务器
  需要修改conf/server.ini文件
[system]brokerId=2
numPartitions=1
serverPort=8123
ashboardHttpPort=8120
unflushThreshold=0
unflushInterval=10000
maxSegmentSize=1073741824
maxTransferSize=1048576
deletePolicy=delete,168
deleteWhen=0 0 6,18 * * ?
flushTxLogAtCommit=1
stat=true
dataPath=/data1/metaq/data
dataLogPath=/data1/metaq/log
[zookeeper]
zk.zkConnect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
zk.zkSessionTimeoutMs=30000
zk.zkConnectionTimeoutMs=30000
zk.zkSyncTimeMs=5000
;; Topics section
[topic=test]
[topic=meta-test]
  集群的话需要修改上面标红部分,brokerId保证每个服务器节点上不一样就行
  dataPath,dataLogPath如果自己制定,需要每台服务器mkdir
  分发到个节点,在每台节点的bin下都执行metaServer.sh start
  需要停止时执行metaServer.sh stop
  查看状态sh metaServer.sh  status


  5,应用例子
package com.test.metaq;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class AsyncConsum {
public static void main(String[] args) {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = "192.168.1.1:2181";
metaClientConfig.setZkConfig(zkConfig);
MessageSessionFactory sessionFactory = null;
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
final String topic = "test";
final String group = "meta-example";
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
try {
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}
public Executor getExecutor() {
return null;
}
});
consumer.completeSubscribe();
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package com.test.metaq;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class Products {
public static void main(String[] args) {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = "192.168.1.1:2181";
metaClientConfig.setZkConfig(zkConfig);
MessageSessionFactory sessionFactory = null;
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
}
MessageProducer producer = sessionFactory.createProducer();
final String topic = "test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(
System.in));
String line = "qiujinyong";
try {
while ((line = reader.readLine()) != null) {
SendResult sendResult = producer.sendMessage(new Message(topic,
line.getBytes()));
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:"
+ sendResult.getErrorMessage());
} else {
System.out.println("Send message successfully,sent to "
+ sendResult.getPartition());
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
  打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入message
  打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到message

posted on 2014-04-24 10:18 顺其自然EVO 阅读(235) 评论(0)  编辑  收藏 所属分类: 测试学习专栏


只有注册用户登录后才能发表评论。


网站导航:
 
<2014年4月>
303112345
6789101112
13141516171819
20212223242526
27282930123
45678910

导航

统计

常用链接

留言簿(55)

随笔分类

随笔档案

文章分类

文章档案

搜索

最新评论

阅读排行榜

评论排行榜