package com.test.metaq; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.producer.MessageProducer; import com.taobao.metamorphosis.client.producer.SendResult; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public class Products { public static void main(String[] args) { final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = "192.168.1.1:2181"; metaClientConfig.setZkConfig(zkConfig); MessageSessionFactory sessionFactory = null; try { sessionFactory = new MetaMessageSessionFactory(metaClientConfig); } catch (MetaClientException e) { e.printStackTrace(); } MessageProducer producer = sessionFactory.createProducer(); final String topic = "test"; producer.publish(topic); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); String line = "qiujinyong"; try { while ((line = reader.readLine()) != null) { SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes())); if (!sendResult.isSuccess()) { System.err.println("Send message failed,error message:" + sendResult.getErrorMessage()); } else { System.out.println("Send message successfully,sent to " + sendResult.getPartition()); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MetaClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |