StorageWrapper
的作用:把文件片断进一步切割为子片断,并且为这些子片断发送
request
消息。在获得子片断后,将数据写入磁盘。
请结合
Storage
类的分析来看。
几点说明:
1、
为了获取传输性能,
BT
把文件片断切割为多个子片断。
2、
BT
为获取一个子片断,需要向拥有该子片断的
peer
发送
request
消息(关于
request
消息,参见《
BT
协议规范》)。
3、
例如一个
256k
大小的片断,索引号是
10
,被划分为
16
个
16k
大小的子片断。那么需要为这
16
个子片断分别产生一个
request
消息。这些
request
消息在发出之前,以
list
的形式保存在
inactive_requests
这个
list
中。例如对这个片断,就保存在
inactive_requests
下标为
10
(片断的索引号)的地方,值是如下的
list
:
[(0,16k),(16k,
16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k),
(112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k,
16k), (208k, 16k), (224k, 16k), (240k, 16k)]
。这个处理过程在
_make_inactive()
函数中。因为这些
request
还没有发送出去,所以叫做
inactive request
(未激活的请求)。如果一个
request
发送出去了,那么叫做
active request
。为每个片断已经发送出去的
request
个数记录在
numactive
中。如果收到一个子片断,那么
active request
个数就要减
1
。
amount_inactive
记录了尚没有发出
request
的子片断总的大小。
4、
每当获得一个子片段,都要写入磁盘。如果子片断所属的片断在磁盘上还没有分配空间,那么首先需要为整个片断分配空间。如何为片断分配空间?这正是
StorageWrapper
类中最难理解的一部分代码。这个“空间分配算法”说起来很简单,但是在没有任何注释的情况下去看代码,耗费了我好几天的时间。具体的算法分析,请看
_piece_came_in()
的注释。
class
StorageWrapper:
def
__init__(self, storage, request_size, hashes,
piece_size, finished, failed,
statusfunc = dummy_status, flag = Event(), check_hashes = True,
data_flunked = dummy_data_flunked):
self.storage = storage # Storage
对象
self.request_size = request_size #
子片断大小
self.hashes = hashes #
文件片断摘要信息
self.piece_size = piece_size #
片断大小
self.data_flunked = data_flunked #
一个函数,用来检查片断的完整性
self.total_length = storage.get_total_length() #
文件总大小
self.amount_left = self.total_length #
未下载完的文件大小
#
文件总大小的有效性检查
#
因为最后一个片断长度可能小于
piece_size
if self.total_length <= piece_size * (len(hashes) - 1):
raise Error, 'bad data from tracker - total too small'
if self.total_length > piece_size * len(hashes):
raise Error, 'bad data from tracker - total too big'
#
两个事件,分布在下载完成和下载失败的时候设置
self.finished = finished
self.failed = failed
这几个变量的作用在前面已经介绍过了。
self.numactive = [0] * len(hashes)
inactive_request
inactive_requests
的值全部被初始化为
1
,这表示每个片断都需要发送
request
。后面在对磁盘文件检查之后,那些已经获得的片断,在
inactive_requests
中对应的是
None
,表示不需要再为这些片断发送
request
了。
self.inactive_requests = [1] * len(hashes)
self.amount_inactive = self.total_length
#
是否进入
EndGame
模式?关于
endgame
模式,在《
Incentives Build Robustness in BitTorrent
》的“片断选择算法”中有介绍。后面可以看到,在为最后一个“子片断”产生请求后,进入
endgame
模式。
self.endgame = False
self.have = Bitfield(len(hashes))
#
该片是否检查了完整性
self.waschecked = [check_hashes] * len(hashes)
这两个变量用于“空间分配算法”
self.places = { }
self.holes = [ ]
if len(hashes) == 0:
finished()
return
targets = {}
total = len(hashes)
#
检查每一个片断,,,
for i in xrange(len(hashes)):
#
如果磁盘上,还没有完全为这个片断分配空间,那么这个片断需要被下载,在
targets
字典中添加一项(如果已经存在,就不用添加了),它的关键字(
key
)是该片断的摘要值,它的值(
)是一个列表,
这个片断的索引号被添加到这个列表中。
这里一度让我非常迷惑,因为一直以为不同的文件片断肯定具有不同的摘要值。后来才想明白了,那就是:两个不同的文件片断,可能拥有相同的摘要值。不是么?只要这两个片断的内容是一样的。
这一点,对后面的分析非常重要。
if not
self._waspre(i):
targets.setdefault(hashes[i], []).append(i)
total -= 1
numchecked = 0.0
if total and check_hashes:
statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})
#
这是一个内嵌在函数中的函数。在
c++
中,可以有内部类,不过好像没有内部函数的说法。这个函数只能在
__init__()
内部使用。
这个函数在一个片段被确认获得后调用
# piece:
片断的索引号
# pos:
这个片断在磁盘上存储的位置
例如,片断
5
可能存储在片断
2
的位置上。请参看后面的“空间分配算法”
def
markgot(piece, pos, self = self, check_hashes = check_hashes):
self.places[piece] = pos
self.have[piece] = True
self.amount_left -= self._piecelen(piece)
self.amount_inactive -= self._piecelen(piece)
不用再为这个片断发送
request
消息了
self.inactive_requests[piece] = None
self.waschecked[piece] = check_hashes
lastlen = self._piecelen(len(hashes) - 1) #
最后一个片断的长度
#
对每一个片断
for i in xrange(len(hashes)):
#
如果磁盘上,还没有完全为这个片断分配空间,那么在
holes
中添加该片断的索引号。
if not
self._waspre(i):
self.holes.append(i)
#
否则,也就是空间已经分配。但是还是不能保证这个片断已经完全获得了,正如分析
Storage
时提到的那样,可能存在“空洞”
#
如果不需要进行有效性检查,那么简单调用
markgot()
表示已经获得了该片断。这显然是一种不负责任的做法。
elif not check_hashes:
markgot(i, i)
#
如果需要进行有效性检查
else:
sha
是
python
内置的模块,它封装了
SHA-1
摘要算法。
SHA-1
摘要算法对一段任意长的数据进行计算,得出一个
160bit
(也就是
20
个字节)长的消息摘要。在
torrent
文件中,保存了每个片断的消息摘要。接收方在收到一个文件片断之后,再计算一次消息摘要,然后跟
torrent
文件中对应的值进行比较,如果结果不一致,那么说明数据在传输过程中发生了变化,这样的数据应该被丢弃。
这里,首先,根据片断
i
的起始位置开始,
lastlen
长的一段数据构造一个
sha
对象。
sh = sha(self.storage.read(piece_size * i, lastlen))
计算这段数据的消息摘要
sp = sh.digest()
然后,更新
sh
这个
sha
对象,注意,是根据片断
i
剩下的数据来更新的。关于
sha::update()
的功能,请看
python
的帮助。如果有两段数据
a
和
b
,那么
sh = sha(a)
sh.update(b)
,等效于
sh = sha(a+b)
所以,下面这个表达式等于
sh.update(self.storage.read(piece_size*i, self._piecelen(i)))
sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))
所以,这次计算出来的就是片断
i
的摘要
(原来的困惑:为什么不直接计算
i
的摘要,要这么绕一下了?后来分析清楚“空间分配算法”之后,这后面一段代码也就没有什么问题了。)
s = sh.digest()
如果计算出来的摘要和
hashes[i]
一致(后者是从
torrent
文件中获得的),那么,这个片断有效且已经存在于磁盘上。
if s == hashes[i]:
markgot(i, i)
elif targets.get(s)
and
self._piecelen(i) == self._piecelen(targets[s][-1]):
markgot(targets[s].pop(), i)
elif not self.have[len(hashes) - 1]
and
sp == hashes[-1]
and
(i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):
markgot(len(hashes) - 1, i)
else:
self.places[i] = i
if flag.isSet():
return
numchecked += 1
statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})
#
如果所有片断都下载完了,那么结束。
if self.amount_left == 0:
finished()
#
检查某个片断,是否已经在磁盘上分配了空间,调用的是
Storage:: was_preallocated()
def
_waspre(self, piece):
return self.storage.was_preallocated(piece * self.piece_size,
self._piecelen(piece))
#
获取指定片断的长度,只有最后一个片断大小可能小于
piece_size
def
_piecelen(self, piece):
if piece < len(self.hashes) - 1:
return self.piece_size
else:
return
self.total_length - piece * self.piece_size
#
返回剩余文件的大小
def
get_amount_left(self):
return self.amount_left
#
判断是否已经获得了一些文件片断
def
do_I_have_anything(self):
return self.amount_left < self.total_length
#
将指定片断切割为“子片断”
def
_make_inactive(self, index):
#
先获取该片断的长度
length = min(self.piece_size, self.total_length - self.piece_size * index)
l = []
x = 0
#
为了获得更好的传输性能,
BT
把每个文件片断又分为更小的“子片断”,我们可以在
download.py
文件中
default
变量中,找到“子片断”大小的定义:
'download_slice_size', 2 ** 14, "How many bytes to query for per request."
这里定义的“子片断”大小是
16k
。
下面这个循环,就是将一个片断进一步切割为“子片断”的过程。
while x + self.request_size < length:
l.append((x, self.request_size))
x += self.request_size
l.append((x, length - x))
#
将
l
保存到
inactive_requests
这个列表中
self.inactive_requests[index] = l
#
是否处于
endgame
模式,关于
endgame
模式,参加《
Incentives Build Robustness in BitTorrent
》
def
is_endgame(self):
return self.endgame
def
get_have_list(self):
return self.have.tostring()
def
do_I_have(self, index):
return self.have[index]
#
判断指定的片断,是否还有
request
没有发出?如果有,那么返回
true
,否则返回
false
。
def
do_I_have_requests(self, index):
return not not self.inactive_requests[index]
为指定片断创建一个
request
消息,返回的是一个二元组,例如(
32k, 16k
),表示“子片断”的起始位置是
32k
,大小是
16k
。
def
new_request(self, index):
# returns (begin, length)
#
如果还没有为该片断创建
request
。,那么调用
_make_inactive()
创建
request
列表。(
inactive_requests[index]
初始化的值是
1
)
if self.inactive_requests[index] == 1:
self._make_inactive(index)
# numactive[index]
记录了已经为该片断发出了多少个
request
。
self.numactive[index] += 1
rs = self.inactive_requests[index]
#
从
inactive_request
中移出最小的那个
request
(也就是起始位置最小)。
r = min(rs)
rs.remove(r)
# amount_inactive
记录了尚没有发出
request
的子片断总的大小。
self.amount_inactive -= r[1]
#
如果这是最后一个“子片断”,那么进入
endgame
模式
if self.amount_inactive == 0:
self.endgame = T.rue
#
返回这个
request
return r
def
piece_came_in(self, index, begin, piece):
try:
return self._piece_came_in(index, begin, piece)
except IOError, e:
self.failed('IO Error ' + str(e))
return True
如果获得了某个“子片断”,那么调用这个函数。
index
:“子片断”所在片断的索引号,
begin
:“子片断”在片断中的起始位置,
piece
:实际数据
def
_piece_came_in(self, index, begin, piece):
#
如果之前没有获得过该片断中任何“子片断”,那么首先需要在磁盘上为整个片断分配空间。
空间分配的算法如下:
假设一共是
6
个片断,现在已经为
0
、
1
、
4
三个片断分配了空间,那么
holes
:
[2, 3, 5]
places
:
{0:0, 1:1, 4:4}
现在要为片断
5
分配空间,思路是把片断
5
的空间暂时先分配在片断
2
应该在的空间上。这样分配以后,
holes
:
[3, 5]
places: {0:0, 1:1, 4:4, 5:2}
假设下一步为片断
2
分配空间,因为
2
的空间已经被
5
占用,所以把
5
的数据转移到
3
上,
2
才可以使用自己的空间。这样分配之后,
holes
:
[5]
places
:
{0:0, 1:1, 2:2, 4:4, 5:3}
最后,为
3
分配空间,因为
3
的空间被
5
占用,所以把
5
的数据转移到
5
自己的空间上,
3
就可以使用自己的空间了。这样分配之后,
holes
:
[]
places
:
{0:0, 1:1, 2:2, 3:3, 4:4, 5:5}
下面这段比较晦涩的代码,实现的就是这种空间分配算法。
if not self.places.has_key(index):
n = self.holes.pop(0)
if self.places.has_key(n):
oldpos = self.places[n]
old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))
if self.have[n] and sha(old).digest() != self.hashes[n]:
self.failed('data corrupted on disk - maybe you have two copies running?')
return True
self.storage.write(self.piece_size * n, old)
self.places[n] = n
if index == oldpos or index in self.holes:
self.places[index] = oldpos
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = oldpos
old = self.storage.read(self.piece_size * index, self.piece_size)
self.storage.write(self.piece_size * oldpos, old)
elif
index in self.holes or index == n:
if not self._waspre(n):
self.storage.write(self.piece_size * n,
self._piecelen(n) * chr(0xFF))
self.places[index] = n
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = n
old = self.storage.read(self.piece_size * index, self._piecelen(n))
self.storage.write(self.piece_size * n, old)
#
调用
Stoarge::write()
将这个子片断写入磁盘,注意是写到
places[index]
所在的空间上。
self.storage.write(self.places[index] * self.piece_size + begin, piece)
#
既然获得了一个子片断,那么发出的
request
个数显然要减少一个。
self.numactive[index] -= 1
#
如果既没有尚未发出的
request
,而且也没有已发出的
request
(每当获得一个子片断,
numactive[index]
减少
1
,
numactive[index]
为
0
,说明所有发出的
request
都已经接收到了响应的数据),那么显然整个片断已经全部获得了。
if not
self.inactive_requests[index] and not self.numactive[index]:
检查整个片断的有效性,如果通过检查
if
sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() == self.hashes[index]:
#
“我”已经拥有了这个片断
self.have[index] = True
self.inactive_requests[index] = None
#
也检查过了有效性
self.waschecked[index] = True
self.amount_left -= self._piecelen(index)
if self.amount_left == 0:
self.finished()
如果没有通过有效性检查
else:
self.data_flunked(self._piecelen(index))
得丢弃这个片断
self.inactive_requests[index] = 1
self.amount_inactive += self._piecelen(index)
return False
return True
#
如果向某个
peer
发送的获取“子片断”的请求丢失了,那么调用此函数
def
request_lost(self, index, begin, length):
self.inactive_requests[index].append((begin, length))
self.amount_inactive += length
self.numactive[index] -= 1
def
get_piece(self, index, begin, length):
try:
return self._get_piece(index, begin, length)
except IOError, e:
self.failed('IO Error ' + str(e))
return None
def
_get_piece(self, index, begin, length):
if not self.have[index]:
return None
if not self.waschecked[index]:
#
检查片断的
hash
值,如果错误,返回
None
if
sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() != self.hashes[index]:
self.failed('told file complete on start-up, but piece failed hash check')
return None
#
通过
hash
检查
self.waschecked[index] = True
#
检查一下“子片断”长度是否越界
if begin + length > self._piecelen(index):
return
None
#
调用
Storage::read()
,将该“子片断”数据从磁盘上读出来,返回值就是这段数据。
return self.storage.read(self.piece_size * self.places[index] + begin, length)
posted on 2007-01-19 00:21
苦笑枯 阅读(322)
评论(0) 编辑 收藏 所属分类:
P2P