引言
DistCp命令是hadoop用户最常使用的命令之一,它位于hadoop tools包中,代码不多,约1300多行,主要用于在两个HDFS集群之间快速拷贝数据。DistCp工具代码结构清晰易懂,通过分析该工具的代码有助于我们更好的理解MR编程框架,并可以对hdfs文件系统有一个初步的了解。
用法
DistCp使用方法如下表所示:
OPTIONS: -p[rbugp] Preserve status r: replication number b: block size u: user g: group p: permission -p alone is equivalent to -prbugp -i Ignore failures -log <logdir> Write logs to <logdir> -m <num_maps> Maximum number of simultaneous copies -overwrite Overwrite destination -update Overwrite if src size different from dst size -f <urilist_uri> Use list at <urilist_uri> as src list -filelimit <n> Limit the total number of files to be <= n -sizelimit <n> Limit the total size to be <= n bytes -delete Delete the files existing in the dst but not in src |
这里-p、-m、-overwrite都是常用参数,大多数情况下我们期望拷贝后数据权限保持一致,通过-p参数来完成权限一致性,拷贝并行度则由-m参数来调节。至于-overwrite往往和-delete合用,用来起到dst和src的一个diff功能。至于-update是很不靠谱的参数,因为只有当源和目标文件的大小不一致时distcp才会覆盖拷贝,如果大小一致,虽然内容不同distcp也依然会跳过这个文件不做拷贝。
源代码与过程分析
DistCp实现了org.apache.hadoop.util.Tool这个接口,这个接口实际只有一个有用的方法声明,即“int run(InputStream in, OutputStream out, OutputStream err,String... arguments);”通过ToolRunner这个类调度运行。
DistCp解析完参数后,首先通过源路径检测并获得文件系统句柄。然后进入setup方法:
private static void setup(Configuration conf, JobConf jobConf,
final Arguments args)
该方法是DistCp做准备
工作的地方,首先是结合一个随机数生成一个工作目录,并将该目录路径作为参数传递给Mapper,在这个目录下会生成两个文件“_distcp_src_files”和“_distcp_dst_files”,这两个文件都是SequenceFile,即Key/Value结构的序列化文件,这里将记录所有需要拷贝的源目录/文件信息列表。其中_distcp_src_files 的key是源文件的size,如果是目录则记录为0,value是自己实现的Writable接口类FilePair,记录目标节点的org.apache.hadoop.fs.FileStatus和路径。_distcp_dst_files的key是目标路径,和节点的FileStatus。这两个文件是DistCp工具的关键点,在setup方法中,DistCp通过递归遍历了要拷贝的所有源头数据列表,生成了这两个文件。
随后,DistCp会以268435456字节(256MB)为切分单位计算map数,这个数值可以通过-sizelimit参数进行人为修改。DistCp构造了自己的InputSplit,将_distcp_src_files文件以刚才所说的值为单位进行切分,如果设定了-m参数,则会按照该参数设定的map数为基准进行切分。这里需要注意切分的map数不会恰好等于-m参数设定的值,由于不能整除的原因,总会或多或少的偏离一点设定值。
map数的确定算法如下:
private static void setMapCount(long totalBytes, JobConf job) throws IOException { int numMaps = (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); numMaps = Math.min(numMaps, job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * new JobClient(job).getClusterStatus().getTaskTrackers())); job.setNumMapTasks(Math.max(numMaps, 1)); } |
这里可以看到,DistCp其实还判断了集群实际tasktracker数量,防止map数设置的太多,导致很多map需要等待一轮轮的调度。
切分代码如下:
SequenceFile.Reader sl = null; try { sl = new SequenceFile.Reader(fs, src, job); for (; sl.next(key, value); last = sl.getPosition()) { // if adding this split would put this split past the target size, // cut the last split and put this next file in the next split. if (acc + key.get() > targetsize && acc != 0) { long splitsize = last - pos; splits.add(new FileSplit(src, pos, splitsize, (String[])null)); cbrem -= splitsize; pos = last; acc = 0L; } acc += key.get(); } } finally { checkAndClose(sl); } |
split之后就进入Mapper执行阶段,map task起来后就会根据自己分配到的那段文件列表来进行点对点的拷贝,拷贝过程会保持Permission、Replication、Block Size的一致性,如果设定了-update则会做一下是否需要update的判断,如果设定了-overwrite则会删除已有的文件。这里Owner信息没有保持一致,而是放到了服务端所有map执行完之后,这一点很让我觉得纠结,为什么不在map里面拷贝完之后直接同步文件Owner呢?如果有哪位大师知道希望可以提点我一下。因为是实现了Tool接口,而Tool接口是继承了Configurable接口的,所以-D指定的值对于DistCp来说也是可以生效的。例如设定“-Ddfs.replication=1”,那么拷贝时目标文件的replication数就将保持为1,这样一来由于目标端需要写入的数据变少了,拷贝速度就可以大大加快,但是不推荐这么做,因为拷贝过程中如果有一台Datanode挂了,那么丢失的数据由于无备份,就将真正丢了,这台机器恢复不了的话,整个distcp过程就会因为丢数据而失败了。
拷贝过程关键代码如下:
FSDataInputStream in = null; FSDataOutputStream out = null; try { // open src file in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath()); reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); // open tmp file out = create(tmpfile, reporter, srcstat); // copy file for(int cbread; (cbread = in.read(buffer)) >= 0; ) { out.write(buffer, 0, cbread); cbcopied += cbread; reporter.setStatus( String.format("%.2f ", cbcopied*100.0/srcstat.getLen()) + absdst + " [ " + StringUtils.humanReadableInt(cbcopied) + " / " + StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); } } finally { checkAndClose(in); checkAndClose(out); } |
Mapper执行完之后,DistCp工具的服务端执行过程就全部完成了,回到客户端还会做一些扫尾的工作,例如同步Owner权限。这里会有一些问题,稍后我们一并分析。
问题分析
DistCp存在三大问题,下面来一一剖析:
1.任务失败,map task报“DFS Read: java.io.IOException: Could not obtain block”
这是由于“_distcp_src_files”这个文件的备份数是系统默认值,例如hadoop-site.xml里面设置了dfs.replication=3,那么_distcp_src_files文件的备份数则创建之后就为3了。当map数非常多,以至于超过了_distcp_src_files文件三个副本所在datanode最大容纳上限的时候,部分map task就会出现获取不了block的问题。对于DistCp来说“-i”参数一般是绝对不能使用的,因为设置了该参数,这个问题就会被掩盖,带来的后果就是拷贝完缺失了部分数据。比较好的做法是在计算了总map数之后,自动增加_distcp_src_files这个文件的备份数,这样一来访问容纳上限也会跟着提高,上述问题就不会再出现了。当前社区已对此有了简单fix,直接将备份数设置成了一个较高的数值。一般说来对于计算资源有限的集群来说,过多的maptask并不会提高拷贝的效率,因此我们可以通过-m参数来设定合理的map数量。一般说来通过观察ganglia,bytes_in、bytes_out达到上限就可以了。
2.Owner同步问题
DistCp工具的提示信息非常少,对于海量数据来说,DistCp初始阶段准备拷贝文件列表和结束阶段设定Owner同步耗时都比较长,但却没有任何提示信息。这是一个很奇怪的地方,拷贝过程中,mapred会打印进度信息到客户端,这时候可以看到百分比,等结束的时候可以看到过程中的一些统计信息。如果你设置了-p参数,此时就会处于一个停滞的状态,没有任何输出了。由于Owner同步没有在map task里面去做,放在客户端就必然成为一个单线程的工作,耗时也会比较长。我以前犯过的错误就是启动distcp后看jobtracker页面出现作业了,就kill了客户端的进程,这样一来就导致Owner不会同步。现在做法都是用“nohup nice -n 0”把进程放到后台让其自动结束。
3.长尾问题
DistCp切分map的时候,充分考虑了每个map需要拷贝的数据量,尽量保持平均,但是却完全没有考虑碎文件和整块文件拷贝耗时不同的问题。此外,某些task所在tasktracker机器由于故障之类原因也会导致性能较差,拖慢了整体节奏。拷贝大量数据的时候总会因为这些原因出现长尾。通过在InputSplit的时候同时考虑数据量和文件个数的均衡可以解决碎文件和整文件拷贝耗时不同的问题。而部分task运行慢的问题,目前看起来则没有很好的解决方案。
用途
DistCp这个工具不仅可以用来做数据拷贝迁移工作,同时也是一个很好的制造集群负载的工具。用来模拟一定压力下的集成测试是非常有效的。在跨机房项目中,我们使用该工具负载两个机房之间的带宽,通过控制同时工作map数来调整带宽的增减是非常有效的。拓展该工具的代码思路,我们在跨机房项目中制作出来的很多压力测试、性能测试工具也都发挥了作用。下面简单用一幅流程图来说明一下distcp工具的思想:
总结
DistCp工具是一个非常易于使用的拷贝工具,在Hadoop生态圈众多怪兽级应用中,DistCp的代码是优美且短小精悍的。也因为其代码易读性非常好,因此作为MR编程框架的入门教材也非常适合。小心的使用这个工具,我们可以在很多测试场景下模拟真实的线上情况。因此建议每位刚入Hadoop门的码农都能钻研一下DistCp的源码,增加对MR编程框架和HDFS文件系统的深入理解。