so true

心怀未来,开创未来!
随笔 - 160, 文章 - 0, 评论 - 40, 引用 - 0
数据加载中……

InputFormat学习

InputFormat,就是为了能够从一个jobconf中得到一个split集合(InputSplit[]),然后再为这个split集合配上一个合适的RecordReader(getRecordReader)来读取每个split中的数据。

InputSplit,继承自Writable接口,因此一个InputSplit实则包含了四个接口函数,读和写(readFields和write),getLength能够给出这个split中所记录的数据大小,getLocations能够得到这个split位于哪些主机之上(blkLocations[blkIndex].getHosts()),这里需要说明的是一个block要么对应一个split,要么对应多个split,因此每个split都可以从它所属的block中获取主机信息,而且我猜测block的大小应该是split的整数倍,否则有可能一个split跨越两个block。

对于RecordReader,其实这个接口主要就是为了维护一组<K,V>键值对,任何一个实现了该接口的类的构造函数都需要是“(Configuration conf, Class< ? extends InputSplit> split)”的形式,因为一个RecordReader是有针对性的,就是针对某种split来进行的,因此必须得与某种split绑定起来。这个接口中最重要的方法就是next,在利用next进行读取K和V时,需要先通过createKey和createValue来创建K和V的对象,然后再传给next作为参数,使得next对形参中的数据成员进行修改。

一个file(FileStatus)分成多个block存储(BlockLocation[]),每个block都有固定的大小(file.getBlockSize()),然后计算出每个split所需的大小(computeSplitSize(goalSize, minSize, blockSize)),然后将长度为length(file.getLen())的file分割为多个split,最后一个不足一个split大小的部分单独为其分配一个split,最后返回这个file分割的最终结果(return splits.toArray(new FileSplit[splits.size()]))。

一个job,会得到输入的文件路径(conf.get("mapred.input.dir", "")),然后据此可以得到一个Path[],对于每个Path,都可以得到一个fs(FileSystem fs = p.getFileSystem(job)),然后再得到一个FileStatus[](FileStatus[] matches = fs.globStatus(p, inputFilter)),再把里面的每个FileStatus拿出来,判断其是否为dir,如果是的话就FileStatus stat:fs.listStatus(globStat.getPath(), inputFilter),然后再将stat加入到最终的结果集中result;如果是文件的话,那就直接加入到结果集中。说得简洁一些,就是一个job会得到input.dir中的所有文件,每个文件都用FileStatus来记录。

MultiFileSplit的官方描述是“A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit class does not represent a split of a file, but a split of input files into smaller sets. The atomic unit of split is a file.”,一个MultiFileSplit中含有多个小文件,每个文件应该只隶属于一个block,然后getLocations就返回所有小文件对应的block的getHosts;getLength返回所有文件的大小总和。

对于MultiFileInputFormat,它的getSplits返回的是一个MultiFileSplit的集合,也就是一个个的小文件簇,举个简单的例子就会很清楚了:假定这个job中有5个小文件,大小分别为2,3,5,1,4;假定我们期望split的总数目为3的话,先算出个double avgLengthPerSplit = ((double)totLength) / numSplits,结果应该为5;然后再切分,因此得到的三个文件簇为:{文件1和2}、{文件3}、{文件4和5}。如果这五个文件的大小分别为2,5,3,1,4;那么应该得到四个文件簇为:{文件1}、{文件2}、{文件3和4}、{文件5}。此外,这个类的getRecordReader依然是个abstract的方法,因此其子类必须得实现这个函数。

posted on 2009-01-07 09:40 so true 阅读(1644) 评论(0)  编辑  收藏 所属分类: Hadoop


只有注册用户登录后才能发表评论。


网站导航: