2008年7月22日

引自:http://www.nightmare.com/medusa/programming.html

Introduction

Why Asynchronous?

There are only two ways to have a program on a single processor do 'more than one thing at a time'. Multi-threaded programming is the simplest and most popular way to do it, but there is another very different technique, that lets you have nearly all the advantages of multi-threading, without actually using multiple threads. It's really only practical if your program is I/O bound (I/O is the principle bottleneck). If your program is CPU bound, then pre-emptive scheduled threads are probably what you really need. Network servers are rarely CPU-bound, however.

If your operating system supports the select() system call in its I/O library (and nearly all do), then you can use it to juggle multiple communication channels at once; doing other work while your I/O is taking place in the "background". Although this strategy can seem strange and complex (especially at first), it is in many ways easier to understand and control than multi-threaded programming. The library documented here solves many of the difficult problems for you, making the task of building sophisticated high-performance network servers and clients a snap.

Select-based multiplexing in the real world

Several well-known Web servers (and other programs) are written using exactly this technique: the thttpd and Zeus, and Squid Internet Object Cache servers are excellent examples.. The InterNet News server (INN) used this technique for several years before the web exploded.

An interesting web server comparison chart is available at the thttpd web site

Variations on a Theme: poll() and WaitForMultipleObjects

Of similar (but better) design is the poll() system call. The main advantage of poll() (for our purposes) is that it does not used fixed-size file-descriptor tables, and is thus more easily scalable than select(). poll() is only recently becoming widely available, so you need to check for availability on your particular operating system.

In the Windows world, the Win32 API provides a bewildering array of features for multiplexing. Although slightly different in semantics, the combination of Event objects and the WaitForMultipleObjects() interface gives essentially the same power as select() on Unix. A version of this library specific to Win32 has not been written yet, mostly because Win32 also provides select() (at least for sockets). If such an interface were written, it would have the advantage of allowing us to multiplex on other objects types, like named pipes and files.

select()

Here's what select() does: you pass in a set of file descriptors, in effect asking the operating system, "let me know when anything happens to any of these descriptors". (A descriptor is simply a numeric handle used by the operating system to keep track of a file, socket, pipe, or other I/O object. It is usually an index into a system table of some kind). You can also use a timeout, so that if nothing happens in the allotted period, select() will return control to your program.

select() takes three fd_set arguments; one for each of the following possible states/events: readability, writability, and exceptional conditions. The last set is less useful than it sounds; in the context of TCP/IP it refers to the presence of out-of-band (OOB) data. OOB is a relatively unportable and poorly used feature that you can (and should) ignore unless you really need it.

So that leaves only two types of events to build our programs around; read events and write events. As it turns out, this is actually enough to get by with, because other types of events can be implied by the sequencing of these two. It also keeps the low-level interface as simple as possible - always a good thing in my book.

The polling loop

Now that you know what select() does, you're ready for the final piece of the puzzle: the main polling loop. This is nothing more than a simple while loop that continually calls select() with a timeout (I usually use a 30-second timeout). Such a program will use virtually no CPU if your server is idle; it spends most of its time letting the operating system do the waiting for it. This is much more efficient than a busy-wait loop.

Here is a pseudo-code example of a polling loop:

while (any_descriptors_left):

events = select (descriptors, timeout)

for event in events:

handle_event (event)

If you take a look at the code used by the library, it looks very similar to this. (see the file asyncore.py, the functions poll() and loop()). Now, on to the magic that must take place to handle the events...

The Code

Blocking vs. Non-Blocking

File descriptors can be in either blocking or non-blocking mode. A descriptor in blocking mode will stop (or 'block') your entire program until the requested event takes place. For example, if you ask to read 64 bytes from a descriptor attached to a socket which is ultimately connected to a modem deep in the backwaters of the Internet, you may wait a while for those 64 bytes.

If you put the descriptor in non-blocking mode, then one of two things might happen: if the data is sitting in a local buffer, it will be returned to you immediately; otherwise you will get back a code (usually EWOULDBLOCK) telling you that the read is in progress, and you should check back later to see if it's done.

sockets vs. other kinds of descriptors

