posts - 495,comments - 227,trackbacks - 0
http://www.rigongyizu.com/hadoop-job-optimize-combinefileinputformat/

某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。

显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近 6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控 制map数,而且换个数据源又得重新配置参数。

第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle 过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即 使通过 mapred.reduce.parallel.copies 参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。

最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。

于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat, 它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过 mapred.max.split.size 参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的 map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。

看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。

第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。

实现代码如下:
CustomCombineSequenceFileInputFormat.java

import java.io.IOException;
 
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
/**
 * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
 * <code>SequenceFileInputFormat</code>.
 *
 * @see CombineFileInputFormat
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> {
    @SuppressWarnings({"rawtypes", "unchecked"})
    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException {
        return new CombineFileRecordReader((CombineFileSplit) split, context,
                SequenceFileRecordReaderWrapper.class);
    }
 
    /**
     * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be
     * used in a <code>CombineFileInputFormat</code>-equivalent for
     * <code>SequenceFileInputFormat</code>.
     *
     * @see CombineFileRecordReader
     * @see CombineFileInputFormat
     * @see SequenceFileInputFormat
     */
    private static class SequenceFileRecordReaderWrapper<K, V>
            extends CombineFileRecordReaderWrapper<K, V> {
        // this constructor signature is required by CombineFileRecordReader
        public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
                Integer idx) throws IOException, InterruptedException {
            super(new SequenceFileInputFormat<K, V>(), split, context, idx);
        }
    }
}

MultiFileInputFormat.java

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 
/**
 * multiple files can be combined in one InputSplit so that InputSplit number can be limited!
 */
public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
 
    private static final Log LOG = LogFactory.getLog(MultiFileInputFormat.class);
    public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num";
    public static final Integer DEFAULT_MAX_SPLIT_NUM = 50;
 
    public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) {
        job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum);
    }
 
    @Override
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // get all the files in input path
        List<FileStatus> stats = listStatus(job);
        List<InputSplit> splits = new ArrayList<InputSplit>();
        if (stats.size() == 0) {
            return splits;
        }
        // 计算split的平均长度
        long totalLen = 0;
        for (FileStatus stat : stats) {
            totalLen += stat.getLen();
        }
        int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM);
        int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size();
        long averageLen = totalLen / expectSplitNum;
        LOG.info("Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen
                + ") expectSplitNum(" + expectSplitNum + ") ");
        // 设置inputSplit
        List<Path> pathLst = new ArrayList<Path>();
        List<Long> offsetLst = new ArrayList<Long>();
        List<Long> lengthLst = new ArrayList<Long>();
        long currentLen = 0;
        for (int i = 0; i < stats.size(); i++) {
            FileStatus stat = stats.get(i);
            pathLst.add(stat.getPath());
            offsetLst.add(0L);
            lengthLst.add(stat.getLen());
            currentLen += stat.getLen();
            if (splits.size() < expectSplitNum - 1   && currentLen > averageLen) {
                Path[] pathArray = new Path[pathLst.size()];
                CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                    getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
                LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                        + ") length(" + currentLen + ")");
                splits.add(thissplit);
                //
                pathLst.clear();
                offsetLst.clear();
                lengthLst.clear();
                currentLen = 0;
            }
        }
        if (pathLst.size() > 0) {
            Path[] pathArray = new Path[pathLst.size()];
            CombineFileSplit thissplit =
                    new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst),
                            getLongArray(lengthLst), new String[0]);
            LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                    + ") length(" + currentLen + ")");
            splits.add(thissplit);
        }
        return splits;
    }
 
    private long[] getLongArray(List<Long> lst) {
        long[] rst = new long[lst.size()];
        for (int i = 0; i < lst.size(); i++) {
            rst[i] = lst.get(i);
        }
        return rst;
    }
}

通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。

posted on 2014-09-16 09:25 SIMONE 阅读(679) 评论(1)  编辑  收藏 所属分类: hadoop

FeedBack:
# re: 一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat[未登录]
2014-12-20 11:34 | 哈哈
看了楼主的代码,但是这种做法以文件为单位,一个文件至多分给一个map处理。如果某个目录下有许多小文件, 另外还有一个超大文件, 处理大文件的map会严重偏慢,这个该怎么办呢?  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航:
博客园   IT新闻   Chat2DB   C++博客   博问