posts - 56,  comments - 12,  trackbacks - 0

Encoder 是一种 Handler 类(关于 Handler类,请参看前面的分析文章)。它在 download.py 中被初始化。它与 Connection类一起,完成“BT对等连接”的建立,以及“BT对等协议”的分析。
为了有助于理解,我添加了一些用圆圈括起来的序号,建议你按照这个顺序去阅读。

class Connection:
    ②def __init__(self, Encoder, connection, id, is_local):
        self.encoder = Encoder
        self.connection = connection
如果是本地发起连接,那么 id 是对方的 id,否则 id 为 None
        self.id = id
如果连接是由本地发起的,那么 is_local 为 True,否则为 False
        self.locally_initiated = is_local
        self.complete = False
        self.closed = False
        self.buffer = StringIO()
        self.next_len = 1
        self.next_func = self.read_header_len
如果连接是由本地主动发起建立的,那么需要向对方发送一个握手消息。(如果不是由本地主动发起的,那么就是被动建立的,那么不能在这里发送握手消息,而必须在分析完对方的握手消息之后,再去回应一个握手西消息,请看read_download_id() 中的处理。
        if self.locally_initiated:
            connection.write(chr(len(protocol_name)) + protocol_name +
                (chr(0) * 8) + self.encoder.download_id)
            if self.id is not None:
                connection.write(self.encoder.my_id)

    def get_ip(self):
        return self.connection.get_ip()

    def get_id(self):
        return self.id

    def is_locally_initiated(self):
        return self.locally_initiated

    def is_flushed(self):
        return self.connection.is_flushed()

    ⑦def read_header_len(self, s):
        if ord(s) != len(protocol_name):
            return None
        return len(protocol_name), self.read_header

    def read_header(self, s):
        if s != protocol_name:
            return None
        return 8, self.read_reserved

    def read_reserved(self, s):
        return 20, self.read_download_id

    def read_download_id(self, s):
        if s != self.encoder.download_id:
            return None
这一步很重要, 如果连接是由对方发起的,那么,给对方发送一个握手消息。为什么不在读完了 peer id 之后才发送这个消息了?这是因为 peer id 是可选的,所以只要分析完 download id 之后,就要立即发送握手消息。
        if not self.locally_initiated:
self.connection.write(chr(len(protocol_name)) +
protocol_name +
(chr(0) * 8) +
self.encoder.download_id + self.encoder.my_id)
        return 20, self.read_peer_id

    def read_peer_id(self, s):
        if not self.id:
            如果 peer id 是自己,那么出错了
            if s == self.encoder.my_id:
                return None
            for v in self.encoder.connections.s():
                如果已经跟该 peer 建立了连接了,那么也出错了
                if s and v.id == s:
                    return None
            self.id = s
            if self.locally_initiated:
                self.connection.write(self.encoder.my_id)
            else:
                self.encoder.everinc = True
        else:
            如果 peer id 和 xxx 不符,那么出错了。
            if s != self.id:
                return None

“BT对等连接”的握手过程正式宣告完成,此后,双方就可以通过这个连接互相发送消息了。
        self.complete = True

调用Connecter::connection_made(),这个函数的意义,我们到分析 Connecter 类的时候,再记得分析。
        self.encoder.connecter.connection_made(self)

下面进入 BT 消息的处理过程。
        return 4, self.read_len

    def read_len(self, s):
        l = toint(s)
        if l > self.encoder.max_len:
            return None
        return l, self.read_message

消息处理,交给了 Connecter::got_message(),所以下一篇我们要分析 Connecter 类。
    def read_message(self, s):
        if s != '':
            self.encoder.connecter.got_message(self, s)
        return 4, self.read_len

    def read_dead(self, s):
        return None

    def close(self):
        if not self.closed:
            self.connection.close()
            self.sever()

    def sever(self):
        self.closed = True
        del self.encoder.connections[self.connection]
        if self.complete:
            self.encoder.connecter.connection_lost(self)

    def send_message(self, message):
        self.connection.write(tobinary(len(message)) + message)


⑤在 Encoder::data_came_in() 中调用下面这个函数,表示某个连接上有数据可读。如果有数据可读,那么我们就按照 BT 对等协议的规范来进行分析。。。
def data_came_in(self, s):

进入协议分析循环。。。
        while True:
            if self.closed:
                return

self.next_len表示按照BT对等协议规范,下一段要分析的数据的长度
self.buffer.tell() 表示缓冲区中剩下数据的长度
那么 i 就表示:为了完成接下来的协议分析,还需要多少数据?
i = self.next_len - self.buffer.tell()
如果 i 大于所读到的数据的长度,那表示数据还没有读够,无法继续协议分析,需要等读到足够多的数据才能继续,所以只能退出。
            if i > len(s):
                self.buffer.write(s)
                return
否则表示这次读到的数据已经足够完成一步协议分析。
只取满足这一步协议分析的数据放入 buffer 中(因为 buffer中可能还有上一步协议分析后留下的一些数据,要加在一起),剩下的数据保留在 s 中。
            self.buffer.write(s[:i])
s = s[i:]

从 buffer 中取出数据,这些数据就是这一步协议分析所需要的数据。然后把 buffer 清空。
            m = self.buffer.get()
            self.buffer.reset()
self.buffer.truncate()

next_func 就是用于这一步协议分析的函数。
返回的 x 是一个二元组,包含了下一步协议分析的数据长度和协议分析函数。这样,就形成了一个协议分析循环。
            try:
                x = self.next_func(m)
            except:
                self.next_len, self.next_func = 1, self.read_dead
                raise
            if x is None:
                self.close()
                return
从 x 中分解出 next_len和 next_func。
self.next_len, self.next_func = x
⑥那么BT对等协议分析的第一步是什么了?
请看初始化函数:
self.next_len = 1
self.next_func = self.read_header_len
显然,第一步协议分析是由 read_header_len() 来完成的。
在BT源码中,有多处采用了这种协议分析的处理方式。

class Encoder:
    def __init__(self, connecter, raw_server, my_id, max_len,
            schedulefunc, keepalive_delay, download_id,
            max_initiate = 40):
        self.raw_server = raw_server
        self.connecter = connecter
        self.my_id = my_id
        self.max_len = max_len
        self.schedulefunc = schedulefunc
        self.keepalive_delay = keepalive_delay
        self.download_id = download_id
        最大发起的连接数
        self.max_initiate = max_initiate
        self.everinc = False
       
        self.connections = {}
        self.spares = []
       
        schedulefunc(self.send_keepalives, keepalive_delay)

为了保持连接不因为超时而被关闭,所以可能需要随机的发送一些空消息,它的目的纯粹是为了保证连接的“活力”
    def send_keepalives(self):
        self.schedulefunc(self.send_keepalives, self.keepalive_delay)
        for c in self.connections.s():
            if c.complete:
                c.send_message('')

③主动向对方发起一个连接,这个函数什么时候调用?
请看 download.py 中 Rerequester 类的初始化函数,其中传递的一个参数是 encoder.start_connection。
再看 Rerequester.py 中,Rerequester::postrequest() 的最后,
for x in peers:
self.connect((x[0], x[1]), x[2])
这里调用的 connect() 就是初始化的时候传递进来的 encoder.start_connection,也就是下面这个函数了。
也就是说,当客户端从 tracker 服务器那里获取了 peers 列表之后,就逐一向这些 peers 主动发起连接。
    def start_connection(self, dns, id):
        if id:
            跟自己不用建立连接。
            if id == self.my_id:
                return
            如果已经与对方建立起连接,也不再建立连接
            for v in self.connections.s():
                if v.id == id:
                    return

        如果当前连接数,已经超过设定的“最大发起连接数”,那么就暂时不建立连接。
  
        if len(self.connections) >= self.max_initiate:
如果空闲连接数还小于 “最大发起连接数”,那么把对方的 ip 先放到spares中,等以后网络稍微空闲一点的时候,再从 spares 中取出来,实际去建立连接。
            if len(self.spares) < self.max_initiate and dns not in self.spares:
                self.spares.append(dns)
            return
        try:
调用 RawServer::start_connection 与对方建立TCP连接
            c = self.raw_server.start_connection(dns)
           
创建 Connection 对象,加入到 connections 字典中,注意,最后一个参数是 True,表示是这个连接是由本地主动发起的。这样,在 Connection 的初始化函数中,会与对方进行 BT 对等协议的握手。

            self.connections[c] = Connection(self, c, id, True)
        except socketerror:
            pass
   
这个内部函数好像没有用到
    def _start_connection(self, dns, id):
        def foo(self=self, dns=dns, id=id):
            self.start_connection(dns, id)
       
        self.schedulefunc(foo, 0)
       
    def got_id(self, connection):
        for v in self.connections.s():
            if connection is not v and connection.id == v.id:
                connection.close()
                return
        self.connecter.connection_made(connection)

    def ever_got_incoming(self):
        return self.everinc

①在 RawServer 中,当从外部发起的一个TCP成功建立后,调用此函数。
这里传递进来的参数 connection 是 SingleSocket 类型
def external_connection_made(self, connection):
 创建一个 Connection 对象,加入到 connections 字典中。
        self.connections[connection] = Connection(self, connection, None, False)

    def connection_flushed(self, connection):
        c = self.connections[connection]
        if c.complete:
            self.connecter.connection_flushed(c)

关闭连接的时候调用此函数
    def connection_lost(self, connection):
        self.connections[connection].sever()
关闭一个连接之后,连接数量可能就没有达到“最大连接数”,所以如果 spares 中有一些等待建立的 ip ,现在可以取出来,主动向对方发起连接。
        while len(self.connections) < self.max_initiate and self.spares:
            self.start_connection(self.spares.pop(), None)

④某个连接上(无论该连接上主动建立还是被动建立的)有数据可读的时候,调用此函数。在 RawServer 中被调用。转而去调 Connection::data_came_in()。
    def data_came_in(self, connection, data):
        self.connections[connection].data_came_in(data)
posted on 2007-01-19 00:22 苦笑枯 阅读(346) 评论(0)  编辑  收藏 所属分类: P2P

只有注册用户登录后才能发表评论。


网站导航:
 
收藏来自互联网,仅供学习。若有侵权,请与我联系!

<2007年1月>
31123456
78910111213
14151617181920
21222324252627
28293031123
45678910

常用链接

留言簿(2)

随笔分类(56)

随笔档案(56)

搜索

  •  

最新评论

阅读排行榜

评论排行榜