paulwong

#

Auto rebalance Storm

http://stackoverflow.com/questions/15010420/storm-topology-rebalance-using-java-code


使用Nimbus获取STORM的信息
http://www.andys-sundaypink.com/i/retrieve-storm-cluster-statistic-from-nimbus-java-mode/
TSocket tsocket = new TSocket("localhost", 6627);
TFramedTransport tTransport = new TFramedTransport(tsocket);
TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
Nimbus.Client client = new Nimbus.Client(tBinaryProtocol);
String topologyId = "test-1-234232567";


try {

tTransport.open();
ClusterSummary clusterSummary = client.getClusterInfo();
StormTopology stormTopology = client.getTopology(topologyId);
TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
List<ExecutorSummary> executorSummaries = topologyInfo.get_executors();

List<TopologySummary> topologies = clusterSummary.get_topologies();
for(ExecutorSummary executorSummary : executorSummaries){

String id = executorSummary.get_component_id();
ExecutorInfo executorInfo = executorSummary.get_executor_info();
ExecutorStats executorStats = executorSummary.get_stats();
System.out.println("executorSummary :: " + id + " emit size :: " + executorStats.get_emitted_size());
}
catch (TTransportException e) {
e.printStackTrace();
catch (TException e) {
e.printStackTrace();
catch (NotAliveException e) {
e.printStackTrace();
}




posted @ 2014-05-09 23:48 paulwong 阅读(530) | 评论 (0)编辑 收藏

浅释STORM

STORM是一个消息处理引擎,可以处理源源不断的进来的消息,这些消息的处理是可以按步骤的。

处理的方式有各种自定义:

  1. 可自定义消息处理的步骤

  2. 可自定义每种类型的消息需要多少个进程来处理

  3. 每个步骤里的消息是在某个进程里的线程来做处理的

  4. 可自定义每个步骤里的消息的线程数

  5. 可以增加和删除要处理的消息类型
如果要处理某种消息了,要怎么办呢?

  1. 定义数据来源组件(SPOUT)

  2. 定义处理步骤(BOLT)

  3. 组合成一个消息处理流程框架TOPOLOGY

  4. 定义处理消息的进程的数量、定义每个步骤并发时可用的线程数

  5. 部署TOPOLOGY
当一个TOPOLOGY被部署到STORM时,STORM会查找配置对象的WORKER数量,根据这个数量相应的启动N个JVM,然后根据每个步骤配置的NUMTASKS生成相应个数的线程,然后每个步骤中配置的数量实例化相应个数的对象,然后就启动一个线程不断的执行SPOUT中的nextTuple()方法,如果这个方法中有输出结果,就启动另一线程,并在此线程中将这个结果作为参数传到下一个对象的excue方法中。

如果此时又有一个步骤BOLT需要执行的话,也是新取一个线程去执行BOLT中的方法启动的线程不会越过NUMTASKS的数量。



posted @ 2014-05-09 22:56 paulwong 阅读(247) | 评论 (0)编辑 收藏

Storm performance

The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:

  1. TOPOLOGY_WORKERS (set with setNumWorkers) specifies how many processes you want allocated around the cluster to execute the topology. Each component in the topology will execute as many threads. The number of threads allocated to a given component is configured through the setBolt and setSpout methods. Those threadsexist within worker processes. Each worker process contains within it some number of threads for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
  2. TOPOLOGY_DEBUG (set with setDebug), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.

There's many other configurations you can set for the topology. The various configurations are detailed on the Javadoc for Config.


Common configurations


There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found here. The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology:

  1. Config.TOPOLOGY_WORKERS: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads.
  2. Config.TOPOLOGY_ACKERS: This sets the number of tasks that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them onGuaranteeing message processing.
  3. Config.TOPOLOGY_MAX_SPOUT_PENDING: This sets the maximum number of spout tuples that can be pending on a single spout task at once (pending means the tuple has not been acked or failed yet). It is highly recommended you set this config to prevent queue explosion.
  4. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: This is the maximum amount of time a spout tuple has to be fully completed before it is considered failed. This value defaults to 30 seconds, which is sufficient for most topologies. SeeGuaranteeing message processing for more information on how Storm's reliability model works.
  5. Config.TOPOLOGY_SERIALIZATIONS: You can register more serializers to Storm using this config so that you can use custom types within tuples.

Reference:
http://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html

storm rebalance 命令调整topology并行数及问题分析
http://blog.csdn.net/jmppok/article/details/17243857

flume+kafka+storm+mysql 数据流
http://blog.csdn.net/jmppok/article/details/17259145



http://storm.incubator.apache.org/documentation/Tutorial.html

posted @ 2014-05-08 09:19 paulwong 阅读(273) | 评论 (0)编辑 收藏

KMEANS PAGERANK ON HADOOP

https://github.com/keokilee/kmeans-hadoop

https://github.com/rorlig/hadoop-pagerank-java

http://wuyanzan60688.blog.163.com/blog/static/12777616320131011426159/

http://codecloud.net/hadoop-k-means-591.html


import java.io.*;
import java.net.URI;
import java.util.Iterator;
import java.util.Random;
import java.util.Vector;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class KMeans {
    static enum Counter { CENTERS, CHANGE, ITERATIONS }

    public static class Point implements WritableComparable<Point> {
        // Longs because this will store sum of many ints
        public LongWritable x;
        public LongWritable y;
        public IntWritable num; // For summation points

        public Point() {
            this.x = new LongWritable(0);
            this.y = new LongWritable(0);
            this.num = new IntWritable(0);
        }

        public Point(int x, int y) {
            this.x = new LongWritable(x);
            this.y = new LongWritable(y);
            this.num = new IntWritable(1);
        }

        public Point(IntWritable x, IntWritable y) {
            this.x = new LongWritable(x.get());
            this.y = new LongWritable(y.get());
            this.num = new IntWritable(1);
        }

        public void add(Point that) {
            x.set(x.get() + that.x.get());
            y.set(y.get() + that.y.get());
            num.set(num.get() + that.num.get());
        }

        public void norm() {
            x.set(x.get() / num.get());
            y.set(y.get() / num.get());
            num.set(1);
        }

        public void write(DataOutput out) throws IOException {
            x.write(out);
            y.write(out);
            num.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            x.readFields(in);
            y.readFields(in);
            num.readFields(in);
        }

        public long distance(Point that) {
            long dx = that.x.get() - x.get();
            long dy = that.y.get() - y.get();

            return dx * dx + dy * dy;
        }

        public String toString() {
            String ret = x.toString() + '\t' + y.toString();
            if (num.get() != 1)
                ret += '\t' + num.toString();
            return ret;
        }

        public int compareTo(Point that) {
            int ret = x.compareTo(that.x);
            if (ret == 0)
                ret = y.compareTo(that.y);
            if (ret == 0)
                ret = num.compareTo(that.num);
            return ret;
        }
    }

    public static class Map
            extends MapReduceBase
            implements Mapper<Text, Text, Point, Point>
    {
        private Vector<Point> centers;
        private IOException error;

        public void configure(JobConf conf) {
            try {
                Path paths[] = DistributedCache.getLocalCacheFiles(conf);
                if (paths.length != 1)
                    throw new IOException("Need exactly 1 centers file");

                FileSystem fs = FileSystem.getLocal(conf);
                SequenceFile.Reader in = new SequenceFile.Reader(fs, paths[0], conf);

                centers = new Vector<Point>();
                IntWritable x = new IntWritable();
                IntWritable y = new IntWritable();
                while(in.next(x, y))
                    centers.add(new Point(x, y));
                in.close();

                // Generate new points if we don't have enough.
                int k = conf.getInt("k", 0);
                Random rand = new Random();
                final int MAX = 1024*1024;
                for (int i = centers.size(); i < k; i++) {
                    x.set(rand.nextInt(MAX));
                    y.set(rand.nextInt(MAX));
                    centers.add(new Point(x, y));
                }
            } catch (IOException e) {
                error = e;
            }
        }

        public void map(Text xt, Text yt,
                OutputCollector<Point, Point> output, Reporter reporter)
            throws IOException
        {
            if (error != null)
                throw error;

            int x = Integer.valueOf(xt.toString());
            int y = Integer.valueOf(yt.toString());
            Point p = new Point(x, y);
            Point center = null;
            long distance = Long.MAX_VALUE;

            for (Point c : centers) {
                long d = c.distance(p);
                if (d <= distance) {
                    distance = d;
                    center = c;
                }
            }

            output.collect(center, p);
        }
    }

    public static class Combine
            extends MapReduceBase
            implements Reducer<Point, Point, Point, Point>
    {
        public void reduce(Point center, Iterator<Point> points,
                OutputCollector<Point, Point> output, Reporter reporter)
            throws IOException
        {
            Point sum = new Point();
            while(points.hasNext()) {
                sum.add(points.next());
            }

            output.collect(center, sum);
        }
    }

    public static class Reduce
            extends MapReduceBase
            implements Reducer<Point, Point, IntWritable, IntWritable>
    {
        public void reduce(Point center, Iterator<Point> points,
                OutputCollector<IntWritable, IntWritable> output,
                Reporter reporter)
            throws IOException
        {
            Point sum = new Point();
            while (points.hasNext()) {
                sum.add(points.next());
            }
            sum.norm();

            IntWritable x = new IntWritable((int) sum.x.get());
            IntWritable y = new IntWritable((int) sum.y.get());

            output.collect(x, y);

            reporter.incrCounter(Counter.CHANGE, sum.distance(center));
            reporter.incrCounter(Counter.CENTERS, 1);
        }
    }

    public static void error(String msg) {
        System.err.println(msg);
        System.exit(1);
    }

    public static void initialCenters(
            int k, JobConf conf, FileSystem fs,
            Path in, Path out)
        throws IOException
    {
        BufferedReader input = new BufferedReader(
                new InputStreamReader(fs.open(in)));
        SequenceFile.Writer output = new SequenceFile.Writer(
                fs, conf, out, IntWritable.class, IntWritable.class);
        IntWritable x = new IntWritable();
        IntWritable y = new IntWritable();
        for (int i = 0; i < k; i++) {
            String line = input.readLine();
            if (line == null)
                error("Not enough points for number of means");

            String parts[] = line.split("\t");
            if (parts.length != 2)
                throw new IOException("Found a point without two parts");

            x.set(Integer.valueOf(parts[0]));
            y.set(Integer.valueOf(parts[1]));
            output.append(x, y);
        }
        output.close();
        input.close();
    }

    public static void main(String args[]) throws IOException {
        JobConf conf = new JobConf(KMeans.class);
        GenericOptionsParser opts = new GenericOptionsParser(conf, args);
        String paths[] = opts.getRemainingArgs();

        FileSystem fs = FileSystem.get(conf);

        if (paths.length < 3)
            error("Usage:\n"
                    + "\tKMeans <file to display>\n"
                    + "\tKMeans <output> <k> <input file>"
                 );

        Path outdir  = new Path(paths[0]);
        int k = Integer.valueOf(paths[1]);
        Path firstin = new Path(paths[2]);
        
        if (k < 1 || k > 20)
            error("Strange number of means: " + paths[1]);

        if (fs.exists(outdir)) {
            if (!fs.getFileStatus(outdir).isDir())
                error("Output directory \"" + outdir.toString()
                        + "\" exists and is not a directory.");
        } else {
            fs.mkdirs(outdir);
        }

        // Input: text file, each line "x\ty"
        conf.setInputFormat(KeyValueTextInputFormat.class);
        for (int i = 2; i < paths.length; i++)
            FileInputFormat.addInputPath(conf, new Path(paths[i]));

        conf.setInt("k", k);

        // Map: (x,y) -> (centroid, point)
        conf.setMapperClass(Map.class);
        conf.setMapOutputKeyClass(Point.class);
        conf.setMapOutputValueClass(Point.class);

        // Combine: (centroid, points) -> (centroid, weighted point)
        conf.setCombinerClass(Combine.class);

        // Reduce: (centroid, weighted points) -> (x, y) new centroid
        conf.setReducerClass(Reduce.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(IntWritable.class);

        // Output
        conf.setOutputFormat(SequenceFileOutputFormat.class);

        // Chose initial centers
        Path centers = new Path(outdir, "initial.seq");
        initialCenters(k, conf, fs, firstin, centers);

        // Iterate
        long change  = Long.MAX_VALUE;
        URI cache[] = new URI[1];
        for (int iter = 1; iter <= 1000 && change > 100 * k; iter++) {
            Path jobdir = new Path(outdir, Integer.toString(iter));
            FileOutputFormat.setOutputPath(conf, jobdir);

            conf.setJobName("k-Means " + iter);
            conf.setJarByClass(KMeans.class);

            cache[0] = centers.toUri();
            DistributedCache.setCacheFiles( cache, conf );

            RunningJob result = JobClient.runJob(conf);
            System.out.println("Iteration: " + iter);

            change   = result.getCounters().getCounter(Counter.CHANGE);
            centers  = new Path(jobdir, "part-00000");
        }
    }
}

192.5.53.208

posted @ 2014-05-07 23:57 paulwong 阅读(382) | 评论 (0)编辑 收藏

SVN中tag branch trunk用法详解

本节主要讲解一下SVN中tag branch trunk的用法,在SVN中Branch/tag在一个功能选项中,在使用中也往往产生混淆。这里就向大家简单介绍一下,欢迎大家能和我一起学习SVN中tag branch trunk的用法。

在实现上,branch和tag,对于svn都是使用copy实现的,所以他们在默认的权限上和一般的目录没有区别。至于何时用tag,何时用branch,完全由人主观的根据规范和需要来选择,而不是强制的(比如cvs)。一般情况下,tag,是用来做一个milestone的,不管是不是release,都是一个可用的版本。这里,应该是只读的。更多的是一个显示用的,给人一个可读(readable)的标记。branch,是用来做并行开发的,这里的并行是指和trunk进行比较。比如,3.0开发完成,这个时候要做一个tag,tag_release_3_0,然后基于这个tag做release,比如安装程序等。trunk进入3.1的开发,但是3.0发现了bug,那么就需要基于tag_release_3_0做一个branch,branch_bugfix_3_0,基于这个branch进行bugfix,等到bugfix结束,做一个tag,tag_release_3_0_1,然后,根据需要决定branch_bugfix_3_0是否并入trunk。对于svn还要注意的一点,就是它是全局版本号,其实这个就是一个tag的标记,所以我们经常可以看到,什么什么release,基于xxx项目的2xxxx版本。就是这个意思了。但是,它还明确的给出一个tag的概念,就是因为这个更加的可读,毕竟记住tag_release_1_0要比记住一个很大的版本号容易的多。

branches:分枝
SVN中tag branch trunk的用法,首先看一下branches的介绍。当多个人合作,可能有这样的情况出现:John突然有个想法,跟原先的设计不太一致,可能是功能的添加或者日志格式的改进等等,总而言之,这个想法可能需要花一段时间来完成,而这个过程中,John的一些操作可能会影响Sally的工作,John从现有的状态单独出一个project的话,又不能及时得到Sally对已有代码做的修正,而且独立出来的话,John的尝试成功时,跟原来的合并也存在困难。这时最好的实践方法是使用branches。John建立一个自己的branch,然后在里面实验,必要的时候从Sally的trunk里取得更新,或者将自己的阶段成果汇集到trunk中。
(svncopySourceURL/trunkDestinationURL/branchName-m"Creatingaprivatebranchofxxxx/trunk.")

trunk:主干
主干,一般来说就是开发的主要呆的地方,

tag: 图标
在经过了一段时间的开发后,项目到达了一个里程碑阶段,你可能想记录这一阶段的代码的状态,那么你就需要给代码打上标签。
(svncpfile:///svnroot/mojavescripts/trunkfile:///svnroot/mojavescripts/tags/mirrorutils_rel_0_0_1-m"tagedmirrorutils_rel_0_0_1")另有一说,无所谓谁对谁错。

trunk:表示开发时版本存放的目录,即在开发阶段的代码都提交到该目录上。

branches:表示发布的版本存放的目录,即项目上线时发布的稳定版本存放在该目录中。

tags:表示标签存放的目录。

在这需要说明下分三个目录的原因,如果项目分为一期、二期、三期等,那么一期上线时的稳定版本就应该在一期完成时将代码copy到branches上,这样二期开发的代码就对一期的代码没有影响,如新增的模块就不会部署到生产环境上。而branches上的稳定的版本就是发布到生产环境上的代码,如果用户使用的过程中发现有bug,则只要在branches上修改该bug,修改完bug后再编译branches上最新的代码发布到生产环境即可。tags的作用是将在branches上修改的bug的代码合并到trunk上时创建个版本标识,以后branches上修改的bug代码再合并到trunk上时就从tags的version到branches最新的version合并到trunk,以保证前期修改的bug代码不会再合并。

-------------------------------------------------------------------------------------------
介绍SVN中tag branch trunk用法时,一直以来用svn只是当作cvs,也从来没有仔细看过文档,直到今天用到,才去翻看svnbook文档,惭愧

需求一:
有一个客户想对产品做定制,但是我们并不想修改原有的svn中trunk的代码。
方法:
用svn建立一个新的branches,从这个branche做为一个新的起点来开发
svncopysvn://server/trunksvn://server/branches/ep-m"initep"
Tip:
如果你的svn中以前没有branches这个的目录,只有trunk这个,你可以用
svnmkdirbranches新建个目录

需求二:
产品开发已经基本完成,并且通过很严格的测试,这时候我们就想发布给客户使用,发布我们的1.0版本
svncopysvn://server/trunksvn://server/tags/release-1.0-m"1.0released"咦,这个和branches有什么区别,好像啥区别也没有?
是的,branches和tags是一样的,都是目录,只是我们不会对这个release-1.0的tag做修改了,不再提交了,如果提交那么就是branches

需求三:
有一天,突然在trunk下的core中发现一个致命的bug,那么所有的branches一定也一样了,该怎么办?
svn-r148:149mergesvn://server/trunkbranches/ep其中148和149是两次修改的版本号。SVN中tag branch trunk用法介绍完毕。

posted @ 2014-05-07 00:10 paulwong 阅读(316) | 评论 (0)编辑 收藏

SVN的标准目录结构:trunk、branches、tags

我们在一些著名开源项目的版本库中,通常可以看到trunk, branches, tags等三个目录。由于SVN固有的特点,目录在SVN中并没有特别的意义,但是这三个目录却在大多数开源项目中存在,这是因为这三个目录反映了软件开发的通常模式。

trunk是主分支,是日常开发进行的地方。

branches是分支。一些阶段性的release版本,这些版本是可以继续进行开发和维护的,则放在branches目录中。又比如为不同用户客制化的版本,也可以放在分支中进行开发。

tags目录一般是只读的,这里存储阶段性的发布版本,只是作为一个里程碑的版本进行存档。

比如一个项目有main.cpp, common.h两个文件,假设目前在开发的是最新的3.0版本,而且1.0/2.0版本也在进行维护,那么项目树将类似如下样子:

project
|
+-- trunk
+ |
+ +----- main.cpp (3.0版本的最新文件)
+ +----- common.h
+
+-- branches
+ |
+ +-- r1.0
+ + |
+ + +---- main.cpp (1.x版本的最新文件)
+ + +---- common.h
+ +
+ +-- r2.0
+ |
+ +---- main.cpp (2.x版本的最新文件)
+ +---- common.h
+
+-- tags (此目录只读)
|
+-- r1.0
+ |
+ +---- main.cpp (1.0版本的发布文件)
+ +---- common.h
+
+-- r1.1
+ |
+ +---- main.cpp (1.1版本的发布文件)
+ +---- common.h
+
+-- r1.2
+ |
+ +---- main.cpp (1.2版本的发布文件)
+ +---- common.h
+
+-- r1.3
+ |
+ +---- main.cpp (1.3版本的发布文件)
+ +---- common.h
+
+-- r2.0
+ |
+ +---- main.cpp (2.0版本的发布文件)
+ +---- common.h
+
+-- r2.1
|
+---- main.cpp (2.1版本的发布文件)
+---- common.h


要使用这样的文件夹结构,在建立项目版本库时,可首先建好项目文件夹,并在其中建立trunk, branches, tags三个空的子目录,再将项目文件夹连同这三个子目录一起导入版本库。

这样在trunk中开始进行开发,当需要建立branch或tag时,使用SVN的copy操作进行。

其中tags目录需要只读,可以使用SVN中的authz文件控制该目录的访问权限为只读。

posted @ 2014-05-07 00:07 paulwong 阅读(275) | 评论 (0)编辑 收藏

Packt celebrates International Day Against DRM, May 6th 2014

Packt celebrates International Day Against DRM, May 6th 2014

 

 

 

 

 

According to the definition of DRM on Wikipedia, Digital Rights Management (DRM) is a class of technologies that are used by hardware manufacturers, publishers, copyright holders, and individuals with the intent to control the use of digital content and devices after sale.

 

However, Packt Publishing firmly believes that you should be able to read and interact with your content when you want, where you want, and how you want – to that end they have been advocates of DRM-free content since their very first eBook was published back in 2004.

 

To show their continuing support for Day Against DRM, Packt Publishing is offering all its DRM-free content at $10 for 24 hours only on May 6th – that’s all 2000+ eBooks and Videos. Check it out at: http://bit.ly/1q6bpha.

posted @ 2014-05-06 20:05 paulwong 阅读(176) | 评论 (0)编辑 收藏

LINUX下安装SVN客户端及MAVEN

  1. 安装SVN客户端
    apt-get install subversion subversion-tools

  2. 下载代码
    svn co http://svn.shiwan.com/svn/shiwan/branches/program/


  3. 更新代码
    cd 到欲更新代码的目录
    svn update


  4. 安装MAVEN,下载安装包并解压
    wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.2.1/binaries/apache-maven-3.2.1-bin.tar.gz
    tar -zxf apache-maven-3.0.3-bin.tar.gz


  5. 编辑/etc/profile
    export JAVA_HOME=/usr/lib/jvm/java-7-oracle
    export STORM_HOME=/home/ubuntu/java/storm-0.8.1
    export KAFKA_HOME=/home/ubuntu/java/kafka_2.9.2-0.8.1.1
    export ZOOKEEPER_HOME=/home/ubuntu/java/zookeeper-3.4.6
    export BIN_HOME=/home/ubuntu/java
    export MAVEN_HOME=/home/ubuntu/java/apache-maven-3.2.1

    export PATH=$JAVA_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$BIN_HOME/bin:$MAVEN_HOME/bin:$PATH


  6. 更新SOURCE
    source /etc/profile


  7. CD到含有pom.xml的文件夹,运行
    mvn package


  8. CD到target文件夹即可看到已编辑的文件
    cd 到欲更新代码的目录
    ll


posted @ 2014-05-05 17:06 paulwong 阅读(425) | 评论 (0)编辑 收藏

非ORACLE网站下载JDK

http://ghaffarian.net/downloads/Java/JDK/

posted @ 2014-05-04 18:48 paulwong 阅读(236) | 评论 (0)编辑 收藏

安装STORM

  1. install ZeroMQ
    wget http://download.zeromq.org/historic/zeromq-2.1.7.tar.gz
    tar -xzf zeromq-2.1.7.tar.gz
    cd zeromq-2.1.7
    ./configure
     //在configure时可能会报缺包,安装即可:sudo apt-get install g++ uuid-dev
    make
    sudo make install
  2. install JZMQ
    git clone https://github.com/nathanmarz/jzmq.git
    cd jzmq
    ./autogen.sh
    ./configure
    make
    sudo make install

  3. 下载并解压STORM

  4. 编辑conf/storm.yaml
    storm.zookeeper.servers:
    "1.2.3.5"
    "1.2.3.6"
    "1.2.3.7"
    storm.local.dir: "/opt/folder"
    nimbus.host: "54.72.4.92"
    supervisor.slots.ports:
    6700
    6701
    6702
  5. 编辑/etc/profile
    export JAVA_HOME=/usr/lib/jvm/java-7-oracle
    export STORM_HOME=/home/ubuntu/java/storm-0.8.1
    export KAFKA_HOME=/home/ubuntu/java/kafka_2.9.2-0.8.1.1
    export ZOOKEEPER_HOME=/home/ubuntu/java/zookeeper-3.4.6

    export PATH=$JAVA_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH

  6. 制作启动命令: start-storm.sh
    storm nimbus &
    storm supervisor &
    storm ui &

安装途中如果遇到问题
http://my.oschina.net/mingdongcheng/blog/43009

posted @ 2014-05-04 18:01 paulwong 阅读(253) | 评论 (0)编辑 收藏

仅列出标题
共112页: First 上一页 50 51 52 53 54 55 56 57 58 下一页 Last