posts - 28, comments - 37, trackbacks - 0, articles - 0

周末无事,故翻译sheepdog design.

原文地址: https://github.com/collie/sheepdog/wiki/Sheepdog-Design

Sheepdog 设计

Sheeepdog采用完全对称的结构,没有类似元数据服务的中心节点. 这种设计有以下的特点.

1)       性能于容量的线性的可扩展性.

当需要提升性能或容量时,只需向集群中增加新的机器便能使Sheeepdog线性成长.

2)       没有单点故障

即使某台机器发生故障,依然可以通过其他机器访问数据.

3)       容易管理

不需要配置机器角色,当管理员在新增的机器开启Sheepdog守护进程时, Sheepdog会自动检测新加入的机器并配置它成为存储系统中的一员.

结构概述



Sheepdog是一个分布式存储系统.Sheepdog客户端(QEMU的块驱动程序)提供对象存储(类似于简单的键值对). 接下来几章将更加详细的阐述Sheepdog各个部分.

1)       对象存储(Object对象存储(Object Storage)

2)       )

Sheepdog不同于一般的文件系统, Sheepdog进程为QEMU(Sheepdog进程名)创建一个分布式对象存储系统,它可以存储对象”.这里的对象数据大小可变,并有唯一的标识,通过标识可以进行读//创建/删除操作.对象存储组成网关对象管理器”.

3)       网关(getway)

Getway接收QEMU块驱动的I/O请求(对象id,偏移,长度和操作类型),通过一直散列算法获得目标节点,然后转发I/O请求至该节点.

4)       对象管理器(Object manager)

对象管理器接收getway转发过来的I/O请求,然后对磁盘执行读/写操作.

5)       集群管理器(Cluster manager)

集群管理器管理管理节点成员(探测失败/新增节点并报告节点成员的变化),以及一些需要节点一致的操作(vdi 创建, 快照 vdi).当前集群管理器使用corosync集群引擎.

6)       QEMU 块驱动

QEMU块驱动把VM image分为固定大小(默认4M),并通过其getway存储到对象存储中

对象存储(Object Storage)

每个对象使用一个64bit的整数作为全局标识,并在多台机器存有备份,QEMU块驱动并不关心存储的位置,对象存储系统负责管理存储的位置.

对象类型(object types)

Sheepdog的对象分为以下四种:

1)       数据类型(data object)

它包括虚拟磁盘映射的真实数据,虚拟磁盘映射分为固定大小的数据对象, Sheepdog客户端访问这个对象.

2)       vdi object

它包括虚拟磁盘映射的元数据(:映射名,磁盘大小,创建时间,vdi的数据对象ID).

3)       vmstate object

它存储运行中的VM状态映射.管理员通过它获取实时快照信息.

4)       vdi attr object

使用它存储各个vdi的属性,属性为键值对类型,类似于普通文件的扩展信息.

对象ID规则(object ID rules)

1) 0 - 31 (32 bits): 对象类型详细信息

2) 32 - 55 (24 bits): vdi id

3) 56 - 59 ( 4 bits): 预留

4) 60 - 63 ( 4 bits): 对象类型标识符

每个VDI有一个全局唯一的ID(vdi id), 通过VDI名求得的散列值,低三十二位使用如下:

对象类型

32位的作用

数据类型

虚拟磁盘映射的索引号

Vdi对象

未使用(0)

Vm状态对象

Vm状态映射索引

Vdi属性对象

键名的散列值

 

对象格式(object format)

1)       数据对象

虚拟磁盘映射的块

2)       Vdi对象

 

 1  struct sheepdog_inode {
 2      char name[SD_MAX_VDI_LEN];               /* the name of this VDI*/
 3      char tag[SD_MAX_VDI_TAG_LEN];           /* the snapshot tag name */
 4      uint64_t ctime;                                    /* creation time of this VDI */
 5      uint64_t snap_ctime;                            /* the time snapshot is taken */
 6      uint64_t vm_clock_nsec;                       /* vm clock (used for live snapshot) */
 7      uint64_t vdi_size;                                 /* the size of VDI */
 8      uint64_t vm_state_size;                        /* the size of vm state (used for live snapshot) */
 9      uint16_t copy_policy;                           /* reserved */
10      uint8_t  nr_copies;                              /* the number of object redundancy */
11      uint8_t  block_size_shift;                      /* info about the size of the data object */
12      uint32_t snap_id;                                /* the snapshot id */
13      uint32_t vdi_id;                                  /* the vdi id */
14      uint32_t parent_vdi_id;                        /* the parent snapshot vdi id of this VDI */
15      uint32_t child_vdi_id[MAX_CHILDREN];    /* the children VDIs of this VDI */
16      uint32_t data_vdi_id[MAX_DATA_OBJS];   /* the data object IDs this VDI contains*/
17  };

3)       Vm状态对象

Vm状态映射块

4)       Vdi属性对象

SD_MAX_VDI_ATTR_KEY_LEN(256)为属性的键名,余下的是属性指.

只读/可写对象(read-only/writable objects)

从如何访问对象的角度,我们还可以把Sheepdog对象分为以下两类.

1)       只读对象(:VDI快照数据对象)

只允许一个VM对其读写,其他vm无法访问

2)       可写对象

不允许任何VM对其写,所有VM都可读

其他功能(other features)

Sheepdog对象存储可接收正在写时复制(copy-on-write)的请求.当一个客户端发送一个创建和写的请求时,同时可以指定基本对象(CoW操作的来源),这用于快照和克隆操作.

网关(Gateway)

对象存在哪(where to store objects)

Sheepdog使用一致性哈希算法决定存放对象的位置,一致性哈希算法提供哈希表,而且增加或介绍节点不回显著的改变对象映射,通过哈希表能使I/O负载均衡.

副本(replication)

Sheepdog的数据副本很简单,我们假设只有一个写,所以写冲突不会发生,客户端可以并行发生请求到目标节点,发生读请求到一个目标节点如果客户端自己处理I/O请求顺序.

I/O(write I/O flow)

Getway使用一致性哈希算法计算目标节点并发送写请求到所有目标节点,只有所有副本均更新成功写请求才会成功,这是因为如果一个副本未更新,getway有可能从未更新的节点读取旧数据.

I/O(read I/O flow)

Getway使用一致性哈希算法计算目标节点,并发送读请求到一个目标节点.

1)       修复对象一致性

当某节点在转发I/O请求时crash,有可能破坏副本的一致性,所以当getway第一次读取对象时会试图修复其一致性,从一节点读取整个对象并用它覆盖所有副本.

重试I/O请求(retrying I/O requests)

Sheepdog存储所有成员节点的历史信息,我们把历史版本号叫做”epoch”(详见章节对象恢复’). 如果getway转发I/O请求至目标节点并且getway与目标节点epoch号不相符,I/O请求失败且getway重试请求直到epcho号相符,这就需要保持副本强一致性.

I/O重试也可能发生在目标节点挂了导致无法完成I/O操作.

对象管理器(Object Manager)

对象管理器把对象存储到本地磁盘,当前把每个对象存储为一个文件,这中方法简单.我们也可以使用DBMS(: BerkeleyDB, Tokyo Cabinet) 作为对象存储器,但还不支持.

路径命名规则(path name rule)

对象存储成如下路径:

        /store_dir/obj/[epoch number]/[object ID]

所有对象文件有一个扩展属性: 副本数(sheepdog.copies),

写日志(write journaling)

sheep进程在写操作过程中失败,对象有可能至少部分更新,一般情况这不会有问题,因为如果VM未接收成功消息,不保证所写部分的内容.然而对于vdi对象,我们必须整体更新或整体未更新,因为如果vdi对象只是部分更新,VDI的元数据有可能被破坏. 为例防止这个问题,我们使用日志记录对vdi对象的写操作. 日志过程很简单:

1)       创建日志文件"/store_dir/journal/[epoch]/[vdi object id]"

2)       首先写数据到日志文件

3)       写一个数据到vdi对象

4)       删除日志文件

集群管理器(Cluster Manager)

大多情况, Sheepdo客户端单独访问它们的映射因为我们不允许两个客户端同时访问一个映射,但是某些VDI操作(:克隆VDI,创建VDI)必须做,因为这些操作更新全局信息,我们使用Corosync集群引擎完成而不是中心服务器.

我们将扩展Sheepdog以支持其他集群管理系统.

本章正在编辑

QEMU 块驱动(QEMU Block Driver)

Sheepdog卷被分为4M的数据对象,刚创建的对象未分配,也就是说,只有写对象被分配.

Open

首先QEMU块驱动通过getwaybdrv_open()从对象存储读取vdi

/(read/write)

块驱动通过请求的部分偏移量和大小计算数据对象id, 并向getway发送请求. 当块驱动发送写请求到那些不属于其当前vdi的数据对象是,块驱动发送CoW请求分配一个新的数据对象.

写入快照vdi(write to snapshot vdi)

我们可以把快照VDI附加到QEMU, 当块驱动第一次发送写请求到快照VDI, 块驱动创建一个新的可写VDI作为子快照,并发送请求到新的VDI.

VDI操作(VDI Operations)

查找(lookup)

当查找VDI对象时:

1)       通过求vdi名的哈希值得到vdi id

2)       通过vdi id计算di对象

3)       发送读请求到vdi对象

4)       如果此vdi不是请求的那个,增加vdi id并重试发送读请求

快照,克隆(snapshot, cloning)

快照可克隆操作很简单,

1)       读目标VDI

2)       创建一个与目标一样的新VDI

3)       把新vdi‘'parent_vdi_id''设为目标VDIid

4)       设置目标vdi''child_vdi_id''为新vdiid.

5)       设置目标vdi''snap_ctime''为当前时间, vdi变为当前vdi对象

删除(delete)

TODO:当前,回收未使用的数据对象是不会被执行,直到所有相关VDI对象(相关的快照VDI和克隆VDI)被删除.

所有相关VDI被删除后, Sheepdog删除所有此VDI的数据对象,设置此VDI对象名为空字符串.

对象恢复(Object Recovery)

epoch

Sheepdog把成员节点历史存储在存储路径, 路径名如下:

        /store_dir/epoch/[epoch number]

每个文件包括节点在epoch的列表信息(IP地址,端口,虚拟节点个数).

恢复过程(recovery process)

1)       从所有节点接收存储对象ID

2)       计算选择那个对象

3)       创建对象ID list文件"/store_dir/obj/[the current epoch]/list"

4)       发送一个读请求以获取id存在于list文件的对象. 这个请求被发送到包含前一次epoch的对象的节点.( The requests are sent to the node which had the object at the previous epoch.)

5)       把对象存到当前epoch路径.

冲突的I/O(conflicts I/Os)

如果QEMU发送I/O请求到某些未恢复的对象, Sheepdog阻塞此请求并优先恢复对象.

协议(Protocol)

Sheepdog的所有请求包含固定大小的头部(48)和固定大小的数据部分,头部包括协议版本,操作码,epoch,数据长度等.

between sheep and QEMU

操作码

描述

SD_OP_CREATE_AND_WRITE_OBJ

发送请求以创建新对象并写入数据,如果对象存在,操作失败

SD_OP_READ_OBJ

读取对象中的数据

SD_OP_WRITE_OBJ

向对象写入数据,如果对象不存在,失败

SD_OP_NEW_VDI

发送vdi名到对象存储并创建新vdi对象, 返回应答vdi的唯一的vdi id

SD_OP_LOCK_VDI

SD_OP_GET_VDI_INFO相同

SD_OP_RELEASE_VDI

未使用

SD_OP_GET_VDI_INFO

获取vdi信息(:vdi id)

SD_OP_READ_VDIS

获取已经使用的vdi id

between sheep and collie

操作码

描述

SD_OP_DEL_VDI

删除VDI

SD_OP_GET_NODE_LIST

获取sheepdog的节点列表

SD_OP_GET_VM_LIST

未使用

SD_OP_MAKE_FS

创建sheepdog集群

SD_OP_SHUTDOWN

停止sheepdog集群

SD_OP_STAT_SHEEP

获取本地磁盘使用量

SD_OP_STAT_CLUSTER

获取sheepdog集群信息

SD_OP_KILL_NODE

退出sheep守护进程

SD_OP_GET_VDI_ATTR

获取vdi属性对象id

between sheeps

操作码

描述

SD_OP_REMOVE_OBJ

删除对象

SD_OP_GET_OBJ_LIST

获取对象id列表,并存储到目标节点

 

posted @ 2011-08-28 17:02 俞灵 阅读(3278) | 评论 (0)编辑 收藏

Ant是什么?
Ant是一种基于Java和XML的build工具.
1 编写build.xml

Ant的buildfile是用XML写的.每个buildfile含有一个project.

buildfile中每个task元素可以有一个id属性,可以用这个id值引用指定的任务.这个值是唯一的.(详情请参考下面的Task小节)

1.1 Projects

project有下面的属性:
Attribute Description Required
name 项目名称. No
default 当没有指定target时使用的缺省target Yes
basedir 用于计算所有其他路径的基路径.该属性可以被basedir property覆盖.当覆盖时,该属性被忽略.如果属性和basedir property都没有设定,就使用buildfile文件的父目录. No


项目的描述以一个顶级的<description>元素的形式出现(参看description小节).

一个项目可以定义一个或多个target.一个target是一系列你想要执行的.执行Ant时,你可以选择执行那个target.当没有给定target时,使用project的default属性所确定的target.

1.2 Targets

一个target可以依赖于其他的target.例如,你可能会有一个target用于编译程序,一个target用于生成可执行文件.你在生成可执行文件之前先编译通过,生成可执行文件的target依赖于编译target.Ant会处理这种依赖关系.

然而,应当注意到,Ant的depends属性只指定了target应该被执行的顺序-如果被依赖的target无法运行,这种depends对于指定了依赖关系的target就没有影响.

Ant会依照depends属性中target出现的顺序(从左到右)依次执行每个target.然而,要记住的是只要某个target依赖于一个target,后者就会被先执行.
<target name="A"/>
<target name="B" depends="A"/>
<target name="C" depends="B"/>
<target name="D" depends="C,B,A"/>
假定我们要执行target D.从它的依赖属性来看,你可能认为先执行C,然后B,A被执行.错了,C依赖于B,B依赖于A,先执行A,然后B,然后C,D被执行.

一个target只能被执行一次,即时有多个target依赖于它(看上面的例子).

如 果(或如果不)某些属性被设定,才执行某个target.这样,允许根据系统的状态(java version, OS, 命令行属性定义等等)来更好地控制build的过程.要想让一个target这样做,你就应该在target元素中,加入if(或unless)属性,带 上target因该有所判断的属性.例如:
<target name="build-module-A" if="module-A-present"/>
<target name="build-own-fake-module-A" unless="module-A-present"/>
如果没有if或unless属性,target总会被执行.

可选的description属性可用来提供关于target的一行描述,这些描述可由-projecthelp命令行选项输出.

将你的tstamp task在一个所谓的初始化target是很好的做法,其他的target依赖这个初始化target.要确保初始化target是出现在其他target依赖表中的第一个target.在本手册中大多数的初始化target的名字是"init".

target有下面的属性:
Attribute Description Required
name target的名字 Yes
depends 用逗号分隔的target的名字列表,也就是依赖表. No
if 执行target所需要设定的属性名. No
unless 执行target需要清除设定的属性名. No
description 关于target功能的简短描述. No


1.3 Tasks

一个task是一段可执行的代码.

一个task可以有多个属性(如果你愿意的话,可以将其称之为变量).属性只可能包含对property的引用.这些引用会在task执行前被解析.

下面是Task的一般构造形式:
<name attribute1="value1" attribute2="value2" ... />
这里name是task的名字,attributeN是属性名,valueN是属性值.

有一套内置的(built-in)task,以及一些可选task,但你也可以编写自己的task.

所有的task都有一个task名字属性.Ant用属性值来产生日志信息.

可以给task赋一个id属性:
<taskname id="taskID" ... />
这里taskname是task的名字,而taskID是这个task的唯一标识符.通过这个标识符,你可以在脚本中引用相应的task.例如,在脚本中你可以这样:
<script ... >
task1.setFoo("bar");
</script>
设定某个task实例的foo属性.在另一个task中(用java编写),你可以利用下面的语句存取相应的实例.
project.getReference("task1").
注意1:如果task1还没有运行,就不会被生效(例如:不设定属性),如果你在随后配置它,你所作的一切都会被覆盖.

注意2:未来的Ant版本可能不会兼容这里所提的属性,很有可能根本没有task实例,只有proxies.

1.4 Properties

一个project可以有很多的properties.可以在buildfile中用 property task来设定,或在Ant之外设定.一个property有一个名字和一个值.property可用于task的属性值.这是通过将属性名放在"${" 和"}"之间并放在属性值的位置来实现的.例如如果有一个property builddir的值是"build",这个property就可用于属性值:${builddir}/classes.这个值就可被解析为build /classes.

内置属性

如果你使用了<property> task 定义了所有的系统属性,Ant允许你使用这些属性.例如,${os.name}对应操作系统的名字.

要想得到系统属性的列表可参考the Javadoc of System.getProperties.

除了Java的系统属性,Ant还定义了一些自己的内置属性:
basedir project基目录的绝对路径 (与<project>的basedir属性一样).
ant.file buildfile的绝对路径.
ant.version Ant的版本.
ant.project.name 当前执行的project的名字;由<project>的name属性设定.
ant.java.version Ant检测到的JVM的版本; 目前的值有"1.1", "1.2", "1.3" and "1.4".

例子
<project name="MyProject" default="dist" basedir=".">

<!-- set global properties for this build -->
<property name="src" value="."/>
<property name="build" value="build"/>
<property name="dist" value="dist"/>

<target name="init">
<!-- Create the time stamp -->
<tstamp/>
<!-- Create the build directory structure used by compile -->
<mkdir dir="${build}"/>
</target>

<target name="compile" depends="init">
<!-- Compile the java code from ${src} into ${build} -->
<javac srcdir="${src}" destdir="${build}"/>
</target>

<target name="dist" depends="compile">
<!-- Create the distribution directory -->
<mkdir dir="${dist}/lib"/>
<!-- Put everything in ${build} into the MyProject-${DSTAMP}.jar file -->
<jar jarfile="${dist}/lib/MyProject-${DSTAMP}.jar" basedir="${build}"/>
</target>

<target name="clean">
<!-- Delete the ${build} and ${dist} directory trees -->
<delete dir="${build}"/>
<delete dir="${dist}"/>
</target>

</project>

1.5 Path-like Structures
你可以用":"和";"作为分隔符,指定类似PATH和CLASSPATH的引用.Ant会把分隔符转换为当前系统所用的分隔符.

当需要指定类似路径的值时,可以使用嵌套元素.一般的形式是
<classpath>
<pathelement path="${classpath}"/>
<pathelement location="lib/helper.jar"/>
</classpath>
location属性指定了相对于project基目录的一个文件和目录,而path属性接受逗号或分号分隔的一个位置列表.path属性一般用作预定义的路径--其他情况下,应该用多个location属性.

为简洁起见,classpath标签支持自己的path和location属性.
<classpath>
<pathelement path="${classpath}"/>
</classpath>
可以被简写作:
<classpath path="${classpath}"/>
也可通过<fileset>元素指定路径.构成一个fileset的多个文件加入path-like structure的顺序是未定的.
<classpath>
<pathelement path="${classpath}"/>
<fileset dir="lib">
<include name="**/*.jar"/>
</fileset>
<pathelement location="classes"/>
</classpath>
上面的例子构造了一个路径值包括:${classpath}的路径,跟着lib目录下的所有jar文件,接着是classes目录.

如果你想在多个task中使用相同的path-like structure,你可以用<path>元素定义他们(与target同级),然后通过id属性引用--参考Referencs例子.

path-like structure可能包括对另一个path-like structurede的引用(通过嵌套<path>元素):
<path id="base.path">
<pathelement path="${classpath}"/>
<fileset dir="lib">
<include name="**/*.jar"/>
</fileset>
<pathelement location="classes"/>
</path>
<path id="tests.path">
<path refid="base.path"/>
<pathelement location="testclasses"/>
</path>
前面所提的关于<classpath>的简洁写法对于<path>也是有效的,如:
<path id="tests.path">
<path refid="base.path"/>
<pathelement location="testclasses"/>
</path>
可写成:
<path id="base.path" path="${classpath}"/>
命令行变量

有些task可接受参数,并将其传递给另一个进程.为了能在变量中包含空格字符,可使用嵌套的arg元素.
Attribute Description Required
value 一个命令行变量;可包含空格字符. 只能用一个
line 空格分隔的命令行变量列表.
file 作为命令行变量的文件名;会被文件的绝对名替代.
path 一个作为单个命令行变量的path-like的字符串;或作为分隔符,Ant会将其转变为特定平台的分隔符.

例子
<arg value="-l -a"/>
是一个含有空格的单个的命令行变量.
<arg line="-l -a"/>
是两个空格分隔的命令行变量.
<arg path="/dir;/dir2:dir3"/>
是一个命令行变量,其值在DOS系统上为dir;dir2;dir3;在Unix系统上为/dir:/dir2:/dir3 .

References

buildfile元素的id属性可用来引用这些元素.如果你需要一遍遍的复制相同的XML代码块,这一属性就很有用--如多次使用<classpath>结构.

下面的例子:
<project ... >
<target ... >
<rmic ...>
<classpath>
<pathelement location="lib/"/>
<pathelement path="${java.class.path}/"/>
<pathelement path="${additional.path}"/>
</classpath>
</rmic>
</target>
<target ... >
<javac ...>
<classpath>
<pathelement location="lib/"/>
<pathelement path="${java.class.path}/"/>
<pathelement path="${additional.path}"/>

</classpath>
</javac>
</target>
</project>
可以写成如下形式:
<project ... >
<path id="project.class.path">
<pathelement location="lib/"/>
<pathelement path="${java.class.path}/"/>
<pathelement path="${additional.path}"/>
</path>
<target ... >
<rmic ...>
<classpath refid="project.class.path"/>
</rmic>
</target>
<target ... >
<javac ...>
<classpath refid="project.class.path"/>
</javac>
</target>
</project>
所有使用PatternSets, FileSets 或 path-like structures嵌套元素的task也接受这种类型的引用.

转自:
http://www.linux521.com/2009/java/200904/1760.html

posted @ 2011-08-07 11:46 俞灵 阅读(1143) | 评论 (0)编辑 收藏

最近看了hadoop进度显示部分代码,做了个ppt算是一个总结吧.
/Files/shenh062326/hadoop进度计算.ppt

posted @ 2011-07-03 09:44 俞灵 阅读(320) | 评论 (0)编辑 收藏

本文转自 http://coolshell.cn/articles/3463.html

对于SQL的Join,在学习起来可能是比较乱的。我们知道,SQL的Join语法有很多inner的,有outer的,有left的,有时候,对于Select出来的结果集是什么样子有点不是很清楚。Coding Horror上有一篇文章(实在不清楚为什么Coding Horror也被墙)通过 文氏图 Venn diagrams 解释了SQL的Join。我觉得清楚易懂,转过来。

假设我们有两张表。

  • Table A 是左边的表。
  • Table B 是右边的表。

其各有四条记录,其中有两条记录是相同的,如下所示:

id name       id  name
-- ----       --  ----
1  Pirate     1   Rutabaga
2  Monkey     2   Pirate
3  Ninja      3   Darth Vader
4  Spaghetti  4   Ninja

下面让我们来看看不同的Join会产生什么样的结果。

SELECT * FROM TableA
INNER JOIN TableB
ON TableA.name = TableB.name

id  name       id   name
--  ----       --   ----
1   Pirate     2    Pirate
3   Ninja      4    Ninja

Inner join
产生的结果集中,是A和B的交集。

Venn diagram of SQL inner join
SELECT * FROM TableA
FULL OUTER JOIN TableB
ON TableA.name = TableB.name

id    name       id    name
--    ----       --    ----
1     Pirate     2     Pirate
2     Monkey     null  null
3     Ninja      4     Ninja
4     Spaghetti  null  null
null  null       1     Rutabaga
null  null       3     Darth Vader

Full outer join 产生A和B的并集。但是需要注意的是,对于没有匹配的记录,则会以null做为值。

Venn diagram of SQL cartesian join
SELECT * FROM TableA
LEFT OUTER JOIN TableB
ON TableA.name = TableB.name

id  name       id    name
--  ----       --    ----
1   Pirate     2     Pirate
2   Monkey     null  null
3   Ninja      4     Ninja
4   Spaghetti  null  null

Left outer join 产生表A的完全集,而B表中匹配的则有值,没有匹配的则以null值取代。

Venn diagram of SQL left join
SELECT * FROM TableA
LEFT OUTER JOIN TableB
ON TableA.name = TableB.name
WHERE TableB.id IS null 

id  name       id     name
--  ----       --     ----
2   Monkey     null   null
4   Spaghetti  null   null

产生在A表中有而在B表中没有的集合。

join-left-outer.png
SELECT * FROM TableA
FULL OUTER JOIN TableB
ON TableA.name = TableB.name
WHERE TableA.id IS null
OR TableB.id IS null

id    name       id    name
--    ----       --    ----
2     Monkey     null  null
4     Spaghetti  null  null
null  null       1     Rutabaga
null  null       3     Darth Vader

产生A表和B表都没有出现的数据集。

join-outer.png

还需要注册的是我们还有一个是“交差集” cross join, 这种Join没有办法用文式图表示,因为其就是把表A和表B的数据进行一个N*M的组合,即笛卡尔积。表达式如下:

SELECT * FROM TableA
CROSS JOIN TableB

这个笛卡尔乘积会产生 4 x 4 = 16 条记录,一般来说,我们很少用到这个语法。但是我们得小心,如果不是使用嵌套的select语句,一般系统都会产生笛卡尔乘积然再做过滤。这是对于性能来说是非常危险的,尤其是表很大的时候。

posted @ 2011-07-01 15:06 俞灵 阅读(299) | 评论 (0)编辑 收藏

简单的jar使用方法
JAR files are packaged with the ZIP file format, so you can use them for "ZIP-like" tasks such as lossless data compression, archiving, decompression, and archive unpacking. These are among the most common uses of JAR files, and you can realize many JAR file benefits using only these basic features.

Even if you want to take advantage of advanced functionality provided by the JAR file format such as electronic signing, you'll first need to become familiar with the fundamental operations.

To perform basic tasks with JAR files, you use the Java Archive Tool provided as part of the Java Development Kit. Because the Java Archive tool is invoked by using the jar command, for convenience we'll call it the "Jar tool".

As a synopsis and preview of some of the topics to be covered in this lesson, the following table summarizes common JAR-file operations:

OperationCommand
To create a JAR filejar cf jar-file input-file(s)
To view the contents of a JAR filejar tf jar-file
To extract the contents of a JAR filejar xf jar-file
To extract specific files from a JAR filejar xf jar-file archived-file(s)
To run an application packaged as a JAR file 
(version 1.1)
jre -cp app.jar MainClass
To run an application packaged as a JAR file 
(version 1.2 -- requires Main-Class
manifest header)
java -jar app.jar
To invoke an applet packaged as a JAR file
<applet code=AppletClassName.class
        archive="JarFileName.jar"
        width=width height=height>
</applet>

posted @ 2011-07-01 11:18 俞灵 阅读(754) | 评论 (0)编辑 收藏

SIEGE is an http load tester and benchmarking utility. It was designed to let web developers measure the performance of their code under duress, to see how it will stand up to load on the internet. It lets the user hit a web server with a configurable number of concurrent simulated users. Those users place the server "under siege." 

posted @ 2011-03-23 11:15 俞灵 阅读(165) | 评论 (0)编辑 收藏

下载与安装 
去官网下载tar  或Google 找RPM 皆可,个人都习惯用tar  装,安装方法同一般的程式 
$>./configure --prefix=/usr/local 
$>make 
$>make install 
Complier 过程中会有几个Warning,但是对整个环境并没有影响.基本上安装部份都不会有什么问题, rrdtool 的tarball 内即可附了libgd,zlib 等自用的lib,不会像mrtg FAQ一样装好了试一下打rrdtool ,看会不会出现类似讯息 
以下是rrdtool中一些参数的解释:
1. DS :DS 用于定义 Data Soure 。也就是用于存放脚本的结果的变量名(DSN)。
2. DST :DST 就是 Data Source Type 的意思。有 COUNTER、GUAGE、DERIVE、ABSOLUTE、COMPUTE 5种。
3. RRA :RRA 用于指定数据如何存放。我们可以把一个RRA 看成一个表,各保存不同 interval 的统计结果
4. PDP :Primary Data Point 。正常情况下每个 interval RRDtool 都会收到一个值;RRDtool 在收到脚本给来的值后 会计算出另外一个值(例如平均值),这个 值就是 PDP,这个值代表的一般是“xxx/秒”的含义
5. CF :CF 就是 Consolidation Function 的缩写。也就是合并(统计)功能。有 AVERAGE、MAX、MIN、LAST 四种分别表示对多个PDP 进行取平均、取最大值、取最小值、取当前值四种类型
6. CDP :Consolidation Data Point 。RRDtool 使用多个 PDP 合并为(计算出)一个 CDP。也就是执行上面 的CF 操作后的结果。这个值就是存入 RRA的数据,绘图时使用的也是这些数据
7. xff:是 xfile factor 的缩写。定义:he xfiles factor defines what part of a consolidation interval may be made up from *UNKNOWN* data while the consolidated value is still regarded as known. It is given as the ratio of allowed *UNKNOWN* PDPs to。Xff 字段实际就是一个比例值。0.5 表示一个 CDP 中的所有 PDP 如果超过一半的值为 UNKNOWN ,则该 CDP 的值就被标为 UNKNOWN。
8. step :就是 RRDtool “期望” 每隔多长时间就收到一个值

