MapReduce是一种可用于数据处理的变成模型。
这里主要讲述Java语言实现MapReduce。
一个投票模型:
Jobs很喜欢给女生打分,好看的打100分,难看的打0分。有一次他给Lucy打了0分,结果被Lucy痛打了一顿。
还有一次,Jobs给两个美女打分,给美女Alice打了99分,给美女Candice打了98分。
这个时候Alex就看不下去了,他于是站起来说:“明明是Candice比较好看嘛!”。
两人于是争执起来,为了证明自己的观点,结果爆发了一场大战!什么降龙十八掌啊,黯然销魂掌啊,他们都不会。
那么怎么才能让对方输的心服口服呢?他们想到“群众的眼睛是雪亮的”!于是他们发动了班上的20名同学进行投票。
结果出来了,Alice的平均分是98.5,Candice的平均分是99.7,以压倒性的优势获得了胜利。
但是Jobs不服,于是把班上每个女生的照片放到了网上,让大家来评分,最后他得到了好多文件,用自己的电脑算根本忙不过来,于是他想到了用Hadoop写一个MapReduce程序。
一直输入文件的格式是:"[user]\t[girlname]\t[point]".例:
alex alice 88
alex candice 100
jobs alice 100
john lucy 89
在这里,我们假设每个人的评分都为0到100的整数,最终的结果向下取整。那么MapReduce程序可以写成如下:
我们需要三样东西:一个map函数,一个reduce函数,和一些用来运行作业的代码。
map函数由Mapper接口实现来表示,后者声明了一个map()方法。
AverageMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class AverageMapper extends MapReduceBase
implements Mapper<IntWritable, Text, Text, IntWritable> {
public void map(IntWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String s = value.toString();
String name = new String();
int point = 0;
int i;
for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
name += s.charAt(i);
}
for(i++;i<s.length();i++) {
point = point * 10 + (s.charAt(i) - '0');
}
if(name.length() != 0 && point >=0 && point <= 100) {
output.collect(new Text(name), new IntWritable(point));
}
}
}
该Mapper接口是一个泛型类型,他有四个形参类型,分别指定map函数的输入键、输入值、输出键、输出值的类型。
以示例来说,输入键是一个整形偏移量(表示行号),输入值是一行文本,输出键是美女的姓名,输出值是美女的得分。
Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。
这里我们使用IntWritable类型(相当于Java中的Integer类型)和Text类型(想到与Java中的String类型)。
map()方法还提供了OutputCollector示例用于输出内容的写入。
我们只在输入内容格式正确的时候才将其写入输出记录中。
reduce函数通过Reducer进行类似的定义。
AverageReducer.java
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class AverageReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
long tot_point = 0, num = 0;
while(values.hasNext()) {
tot_point += values.next().get();
num ++;
}
int ave_point = (int)(tot_point/num);
output.collect(key, new IntWritable(ave_point));
}
}
第三部分代码负责运行MapReduce作业。
Average.java
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
public class Average {
public static void main(String[] args) throws IOException {
if(args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(Average.class);
conf.setJobName("Average");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(AverageMapper.class);
conf.setReducerClass(AverageReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
}
}
JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。
在Hadoop作业上运行着写作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这些文件)。
我们无需明确指定JAR文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找JAR文件进而找到相关的JAR文件。
构造JobCOnf对象之后,需要指定输入和输出数据的路径。
调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径。
可以多次调用addInputOath()实现多路径的输入。
调用FileOutputFormat类的静态函数SetOutputPath()来指定输出路径。
该函数指定了reduce函数输出文件的写入目录。
在运行任务前该目录不应该存在,否则Hadoop会报错并拒绝运行该任务。
这种预防措施是为了防止数据丢失。
接着,通过SetMapperClass()和SetReducerClass()指定map和reduce类型。
输入类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
新增的Java MapReduce API
新的Hadoop在版本0.20.0包含有一个新的Java MapReduce API,又是也称为"上下文对象"(context object),旨在使API在今后更容易扩展。
新特性:
倾向于使用虚类,而不是接口;
新的API放在org.apache.hadoop.mapreduce包中(旧版本org.apache.hadoop.mapred包);
新的API充分使用上下文对象,使用户代码能与MapReduce通信。例如:MapContext基本具备了JobConf,OutputCollector和Reporter的功能;
新的API同时支持“推”(push)和“拉”(pull)的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方法中拉出、对reducer来说是一样的。拉式处理的好处是可以实现批量处理,而非逐条记录的处理。
新的API实现了配置的统一。所有作业的配置均通过Configuration来完成。(区别于旧API的JobConf)。
新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
输出文件的命名稍有不同。
用新上下文对象来重写Average应用
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewAverage {
static class NewAverageMapper
extends Mapper<IntWritable, Text, Text, IntWritable> {
public void map(IntWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s = value.toString();
String name = new String();
int point = 0;
int i;
for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
name += s.charAt(i);
}
for(i++;i<s.length();i++) {
point = point * 10 + (s.charAt(i) - '0');
}
if(name.length() != 0 && point >=0 && point <= 100) {
context.write(new Text(name), new IntWritable(point));
}
}
}
static class NewAverageReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
long tot_point = 0, num = 0;
for(IntWritable value : values) {
tot_point += value.get();
num ++;
}
int ave_point = (int)(tot_point/num);
context.write(key, new IntWritable(ave_point));
}
}
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: NewAverage <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewAverage.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewAverageMapper.class);
job.setReducerClass(NewAverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
posted on 2015-03-08 13:27
marchalex 阅读(1371)
评论(0) 编辑 收藏 所属分类:
java小程序