MapreduceRead.java
package com.cloudputing.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class MapreduceRead {
public static void read() throws IOException, InterruptedException, ClassNotFoundException
{
// Add these statements. XXX
// File jarFile = EJob.createTempJar("target/classes");
// EJob.addClasspath("D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/src/main/resources");
// ClassLoader classLoader = EJob.getClassLoader();
// Thread.currentThread().setContextClassLoader(classLoader);
Configuration config = HBaseConfiguration.create();
addTmpJar("file:/D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/target/bigdata-1.0.jar",config);
Job job = new Job(config, "ExampleRead");
// And add this statement. XXX
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// TableMapReduceUtil.addDependencyJars(job);
// TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
// MapreduceRead.class,MyMapper.class);
job.setJarByClass(MapreduceRead.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
"wiki", // input HBase table name
scan, // Scan instance to control CF and attribute selection
MapreduceRead.MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
// DistributedCache.addFileToClassPath(new Path("hdfs://node.tracker1:9000/user/root/lib/stat-analysis-mapred-1.0-SNAPSHOT.jar"),job.getConfiguration());
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
/**
* 为Mapreduce添加第三方jar包
*
* @param jarPath
* 举例:D:/Java/new_java_workspace/scm/lib/guava-r08.jar
* @param conf
* @throws IOException
*/
public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
System.setProperty("path.separator", ":");
FileSystem fs = FileSystem.getLocal(conf);
String newJarPath = new Path(jarPath).makeQualified(fs).toString();
String tmpjars = conf.get("tmpjars");
if (tmpjars == null || tmpjars.length() == 0) {
conf.set("tmpjars", newJarPath);
} else {
conf.set("tmpjars", tmpjars + ":" + newJarPath);
}
}
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value,
Context context) throws InterruptedException, IOException {
String val1 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual1")));
String val2 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual2")));
System.out.println(val1 + " -- " + val2);
}
private String getValue(byte [] value)
{
return value == null? "null" : new String(value);
}
}
}