统计域名(实际是host)的计数器。
输入:一个文件夹中有一堆的文本文件,内容是一行一个的url,可以想像为数据库中的一条记录
流程:提取url的domain,对domain计数+1
输出:域名,域名计数
代码如下:
Mapper
package com.keseek.hadoop;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;

public class DomainCountMapper implements

Mapper<LongWritable, Text, Text, LongWritable>
{

@Override

public void configure(JobConf arg0)
{
// Init Text and LongWritable
domain = new Text();
one = new LongWritable(1);
}

@Override

public void close() throws IOException
{
// TODO Auto-generated method stub
}

@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)

throws IOException
{
// Get URL
String url = value.toString().trim();

// URL->Domain && Collect
domain.set(ParseDomain(url));

if (domain.getLength() != 0)
{
output.collect(domain, one);
}

}


public String ParseDomain(String url)
{

try
{
URI uri = URI.create(url);
return uri.getHost();

} catch (Exception e)
{
return "";
}
}

// Shared used Text domain
private Text domain;

// One static
private LongWritable one;

}Reducer
package com.keseek.hadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Reducer;

public class DomainCountReducer implements

Reducer<Text, LongWritable, Text, LongWritable>
{

@Override

public void configure(JobConf arg0)
{
// TODO Auto-generated method stub

}

@Override

public void close() throws IOException
{
// TODO Auto-generated method stub

}

@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)

throws IOException
{
// Count the domain
long cnt = 0;

while (values.hasNext())
{
cnt += values.next().get();
}
// Output
output.collect(key, new LongWritable(cnt));
}

}Main
package com.keseek.hadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class DomainCountMain
{

public static void main(String[] args) throws Exception
{
// Param for path

if (args.length != 2)
{
System.out.println("Usage:");
System.out
.println("DomainCountMain.jar <Input_Path> <Outpu_Path>");
System.exit(-1);
}

// Configure JobConf
JobConf jobconf = new JobConf(DomainCountMain.class);

jobconf.setJobName("Domain Counter by Coder4");

FileInputFormat.setInputPaths(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));

jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);

jobconf.setMapperClass(DomainCountMapper.class);
jobconf.setReducerClass(DomainCountReducer.class);
jobconf.setCombinerClass(DomainCountReducer.class);

jobconf.setMapOutputKeyClass(Text.class);
jobconf.setMapOutputValueClass(LongWritable.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(LongWritable.class);

// Run job
RunningJob run = JobClient.runJob(jobconf);
run.waitForCompletion();

if (run.isSuccessful())
{
System.out.println("<<<DomainCount Main>>> success.");

} else
{
System.out.println("<<<DomainCount Main>>> error.");
}
}
}