统计域名(实际是host)的计数器。
输入:一个文件夹中有一堆的文本文件,内容是一行一个的url,可以想像为数据库中的一条记录
流程:提取url的domain,对domain计数+1
输出:域名,域名计数
代码如下:
Mapper
package com.keseek.hadoop;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;
public class DomainCountMapper implements
Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void configure(JobConf arg0) {
// Init Text and LongWritable
domain = new Text();
one = new LongWritable(1);
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
// Get URL
String url = value.toString().trim();
// URL->Domain && Collect
domain.set(ParseDomain(url));
if (domain.getLength() != 0) {
output.collect(domain, one);
}
}
public String ParseDomain(String url) {
try {
URI uri = URI.create(url);
return uri.getHost();
} catch (Exception e) {
return "";
}
}
// Shared used Text domain
private Text domain;
// One static
private LongWritable one;
} Reducer
package com.keseek.hadoop;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Reducer;
public class DomainCountReducer implements
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void configure(JobConf arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
// Count the domain
long cnt = 0;
while (values.hasNext()) {
cnt += values.next().get();
}
// Output
output.collect(key, new LongWritable(cnt));
}
} Main
package com.keseek.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class DomainCountMain {
public static void main(String[] args) throws Exception {
// Param for path
if (args.length != 2) {
System.out.println("Usage:");
System.out
.println("DomainCountMain.jar <Input_Path> <Outpu_Path>");
System.exit(-1);
}
// Configure JobConf
JobConf jobconf = new JobConf(DomainCountMain.class);
jobconf.setJobName("Domain Counter by Coder4");
FileInputFormat.setInputPaths(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
jobconf.setMapperClass(DomainCountMapper.class);
jobconf.setReducerClass(DomainCountReducer.class);
jobconf.setCombinerClass(DomainCountReducer.class);
jobconf.setMapOutputKeyClass(Text.class);
jobconf.setMapOutputValueClass(LongWritable.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(LongWritable.class);
// Run job
RunningJob run = JobClient.runJob(jobconf);
run.waitForCompletion();
if (run.isSuccessful()) {
System.out.println("<<<DomainCount Main>>> success.");
} else {
System.out.println("<<<DomainCount Main>>> error.");
}
}
}