Encoder 是一种 Handler 类(关于 Handler类,请参看前面的分析文章)。它在 download.py 中被初始化。它与 Connection类一起,完成“BT对等连接”的建立,以及“BT对等协议”的分析。
为了有助于理解,我添加了一些用圆圈括起来的序号,建议你按照这个顺序去阅读。
class Connection:
②def __init__(self, Encoder, connection, id, is_local):
self.encoder = Encoder
self.connection = connection
如果是本地发起连接,那么 id 是对方的 id,否则 id 为 None
self.id = id
如果连接是由本地发起的,那么 is_local 为 True,否则为 False
self.locally_initiated = is_local
self.complete = False
self.closed = False
self.buffer = StringIO()
self.next_len = 1
self.next_func = self.read_header_len
如果连接是由本地主动发起建立的,那么需要向对方发送一个握手消息。(如果不是由本地主动发起的,那么就是被动建立的,那么不能在这里发送握手消息,而必须在分析完对方的握手消息之后,再去回应一个握手西消息,请看read_download_id() 中的处理。
if self.locally_initiated:
connection.write(chr(len(protocol_name)) + protocol_name +
(chr(0) * 8) + self.encoder.download_id)
if self.id is not None:
connection.write(self.encoder.my_id)
def get_ip(self):
return self.connection.get_ip()
def get_id(self):
return self.id
def is_locally_initiated(self):
return self.locally_initiated
def is_flushed(self):
return self.connection.is_flushed()
⑦def read_header_len(self, s):
if ord(s) != len(protocol_name):
return None
return len(protocol_name), self.read_header
def read_header(self, s):
if s != protocol_name:
return None
return 8, self.read_reserved
def read_reserved(self, s):
return 20, self.read_download_id
def read_download_id(self, s):
if s != self.encoder.download_id:
return None
这一步很重要, 如果连接是由对方发起的,那么,给对方发送一个握手消息。为什么不在读完了 peer id 之后才发送这个消息了?这是因为 peer id 是可选的,所以只要分析完 download id 之后,就要立即发送握手消息。
if not self.locally_initiated:
self.connection.write(chr(len(protocol_name)) +
protocol_name +
(chr(0) * 8) +
self.encoder.download_id + self.encoder.my_id)
return 20, self.read_peer_id
def read_peer_id(self, s):
if not self.id:
如果 peer id 是自己,那么出错了
if s == self.encoder.my_id:
return None
for v in self.encoder.connections.s():
如果已经跟该 peer 建立了连接了,那么也出错了
if s and v.id == s:
return None
self.id = s
if self.locally_initiated:
self.connection.write(self.encoder.my_id)
else:
self.encoder.everinc = True
else:
如果 peer id 和 xxx 不符,那么出错了。
if s != self.id:
return None
“BT对等连接”的握手过程正式宣告完成,此后,双方就可以通过这个连接互相发送消息了。
self.complete = True
调用Connecter::connection_made(),这个函数的意义,我们到分析 Connecter 类的时候,再记得分析。
self.encoder.connecter.connection_made(self)
下面进入 BT 消息的处理过程。
return 4, self.read_len
def read_len(self, s):
l = toint(s)
if l > self.encoder.max_len:
return None
return l, self.read_message
消息处理,交给了 Connecter::got_message(),所以下一篇我们要分析 Connecter 类。
def read_message(self, s):
if s != '':
self.encoder.connecter.got_message(self, s)
return 4, self.read_len
def read_dead(self, s):
return None
def close(self):
if not self.closed:
self.connection.close()
self.sever()
def sever(self):
self.closed = True
del self.encoder.connections[self.connection]
if self.complete:
self.encoder.connecter.connection_lost(self)
def send_message(self, message):
self.connection.write(tobinary(len(message)) + message)
⑤在 Encoder::data_came_in() 中调用下面这个函数,表示某个连接上有数据可读。如果有数据可读,那么我们就按照 BT 对等协议的规范来进行分析。。。
def data_came_in(self, s):
进入协议分析循环。。。
while True:
if self.closed:
return
self.next_len表示按照BT对等协议规范,下一段要分析的数据的长度
self.buffer.tell() 表示缓冲区中剩下数据的长度
那么 i 就表示:为了完成接下来的协议分析,还需要多少数据?
i = self.next_len - self.buffer.tell()
如果 i 大于所读到的数据的长度,那表示数据还没有读够,无法继续协议分析,需要等读到足够多的数据才能继续,所以只能退出。
if i > len(s):
self.buffer.write(s)
return
否则表示这次读到的数据已经足够完成一步协议分析。
只取满足这一步协议分析的数据放入 buffer 中(因为 buffer中可能还有上一步协议分析后留下的一些数据,要加在一起),剩下的数据保留在 s 中。
self.buffer.write(s[:i])
s = s[i:]
从 buffer 中取出数据,这些数据就是这一步协议分析所需要的数据。然后把 buffer 清空。
m = self.buffer.get()
self.buffer.reset()
self.buffer.truncate()
next_func 就是用于这一步协议分析的函数。
返回的 x 是一个二元组,包含了下一步协议分析的数据长度和协议分析函数。这样,就形成了一个协议分析循环。
try:
x = self.next_func(m)
except:
self.next_len, self.next_func = 1, self.read_dead
raise
if x is None:
self.close()
return
从 x 中分解出 next_len和 next_func。
self.next_len, self.next_func = x
⑥那么BT对等协议分析的第一步是什么了?
请看初始化函数:
self.next_len = 1
self.next_func = self.read_header_len
显然,第一步协议分析是由 read_header_len() 来完成的。
在BT源码中,有多处采用了这种协议分析的处理方式。
class Encoder:
def __init__(self, connecter, raw_server, my_id, max_len,
schedulefunc, keepalive_delay, download_id,
max_initiate = 40):
self.raw_server = raw_server
self.connecter = connecter
self.my_id = my_id
self.max_len = max_len
self.schedulefunc = schedulefunc
self.keepalive_delay = keepalive_delay
self.download_id = download_id
最大发起的连接数
self.max_initiate = max_initiate
self.everinc = False
self.connections = {}
self.spares = []
schedulefunc(self.send_keepalives, keepalive_delay)
为了保持连接不因为超时而被关闭,所以可能需要随机的发送一些空消息,它的目的纯粹是为了保证连接的“活力”
def send_keepalives(self):
self.schedulefunc(self.send_keepalives, self.keepalive_delay)
for c in self.connections.s():
if c.complete:
c.send_message('')
③主动向对方发起一个连接,这个函数什么时候调用?
请看 download.py 中 Rerequester 类的初始化函数,其中传递的一个参数是 encoder.start_connection。
再看 Rerequester.py 中,Rerequester::postrequest() 的最后,
for x in peers:
self.connect((x[0], x[1]), x[2])
这里调用的 connect() 就是初始化的时候传递进来的 encoder.start_connection,也就是下面这个函数了。
也就是说,当客户端从 tracker 服务器那里获取了 peers 列表之后,就逐一向这些 peers 主动发起连接。
def start_connection(self, dns, id):
if id:
跟自己不用建立连接。
if id == self.my_id:
return
如果已经与对方建立起连接,也不再建立连接
for v in self.connections.s():
if v.id == id:
return
如果当前连接数,已经超过设定的“最大发起连接数”,那么就暂时不建立连接。
if len(self.connections) >= self.max_initiate:
如果空闲连接数还小于 “最大发起连接数”,那么把对方的 ip 先放到spares中,等以后网络稍微空闲一点的时候,再从 spares 中取出来,实际去建立连接。
if len(self.spares) < self.max_initiate and dns not in self.spares:
self.spares.append(dns)
return
try:
调用 RawServer::start_connection 与对方建立TCP连接
c = self.raw_server.start_connection(dns)
创建 Connection 对象,加入到 connections 字典中,注意,最后一个参数是 True,表示是这个连接是由本地主动发起的。这样,在 Connection 的初始化函数中,会与对方进行 BT 对等协议的握手。
self.connections[c] = Connection(self, c, id, True)
except socketerror:
pass
这个内部函数好像没有用到
def _start_connection(self, dns, id):
def foo(self=self, dns=dns, id=id):
self.start_connection(dns, id)
self.schedulefunc(foo, 0)
def got_id(self, connection):
for v in self.connections.s():
if connection is not v and connection.id == v.id:
connection.close()
return
self.connecter.connection_made(connection)
def ever_got_incoming(self):
return self.everinc
①在 RawServer 中,当从外部发起的一个TCP成功建立后,调用此函数。
这里传递进来的参数 connection 是 SingleSocket 类型
def external_connection_made(self, connection):
创建一个 Connection 对象,加入到 connections 字典中。
self.connections[connection] = Connection(self, connection, None, False)
def connection_flushed(self, connection):
c = self.connections[connection]
if c.complete:
self.connecter.connection_flushed(c)
关闭连接的时候调用此函数
def connection_lost(self, connection):
self.connections[connection].sever()
关闭一个连接之后,连接数量可能就没有达到“最大连接数”,所以如果 spares 中有一些等待建立的 ip ,现在可以取出来,主动向对方发起连接。
while len(self.connections) < self.max_initiate and self.spares:
self.start_connection(self.spares.pop(), None)
④某个连接上(无论该连接上主动建立还是被动建立的)有数据可读的时候,调用此函数。在 RawServer 中被调用。转而去调 Connection::data_came_in()。
def data_came_in(self, connection, data):
self.connections[connection].data_came_in(data)
posted on 2007-01-19 00:22
苦笑枯 阅读(344)
评论(0) 编辑 收藏 所属分类:
P2P