某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。
显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近 6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控 制map数,而且换个数据源又得重新配置参数。
第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle 过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即 使通过 mapred.reduce.parallel.copies 参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。
最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。
于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat, 它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过 mapred.max.split.size 参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的 map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。
看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。
第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。
实现代码如下:
CustomCombineSequenceFileInputFormat.java
import java.io.IOException; |
import org.apache.hadoop.classification.InterfaceAudience; |
import org.apache.hadoop.classification.InterfaceStability; |
import org.apache.hadoop.mapreduce.InputSplit; |
import org.apache.hadoop.mapreduce.RecordReader; |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
* Input format that is a <code>CombineFileInputFormat</code>-equivalent for |
* <code>SequenceFileInputFormat</code>. |
* @see CombineFileInputFormat |
@InterfaceAudience .Public |
@InterfaceStability .Stable |
public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> { |
@SuppressWarnings ({ "rawtypes" , "unchecked" }) |
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) |
return new CombineFileRecordReader((CombineFileSplit) split, context, |
SequenceFileRecordReaderWrapper. class ); |
* A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be |
* used in a <code>CombineFileInputFormat</code>-equivalent for |
* <code>SequenceFileInputFormat</code>. |
* @see CombineFileRecordReader |
* @see CombineFileInputFormat |
* @see SequenceFileInputFormat |
private static class SequenceFileRecordReaderWrapper<K, V> |
extends CombineFileRecordReaderWrapper<K, V> { |
// this constructor signature is required by CombineFileRecordReader |
public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, |
Integer idx) throws IOException, InterruptedException { |
super ( new SequenceFileInputFormat<K, V>(), split, context, idx); |
MultiFileInputFormat.java
import java.io.IOException; |
import java.util.ArrayList; |
import org.apache.commons.logging.Log; |
import org.apache.commons.logging.LogFactory; |
import org.apache.hadoop.fs.FileStatus; |
import org.apache.hadoop.fs.Path; |
import org.apache.hadoop.mapreduce.InputSplit; |
import org.apache.hadoop.mapreduce.Job; |
import org.apache.hadoop.mapreduce.JobContext; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; |
* multiple files can be combined in one InputSplit so that InputSplit number can be limited! |
public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { |
private static final Log LOG = LogFactory.getLog(MultiFileInputFormat. class ); |
public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num" ; |
public static final Integer DEFAULT_MAX_SPLIT_NUM = 50 ; |
public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) { |
job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum); |
public List<InputSplit> getSplits(JobContext job) throws IOException { |
// get all the files in input path |
List<FileStatus> stats = listStatus(job); |
List<InputSplit> splits = new ArrayList<InputSplit>(); |
for (FileStatus stat : stats) { |
totalLen += stat.getLen(); |
int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM); |
int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size(); |
long averageLen = totalLen / expectSplitNum; |
LOG.info( "Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen |
+ ") expectSplitNum(" + expectSplitNum + ") " ); |
List<Path> pathLst = new ArrayList<Path>(); |
List<Long> offsetLst = new ArrayList<Long>(); |
List<Long> lengthLst = new ArrayList<Long>(); |
for ( int i = 0 ; i < stats.size(); i++) { |
FileStatus stat = stats.get(i); |
pathLst.add(stat.getPath()); |
lengthLst.add(stat.getLen()); |
currentLen += stat.getLen(); |
if (splits.size() < expectSplitNum - 1 && currentLen > averageLen) { |
Path[] pathArray = new Path[pathLst.size()]; |
CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray), |
getLongArray(offsetLst), getLongArray(lengthLst), new String[ 0 ]); |
LOG.info( "combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size() |
+ ") length(" + currentLen + ")" ); |
if (pathLst.size() > 0 ) { |
Path[] pathArray = new Path[pathLst.size()]; |
CombineFileSplit thissplit = |
new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst), |
getLongArray(lengthLst), new String[ 0 ]); |
LOG.info( "combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size() |
+ ") length(" + currentLen + ")" ); |
private long [] getLongArray(List<Long> lst) { |
long [] rst = new long [lst.size()]; |
for ( int i = 0 ; i < lst.size(); i++) { |
通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。