posts - 56,  comments - 12,  trackbacks - 0

概要
 上一节我们分析了BT客户端主程序的框架:一个循环的服务器程序。在这个循环过程中,客户端要完成如下任务:
? 与 tracker 服? 务器通信;汇报自己的状态,同? 时获取其它 peers 的信息;
? 接受其它 peers 的连接请求;
? 主动向某些 peers 发起连接请求;
? 对BT对等协议的分析处理;
? 启用片断选择算法,? 通过这些主动或被动的连接去下载所需要的片断;
? 为其它 peers 提供片断上传;启用阻塞算法,? 阻塞某些 peers 的上传请求;
? 将下载的片断写入磁盘;
? 其它任务;

  这一节我们分析BT客户端与 tracker 服务器的通信过程。在整个下载过程中,客户端必须不断的与 tracker 通信,报告自己的状态,同时也从 tracker 那里获取当前参与下载的其它 peers 的信息(ip地址、端口等)。一旦不能与 tracker 通信,则下载过程无法继续下去。
 客户端与 tracker 之间采用 HTTP 协议通信,客户端把状态信息放在 HTTP协议的GET命令的参数中传递给 tracker,tracker 则把 peers列表等信息放在 中传递给 客户端。这种设计非常简捷巧妙。采用HTTP协议也更使得穿越防火墙的阻拦更容易(不过实际应用中,tracker通常不会使用 80 端口)。
 请务必参考服务器端源码分析的几篇相关文章(论坛中有)。
 Rerequester 类全权负责与 tracker 的通信任务,我们首先看客户端主程序 download.py中相应的代码:

【download.py】
①rerequest = Rerequester(response['announce'], config['rerequest_interval'],
        rawserver.add_task, connecter.how_many_connections,
        config['min_peers'], encoder.start_connection,
        rawserver.add_task, storagewrapper.get_amount_left,
        upmeasure.get_total, downmeasure.get_total, listen_port,
        config['ip'], myid, infohash, config['http_timeout'], errorfunc,
        config['max_initiate'], doneflag, upmeasure.get_rate,
downmeasure.get_rate,
        encoder.ever_got_incoming
②rerequest.begin()

Rerequester 类的构造函数中传递了一大堆参数,重点是 rawserver.add_task和 encoder.start_connection(rawserver和encoder 这两个对象,在上一节中,我们已经看到了它们的构造过程),分别对应RawServer::add_task() 和 Encoder::start_connection(),稍后会看到它们的作用。
一切从 Rerequest::begin() 开始。

【rerequester.py】

class Rerequester:
 # 这个构造函数传递了一大堆参数,现在没必要搞的那么清楚,关键是 sched 和 connect 这两个参数,对照前面的代码,很容易知道它们是什么。
    def __init__(self, url, interval, sched, howmany, minpeers,
            connect, externalsched, amount_left, up, down,
            port, ip, myid, infohash, timeout, errorfunc, maxpeers, doneflag,
            upratefunc, downratefunc, ever_got_incoming):
        self.url = ('%s?info_hash=%s&peer_id=%s&port=%s&key=%s' %
            (url, quote(infohash), quote(myid), str(port),
            b2a_hex(''.join([chr(randrange(256)) for i in xrange(4)]))))
        if ip != '':
            self.url += '&ip=' + quote(ip)
        self.interval = interval
        self.last = None
        self.trackerid = None
        self.announce_interval = 30 * 60
        self.sched = sched # -》RawServer::add_task()
        self.howmany = howmany # 当前有多少个连接 peer
        self.minpeers = minpeers
        self.connect = connect #-》 Encoder::start_connection()
        self.externalsched = externalsched
        self.amount_left = amount_left
        self.up = up
        self.down = down
        self.timeout = timeout
        self.errorfunc = errorfunc
        self.maxpeers = maxpeers
        self.doneflag = doneflag
        self.upratefunc = upratefunc
        self.downratefunc = downratefunc
        self.ever_got_incoming = ever_got_incoming
        self.last_failed = True
        self.last_time = 0

④ def c(self):
  # 再调用一次 add_task(),把自身加入到客户端的任务队列中,这样就形成了任务循环,每隔一段时间,就会执行这个函数。从而,客户端与 tracker 的通信能够持续的进行下去。
        self.sched(self.c, self.interval)
       
        if self.ever_got_incoming():
            # 如果曾经接受到过外来的连接,那么说明这个客户端是可以接受外来连接的,也就是说很可能处于“公网”之中,所以并非很迫切的需要从 tracker 那里获取 peers 列表。(既可以主动连接别人,也可以接受别人的连接,所以有点不紧不慢的味道)。
            getmore = self.howmany() <= self.minpeers / 3
        else:
            # 如果从来没有接受到过外来的连接,那么很有可能处于内网之中,根本无法接受外来的连接。这种情况下,只有主动的去连接外部的 peers,所以要尽可能的向 tracker 请求更多的 peers。(只能去连接别人,所以更迫切一些)。
            getmore = self.howmany() < self.minpeers
        if getmore or time() - self.last_time > self.announce_interval:
            # 如果有必要从 tracker 那里获取 peers 列表,且任务执行时间已到,则调用 announce() 开始与 tracker 通信
            self.announce()

③    def begin(self):
        # 调用前面提到的 RawServer::add_task(),把 Rerequester::c() 加入到 rawserver对象的任务队列中
        self.sched(self.c, self.interval)
  # 宣布与 tracker 的通信开始。。。。
        self.announce(0)

⑤    def announce(self, event = None):
        self.last_time = time()
  # 传递给 tracker 的信息,放在 HTTP GET 命令的参数中。所以,首先就是构造这个参数。
  # 传递给 tracker 的信息,必须包括 uploaded、downloaded、left 信息。其它入 last、trackerid、numwant、compact、event 是可选的。这些参数的解释请看 BT协议规范。
        s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
            (self.url, str(self.up()), str(self.down()),
            str(self.amount_left())))
        if self.last is not None:
            s += '&last=' + quote(str(self.last))
        if self.trackerid is not None:
            s += '&trackerid=' + quote(str(self.trackerid))
        if self.howmany() >= self.maxpeers:
            s += '&numwant=0'
        else:
            s += '&compact=1'
        if event != None:
            s += '&event=' + ['started', 'completed', 'stopped'][event]
        set = SetOnce().set

  # 函数中可以嵌套定义函数,这是 python 语法特别的地方。
  # 调用 RawServer::add_task() 把 checkfail() 加入到任务队列中。在 timeout 时间之后,检查向 tracker 发的GET命令是否失败。
        def checkfail(self = self, set = set):
            if set():
                if self.last_failed and self.upratefunc() < 100 and self.downratefunc() < 100:
                    self.errorfunc('Problem connecting to tracker - timeout exceeded')
                self.last_failed = True
        self.sched(checkfail, self.timeout)

        # 创建一个线程,执行 Rerequester::rerequest()
  # 可见,客户端与 tracker之间的通信是由单独的线程完成的
        Thread(target = self.rerequest, args = [s, set]).start()

⑥    def rerequest(self, url, set):
        try:
   # 调用 urlopen() ,向 tracker 发送命令
h = urlopen(url)
# read() 返回 tracker 的响应数据。
            r = h.read()
            h.close()
            if set():
                def add(self = self, r = r):
self.last_failed = False
# 从 tracker 响应回来的数据,由 postrequest() 处理。
self.postrequest(r)
# externalsched() 同样是调用 RawServer::add_task(),这样,由主线程调用 postrequest() 函数,处理 tracker 响应的数据。(不要忘记,我们现在是处在一个单独的子线程中。)
                self.externalsched(add, 0)
        except (IOError, error), e:
            if set():
                def fail(self = self, r = 'Problem connecting to tracker - ' + str(e)):
                    if self.last_failed:
                        self.errorfunc(r)
                    self.last_failed = True
                self.externalsched(fail, 0)

⑦    def postrequest(self, data):
        try:
   # 对服务器响应的数据解码。请参看BT协议规范。
r = bdecode(data)
# 检查 peers 有效性?
            check_peers(r)
            if r.has_key('failure reason'):
                self.errorfunc('rejected by tracker - ' + r['failure reason'])
            else:
                if r.has_key('warning message'):
self.errorfunc('warning from tracker - ' + r['warning message'])

 # 根据 tracker 的响应,调整BT客户端的参数。
                self.announce_interval = r.get('interval', self.announce_interval)
                self.interval = r.get('min interval', self.interval)
                self.trackerid = r.get('tracker id', self.trackerid)
                self.last = r.get('last')

    # 将其它下载者的信息保存到 peers 数组中。
                p = r['peers']
                peers = []
                if type(p) == type(''):
                    for x in xrange(0, len(p), 6):
                        ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
                        port = (ord(p[x+4]) << 8) | ord(p[x+5])
                        peers.append((ip, port, None))
                else:
                    for x in p:
                        peers.append((x['ip'], x['port'], x.get('peer id')))
                ps = len(peers) + self.howmany()
                if ps < self.maxpeers:
                    if self.doneflag.isSet():
                        if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
                            self.last = None
                    else:
                        if r.get('num peers', 1000) > ps * 1.2:
                            self.last = None
                # 这里的 connect,即前面的Encoder::start_connection(),
    # 至此,已经成功的完成了一次与 tracker 服务器的通信,并且从它那里获取了 peers 列表;下面就与这些 peers 挨个建立连接;至于建立连接的过程,就要追踪到 Encoder 类中了,且听下回分解。
                for x in peers:
                    # x[0]:ip, x[1]:port, x[2]:id
                    self.connect((x[0], x[1]), x[2])
        except Error, e:
            if data != '':
                self.errorfunc('bad data from tracker - ' + str(e))


小结:
  这篇文章,我们重点分析了BT客户端与 tracker 服务器通信的过程。我们知道了,客户端要每隔一段时间,就去连接 tracker 一次,它是以一个单独的线程执行的。这个线程在接受到 tracker 的响应数据后,交给主线程(既主程序)来进行分析。主程序从响应数据中,获得 peers 列表。然后调用 Encoder::start_connection() 挨个与这些 peers 尝试建立连接。

posted on 2007-01-19 00:23 苦笑枯 阅读(518) 评论(0)  编辑  收藏 所属分类: P2P

只有注册用户登录后才能发表评论。


网站导航:
 
收藏来自互联网,仅供学习。若有侵权,请与我联系!

<2007年1月>
31123456
78910111213
14151617181920
21222324252627
28293031123
45678910

常用链接

留言簿(2)

随笔分类(56)

随笔档案(56)

搜索

  •  

最新评论

阅读排行榜

评论排行榜