paulwong

Windows环境下用ECLIPSE提交MAPREDUCE JOB至远程HBASE中运行

  1. 假设远程HADOOP主机名为ubuntu,则应在hosts文件中加上192.168.58.130       ubuntu


  2. 新建MAVEN项目,加上相应的配置
    pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation
    ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>com.cloudputing</groupId>
      <artifactId>bigdata</artifactId>
      <version>1.0</version>
      <packaging>jar</packaging>

      <name>bigdata</name>
      <url>http://maven.apache.org</url>

      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-hadoop</artifactId>
                <version>0.9.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase</artifactId>
                <version>0.94.1</version>
            </dependency>
            
            <!-- <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase</artifactId>
                <version>0.90.2</version>
            </dependency> 
    -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-core</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>3.0.5.RELEASE</version>
            </dependency>
        </dependencies>
    </project>


  3. hbase-site.xml
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!--
    /**
     * Copyright 2010 The Apache Software Foundation
     *
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    -->
    <configuration>

        <property>
            <name>hbase.rootdir</name>
            <value>hdfs://ubuntu:9000/hbase</value>
        </property>

        <!-- 在构造JOB时,会新建一文件夹来准备所需文件。
               如果这一段没写,则默认本地环境为LINUX,将用LINUX命令去实施,在WINDOWS环境下会出错 
    -->
        <property>
            <name>mapred.job.tracker</name>
            <value>ubuntu:9001</value>
        </property>
        
        <property>
            <name>hbase.cluster.distributed</name>
            <value>true</value>
        </property>
        
        <!-- 此处会向ZOOKEEPER咨询JOB TRACKER的可用IP -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ubuntu</value>
        </property>
        <property skipInDoc="true">
            <name>hbase.defaults.for.version</name>
            <value>0.94.1</value>
        </property>

    </configuration>


  4. 测试文件:MapreduceTest.java
    package com.cloudputing.mapreduce;

    import java.io.IOException;

    import junit.framework.TestCase;

    public class MapreduceTest extends TestCase{
        
        public void testReadJob() throws IOException, InterruptedException, ClassNotFoundException
        {
            MapreduceRead.read();
        }

    }


  5. MapreduceRead.java
    package com.cloudputing.mapreduce;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

    public class MapreduceRead {
        
        public static void read() throws IOException, InterruptedException, ClassNotFoundException
        {
            // Add these statements. XXX
    //        File jarFile = EJob.createTempJar("target/classes");
    //        EJob.addClasspath("D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/src/main/resources");
    //        ClassLoader classLoader = EJob.getClassLoader();
    //        Thread.currentThread().setContextClassLoader(classLoader);

            Configuration config = HBaseConfiguration.create();
            addTmpJar("file:/D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/target/bigdata-1.0.jar",config);
            
            Job job = new Job(config, "ExampleRead");
            // And add this statement. XXX
    //        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

    //        TableMapReduceUtil.addDependencyJars(job);
    //        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
    //                MapreduceRead.class,MyMapper.class);
            
            job.setJarByClass(MapreduceRead.class);     // class that contains mapper
            
            Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs
            
    // set other scan attrs
            
            TableMapReduceUtil.initTableMapperJob(
                    "wiki",        // input HBase table name
                    scan,             // Scan instance to control CF and attribute selection
                    MapreduceRead.MyMapper.class,   // mapper
                    null,             // mapper output key 
                    null,             // mapper output value
                    job);
            job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
            
    //        DistributedCache.addFileToClassPath(new Path("hdfs://node.tracker1:9000/user/root/lib/stat-analysis-mapred-1.0-SNAPSHOT.jar"),job.getConfiguration());
            
            boolean b = job.waitForCompletion(true);
            if (!b) {
                throw new IOException("error with job!");
            }
            
        }
        
        /**
         * 为Mapreduce添加第三方jar包
         * 
         * 
    @param jarPath
         *            举例:D:/Java/new_java_workspace/scm/lib/guava-r08.jar
         * 
    @param conf
         * 
    @throws IOException
         
    */
        public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
            System.setProperty("path.separator", ":");
            FileSystem fs = FileSystem.getLocal(conf);
            String newJarPath = new Path(jarPath).makeQualified(fs).toString();
            String tmpjars = conf.get("tmpjars");
            if (tmpjars == null || tmpjars.length() == 0) {
                conf.set("tmpjars", newJarPath);
            } else {
                conf.set("tmpjars", tmpjars + ":" + newJarPath);
            }
        }
        
        public static class MyMapper extends TableMapper<Text, Text> {

            public void map(ImmutableBytesWritable row, Result value,
                    Context context) throws InterruptedException, IOException {
                String val1 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual1")));
                String val2 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual2")));
                System.out.println(val1 + " -- " + val2);
            }
            
            private String getValue(byte [] value)
            {
                return value == null? "null" : new String(value);
            }
        } 

    }

posted on 2013-01-29 00:19 paulwong 阅读(1755) 评论(0)  编辑  收藏 所属分类: 分布式HADOOP云计算HBASE


只有注册用户登录后才能发表评论。


网站导航: