#
- For disable IPv6 on Ubuntu Server, we need add these configuration lines in /etc/sysctl.conf
# IPv6 configuration
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
-
Reload configurationf or sysctl.conf
sysctl -p
-
Check IPv6 is disabled on Ubuntu Server
root@ip-10-48-234-13:/# ip addr1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
link/ether 12:31:3c:03:e9:fb brd ff:ff:ff:ff:ff:ff
inet 10.48.234.13/23 brd 10.48.235.255 scope global eth0
先查看
rpm -qa | grep java
显示如下信息:
javapackages-tools-0.9.1-1.2.amzn1.noarch
java-1.6.0-openjdk-1.6.0.0-62.1.11.11.90.55.amzn1.x86_64
tzdata-java-2013c-2.18.amzn1.noarch
卸载:
rpm -e --nodeps javapackages-tools-0.9.1-1.2.amzn1.noarch
rpm -e --nodeps java-1.6.0-openjdk-1.6.0.0-62.1.11.11.90.55.amzn1.x86_64
rpm -e --nodeps tzdata-java-2013c-2.18.amzn1.noarch
还有一些其他的命令
rpm -qa | grep gcj
rpm -qa | grep jdk
如果出现找不到openjdk source的话,那么还可以这样卸载
yum -y remove java java-1.4.2-gcj-compat-1.4.2.0-40jpp.115
yum -y remove java java-1.6.0-openjdk-1.6.0.0-1.7.b09.el5
<1># rpm -qa|grep jdk ← 查看jdk的信息或直接执行
或
# rpm -q jdk
或
# java -version
<2># rpm -qa | grep gcj ← 确认gcj的版本号
<3># yum -y remove java-1.4.2-gcj-compat ← 卸载gcj
安装SUN JDK
http://www.oracle.com/technetwork/java/javasebusiness/downloads/java-archive-downloads-javase6-419409.html#jdk-6u45-oth-JPR
- 启动ZOOPKEEPER
zkServer.sh start - 启动NIMBUS
storm nimbus & - 启动SUPERVISOR
storm supervisor & - 启动UI
storm ui & - 部署TOPOLOGY
storm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt - 删除TOPOLOGY
storm kill {toponame} - 激活TOPOLOGY
storm active {toponame} - 不激活TOPOLOGY
storm deactive {toponame} - 列出所有TOPOLOGY
storm list
对于一堆时刻在增长的数据,如果要统计,可以采取什么方法呢?
- 等数据增长到一定程度的时候,跑一个统计程序进行统计。适用于实时性要求不高的场景。
如将数据导到HDFS,再运行一个MAP REDUCE JOB。
- 如果实时性要求高的,上面的方法就不行了。因此就带来第二种方法。
在数据每次增长一笔的时候,就进行统计JOB,结果放到DB或搜索引擎的INDEX中。
STORM就是完成这种工作的。
HADOOP与STORM比较
- 数据来源:HADOOP是HDFS上某个文件夹下的可能是成TB的数据,STORM是实时新增的某一笔数据
- 处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,
流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT) - 是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新
数据进入时再从头开始 - 处理速度:HADOOP是以处理HDFS上大量数据为目的,速度慢,STORM是只要处理新增的某一笔数据即可
可以做到很快。 - 适用场景:HADOOP是在要处理一批数据时用的,不讲究时效性,要处理就提交一个JOB,STORM是要处理
某一新增数据时用的,要讲时效性
- 与MQ对比:HADOOP没有对比性,STORM可以看作是有N个步骤,每个步骤处理完就向下一个MQ发送消息,
监听这个MQ的消费者继续处理
首先要明白Storm和Hadoop的应用领域,注意加粗、标红的关键字。
Hadoop是基于Map/Reduce模型的,处理海量数据的离线分析工具。
Storm是分布式的、实时数据流分析工具,数据是源源不断产生的,例如Twitter的Timeline。
再回到你说的速度问题,只能说Storm更适用于实时数据流,Map/Reduce模型在实时领域很难有所发挥,不能简单粗暴的说谁快谁慢。
这里的快主要是指的时延。
storm的网络直传、内存计算,其时延必然比hadoop的通过hdfs传输低得多;当计算模型比较适合流式时,storm的流式处理,省去了批处理的收集数据的时间;因为storm是服务型的作业,也省去了作业调度的时延。所以从时延上来看,storm要快于hadoop。
说一个典型的场景,几千个日志生产方产生日志文件,需要进行一些ETL操作存入一个数据库。
假设利用hadoop,则需要先存入hdfs,按每一分钟切一个文件的粒度来算(这个粒度已经极端的细了,再小的话hdfs上会一堆小文件),hadoop开始计算时,1分钟已经过去了,然后再开始调度任务又花了一分钟,然后作业运行起来,假设机器特别多,几钞钟就算完了,然后写数据库假设也花了很少的时间,这样,从数据产生到最后可以使用已经过去了至少两分多钟。
而流式计算则是数据产生时,则有一个程序去一直监控日志的产生,产生一行就通过一个传输系统发给流式计算系统,然后流式计算系统直接处理,处理完之后直接写入数据库,每条数据从产生到写入数据库,在资源充足时可以在毫秒级别完成。
当然,跑一个大文件的wordcount,本来就是一个批处理计算的模型,你非要把它放到storm上进行流式的处理,然后又非要让等所有已有数据处理完才让storm输出结果,这时候,你再把它和hadoop比较快慢,这时,其实比较的不是时延,而是比较的吞吐了。
Hadoop M/R基于HDFS,需要切分输入数据、产生中间数据文件、排序、数据压缩、多份复制等,效率较低。
Storm 基于ZeroMQ这个高性能的消息通讯库,不持久化数据。
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
kakfa的consumer使用拉的方式工作。
安装kafka下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
这样就是一个标准的producer。
consumer的代码
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}
日志抓取端:
apache kafka在数据处理中特别是日志和消息的处理上会有很多出色的表现,这里写个索引,关于kafka的文章暂时就更新到这里,最近利用空闲时间在对kafka做一些功能性增强,并java化,虽然现在已经有很多这样的版本,但是根据实际需求来改变才是最适合的。
首先当然推荐的是kafka的官网 http://kafka.apache.org/
在官网最值得参考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是参照这里的说明,大家要特别重视这篇文章,里面有好多理念都特别好,推荐多读几遍。
在OSC的翻译频道有kafka design全中文的翻译,翻得挺好的,推荐一下:http://www.oschina.net/translate/kafka-design
kafka的wiki是很不错的学习文档:https://cwiki.apache.org/confluence/display/KAFKA/Index
——————————————————————————————————
接下来就是我写的一系列文章,文章都是循序渐进的方式带你了解kafka:
关于kafka的基本知识,分布式的基础:《分布式消息系统Kafka初步》
kafka的分布式搭建,quick start:《kafka分布式环境搭建》
关于kafka的实现细节,这主要就是讲design的部分:《细节上》、《细节下》
关于kafka开发环境,scala环境的搭建:《开发环境搭建》
数据生产者,producer的用法:《producer的用法》、《producer使用注意》
数据消费者,consumer的用法:《consumer的用法》
还有些零碎的,关于通信段的源码解读:《net包源码解读》、《broker配置》
——————————————————————————————————
扩展的阅读还有下面这些:
我的好友写的关于kafka和jafka的相关博客,特别好,我有很多问题也都找他解决的,大神一般的存在:http://rockybean.github.com/ @rockybean
kafka的java化版本jafka:https://github.com/adyliu/jafka
淘宝的metaQ:https://github.com/killme2008/Metamorphosis
我最近在写的inforQ,刚开始写,我也纯粹是为了读下源码,不定期更新哈:https://github.com/ielts0909/inforq
后面一阶段可能更新点儿关于cas的东西吧,具体也没想好,最近一直出差,写代码的时间都很少
--------------------------------------------------------------------------------
0.8版本的相关更新如下:
0.8更新内容介绍:《kafka0.8版本的一些更新》