前置条件
成功安装配置Hadoop集群
成功编译安装Hadoop Eclipse插件
本地有和服务器相同版本的Hadoop安装包,并已解压(本例使用hadoop-1.1.2)
启动Hadoop集群
配置hadoop installation directory
Eclipse选择WindowàPreferens,弹出Preferens窗口,选择左侧Hadoop Map/Reduce选项,右侧配置Hadoop installation directory
配置Map/Reduce Locations
Eclipse选择WindowàOpen PerspectiveàMap/Reduce,打开Map/Reduce视图。
点击上图右上角蓝色小象,弹出Hadoop Location新增配置窗口,如下图:
Location name:自定义Location名称
Map/Reduce Master:mapred-site.xml中mapred.job.tracker属性值
DFS Master:core-site.xml中fs.default.name属性值
User name:服务器端连接hdfs的用户名
连接成功,如上图左侧所示,列出了当前HDFS中的目录。
新建MapReduce Project
以NCDC求年最高气温为例,将气温数据放在/user/hadoop/ncdc/input目录下,如图所示:
创建测试类MaxTempTest,代码如下:
package com.hadoop.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTempTest extends Configured implements Tool {
public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//从输入文本中解析出年和气温
String line = value.toString();
String year = line.substring(15, 19);
int airTemp = 0;
if (line.length() > 87) {
if (line.charAt(87) == '+') {
airTemp = Integer.parseInt(line.substring(88, 92));
} else {
airTemp = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemp != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemp));
}
} else {
System.out.println("year: " + year + ", " + context.getInputSplit().toString());
context.write(new Text(year), new IntWritable(airTemp));
}
}
}
public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws IOException ,InterruptedException {
//计算最大值
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
int exitCode = ToolRunner.run(new MaxTempTest(), args);
System.exit(exitCode);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if (args.length != 2) {
System.out.println("Usage: MaxTempTest <input path> <output path>");
System.exit(-1);
}
Configuration config = new Configuration();
try {
FileSystem hdfs = FileSystem.get(config);
Job job = new Job(config, "Max TempTest");
//设置输出Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setJarByClass(MaxTempTest.class);
job.setMapperClass(MaxTempMapper.class); //设置Mapper类
job.setReducerClass(MaxTempReducer.class); //设置Reducer类
//设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
//如果输出目录已经存在,删除该目录
if (hdfs.exists(outputDir)) {
hdfs.delete(outputDir, true);
}
//设置输入输出路径
FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
//提交作业
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
}
}
运行测试项目
在测试类MaxTempTest右键àRun AsàRun Configurations弹出Run Configurations窗口,右键Java ApplicationàNew,新建名为MaxTempTest,如下图所示:
点右侧Arguments配置运行参数,在Program arguments中输入:
hdfs://10.120.10.11:9090/user/hadoop/ncdc/input hdfs://10.120.10.11:9090/user/hadoop/ncdc/output
在测试类MaxTempTest右键àRun AsàRun on Hadoop弹出Select Hadoop location窗口,如下图所示:
选择“Choose an existing server from the list below”,点击“Finish”,提交作业。
运行错误一
解决方法:
从hadoop-1.1.2\src\core\org\apache\hadoop\fs目录找到FileUtil.java,将该文件复制到自己的测试项目中,包名与FileUtil中的package定义相同,注释掉类中checkReturnValue方法中的代码,如下图所示:
运行错误二
解决方法:
在服务器端修改目录权限,执行hadoop fs -chmod -R 777 ./ncdc命令
运行结果
如上图所示,作业运行成功。Eclipse左侧“DFS Locations”右键刷新”ncdc”目录,双击打开part-r-00000查看运行结果,如下图所示: