posts - 2,  comments - 0,  trackbacks - 0
main:
 1 package com.aamend.hadoop.MapReduce;
 2  
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
15  
16 public class WordCount {
17  
18     public static void main(String[] args) throws IOException,
19             InterruptedException, ClassNotFoundException {
20  
21         Path inputPath = new Path(args[0]);
22         Path outputDir = new Path(args[1]);
23  
24         // Create configuration
25         Configuration conf = new Configuration(true);
26  
27         // Create job
28         Job job = new Job(conf, "WordCount");
29         job.setJarByClass(WordCountMapper.class);
30  
31         // Setup MapReduce
32         job.setMapperClass(WordCountMapper.class);
33         job.setReducerClass(WordCountReducer.class);
34         job.setNumReduceTasks(1);
35  
36         // Specify key / value
37         job.setOutputKeyClass(Text.class);
38         job.setOutputValueClass(IntWritable.class);
39  
40         // Input
41         FileInputFormat.addInputPath(job, inputPath);
42         job.setInputFormatClass(TextInputFormat.class);
43  
44         // Output
45         FileOutputFormat.setOutputPath(job, outputDir);
46         job.setOutputFormatClass(TextOutputFormat.class);
47  
48         // Delete output if exists
49         FileSystem hdfs = FileSystem.get(conf);
50         if (hdfs.exists(outputDir))
51             hdfs.delete(outputDir, true);
52  
53         // Execute job
54         int code = job.waitForCompletion(true? 0 : 1;
55         System.exit(code);
56  
57     }
58  
59 }

mapper class:
 1 package com.aamend.hadoop.MapReduce;
 2  
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8  
 9 public class WordCountMapper extends
10         Mapper<Object, Text, Text, IntWritable> {
11  
12     private final IntWritable ONE = new IntWritable(1);
13     private Text word = new Text();
14  
15     public void map(Object key, Text value, Context context)
16             throws IOException, InterruptedException {
17  
18         String[] csv = value.toString().split(",");
19         for (String str : csv) {
20             word.set(str);
21             context.write(word, ONE);
22         }
23     }
24 }

reducer class:
 1 package com.aamend.hadoop.MapReduce;
 2  
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8  
 9 public class WordCountReducer extends
10         Reducer<Text, IntWritable, Text, IntWritable> {
11  
12     public void reduce(Text text, Iterable<IntWritable> values, Context context)
13             throws IOException, InterruptedException {
14         int sum = 0;
15         for (IntWritable value : values) {
16             sum += value.get();
17         }
18         context.write(text, new IntWritable(sum));
19     }
20 }




posted on 2014-09-24 21:48 hqjma 阅读(84) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航:
 
<2024年12月>
24252627282930
1234567
891011121314
15161718192021
22232425262728
2930311234

常用链接

留言簿

随笔分类

随笔档案

文章分类

文章档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