main:
1 package com.aamend.hadoop.MapReduce;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.IntWritable;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
15
16 public class WordCount {
17
18 public static void main(String[] args) throws IOException,
19 InterruptedException, ClassNotFoundException {
20
21 Path inputPath = new Path(args[0]);
22 Path outputDir = new Path(args[1]);
23
24 // Create configuration
25 Configuration conf = new Configuration(true);
26
27 // Create job
28 Job job = new Job(conf, "WordCount");
29 job.setJarByClass(WordCountMapper.class);
30
31 // Setup MapReduce
32 job.setMapperClass(WordCountMapper.class);
33 job.setReducerClass(WordCountReducer.class);
34 job.setNumReduceTasks(1);
35
36 // Specify key / value
37 job.setOutputKeyClass(Text.class);
38 job.setOutputValueClass(IntWritable.class);
39
40 // Input
41 FileInputFormat.addInputPath(job, inputPath);
42 job.setInputFormatClass(TextInputFormat.class);
43
44 // Output
45 FileOutputFormat.setOutputPath(job, outputDir);
46 job.setOutputFormatClass(TextOutputFormat.class);
47
48 // Delete output if exists
49 FileSystem hdfs = FileSystem.get(conf);
50 if (hdfs.exists(outputDir))
51 hdfs.delete(outputDir, true);
52
53 // Execute job
54 int code = job.waitForCompletion(true) ? 0 : 1;
55 System.exit(code);
56
57 }
58
59 }
mapper class:
1 package com.aamend.hadoop.MapReduce;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Mapper;
8
9 public class WordCountMapper extends
10 Mapper<Object, Text, Text, IntWritable> {
11
12 private final IntWritable ONE = new IntWritable(1);
13 private Text word = new Text();
14
15 public void map(Object key, Text value, Context context)
16 throws IOException, InterruptedException {
17
18 String[] csv = value.toString().split(",");
19 for (String str : csv) {
20 word.set(str);
21 context.write(word, ONE);
22 }
23 }
24 }
reducer class:
1 package com.aamend.hadoop.MapReduce;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Reducer;
8
9 public class WordCountReducer extends
10 Reducer<Text, IntWritable, Text, IntWritable> {
11
12 public void reduce(Text text, Iterable<IntWritable> values, Context context)
13 throws IOException, InterruptedException {
14 int sum = 0;
15 for (IntWritable value : values) {
16 sum += value.get();
17 }
18 context.write(text, new IntWritable(sum));
19 }
20 }
posted on 2014-09-24 21:48
hqjma 阅读(85)
评论(0) 编辑 收藏