posted @ 2011-03-09 08:55 俞灵 阅读(593) | 评论 (1)编辑 收藏

最近看了hadoop的mapreduce部分代码,看了之后总结了一下,算是成果吧。以下是程序执行的主要流程,其中参考了网上的一些文章。


概括

Hadoop包括hdfsmapreduce两部分,在试用期期间我主要看了mapreduce部分,即hadoop执行作业的部分。

  1. mapreduce中几个主要的概念

       mapreduce整体上可以分为这么几条执行的线索,jobclientJobTrackerTaskTracker

    1. JobClient

               每一个job都会在客户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,然后向JobTracker提交作业,JobTracker创建Task(即MapTaskReduceTask)并将它们分发到各个TaskTracker服务中去执行。


    1. JobTracker

                JobTracker是一个master服务,hadoop服务端启动之后JobTracker接收job,负责调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。


    1. TaskTracker

               TaskTracker是运行于多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。

下图简单的描述了三者之间的关系:(上传不了图片,抱歉!)


  1. 数据结构

2.1 JobInProgress

JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1ReduceTask


2.2 TaskInProgress

JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTaskReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。


2.3 MapTaskReduceTask

一个完整的job会自动依次执行MapperCombiner(在JobConf指定了Combiner时执行)和Reducer,其中MapperCombiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了CombinerMapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。

 

  1. 整体流程

一道MapRedcue作业是通过JobClient.rubJob(job)master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。JobTracker一直在等待JobClient通过RPC提交作业,TaskTracker一直通过RPCJobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。以下是简略示意图:



下图比较详细的解释了程序的流程:



 

  1. Jobclient

在编写MapReduce程序时通常是上是这样写的:

Configuration conf = new Configuration(); // 读取hadoop配置

Job job = new Job(conf, "作业名称"); // 实例化一道作业

job.setMapperClass(Mapper类型);

job.setCombinerClass(Combiner类型);

job.setReducerClass(Reducer类型);

job.setOutputKeyClass(输出Key的类型);

job.setOutputValueClass(输出Value的类型);

FileInputFormat.addInputPath(job, new Path(输入hdfs路径));

FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径));

// 其它初始化配置

JobClient.runJob(job);

4.1 配置Job

JobConf是用户描述一个job的接口。下面的信息是MapReduce过程中一些较关键的定制信息:


4.2 JobClient.runJob():运行Job并分解输入数据集


runJob()提交作业,如何等待返回的状态,根据状态返回不同的结构给客户端。

其中runJob()使用submitJob(job)方法向 master提交作业。

submitJob(Job)方法的流程



 

一个MapReduceJob会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。这些信息会统一打包到jobFilejar中。


hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.splitjob.xml。 

job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。 

job.jar: jar,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。 

job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。 

这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文 件之后, 此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,等待返回状态,此时作业已经提交完成。

接下来转到JobTracker上执行。

(事实上这里还涉及到一些相关的类与方法)

4.3 提交Job

jobFile的提交过程是通过RPC(远程进程调用)模块来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口调用JobTrackersubmitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。

JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、MapReduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJobRunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

与创建Job过程相关的类和方法如下图所示


 

  1. JobTracker

5.1 JobTracker启动

JobTracker类中有一个main()函数,在软件启动的时候执行此main()函数启动JobTracker进程,main()中生成一个JobTracker的对象,然后通过tracker.offerService()语句启动服务,即启动一些线程,下面是几个主要的线程:

taskScheduler:一个抽象类,被JobTracker用于安排执行在TaskTrackers上的task任务,它使用一个或多个JobInProgressListeners接收jobs的通知。另外一个任务是调用JobInProgress.initTask()job初始化tasks。启动,提交作业,设置配置参数,终止等方法。


completedJobsStoreThread对应completedJobStatusStoreCompletedJobStatusStore类:把JobInProgress中的job信息存储到DFS中;提供一些读取状态信息的方法;是一个守护进程,用于删除DFS中的保存时间超过规定时间的job status删除,


interTrackerServer,抽象类Server类型的实例。一个IPC (Inter-Process Communication,进程间通信)服务器,IPC调用一个以一个参数的形式调用Writable,然后返回一个Writable作为返回值,在某个端口上运行。提供了call,listener,responder,connection,handle类。包括start(),stop(),join(),getListenerAddress(),call()等方法。

这些线程启动之后,便可开始工作了。



job是统一由JobTracker来调度的,把具体的Task分发给各个TaskTracker节点来执行。下面来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。

    1. JobTracker初始化Job

5.2.1 JobTracker.submitJob() 收到请求

JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。

5.2 JobTracker.JobInitThread 通知初始化线程

JobTracker 中的监听器类EagerTaskInitializationListener负责任务Task的初始化。JobTracker使用jobAdded(job)加入jobEagerTaskInitializationListener中一个专门管理需要初始化的队列里,即一个list成员变量jobInitQueue里。resortInitQueue方法根据作业的优先级排序。然后调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理???JobInitThread收到信号后即取出最靠前的job,即优先级别最高的job,调用TaskTrackerManagerinitJob最终调用JobInProgress.initTasks()执行真正的初始化工作。

5.3 JobInProgress.initTasks() 初始化TaskInProgress

任务Task分两种: MapTask reduceTask,它们的管理对象都是TaskInProgress

首先JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用JobClientreadSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplitgetLocations()函数获取,该函数会调用DistributedFileSystem的getFileCacheHints()获得。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。

创建这些TaskInProgress对象完毕后,initTasks()方法会通 过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCacheslave端的 TaskTrackermaster发送心跳时,就可以直接从这个cache中取任务去执行。

其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法产生nonRunningReduces成员。

JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束。


5.3.2 JobTracker调度Job

hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量 jobQueueJobInProgressListener与上面说的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一个监听器类,它包含了一个映射,用来管理和调度所有的JobInProgress。jobAdded(job)同时会加入job到JobQueueJobInProgressListener中的映射。

JobQueueTaskScheduler最重要的方法是assignTasks ,他实现了工作调度。具体实现:JobTracker 接到TaskTracker heartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。首先它会检查 TaskTracker 端还可以做多少个 map reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask ReduceTask 。产生 Map 任务使用 JobInProgress obtainNewMapTask() 方法,实质上最后调用了 JobInProgress findNewMapTask() 访问 nonRunningMapCache

上面讲解任务初始化时说过,createCache()方法会在网络拓扑结构上挂上需要执行的TaskInProgressfindNewMapTask()从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同数据中心下的节点,直到找了maxLevel层结束。这样的话,在JobTrackerTaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。

最终生成一个Task类对象,该对象被封装在一个LanuchTaskAction 中,发回给TaskTracker,让它去执行任务。

产生 Reduce 任务过程类似,使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress findNewReduceTask() 访问 nonRunningReduces

6. TaskTracker

6.1 TaskTracker的启动

JobTracker一样,里面包含一个main()方法,在hadoop启动的时候启动此进程。

Main()方法最主要的一句话

TaskTracker(conf).run()

TaskTracker(conf)获取本机的一些配置信息,初始化服务器并启动服务器(StatusHttpServer);然后调用initialize(),这个方法才是真正构造TaskTracker的地方,把它作为一个单独的方法便可以再次调用并可以在close()之后回收对象,就是初始化一些变量对象,最后启动线程:

taskMemoryManagerTaskMemoryManagerThread类的对象。管理本机上task运行时内存的使用,杀死任何溢出和超出内存限制的task-trees

mapLauncherreduceLauncher都是TaskLauncher类的对象,其作用是启动maptaskreducetask任务线程。根据tasksToLaunch判断是否需要新建任务,其中的调用的关系为:run()startNewTask()localizeJob()launchTaskForJoblaunchTask()localizeTask


run()方法中启动TaskTracker服务器然后一直循环。循环会尝试连接到的JobTracker。主要调用了两个方法startCleanupThreads(),offerService()

startCleanupThreads()启动为守护进程,可以用来删除一个独立线程的路径。

offerService()类似于JobTracker中的offerService()方法,即服务器执行的主循环。规定的时间内给JobTracker发送心跳信息,并处理返回的命令。

下面具体介绍流程中的每一步。

6.2 TaskTracker加载Task到子进程

Task的执行实际是由TaskTracker发起的,TaskTracker会定期与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等。如果发现有自己需要执行的新任务也会在这时启动,即是在TaskTracker调用JobTrackerheartbeat()方法时进行,此调用底层是通过IPC层调用Proxy接口实现。

6.2.1 TaskTracker.run() 连接JobTracker

TaskTracker的启动过程会初始化一系列参数和服务,然后尝试连接JobTracker(即必须实现InterTrackerProtocol接口),如果连接断开,则会循环尝试连接JobTracker,并重新初始化所有成员和参数。

6.2.2 TaskTracker.offerService() 主循环

如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。这个循环会每隔10秒与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。然后调用HeartbeatResponsegetActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用调用addToTaskQueue加入到待执行

队列,否则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者KillTaskAction等。

6.2.3 TaskTracker.transmitHeartBeat() 获取JobTracker指令

transmitHeartBeat()函数处理中,TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况,检查目前执行的Task数目以及本地磁盘的空间使用情况等,如果可以接收新的Task则设置heartbeat()askForNewTask参数为true。然后通过IPC接口调用JobTrackerheartbeat()方法发送过去,heartbeat()返回值TaskTrackerAction数组。

6.2.4 TaskTracker.addToTaskQueue,交给TaskLauncher处理

TaskLauncher是用来处理新任务的线程类,包含了一个待运行任务的队列 tasksToLaunchTaskTracker.addToTaskQueue会调用TaskTrackerregisterTask,创建TaskInProgress对象来调度和监控任务,并把它加入到runningTasks队列中。同时将这个TaskInProgress加到tasksToLaunch 中,并notifyAll()唤醒一个线程运行,该线程从队列tasksToLaunch取出一个待运行任务,调用TaskTrackerstartNewTask运行任务。

6.2.5 TaskTracker.startNewTask() 启动新任务

调用localizeJob()真正初始化Task并开始执行。

6.2.6 TaskTracker.localizeJob() 初始化job目录等

此函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用addTaskToJob()函数将它添加到runningJobs监控队列中。addTaskToJob方法把一个任务加入到该任务属于的runningJobtasks列表中。如果该任务属于的runningJob不存在,先新建,加到runningJobs中。完成后即调用launchTaskForJob()开始执行Task

6.2.7 TaskTracker.launchTaskForJob() 执行任务

启动Task的工作实际是调用TaskTracker$TaskInProgresslaunchTask()函数来执行的。

6.2.8 TaskTracker$TaskInProgress.launchTask() 执行任务

执行任务前先调用localizeTask()更新一下jobConf文件并写入到本地目录中。然后通过调用TaskcreateRunner()方法创建TaskRunner对象并调用其start()方法最后启动Task独立的java执行子进程。

6.2.9 Task.createRunner() 创建启动Runner对象

Task有两个实现版本,即MapTaskReduceTask,它们分别用于创建MapReduce任务。MapTask会创建MapTaskRunner来启动Task子进程,而ReduceTask则创建ReduceTaskRunner来启动。

6.2.10 TaskRunner.start() 启动子进程

TaskRunner负责将一个任务放到一个进程里面来执行。它会调用run()函数来处理,主要的工作就是初始化启动java子进程的一系列环境变量,包括设定工作目录workDir,设置CLASSPATH环境变量等。然后装载job jar包。JvmManager用于管理该TaskTracker上所有运行的Task子进程。每一个进程都是由JvmRunner来管理的,它也是位于单独线程中的。JvmManagerlaunchJvm方法,根据任务是map还是reduce,生成对应的JvmRunner并放到对应JvmManagerForType的进程容器中进行管理。JvmManagerForTypereapJvm()

分配一个新的JVM进程。如果JvmManagerForType槽满,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。如果槽没有满,那么就启动新的子进程。生成新的进程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner线程的run方法,run方法用于生成一个新的进程并运行它,具体实现是调用runChild

6.3 子进程执行MapTask

真实的执行载体,是Child,它包含一个 main函数,进程执行,会将相关参数传进来,它会拆解这些参数,通过getTask(jvmId)向父进程索取任务,并且构造出相关的Task实例,然后使用Taskrun()启动任务。

6.3.1 run

方法相当简单,配置完系统的TaskReporter后,就根据情况执行runJobCleanupTaskrunJobSetupTaskrunTaskCleanupTask或执行map

6.3.2 mapper

首先构造Mapper的输出,是通过MapOutputCollector进行的,也分两种情况,如果没有Reducer,那么,用DirectMapOutputCollector,否则,用MapOutputBuffer。然后构造Mapper处理的InputSplit,然后就开始创建MapperRecordReader,最终得到map的输入。构造完Mapper的输入输出,通过构造配置文件中配置的MapRunnable,就可以执行Mapper了。目前系统有两个MapRunnableMapRunnerMultithreadedMapRunnerMapRunner是单线程执行器,比较简单,他会使用反射机制生成用户定义的Mapper接口实现类,作为他的一个成员。

6.3.3 MapRunnerrun方法

会先创建对应的keyvalue对象,然后,对InputSplit的每一对<keyvalue>,调用用户实现的Mapper接口实现类的map方法,每处理一个数据对,就要使用OutputCollector收集每次处理kv对后得到的新的kv对,把他们spill到文件或者放到内存,以做进一步的处理,比如排序,combine等。

6.3.4 OutputCollector

OutputCollector的作用是收集每次调用map后得到的新的kv对,并把他们spill到文件或者放到内存,以做进一步的处理,比如排序,combine等。

MapOutputCollector 有两个子类:MapOutputBufferDirectMapOutputCollectorDirectMapOutputCollector用在不需要Reduce阶段的时候。如果Mapper后续有reduce任务,系统会使用MapOutputBuffer做为输出, MapOutputBuffer使用了一个缓冲区对map的处理结果进行缓存,放在内存中,又使用几个数组对这个缓冲区进行管理。



在适当的时机,缓冲区中的数据会被spill到硬盘中。



向硬盘中写数据的时机:

1)当内存缓冲区不能容下一个太大的k v对时。spillSingleRecord方法。

2)内存缓冲区已满时。SpillThread线程。

3Mapper的结果都已经collect了,需要对缓冲区做最后的清理。Flush方法。

2.5 spillThread线程:将缓冲区中的数据spill到硬盘中。

1)需要spill时调用函数sortAndSpill,按照partitionkey做排序。默认使用的是快速排序QuickSort

2)如果没有combiner,则直接输出记录,否则,调用CombinerRunnercombine,先做combin然后输出。

6.4 子进程执行ReduceTask

ReduceTask.run方法开始和MapTask类似,包括initialize()初始化 ,runJobCleanupTask()runJobSetupTask()runTaskCleanupTask()。之后进入正式的工作,主要有这么三个步骤:CopySortReduce

6.4.1 Copy

就是从执行各个Map任务的服务器那里,收罗到map的输出文件。拷贝的任务,是由ReduceTask.ReduceCopier 类来负责。

6.4.1.1 类图:



6.4.1.2 流程: 使用ReduceCopier.fetchOutputs开始

1)索取任务。使用GetMapEventsThread线程。该线程的run方法不停的调用getMapCompletionEvents方法,该方法又使用RPC调用TaskUmbilicalProtocol协议的getMapCompletionEvents,方法使用所属的jobID向其父TaskTracker询问此作业个Map任务的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。返回一个数组TaskCompletionEvent events[]TaskCompletionEvent包含taskidip地址之类的信息。

2)当获取到相关Map任务执行服务器的信息后,有一个线程MapOutputCopier开启,做具体的拷贝工作。 它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。MapOutputCopierrun循环调用copyOutputcopyOutput又调用getMapOutput,使用HTTP远程拷贝。

3getMapOutput远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。

4) 同时,还有一个内存Merger线程InMemFSMergeThread和一个文件Merger线程LocalFSMerger在同步工作,它们将下载过来的文件(可能在内存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为后续的排序工作减 负。InMemFSMergeThreadrun循环调用doInMemMerge, 该方法使用工具类Merger实现归并,如果需要combine,则combinerRunner.combine

6.4.2 Sort

排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过这一个流程,一个合并了所有所需Map任务输出文件的新文件产生了。而那些从其他各个服务器网罗过来的 Map任务输出文件,全部删除了。

6.4.3 Reduce

Reduce任务的最后一个阶段。他会准备好 keyClass"mapred.output.key.class""mapred.mapoutput.key.class", valueClass("mapred.mapoutput.value.class""mapred.output.value.class")Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后调用runOldReducer方法。(也是两套API,我们分析runOldReducer

6.4.4 runReducer

1)输出方面。它会准备一个OutputCollector收集输出,与MapTask不同,这个OutputCollector更为简单,仅仅是打开一个RecordWritercollect一次,write一次。最大的不同在于,这次传入RecordWriter的文件系统,基本都是分布式文件系统, 或者说是HDFS

2)输入方面,ReduceTask会用准备好的KeyClassValueClassKeyComparator等等之类的自定义类,构造出Reducer所需的键类型, 和值的迭代类型Iterator(一个键到了这里一般是对应一组值)。

3)有了输入,有了输出,不断循环调用自定义的Reducer,最终,Reduce阶段完成。



 

 

posted @ 2011-01-14 09:05 俞灵 阅读(8367) | 评论 (7)编辑 收藏

仅列出标题
共3页: 上一页 1 2 3