Previous Page
Next Page

20.3. Event-Driven Socket Programs

Socket programs, particularly servers, must often perform many tasks at once. Example 20-1 accepts a connection request, then serves a single client until that client has finishedother requests must wait. This is not acceptable for servers in production use. Clients cannot wait too long: the server must be able to service multiple clients at once.

One way to let your program perform several tasks at once is threading, covered in "Threads in Python" on page 341. Module SocketServer optionally supports threading, as covered in "The SocketServer Module" on page 528. An alternative to threading that can offer better performance and scalability is event-driven (also known as asynchronous) programming.

An event-driven program sits in an event loop and waits for events. In networking, typical events are "a client requests connection," "data arrived on a socket," and "a socket is available for writing." The program responds to each event by executing a small slice of work to service that event, then goes back to the event loop to wait for the next event. The Python library provides minimal support for event-driven network programming with the low-level select module and the higher-level asyncore and asynchat modules. Much richer support for event-driven programming is in the Twisted package (available at http://www.twistedmatrix.com), particularly in subpackage twisted.internet.

20.3.1. The select Module

The select module exposes a cross-platform, low-level function to implement asynchronous network servers and clients. Module select has additional functionality on Unix-like platforms, but I cover only cross-platform functionality in this book.

select

select(inputs,outputs,excepts,timeout=None)

inputs, outputs, and excepts are lists of socket objects that wait for input events, output events, and exceptional conditions, respectively. timeout is a float, which is the maximum time to wait in seconds. When timeout is None, there is no maximum wait: select waits until some objects receive events. When timeout is 0, select returns at once, without waiting, whether some objects have already received events or not.

select returns a tuple with three items (i,o,e). i is a list of zero or more of the items of inputs, those that received input events. o is a list of zero or more of the items of outputs, those that received output events. e is a list of zero or more of the items of excepts, those that received exceptional conditions (i.e., out-of-band data). Any or all of i, o, and e can be empty, but at least one of them is nonempty if timeout is None.

In addition to sockets, you can have in lists inputs, outputs, and excepts other objects that supply a method fileno, callable without arguments, returning a socket's file descriptor. For example, the server classes of module SocketServer, covered in "The SocketServer Module" on page 528, follow this protocol. Therefore, you can have instances of these classes in the lists. On Unix-like platforms, select.select has wider applicability, since it also accepts file descriptors that do not refer to sockets. On Windows, however, select.select accepts only file descriptors that do refer to sockets.


Example 20-6 uses module select to reimplement the server of Example 20-1 with the added ability to serve any number of clients simultaneously.

Example 20-6. Asynchronous TCP echo server using select

import socket import select sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 8881))
sock.listen(5)

# lists of sockets to watch for input and output events ins = [sock]
ous = []
# mapping socket -> data to send on that socket when feasible data = {}
# mapping socket -> (host, port) on which the client is running adrs = {}

try:
    while True:
        i, o, e = select.select(ins, ous, [])  # no excepts nor timeout
        for x in i:
            if x is sock:
                # input event on sock means a client is trying to connect
                newSocket, address = sock.accept( )
                print "Connected from", address
                ins.append(newSocket)
                adrs[newSocket] = address
            else:
                # other input events mean data arrived, or disconnections
                newdata = x.recv(8192)
                if newdata:
                    # data arrived, prepare and queue the response to it
                    print "%d bytes from %s" % (len(newdata), adrs[x])
                    data[x] = data.get(x, '') + newdata
                    if x not in ous: ous.append(x)
                else:
                    # a disconnect, give a message and clean up
                    print "disconnected from", adrs[x]
                    del adrs[x]
                    try: ous.remove(x)
                    except ValueError: pass
                    x.close( )
                    ins.remove(x)
        for x in o:
            # output events always mean we can send some data
            tosend = data.get(x)
            if tosend:
                nsent = x.send(tosend)
                print "%d bytes to %s" % (nsent, adrs[x])
                # remember data still to be sent, if any
                tosend = tosend[nsent:]
            if tosend:
                print "%d bytes remain for %s" % (len(tosend), adrs[x])
                data[x] = tosend
            else:
                try: del data[x]
                except KeyError: pass
                ous.remove(x)
                print "No data currently remain for", adrs[x]
finally:
    sock.close( )

Programming at such a low level incurs substantial complications, as shown by the complexity of Example 20-6 and its data structures. Run the server of Example 20-6 on a terminal window and try a few runs of Example 20-2. Try also telnet localhost 8881 on other terminal windows (or other platform-dependent, Telnet-like programs) to verify the behavior of longer-lived connections.

20.3.2. The asyncore and asynchat Modules

The asyncore and asynchat modules help you implement asynchronous network servers and clients at a higher, more productive level than module select affords.

20.3.2.1. The asyncore module

Module asyncore supplies one function.

loop

loop( )

Implements the asynchronous event loop, dispatching all network events to previously instantiated dispatcher objects. loop terminates when all dispatcher objects (i.e., all communication channels) are closed.

Module asyncore also supplies class dispatcher, which supplies all methods of socket objects, plus specific methods for event-driven programming, with names starting with 'handle_'. Subclass dispatcher and override the handle_ methods for all events you need to handle. To initialize an instance d of dispatcher, you can pass an argument s, which is an already connected socket object. Otherwise, first call:

d.create_socket(socket.AF_INET,socket.SOCK_STREAM)

and then call on d either connect, to connect to a server, or bind and listen, to have d itself be a server. The most frequently used methods of an instance d of a subclass X of dispatcher are the following.

create_socket

d.create_socket(family,type)

Creates d's socket with the given family and type. family is generally socket.AF_INET. type is generally socket.SOCK_STREAM, since class dispatcher normally uses a TCP (i.e., connection-based) socket.

handle_accept

d.handle_accept( )

Called when a new client has connected. Your class X normally responds by calling self.accept, then instantiating another subclass Y of dispatcher with the resulting new socket, in order to handle the new client connection.

Your implementation of handle_accept need not return the new instance of Y: all instances of subclasses of dispatcher register themselves with the asyncore framework in dispatcher._ _init_ _ so that asyncore later calls back to them.

handle_close

d.handle_close( )

Called when the connection is closing.

handle_connect

d.handle_connect( )

Called when the connection is starting.

handle_read

d.handle_read( )

Called when the socket has new data that you can read without blocking.

handle_write

d.handle_write( )

Called when the socket has some free buffer space, so you can write without blocking.

Module asyncore also supplies class dispatcher_with_send, a subclass of dispatcher that overrides one method.

send

d.send(data)

In class dispatcher_with_send, method d.send is equivalent to a socket object's method send_all in that it sends all the data. However, d.send does not send all the data at once, and does not block; rather, d sends the data in small packets of 512 bytes each in response to handle_write events (callbacks). This strategy ensures good performance in simple cases.


Example 20-7 uses module asyncore to reimplement the server of Example 20-1, with the added ability to serve any number of clients simultaneously.

Example 20-7. Asynchronous TCP echo server using asyncore

import asyncore import socket

