march alex's blog
hello,I am march alex
posts - 52,comments - 7,trackbacks - 0
MapReduce是一种可用于数据处理的变成模型。
这里主要讲述Java语言实现MapReduce。
一个投票模型:
Jobs很喜欢给女生打分,好看的打100分,难看的打0分。有一次他给Lucy打了0分,结果被Lucy痛打了一顿。
还有一次,Jobs给两个美女打分,给美女Alice打了99分,给美女Candice打了98分。
这个时候Alex就看不下去了,他于是站起来说:“明明是Candice比较好看嘛!”。
两人于是争执起来,为了证明自己的观点,结果爆发了一场大战!什么降龙十八掌啊,黯然销魂掌啊,他们都不会。
那么怎么才能让对方输的心服口服呢?他们想到“群众的眼睛是雪亮的”!于是他们发动了班上的20名同学进行投票。
结果出来了,Alice的平均分是98.5,Candice的平均分是99.7,以压倒性的优势获得了胜利。
但是Jobs不服,于是把班上每个女生的照片放到了网上,让大家来评分,最后他得到了好多文件,用自己的电脑算根本忙不过来,于是他想到了用Hadoop写一个MapReduce程序。
一直输入文件的格式是:"[user]\t[girlname]\t[point]".例:
alex    alice   88
alex    candice 100
jobs    alice   100
john    lucy    89

在这里,我们假设每个人的评分都为0到100的整数,最终的结果向下取整。那么MapReduce程序可以写成如下:

我们需要三样东西:一个map函数,一个reduce函数,和一些用来运行作业的代码。
map函数由Mapper接口实现来表示,后者声明了一个map()方法。
AverageMapper.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class AverageMapper extends MapReduceBase
  implements Mapper<IntWritable, Text, Text, IntWritable> {
        
        public void map(IntWritable key, Text value,
                        OutputCollector<Text, IntWritable> output, Reporter reporter)
                        throws IOException {
                
                String s = value.toString();
                String name = new String();
                int point = 0;
                int i;
                for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                        name += s.charAt(i);
                }
                for(i++;i<s.length();i++) {
                        point = point * 10 + (s.charAt(i) - '0');
                }
                if(name.length() != 0 && point >=0 && point <= 100) {
                        output.collect(new Text(name), new IntWritable(point));
                }
        }
}

该Mapper接口是一个泛型类型,他有四个形参类型,分别指定map函数的输入键、输入值、输出键、输出值的类型。
以示例来说,输入键是一个整形偏移量(表示行号),输入值是一行文本,输出键是美女的姓名,输出值是美女的得分。
Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。
这里我们使用IntWritable类型(相当于Java中的Integer类型)和Text类型(想到与Java中的String类型)。
map()方法还提供了OutputCollector示例用于输出内容的写入。
我们只在输入内容格式正确的时候才将其写入输出记录中。

reduce函数通过Reducer进行类似的定义。
AverageReducer.java

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;


public class AverageReducer extends MapReduceBase
  implements Reducer<Text, IntWritable, Text, IntWritable> {
        
        public void reduce(Text key, Iterator<IntWritable> values,
                        OutputCollector<Text, IntWritable> output, Reporter reporter)
                        throws IOException {
                
                long tot_point = 0, num = 0;
                while(values.hasNext()) {
                        tot_point += values.next().get();
                        num ++;
                }
                int ave_point = (int)(tot_point/num);
                output.collect(key, new IntWritable(ave_point));
        }
}

第三部分代码负责运行MapReduce作业。
Average.java
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;


public class Average {
        public static void main(String[] args) throws IOException {
                if(args.length != 2) {
                        System.err.println("Usage: Average <input path> <output path>");
                        System.exit(-1);
                }
                JobConf conf = new JobConf(Average.class);
                conf.setJobName("Average");
                FileInputFormat.addInputPath(conf, new Path(args[0]));
                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
                conf.setMapperClass(AverageMapper.class);
                conf.setReducerClass(AverageReducer.class);
                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(IntWritable.class);
        }
}

JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。
在Hadoop作业上运行着写作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这些文件)。
我们无需明确指定JAR文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找JAR文件进而找到相关的JAR文件。
构造JobCOnf对象之后,需要指定输入和输出数据的路径。
调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径。
可以多次调用addInputOath()实现多路径的输入。
调用FileOutputFormat类的静态函数SetOutputPath()来指定输出路径。
该函数指定了reduce函数输出文件的写入目录。
在运行任务前该目录不应该存在,否则Hadoop会报错并拒绝运行该任务。
这种预防措施是为了防止数据丢失。
接着,通过SetMapperClass()和SetReducerClass()指定map和reduce类型。
输入类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。

新增的Java MapReduce API


新的Hadoop在版本0.20.0包含有一个新的Java MapReduce API,又是也称为"上下文对象"(context object),旨在使API在今后更容易扩展。
新特性:
倾向于使用虚类,而不是接口;
新的API放在org.apache.hadoop.mapreduce包中(旧版本org.apache.hadoop.mapred包);
新的API充分使用上下文对象,使用户代码能与MapReduce通信。例如:MapContext基本具备了JobConf,OutputCollector和Reporter的功能;
新的API同时支持“推”(push)和“拉”(pull)的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方法中拉出、对reducer来说是一样的。拉式处理的好处是可以实现批量处理,而非逐条记录的处理。
新的API实现了配置的统一。所有作业的配置均通过Configuration来完成。(区别于旧API的JobConf)。
新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
输出文件的命名稍有不同。

用新上下文对象来重写Average应用

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewAverage {
        
        static class NewAverageMapper
          extends Mapper<IntWritable, Text, Text, IntWritable> {
                
                public void map(IntWritable key, Text value, Context context) 
                        throws IOException, InterruptedException {
                        
                        String s = value.toString();
                        String name = new String();
                        int point = 0;
                        int i;
                        for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                        for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                                name += s.charAt(i);
                        }
                        for(i++;i<s.length();i++) {
                                point = point * 10 + (s.charAt(i) - '0');
                        }
                        if(name.length() != 0 && point >=0 && point <= 100) {
                                context.write(new Text(name), new IntWritable(point));
                        }
                }
        }
        
        static class NewAverageReducer
          extends Reducer<Text, IntWritable, Text, IntWritable> {
                
                public void reduce(Text key, Iterable<IntWritable> values,
                                Context context)
                                throws IOException, InterruptedException {
                        
                        long tot_point = 0, num = 0;
                        for(IntWritable value : values) {
                                tot_point += value.get();
                                num ++;
                        }
                        int ave_point = (int)(tot_point/num);
                        context.write(key, new IntWritable(ave_point));
                }
        }
        
        public static void main(String[] args) throws Exception {
                if(args.length != 2) {
                        System.err.println("Usage: NewAverage <input path> <output path>");
                        System.exit(-1);
                }
                Job job = new Job();
                job.setJarByClass(NewAverage.class);
                
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                
                job.setMapperClass(NewAverageMapper.class);
                job.setReducerClass(NewAverageReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}
posted on 2015-03-08 13:27 marchalex 阅读(1368) 评论(0)  编辑  收藏 所属分类: java小程序

只有注册用户登录后才能发表评论。


网站导航: