paulwong

#

2014年值得关注的十个Hadoop大数据创业公司

开源大数据框架Apache Hadoop已经成了大数据处理的事实标准,同时也几乎成了大数据的代名词,虽然这多少有些以偏概全。

根据Gartner的估计,目前的Hadoop生态系统市场规模在7700万美元左右,2016年,该市场规模将快速增长至8.13亿美元。

但是在Hadoop这个快速扩增的蓝海中游泳并非易事,不仅开发大数据基础设施技术产品这件事很难,销售起来也很难,具体到大数据基础设施工具如 Hadoop、NoSQL数据库和流处理系统则更是难上加难。客户需要大量培训和教育,付费用户需要大量支持和及时跟进的产品开发工作。而跟企业级客户打 交道往往并非创业公司团队的强项。此外,大数据基础设施技术创业通常对风险投资规模也有较高要求。

尽管困难重重,Hadoop创业公司依然如雨后春笋冒出,除了Cloudera、Datameer、DataStax和MapR等已经功成名就的 Hadoop创业公司外,最近CIO杂志评出了2014年十大最值得关注的Hadoop创业公司,了解这些公司的产品和商业模式对企业大数据技术创业者和 大数据应用用户来说都非常有参考价值:

一、Platfora

platfora

业务:所提供的大数据分析解决方案能够将Hadoop中的原始数据转换成可互动的,基于内存计算的商业智能服务。

简介:创立于2011年,迄今已募集6500万美元。

入选理由:Platfora的目标是简化复杂难用的Hadoop,推动Hadoop在企业市场的应用。Platfora的做法是简化数据采集和分析 流程,将Hadoop中的原始数据自动转化成可以互动的商业智能服务,无需ETL或者数据仓库。(参考阅读:Hadoop只是穷人的ETL)

 

二、Alpine Data Labs

alpine data

业务:提供基于Hadoop的数据分析平台

简介:创立于2010年,迄今累计融资2350万美元。

入选理由:复杂的高级分析和机器学习应用通常都需要脚本和代码开发高手实现,这进一步推高了数据科学家的技术门槛。实际上大数据企业高管和IT经理都没时间也没兴致学习编程技术,或者去了解复杂的Hadoop。Alpine Data通过SaaS服务的方式大幅降低了预测分析的应用门槛。

 

三、Altiscale

altiscale

业务:提供Hadoop即服务(HaaS)

简介:创立于2012年3月,迄今融资1200万美元。

入选理由:大数据正在闹人才荒,而通过云计算提供Hadoop相关服务无疑是普及Hadoo的一条捷径,根据TechNavio的估计,2016年 HaaS市场规模将高达190亿美元,是块大蛋糕。但是HaaS市场的竞争已经日趋激烈,包括亚马逊EMR、微软的Hadoop on Azure,以及Rackspace的Hortonworks云服务等都是重量级玩家,Altiscale还需要与Hortonworks、 Cloudera、Mortar Data、Qubole、Xpleny展开直接竞争。

 

四、Trifacta

trifacta

业务:提供平台帮助用户将复杂的原始数据转化成干净的结构化格式供分析使用。

简介:创立于2012年,迄今融资1630万美元。

入选理由:大数据技术平台和分析工具之间存在一个巨大的瓶颈,那就是数据分析专家需要花费大量精力和时间转化数据,而且业务数据分析师们往往也并不 具备独立完成数据转化工作的技术能力。为了解决这个问题Trifacta开发出了“预测互动”技术,将数据操作可视化,而且Trifacta的机器学习算 法还能同时观察用户和数据属性,预测用户意图,并自动给出建议。Trifata的竞争对手是Paxata、Informatica和CirroHow。

 

五、Splice Machine

splice machine

业务:提供面向大数据应用的,基于Hadoop的SQL兼容数据库。

简介:创立于2012年,迄今融资1900万美元。

入选理由:新的数据技术使得传统关系型数据库的一些流行功能如ACID合规、交易一致性和标准的SQL查询语言等得以在廉价可扩展的Hadoop上 延续。Splice Machine保留了NoSQL数据库所有的优点,例如auto-sharding,容错、可扩展性等,同时又保留了SQL。

 

六、DataTorrent

datarorrent

业务:提供基于Hadoop平台的实时流处理平台

简介:创立于2012年,2013年6月获得800万美元A轮融资。

入选理由:大数据的未来是快数据,而DataTorrent正是要解决快数据的问题。

 

七、Qubole

qubole

业务:提供大数据DaaS服务,基于“真正的自动扩展Hadoop集群”。

简介:创立于2011年,累计融资700万美元。

入选理由:大数据人才一将难求,对于大多数企业来说,像使用SaaS企业应用一样使用Hadoop是一个现实的选择。

 

八、Continuuity 

continuuity

业务:提供基于Hadoop的大数据应用托管平台

简介:创立于2011年,累计获得1250万美元融资,创始人兼CEO Todd Papaioannou曾是雅虎副总裁云架构负责人,去年夏天Todd离开Continuuity后,联合创始人CTO Jonathan Gray接替担任CEO一职。

入选理由:Continuuity的商业模式非常聪明也非常独特,他们绕过非常难缠的Hadoop专家,直接向Java开发者提供应用开发平台,其 旗舰产品Reactor是一个基于Hadoop的Java集成化数据和应用框架,Continuuity将底层基础设施进行抽象处理,通过简单的Java 和REST API提供底层基础设施服务,为用户大大简化了Hadoop基础设施的复杂性。Continuuity最新发布的服务——Loom是一个集群管理方案,通 过Loom创建的集群可以使用任意硬件和软件堆叠的模板,从单一的LAMP服务器和传统应用服务器如JBoss到包含数千个节点的大规模的Hadoop集 群。集群还可以部署在多个云服务商的环境中(例如Rackspace、Joyent、Openstack等)而且还能使用常见的SCM工具。

 

九、Xplenty

xplenty

业务:提供HaaS服务

简介:创立于2012年,从Magma风险投资获得金额不详的融资。

入选理由:虽然Hadoop已经成了大数据的事实工业标准,但是Hadoop的开发、部署和维护对技术人员的技能依然有着极高要求。Xplenty 的技术通过无需编写代码的Hadoop开发环境提供Hadoop处理服务,企业无需投资软硬件和专业人才就能快速享受大数据技术。

十、Nuevora

nuevora

业务:提供大数据分析应用

简介:创立于2011年,累计获得300万早期投资。

入选理由:Nuevora的着眼点是大数据应用最早启动的两个领域:营销和客户接触。Nuevora的nBAAP(大数据分析与应用)平台的主要功 能包括基于最佳时间预测算法的定制分析应用,nBAAP基于三个关键大数据技术:Hadoop(大数据处理)、R(预测分析)和Tableau(数据可视 化)

posted @ 2014-05-23 12:15 paulwong 阅读(379) | 评论 (0)编辑 收藏

12 款最好的 Bootstrap 设计工具

作为一位设计师,会经常追寻新鲜有趣的设计工具,这些工具会提高工作的效率,使得工作更有效, 最重要的是使工作变得更方便。非常肯定的说,随着日益增长的工具和应用的数量,设计和开发变得越来越简单了。其中最普遍使用的最终框架 之一是 Bootstrap,它在 2013 年特别流行。如果你是位设计师,你可能会接触过它,甚至是使用过它。如果你是 Bootstrap 的使用者或者是相关功能的用户,这篇文章非常的适合你!

这里总共列举了 12 款最好的 Bootstrap 设计工具,这些都能很好的简化大家的工作。希望大家能从这些列表中找到适合自己的,在评论跟大家分享一下使用的感想和其他类似的工具,Enjoy :)

12. Bootstrap Designer

Bootstrap Designer

Bootstrap Designer 是一个在线工具,不需要下载就可以使用。用户可以使用它创建漂亮和迷人基于 Bootstrap 框架的 HTML5 模板。

11. Get Kickstrap

Get Kickstrap

如果用户想结合先进高级的 web 技术和 Bootstrap,可以尝试一下 Get Kickstrap。这款特别的工具非常高大上,可以运行数据库驱动的 web 应用程序,而且还不用任何的后台哦 :D

10. Bootply

Bootply

这个工具拥有非常多样化的库,集成了其他 Bootstrap 基础插件,框架和库。

9. Bootstrap Button Generator

Bootstrap Button Generator

这个特别的工具能帮助用户轻松创建各种类型的按钮,用户只需要输入 CSS 类到新的按钮中,并且选择相应的颜色值就可以了。当需要使用的时候,只需要复制粘贴就能轻松创建漂亮的按钮了。 

8. Easel

Easel

这个工具能在浏览器中运行,这个非常基础的工具能帮助在小团队工作的开发者和设计者建立非常真实的 web 元素。

7. Layoutit

Layoutit

用户只需要使用拖拽界面构建器就可以轻松创建前端代码。

6. Bootswatch

Bootswatch

这是一个开源的工具,而且非常容易安装和使用。

5. Boottheme

Boottheme

这是一款在线主题生成器和 Twitter Bootstrap 的 web 设计工具。用户可以在设计 webapp 时生成和预览主题。

4. Custom Font Tool

Custom Font Tool

这个特别的工具允许用户创建非常个性化的字体,通过非常舒适的命令行方式。

3. WordPress Twitter Bootstrap CSS

WordPress Twitter Bootstrap CSS

这是另外一个基于 CSS 的 Twitter Bootstrap,让用户能很好的在 WordPress 网站上使用 Twitter Bootstrap JavaScript 库。

2. Bootmetro

Bootmetro

这是个非常灵活和简单的框架,用户可以创建现代化和经典的 web 应用。同时,也可以定义类似 Windows 8 的风格和感觉。

1. Flat UI Free–Framework and Bootstrap Theme Design

Flat UI Free–Framework and Bootstrap Theme Design

这是另外一个基于 Twitter Bootstrap 的工具,可以运行一个平滑风格,组件可以包含 PSD 文件的 UI。使用这个特殊的工具可以创建出非常有创意的 UI。

posted @ 2014-05-23 12:13 paulwong 阅读(355) | 评论 (0)编辑 收藏

简单的 6 步配置 MongoDB 复制集

     摘要: 在本次教程中,我们将创建含有三个节点的群集。第一个节点是主节点,第二个节点是Failover节点,第三个节点是仲裁节点。1. 安装Mongo并设置配置文件安装3台服务器并调整配置文件/etc/mongod.conf:1#Select your replication set name2replSet=[replication_set_name]3#Selec...  阅读全文

posted @ 2014-05-23 12:11 paulwong 阅读(361) | 评论 (0)编辑 收藏

BUTTON样式

http://www.oschina.net/p/buttons

posted @ 2014-05-20 18:19 paulwong 阅读(328) | 评论 (0)编辑 收藏

NUTCH+ELASTICSEARCH ON AWS

Elasticsearch: Indexing SQL databases. The easy way.
http://blog.comperiosearch.com/blog/2014/01/30/elasticsearch-indexing-sql-databases-the-easy-way/


Elasticsearch-HBase-River
https://github.com/mallocator/Elasticsearch-HBase-River

Elasticsearch river
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html#river


Mongodb river
http://satishgandham.com/2012/09/a-complete-guide-to-integrating-mongodb-with-elastic-search/
http://shuminghuang.iteye.com/blog/1829432






posted @ 2014-05-15 09:14 paulwong 阅读(755) | 评论 (0)编辑 收藏

安装KAFKA

  1. 下载KAFKA
    wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

  2. 解压
    tar -zxf kafka_2.9.2-0.8.1.1.tgz

  3. 0.7之前的版本这时就要安装相应的包之类的,0.8.1之后就不用了。把命令加进PATH中
    export KAFKA_HOME=/home/ubuntu/java/kafka_2.9.2-0.8.1.1
    export PATH=$JAVA_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$BIN_HOME/bin:$MAVEN_HOME/bin:$PATH

  4. SOURCE一下
    source /etc/profile

  5. 制作启动命令,start-kafka.sh,并放于kafaka_hoem/bin下
    kafka-server-start.sh $KAFKA_HOME/config/server.properties &

  6. 安装ZOOKEEPER
    wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
    tar -zxf  zookeeper-3.4.6.tar.gz
    cd zookeeper-3.4.6.tar.gz/conf
    cp zoo_sample.cfg zoo.cfg

  7. 改zoo.cfg
    dataDir=$ZOOKEEPER_HOME/data

    #方便查LOG
    dataLogDir=$ZOOKEEPER_HOME/logs

    #控制客户的连接数,默认数为60,太少
    maxClientCnxns=300

    #如果有多个ZOOKEEPER INSTANCE时
    server.1=10.120.151.223:2888:3888
    server.2=10.120.151.224:2888:3888

  8. 启动ZOOKEEPER
    zkServer.sh start

  9. 更改KAFKA的配置文件server.properties, 主要改几个地方
    #这个是配置PRODUCER/CONSUMER连上来的时候使用的地址
    advertised.host.name=54.72.4.92
    #设置KAFKA LOG路径
    log.dirs=$KAFKA_HOME/logs/kafka-logs
    #设置ZOOKEEPER的连接地址
    zookeeper.connect=54.72.4.92:2181

  10. 启动KAFKA
    start-kafka.sh

  11. 新建一个TOPIC
    #KAFKA有几个,replication-factor就填几个
    kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181

  12. 发送消息至KAFKA
    kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic

  13. 另开一个终端,显示消息的消费
    kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

  14. 在发送消息的终端输入aaa,则可以在消费消息的终端显示

posted @ 2014-05-11 10:30 paulwong 阅读(2770) | 评论 (0)编辑 收藏

Auto rebalance Storm

http://stackoverflow.com/questions/15010420/storm-topology-rebalance-using-java-code


使用Nimbus获取STORM的信息
http://www.andys-sundaypink.com/i/retrieve-storm-cluster-statistic-from-nimbus-java-mode/
TSocket tsocket = new TSocket("localhost", 6627);
TFramedTransport tTransport = new TFramedTransport(tsocket);
TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
Nimbus.Client client = new Nimbus.Client(tBinaryProtocol);
String topologyId = "test-1-234232567";


try {

tTransport.open();
ClusterSummary clusterSummary = client.getClusterInfo();
StormTopology stormTopology = client.getTopology(topologyId);
TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
List<ExecutorSummary> executorSummaries = topologyInfo.get_executors();

List<TopologySummary> topologies = clusterSummary.get_topologies();
for(ExecutorSummary executorSummary : executorSummaries){

String id = executorSummary.get_component_id();
ExecutorInfo executorInfo = executorSummary.get_executor_info();
ExecutorStats executorStats = executorSummary.get_stats();
System.out.println("executorSummary :: " + id + " emit size :: " + executorStats.get_emitted_size());
}
catch (TTransportException e) {
e.printStackTrace();
catch (TException e) {
e.printStackTrace();
catch (NotAliveException e) {
e.printStackTrace();
}




posted @ 2014-05-09 23:48 paulwong 阅读(552) | 评论 (0)编辑 收藏

浅释STORM

STORM是一个消息处理引擎,可以处理源源不断的进来的消息,这些消息的处理是可以按步骤的。

处理的方式有各种自定义:

  1. 可自定义消息处理的步骤

  2. 可自定义每种类型的消息需要多少个进程来处理

  3. 每个步骤里的消息是在某个进程里的线程来做处理的

  4. 可自定义每个步骤里的消息的线程数

  5. 可以增加和删除要处理的消息类型
如果要处理某种消息了,要怎么办呢?

  1. 定义数据来源组件(SPOUT)

  2. 定义处理步骤(BOLT)

  3. 组合成一个消息处理流程框架TOPOLOGY

  4. 定义处理消息的进程的数量、定义每个步骤并发时可用的线程数

  5. 部署TOPOLOGY
当一个TOPOLOGY被部署到STORM时,STORM会查找配置对象的WORKER数量,根据这个数量相应的启动N个JVM,然后根据每个步骤配置的NUMTASKS生成相应个数的线程,然后每个步骤中配置的数量实例化相应个数的对象,然后就启动一个线程不断的执行SPOUT中的nextTuple()方法,如果这个方法中有输出结果,就启动另一线程,并在此线程中将这个结果作为参数传到下一个对象的excue方法中。

如果此时又有一个步骤BOLT需要执行的话,也是新取一个线程去执行BOLT中的方法启动的线程不会越过NUMTASKS的数量。



posted @ 2014-05-09 22:56 paulwong 阅读(279) | 评论 (0)编辑 收藏

Storm performance

The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:

  1. TOPOLOGY_WORKERS (set with setNumWorkers) specifies how many processes you want allocated around the cluster to execute the topology. Each component in the topology will execute as many threads. The number of threads allocated to a given component is configured through the setBolt and setSpout methods. Those threadsexist within worker processes. Each worker process contains within it some number of threads for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
  2. TOPOLOGY_DEBUG (set with setDebug), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.

There's many other configurations you can set for the topology. The various configurations are detailed on the Javadoc for Config.


Common configurations


There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found here. The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology:

  1. Config.TOPOLOGY_WORKERS: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads.
  2. Config.TOPOLOGY_ACKERS: This sets the number of tasks that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them onGuaranteeing message processing.
  3. Config.TOPOLOGY_MAX_SPOUT_PENDING: This sets the maximum number of spout tuples that can be pending on a single spout task at once (pending means the tuple has not been acked or failed yet). It is highly recommended you set this config to prevent queue explosion.
  4. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: This is the maximum amount of time a spout tuple has to be fully completed before it is considered failed. This value defaults to 30 seconds, which is sufficient for most topologies. SeeGuaranteeing message processing for more information on how Storm's reliability model works.
  5. Config.TOPOLOGY_SERIALIZATIONS: You can register more serializers to Storm using this config so that you can use custom types within tuples.

Reference:
http://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html

storm rebalance 命令调整topology并行数及问题分析
http://blog.csdn.net/jmppok/article/details/17243857

flume+kafka+storm+mysql 数据流
http://blog.csdn.net/jmppok/article/details/17259145



http://storm.incubator.apache.org/documentation/Tutorial.html

posted @ 2014-05-08 09:19 paulwong 阅读(294) | 评论 (0)编辑 收藏

KMEANS PAGERANK ON HADOOP

https://github.com/keokilee/kmeans-hadoop

https://github.com/rorlig/hadoop-pagerank-java

http://wuyanzan60688.blog.163.com/blog/static/12777616320131011426159/

http://codecloud.net/hadoop-k-means-591.html


import java.io.*;
import java.net.URI;
import java.util.Iterator;
import java.util.Random;
import java.util.Vector;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class KMeans {
    static enum Counter { CENTERS, CHANGE, ITERATIONS }

    public static class Point implements WritableComparable<Point> {
        // Longs because this will store sum of many ints
        public LongWritable x;
        public LongWritable y;
        public IntWritable num; // For summation points

        public Point() {
            this.x = new LongWritable(0);
            this.y = new LongWritable(0);
            this.num = new IntWritable(0);
        }

        public Point(int x, int y) {
            this.x = new LongWritable(x);
            this.y = new LongWritable(y);
            this.num = new IntWritable(1);
        }

        public Point(IntWritable x, IntWritable y) {
            this.x = new LongWritable(x.get());
            this.y = new LongWritable(y.get());
            this.num = new IntWritable(1);
        }

        public void add(Point that) {
            x.set(x.get() + that.x.get());
            y.set(y.get() + that.y.get());
            num.set(num.get() + that.num.get());
        }

        public void norm() {
            x.set(x.get() / num.get());
            y.set(y.get() / num.get());
            num.set(1);
        }

        public void write(DataOutput out) throws IOException {
            x.write(out);
            y.write(out);
            num.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            x.readFields(in);
            y.readFields(in);
            num.readFields(in);
        }

        public long distance(Point that) {
            long dx = that.x.get() - x.get();
            long dy = that.y.get() - y.get();

            return dx * dx + dy * dy;
        }

        public String toString() {
            String ret = x.toString() + '\t' + y.toString();
            if (num.get() != 1)
                ret += '\t' + num.toString();
            return ret;
        }

        public int compareTo(Point that) {
            int ret = x.compareTo(that.x);
            if (ret == 0)
                ret = y.compareTo(that.y);
            if (ret == 0)
                ret = num.compareTo(that.num);
            return ret;
        }
    }

    public static class Map
            extends MapReduceBase
            implements Mapper<Text, Text, Point, Point>
    {
        private Vector<Point> centers;
        private IOException error;

        public void configure(JobConf conf) {
            try {
                Path paths[] = DistributedCache.getLocalCacheFiles(conf);
                if (paths.length != 1)
                    throw new IOException("Need exactly 1 centers file");

                FileSystem fs = FileSystem.getLocal(conf);
                SequenceFile.Reader in = new SequenceFile.Reader(fs, paths[0], conf);

                centers = new Vector<Point>();
                IntWritable x = new IntWritable();
                IntWritable y = new IntWritable();
                while(in.next(x, y))
                    centers.add(new Point(x, y));
                in.close();

                // Generate new points if we don't have enough.
                int k = conf.getInt("k", 0);
                Random rand = new Random();
                final int MAX = 1024*1024;
                for (int i = centers.size(); i < k; i++) {
                    x.set(rand.nextInt(MAX));
                    y.set(rand.nextInt(MAX));
                    centers.add(new Point(x, y));
                }
            } catch (IOException e) {
                error = e;
            }
        }

        public void map(Text xt, Text yt,
                OutputCollector<Point, Point> output, Reporter reporter)
            throws IOException
        {
            if (error != null)
                throw error;

            int x = Integer.valueOf(xt.toString());
            int y = Integer.valueOf(yt.toString());
            Point p = new Point(x, y);
            Point center = null;
            long distance = Long.MAX_VALUE;

            for (Point c : centers) {
                long d = c.distance(p);
                if (d <= distance) {
                    distance = d;
                    center = c;
                }
            }

            output.collect(center, p);
        }
    }

    public static class Combine
            extends MapReduceBase
            implements Reducer<Point, Point, Point, Point>
    {
        public void reduce(Point center, Iterator<Point> points,
                OutputCollector<Point, Point> output, Reporter reporter)
            throws IOException
        {
            Point sum = new Point();
            while(points.hasNext()) {
                sum.add(points.next());
            }

            output.collect(center, sum);
        }
    }

    public static class Reduce
            extends MapReduceBase
            implements Reducer<Point, Point, IntWritable, IntWritable>
    {
        public void reduce(Point center, Iterator<Point> points,
                OutputCollector<IntWritable, IntWritable> output,
                Reporter reporter)
            throws IOException
        {
            Point sum = new Point();
            while (points.hasNext()) {
                sum.add(points.next());
            }
            sum.norm();

            IntWritable x = new IntWritable((int) sum.x.get());
            IntWritable y = new IntWritable((int) sum.y.get());

            output.collect(x, y);

            reporter.incrCounter(Counter.CHANGE, sum.distance(center));
            reporter.incrCounter(Counter.CENTERS, 1);
        }
    }

    public static void error(String msg) {
        System.err.println(msg);
        System.exit(1);
    }

    public static void initialCenters(
            int k, JobConf conf, FileSystem fs,
            Path in, Path out)
        throws IOException
    {
        BufferedReader input = new BufferedReader(
                new InputStreamReader(fs.open(in)));
        SequenceFile.Writer output = new SequenceFile.Writer(
                fs, conf, out, IntWritable.class, IntWritable.class);
        IntWritable x = new IntWritable();
        IntWritable y = new IntWritable();
        for (int i = 0; i < k; i++) {
            String line = input.readLine();
            if (line == null)
                error("Not enough points for number of means");

            String parts[] = line.split("\t");
            if (parts.length != 2)
                throw new IOException("Found a point without two parts");

            x.set(Integer.valueOf(parts[0]));
            y.set(Integer.valueOf(parts[1]));
            output.append(x, y);
        }
        output.close();
        input.close();
    }

    public static void main(String args[]) throws IOException {
        JobConf conf = new JobConf(KMeans.class);
        GenericOptionsParser opts = new GenericOptionsParser(conf, args);
        String paths[] = opts.getRemainingArgs();

        FileSystem fs = FileSystem.get(conf);

        if (paths.length < 3)
            error("Usage:\n"
                    + "\tKMeans <file to display>\n"
                    + "\tKMeans <output> <k> <input file>"
                 );

        Path outdir  = new Path(paths[0]);
        int k = Integer.valueOf(paths[1]);
        Path firstin = new Path(paths[2]);
        
        if (k < 1 || k > 20)
            error("Strange number of means: " + paths[1]);

        if (fs.exists(outdir)) {
            if (!fs.getFileStatus(outdir).isDir())
                error("Output directory \"" + outdir.toString()
                        + "\" exists and is not a directory.");
        } else {
            fs.mkdirs(outdir);
        }

        // Input: text file, each line "x\ty"
        conf.setInputFormat(KeyValueTextInputFormat.class);
        for (int i = 2; i < paths.length; i++)
            FileInputFormat.addInputPath(conf, new Path(paths[i]));

        conf.setInt("k", k);

        // Map: (x,y) -> (centroid, point)
        conf.setMapperClass(Map.class);
        conf.setMapOutputKeyClass(Point.class);
        conf.setMapOutputValueClass(Point.class);

        // Combine: (centroid, points) -> (centroid, weighted point)
        conf.setCombinerClass(Combine.class);

        // Reduce: (centroid, weighted points) -> (x, y) new centroid
        conf.setReducerClass(Reduce.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(IntWritable.class);

        // Output
        conf.setOutputFormat(SequenceFileOutputFormat.class);

        // Chose initial centers
        Path centers = new Path(outdir, "initial.seq");
        initialCenters(k, conf, fs, firstin, centers);

        // Iterate
        long change  = Long.MAX_VALUE;
        URI cache[] = new URI[1];
        for (int iter = 1; iter <= 1000 && change > 100 * k; iter++) {
            Path jobdir = new Path(outdir, Integer.toString(iter));
            FileOutputFormat.setOutputPath(conf, jobdir);

            conf.setJobName("k-Means " + iter);
            conf.setJarByClass(KMeans.class);

            cache[0] = centers.toUri();
            DistributedCache.setCacheFiles( cache, conf );

            RunningJob result = JobClient.runJob(conf);
            System.out.println("Iteration: " + iter);

            change   = result.getCounters().getCounter(Counter.CHANGE);
            centers  = new Path(jobdir, "part-00000");
        }
    }
}

192.5.53.208

posted @ 2014-05-07 23:57 paulwong 阅读(409) | 评论 (0)编辑 收藏

仅列出标题
共119页: First 上一页 57 58 59 60 61 62 63 64 65 下一页 Last