Although most of our discussion will be about TCP/IP sockets, on Unix you can use select() to multiplex other kinds of communications objects, like pipes and ttys. (Unfortunately, select() cannot be used to do non-blocking file I/O. Please correct me if you have information to the contrary!)

The socket_map

We use a global dictionary (asyncore.socket_map) to keep track of all the active socket objects. The keys for this dictionary are the objects themselves. Nothing is stored in the value slot. Each time through the loop, this dictionary is scanned. Each object is asked which fd_sets it wants to be in. These sets are then passed on to select().

asyncore.dispatcher

The first class we'll introduce you to is the dispatcher class. This is a thin wrapper around a low-level socket object. We have attached a few methods for event-handling to it. Otherwise, it can be treated as a normal non-blocking socket object.

The direct interface between the select loop and the socket object are the handle_read_event and handle_write_event methods. These are called whenever an object 'fires' that event.

The firing of these low-level events can tell us whether certain higher-level events have taken place, depending on the timing and state of the connection. For example, if we have asked for a socket to connect to another host, we know that the connection has been made when the socket fires a write event (at this point you know that you may write to it with the expectation of success).
The implied events are

  • handle_connect.
    implied by a write event.
  • handle_close
    implied by a read event with no data available.
  • handle_accept
    implied by a read event on a listening socket.

Thus, the set of user-level events is a little larger than simply readable and writeable. The full set of events your code may handle are:

  • handle_read
  • handle_write
  • handle_expt (OOB data)
  • handle_connect
  • handle_close
  • handle_accept

A quick terminology note: In order to distinguish between low-level socket objects and those based on the async library classes, I call these higher-level objects channels.

Enough Gibberish, let's write some code

Ok, that's enough abstract talk. Let's do something useful and concrete with this stuff. We'll write a simple HTTP client that demonstrates how easy it is to build a powerful tool in only a few lines of code.

# -*- Mode: Python; tab-width: 4 -*-



import asyncore

import socket

import string



class http_client (asyncore.dispatcher):



def __init__ (self, host, path):

asyncore.dispatcher.__init__ (self)

self.path = path

self.create_socket (socket.AF_INET, socket.SOCK_STREAM)

self.connect ((host, 80))



def handle_connect (self):

self.send ('GET %s HTTP/1.0\r\n\r\n' % self.path)



def handle_read (self):

data = self.recv (8192)

print data



def handle_write (self):

pass



if __name__ == '__main__':

import sys

import urlparse

for url in sys.argv[1:]:

parts = urlparse.urlparse (url)

if parts[0] != 'http':

raise ValueError, "HTTP URL's only, please"

else:

host = parts[1]

path = parts[2]

http_client (host, path)

asyncore.loop()



HTTP is (in theory, at least) a very simple protocol. You connect to the web server, send the string "GET /some/path HTTP/1.0", and the server will send a short header, followed by the file you asked for. It will then close the connection.

We have defined a single new class, http_client, derived from the abstract class asyncore.dispatcher. There are three event handlers defined.

  • handle_connect
    Once we have made the connection, we send the request string.
  • handle_read
    As the server sends data back to us, we simply print it out.
  • handle_write
    Ignore this for the moment, I'm brushing over a technical detail we'll clean up in a moment.

Go ahead and run this demo - giving a single URL as an argument, like this:

$ python asynhttp.py http://www.nightmare.com/

You should see something like this:

[rushing@gnome demo]$ python asynhttp.py http://www.nightmare.com/

log: adding channel <http_client at 80ef3e8>

HTTP/1.0 200 OK

Server: Medusa/3.19

Content-Type: text/html

Content-Length: 1649

Last-Modified: Sun, 26 Jul 1998 23:57:51 GMT

Date: Sat, 16 Jan 1999 13:04:30 GMT



[... body of the file ...]



log: unhandled close event

log: closing channel 4:<http_client connected at 80ef3e8>



The 'log' messages are there to help, they are useful when debugging but you will want to disable them later. The first log message tells you that a new http_client object has been added to the socket map. At the end, you'll notice there's a warning that you haven't bothered to handle the close event. No big deal, for now.

Now at this point we haven't seen anything revolutionary, but that's because we've only looked at one URL. Go ahead and add a few other URL's to the argument list; as many as you like - and make sure they're on different hosts...

Now you begin to see why select() is so powerful. Depending on your operating system (and its configuration), select() can be fed hundreds, or even thousands of descriptors like this. (I've recently tested select() on a FreeBSD box with over 10,000 descriptors).

A really good way to understand select() is to put a print statement into the asyncore.poll() function:

        [...]

(r,w,e) = select.select (r,w,e, timeout)

print '---'

print 'read', r

print 'write', w

[...]

Each time through the loop you will see which channels have fired which events. If you haven't skipped ahead, you'll also notice a pointless barrage of events, with all your http_client objects in the 'writable' set. This is because we were a bit lazy earlier; sweeping some ugliness under the rug. Let's fix that now.

Buffered Output

In our handle_connect, we cheated a bit by calling send without examining its return code. In truth, since we are using a non-blocking socket, it's (theoretically) possible that our data didn't get sent. To do this correctly, we actually need to set up a buffer of outgoing data, and then send as much of the buffer as we can whenever we see a write event:

class http_client (asyncore.dispatcher):



def __init__ (self, host, path):

asyncore.dispatcher.__init__ (self)

self.path = path

self.create_socket (socket.AF_INET, socket.SOCK_STREAM)

self.connect ((host, 80))

self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % self.path



def handle_connect (self):

pass



def handle_read (self):

data = self.recv (8192)

print data



def writable (self):

return (len(self.buffer) > 0)



def handle_write (self):

sent = self.send (self.buffer)

self.buffer = self.buffer[sent:]

The handle_connect method no longer assumes it can send its request string successfully. We move its work over to handle_write; which trims self.buffer as pieces of it are sent succesfully.

We also introduce the writable method. Each time through the loop, the set of sockets is scanned, the readable and writable methods of each object are called to see if are interested in those events. The default methods simply return 1, indicating that by default all channels will be in both sets. In this case, however, we are only interested in writing as long as we have something to write. So we override this method, making its behavior dependent on the length of self.buffer.

If you try the client now (with the print statements in asyncore.poll()), you'll see that select is firing more efficiently.

asynchat.py

The dispatcher class is useful, but somewhat limited in capability. As you might guess, managing input and output buffers manually can get complex, especially if you're working with a protocol more complicated than HTTP.

The async_chat class does a lot of the heavy lifting for you. It automatically handles the buffering of both input and output, and provides a "line terminator" facility that partitions an input stream into logical lines for you. It is also carefully designed to support pipelining - a nice feature that we'll explain later.

There are four new methods to introduce:

  • set_terminator (self, <eol-string>)
    Set the string used to identify end-of-line. For most Internet protocols, this is the string \r\n, that is; a carriage return followed by a line feed. To turn off input scanning, use None
  • collect_incoming_data (self, data)
    Called whenever data is available from a socket. Usually, your implementation will accumulate this data into a buffer of some kind.
  • found_terminator (self)
    Called whenever an end-of-line marker has been seen. Typically your code will process and clear the input buffer.
  • push (data)
    This is a buffered version of send. It will place the data in an outgoing buffer.

These methods build on the underlying capabilities of dispatcher by providing implementations of handle_read handle_write, etc... handle_read collects data into an input buffer, which is continually scanned for the terminator string. Data in between terminators is feed to your collect_incoming_data method.

The implementation of handle_write and writable examine an outgoing-data queue, and automatically send data whenever possible.

A Proxy Server

In order to demonstrate the async_chat class, we will put together a simple proxy server. A proxy server combines a server and a client together, in effect sitting between the real server and client. You can use this to monitor or debug protocol traffic.
# -*- Mode: Python; tab-width: 4 -*-



import asynchat

import asyncore

import socket

import string



class proxy_server (asyncore.dispatcher):



def __init__ (self, host, port):

asyncore.dispatcher.__init__ (self)

self.create_socket (socket.AF_INET, socket.SOCK_STREAM)

self.set_reuse_addr()

self.there = (host, port)

here = ('', port + 8000)

self.bind (here)

self.listen (5)



def handle_accept (self):

proxy_receiver (self, self.accept())



class proxy_sender (asynchat.async_chat):



def __init__ (self, receiver, address):

asynchat.async_chat.__init__ (self)

self.receiver = receiver

self.set_terminator (None)

self.create_socket (socket.AF_INET, socket.SOCK_STREAM)

self.buffer = ''

self.set_terminator ('\n')

self.connect (address)



def handle_connect (self):

print 'Connected'



def collect_incoming_data (self, data):

self.buffer = self.buffer + data



def found_terminator (self):

data = self.buffer

self.buffer = ''

print '==> (%d) %s' % (self.id, repr(data))

self.receiver.push (data + '\n')



def handle_close (self):

self.receiver.close()

self.close()



class proxy_receiver (asynchat.async_chat):



channel_counter = 0



def __init__ (self, server, (conn, addr)):

asynchat.async_chat.__init__ (self, conn)

self.set_terminator ('\n')

self.server = server

self.id = self.channel_counter

self.channel_counter = self.channel_counter + 1

self.sender = proxy_sender (self, server.there)

self.sender.id = self.id

self.buffer = ''



def collect_incoming_data (self, data):

self.buffer = self.buffer + data



def found_terminator (self):

data = self.buffer

self.buffer = ''

print '<== (%d) %s' % (self.id, repr(data))

self.sender.push (data + '\n')



def handle_close (self):

print 'Closing'

self.sender.close()

self.close()



if __name__ == '__main__':

import sys

import string

if len(sys.argv) < 3:

print 'Usage: %s <server-host> <server-port>' % sys.argv[0]

else:

ps = proxy_server (sys.argv[1], string.atoi (sys.argv[2]))

asyncore.loop()



To try out the proxy, find a server (any SMTP, NNTP, or HTTP server should do fine), and give its hostname and port as arguments:

python proxy.py localhost 25

The proxy server will start up its server on port n + 8000, in this case port 8025. Now, use a telnet program to connect to that port on your server host. Issue a few commands. See how the whole session is being echoed by your proxy server. Try opening up several simultaneous connections through your proxy. You might also try pointing a real client (a news reader [port 119] or web browser [port 80]) at your proxy.

Pipelining

Pipelining refers to a protocol capability. Normally, a conversation with a server has a back-and-forth quality to it. The client sends a command, and waits for the response. If a client needs to send many commands over a high-latency connection, waiting for each response can take a long time.

For example, when sending a mail message to many recipients with SMTP, the client will send a series of RCPT commands, one for each recipient. For each of these commands, the server will send back a reply indicating whether the mailbox specified is valid. If you want to send a message to several hundred recipients, this can be rather tedious if the round-trip time for each command is long. You'd like to be able to send a bunch of RCPT commands in one batch, and then count off the responses to them as they come.

I have a favorite visual when explaining the advantages of pipelining. Imagine each request to the server is a boxcar on a train. The client is in Los Angeles, and the server is in New York. Pipelining lets you hook all your cars in one long chain; send them to New York, where they are filled and sent back to you. Without pipelining you have to send one car at a time.

Not all protocols allow pipelining. Not all servers support it; Sendmail, for example, does not support pipelining because it tends to fork unpredictably, leaving buffered data in a questionable state. A recent extension to the SMTP protocol allows a server to specify whether it supports pipelining. HTTP/1.1 explicitly requires that a server support pipelining.

Servers built on top of async_chat automatically support pipelining. It is even possible to change the terminator repeatedly when processing data already in the input buffer. See the handle_read method if you're interested in the gory details.

Producers

async_chat supports a sophisticated output buffering model, using a queue of data-producing objects. For most purposes, you will use the push() method to send string data - but for more sophisticated usage you can push a producer

A producer is a very simple object, requiring only a single method in its implementation, more(). See the code for simple_producer in asynchat.py for an example. Many more examples are available in the Medusa distribution, in the file producers.py

posted @ 2008-07-22 16:02 piggillow 阅读(210) | 评论 (0)编辑 收藏


posts - 2, comments - 0, trackbacks - 0, articles - 0

Copyright © piggillow