class MainServerSocket(asyncore.dispatcher):
    def _ _init_ _(self, port):
        asyncore.dispatcher._ _init_ _(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(('',port))
        self.listen(5)
    def handle_accept(self):
        newSocket, address = self.accept( )
        print "Connected from", address
        SecondaryServerSocket(newSocket)

class SecondaryServerSocket(asyncore.dispatcher_with_send):
    def handle_read(self):
        receivedData = self.recv(8192)
        if receivedData: self.send(receivedData)
        else: self.close( )
    def handle_close(self):
        print "Disconnected from", self.getpeername( )

MainServerSocket(8881)
asyncore.loop( )

The complexity of Example 20-7 is modest when compared to Example 20-1. The additional functionality of serving multiple clients, with the high performance and scalability of asynchronous, event-driven programming, comes cheap, thanks to asyncore's power.

Note that method handle_read of SecondaryServerSocket can freely use self.send without precautions: SecondaryServerSocket subclasses dispatcher_with_send, which overrides method send appropriately. We could not do this if we had instead chosen to subclass asyncore.dispatcher directly.

20.3.2.2. The asynchat module

The asynchat module supplies class async_chat, which subclasses asyncore.dispatcher and adds methods to support buffering and line-oriented protocols. Subclass async_chat and override some methods. The most frequently used additional methods of an instance x of a subclass of async_chat are the following.

collect_incoming_data

x.collect_incoming_data(data)

Called whenever a byte string data of data arrives. Normally, x adds data to some buffer that x keeps; generally, the buffer is a list, and x adds data by calling the list's append method.

found_terminator

x.found_terminator( )

Called whenever the terminator, set by method set_terminator, is found. Normally, x processes the buffer it keeps, then clears the buffer.

push

x.push(data)

Your class normally doesn't override this method. The implementation in base class async_chat adds string data to an output buffer that it sends as appropriate. Method push is therefore quite similar to method send of class asyncore.dispatcher_with_send, but method push has a more sophisticated implementation to ensure good performance in more cases.

set_terminator

x.set_terminator(terminator)

Your class normally doesn't override this method. terminator is normally '\r\n', the line terminator specified by most Internet protocols. terminator can also be None, to disable calls to found_terminator.


Example 20-8 uses module asynchat to reimplement the server of Example 20-7, using asynchat.async_chat instead of asyncore.dispatcher_with_send. To highlight async_chat's typical use, Example 20-8 responds (by echoing the received data back to the client, like all other server examples in this chapter) only when it has received a complete line (i.e., one ending with \n).

Example 20-8. Asynchronous TCP echo server using asynchat

import asyncore, asynchat, socket

class MainServerSocket(asyncore.dispatcher):
    def _ _init_ _(self, port):
        print 'initing MSS'
        asyncore.dispatcher._ _init_ _(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(('',port))
        self.listen(5)
    def handle_accept(self):
        newSocket, address = self.accept( )
        print "Connected from", address
        SecondaryServerSocket(newSocket)

class SecondaryServerSocket(asynchat.async_chat):
    def _ _init_ _(self, *args):
        print 'initing SSS'
        asynchat.async_chat._ _init_ _(self, *args)
        self.set_terminator('\n')
        self.data = []
    def collect_incoming_data(self, data):
        self.data.append(data)
    def found_terminator(self):
        self.push(''.join(self.data))
        self.data = []
    def handle_close(self):
        print "Disconnected from", self.getpeername( )
        self.close( )

MainServerSocket(8881)
asyncore.loop( )

To try out Example 20-8, we cannot use Example 20-2 as it stands because that code does not ensure that it sends only entire lines terminated with \n. It doesn't take much to fix this, however. The following client program, for example, is quite suitable for testing Example 20-8, as well as any of the other server examples in this chapter:

import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8881))
print "Connected to server"
data = """A few lines of data to test the operation of both server and client."""
for line in data.splitlines( ):
    sock.sendall(line+'\n')
    print "Sent:", line
    response = sock.recv(8192)
    print "Received:", response sock.close( )

The only difference in this code with respect to Example 20-2 is the change to the argument in the call to sock.sendall, in the first line of the loop body. This code simply adds a line terminator '\n' to ensure it interoperates with Example 20-8.

20.3.3. The Twisted Framework

The Twisted package (available at http://www.twistedmatrix.com) is a freely available framework for network clients and servers. Twisted includes powerful, high-level components such as web servers, user authentication systems, mail servers and clients, instant messaging, SSH clients and servers, a DNS server and client, and so on, as well as the lower-level infrastructure on which all these high-level components are built. Each component is highly scalable and easily customizable, and all are integrated to interoperate smoothly. It's a tribute to the power of Python and to the ingenuity of Twisted's developers that so much can be accomplished within two megabytes' worth of download.

20.3.3.1. The twisted.internet and twisted.protocols packages

"Twisted Core" is the low-level part of Twisted that supports event-driven clients and servers, centered on modules twisted.internet and twisted.protocols. twisted.protocols supplies protocol handlers and factories. twisted.internet supplies object reactor, embodying the concept of an event loop. To make optimal use of Twisted Core, you need a good understanding of the design patterns used in distributed computing. Douglas Schmidt, of the Center for Distributed Object Computing of Washington University, documents such design patterns at http://www.cs.wustl.edu/~schmidt/patterns-ace.html.

twisted.protocols supplies many protocols using twisted.internet's infrastructure, including FTP, HTTP, Finger, GPS-NMEA, IRC, Jabber, NNTP, POP3, IMAP4, SMTP, SIP, SocksV4, and Telnet.

20.3.3.2. Reactors

A reactor object lets you establish protocol factories as listeners (servers) on given TCP/IP ports (or other transports, such as SSL) and connect protocol handlers as clients. You can choose different reactor implementations, depending on which module you import (the reactor is instantiated at the time you import a reactor module for the first time in your program's run: no need to call any factory). The default reactor uses the select module covered in "The select Module" on page 533. Other specialized reactors integrate with many GUI toolkits' event loops, or use platform-specific techniques such as the Windows event loop, the poll system call support available in the select module on some Unix-like systems, and even more specialized system calls such as FreeBSD's kqueue. The default reactor is often sufficient, but the extra flexibility of being able to use other implementations can help you integrate GUIs or other platform-specific capabilities, or achieve even higher performance and scalability.

A reactor object r implements many interfaces, each of which supplies many methods. The reactor methods most frequently used for programs that implement TCP/IP clients and servers with twisted.internet are the following.

callLater

r.callLater(delay,callable,*args,**kwds)

Schedules a call to callable(*args,**kwds) to happen delay seconds from now. delay is a float, so it can also express fractions of a second.

callInThread

r.callInThread(callable,*args,**kwds)

Calls callable(*args,**kwds) in a worker thread separate from the reactor's.

callFromThread

r.callFromThread(callable,*args,**kwds)

Calls callable(*args,**kwds) in the reactor's thread. r.callFrom-Thread must be called only from a separate thread and, per se, does no synchronization.

callWhenRun-ning

r.callWhenRunning(callable,*args,**kwds)

Schedules a call to callable(*args,**kwds) to happen when r is running.

connectTCP

r.listenTCP(host,port,factory,timeout=30,bindAddress=None)

Establishes factory, which must be an instance of class ClientFactory (or any subclass of ClientFactory), as the protocol handler factory for a TCP client that connects to the given host and port. If no connection takes place within timeout seconds, the connection attempt is deemed to have failed. When bindAddress is not None, it is a tuple with two items, (clienthost,clientport); the client locally binds to that local host and port.

listenTCP

r.listenTCP(port,factory,backlog=50,interface='')

Establishes factory, which must be an instance of class ServerFactory (or any subclass of ServerFactory), as the protocol handler factory for a TCP server that listens on the given port. No more than backlog clients can be kept queued waiting for connection at any given time. When interface is not '', binds to hostname interface.

run

r.run( )

Runs the reactor's event loop until r.stop( ) is called.

stop

r.stop( )

Stops the reactor's event loop that was started by calling r.run( ).


20.3.3.3. Transports

A transport object embodies a network connection. Each protocol object calls methods on self.transport to write data to its counterpart and to disconnect. A transport object t supplies the following methods.

getHost

t.getHost( )

Returns an object a that identifies this side of the connection. a's attributes are type (a string such as 'TCP' or 'UDP'), host (a string with a dotted-quad IP address), and port (an integer that identifies the port number).

getPeer

t.getPeer( )

Returns an object that identifies the other side of the connection (easily confused by proxies, masquerading, NATting, firewalls, and so on), and is of the same type as getHost's result.

loseConnection

t.loseConnection( )

Tells t to disconnect as soon as t has finished writing all pending data.

write

t.write(data)

Transmits string data to the counterpart or queues it for transmission. t TRies its best to ensure that all data you pass to write is eventually sent.

writeSequence

t.writeSequence(seq)

Transmits each string item data of iterable seq to the counterpart or queues it for eventual transmission. t tries its best to ensure that all data you pass to writeSequence is eventually sent.

Specific transports add some methods to this small set; for example, a TCP transport also has methods to let you set and get attributes SO_KEEPALIVE and TCP_NODELAY, and an SSL transport, in addition to those, also supplies a further method to let you get the certificate information for the peer (the other side of the connection).


20.3.3.4. Protocol handlers and factories

The reactor instantiates protocol handlers using a factory and calls methods on protocol handler instances when events occur. A protocol handler subclasses class Protocol and overrides some methods. A protocol handler may use its factory, available as self.factory, as a repository for state that needs to be shared among handlers or persist across multiple instantiations. A protocol factory may subclass class Factory, but this subclassing is often not necessary since in most cases the stock Factory supplies all you need. Just set the protocol attribute of a Factory instance f to a class object that is an appropriate subclass of Protocol, then pass f to the reactor.

An instance p of a subclass of Protocol supplies the following methods.

connectionLost

p.connectionLost(reason)

Called when the connection to the counterpart has been closed. Argument reason is an object that explains why the connection has been closed. reason is not an instance of a Python exception, but has an attribute reason.value that often is such an instance. You can use str(reason) to get an explanation string, including a brief traceback, or str(reason.value) to get just the explanation string without any traceback.

connection-Made

p.connectionMade( )

Called when the connection to the counterpart has just succeeded.

dataReceived

p.dataReceived(data)

Called when string data has just been received from the counterpart.

makeConnec-tion

p.makeConnection(transport)

Initiates a connection with the given transport, and calls p.connec-tionMade when the connection attempt succeeds or p.connection-Lost when the attempt fails.


20.3.3.5. Echo server using Twisted

Example 20-9 uses twisted.internet to implement an echo server with the ability to serve any number of clients simultaneously.

Example 20-9. Asynchronous TCP echo server using twisted

from twisted.internet import protocol, reactor

class EchoProtocol(protocol.Protocol):
    def connectionMade(self):
        p = self.transport.getPeer( )
        self.peer = '%s:%s' % (p.host, p.port)
        print "Connected from", self.peer
    def dataReceived(self, data):
        self.transport.write(data)
    def connectionLost(self, reason):
        print "Disconnected from %s: %s" % (self.peer, reason.value)

factory = protocol.Factory( )
factory.protocol = EchoProtocol

reactor.listenTCP(8881, factory)
def hello( ): print 'Listening on port', 8881
reactor.callWhenRunning(hello)
reactor.run( )

Example 20-9 exhibits scalability at least as good as Example 20-7, yet it's easily the simplest of the echo server examples in this chaptera good indication of Twisted's power and simplicity, even when used for such low-level tasks. Note the statement:

factory.protocol = EchoProtocol

This binds the class object EchoProtocol as the attribute protocol of object factory. The righthand side of the assignment must not be EchoProtocol( ), with parentheses after the class name. Such a righthand side would call, and therefore instantiate, class EchoProtocol, and therefore the statement would bind to factory.protocol a protocol instance object rather than a protocol class object. Such a mistake would make the server fail pretty quickly.

For a task-oriented book that shows how to implement a variety of tasks with Twisted at both low and high levels, check out Twisted Network Programming Essentials, by Abe Fettig (O'Reilly).


Previous Page
Next Page