Viewing file: asynchat.py (10.5 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- Mode: Python; tab-width: 4 -*- # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing <rushing@nightmare.com>
# ====================================================================== # Copyright 1996 by Sam Rushing # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of Sam # Rushing not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ======================================================================
r"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side sends a 'command', and the other sends a response (examples would be the common internet protocols - smtp, nntp, ftp, etc..).
The handle_read() method looks at the input stream for the current 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' for multi-line output), calling self.found_terminator() on its receipt.
for example: Say you build an async nntp client using this class. At the start of the connection, you'll have self.terminator set to '\r\n', in order to process the single-line greeting. Just before issuing a 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST command will be accumulated (using your own 'collect_incoming_data' method) up to the terminator, and then control will be returned to you - by calling your self.found_terminator() method. """
import socket import asyncore from collections import deque
class async_chat (asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
ac_in_buffer_size = 4096 ac_out_buffer_size = 4096
def __init__ (self, conn=None): self.ac_in_buffer = '' self.ac_out_buffer = '' self.producer_fifo = fifo() asyncore.dispatcher.__init__ (self, conn)
def collect_incoming_data(self, data): raise NotImplementedError, "must be implemented in subclass"
def found_terminator(self): raise NotImplementedError, "must be implemented in subclass"
def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, an integer, or None" self.terminator = term
def get_terminator (self): return self.terminator
# grab some more data from the socket, # throw it to the collector method, # check for the terminator, # if found, transition to the next state.
def handle_read (self):
try: data = self.recv (self.ac_in_buffer_size) except socket.error, why: self.handle_error() return
self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator # combos with a single recv(1024).
while self.ac_in_buffer: lb = len(self.ac_in_buffer) terminator = self.get_terminator() if terminator is None or terminator == '': # no terminator, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' elif isinstance(terminator, int): # numeric terminator n = terminator if lb < n: self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' self.terminator = self.terminator - lb else: self.collect_incoming_data (self.ac_in_buffer[:n]) self.ac_in_buffer = self.ac_in_buffer[n:] self.terminator = 0 self.found_terminator() else: # 3 cases: # 1) end of buffer matches terminator exactly: # collect data, transition # 2) end of buffer matches some prefix: # collect data to the prefix # 3) end of buffer does not match any prefix: # collect data terminator_len = len(terminator) index = self.ac_in_buffer.find(terminator) if index != -1: # we found the terminator if index > 0: # don't bother reporting the empty string (source of subtle bugs) self.collect_incoming_data (self.ac_in_buffer[:index]) self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] # This does the Right Thing if the terminator is changed here. self.found_terminator() else: # check for a prefix of the terminator index = find_prefix_at_end (self.ac_in_buffer, terminator) if index: if index != lb: # we found a prefix, collect up to the prefix self.collect_incoming_data (self.ac_in_buffer[:-index]) self.ac_in_buffer = self.ac_in_buffer[-index:] break else: # no prefix, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = ''
def handle_write (self): self.initiate_send ()
def handle_close (self): self.close()
def push (self, data): self.producer_fifo.push (simple_producer (data)) self.initiate_send()
def push_with_producer (self, producer): self.producer_fifo.push (producer) self.initiate_send()
def readable (self): "predicate for inclusion in the readable for select()" return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
def writable (self): "predicate for inclusion in the writable for select()" # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) # this is about twice as fast, though not as clear. return not ( (self.ac_out_buffer == '') and self.producer_fifo.is_empty() and self.connected )
def close_when_done (self): "automatically close this channel once the outgoing queue is empty" self.producer_fifo.push (None)
# refill the outgoing buffer by calling the more() method # of the first producer in the queue def refill_buffer (self): while 1: if len(self.producer_fifo): p = self.producer_fifo.first() # a 'None' in the producer fifo is a sentinel, # telling us to close the channel. if p is None: if not self.ac_out_buffer: self.producer_fifo.pop() self.close() return elif isinstance(p, str): self.producer_fifo.pop() self.ac_out_buffer = self.ac_out_buffer + p return data = p.more() if data: self.ac_out_buffer = self.ac_out_buffer + data return else: self.producer_fifo.pop() else: return
def initiate_send (self): obs = self.ac_out_buffer_size # try to refill the buffer if (len (self.ac_out_buffer) < obs): self.refill_buffer()
if self.ac_out_buffer and self.connected: # try to send the buffer try: num_sent = self.send (self.ac_out_buffer[:obs]) if num_sent: self.ac_out_buffer = self.ac_out_buffer[num_sent:]
except socket.error, why: self.handle_error() return
def discard_buffers (self): # Emergencies only! self.ac_in_buffer = '' self.ac_out_buffer = '' while self.producer_fifo: self.producer_fifo.pop()
class simple_producer:
def __init__ (self, data, buffer_size=512): self.data = data self.buffer_size = buffer_size
def more (self): if len (self.data) > self.buffer_size: result = self.data[:self.buffer_size] self.data = self.data[self.buffer_size:] return result else: result = self.data self.data = '' return result
class fifo: def __init__ (self, list=None): if not list: self.list = deque() else: self.list = deque(list)
def __len__ (self): return len(self.list)
def is_empty (self): return not self.list
def first (self): return self.list[0]
def push (self, data): self.list.append(data)
def pop (self): if self.list: return (1, self.list.popleft()) else: return (0, None)
# Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. # for example: # f_p_a_e ("qwerty\r", "\r\n") => 1 # f_p_a_e ("qwertydkjf", "\r\n") => 0 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
# this could maybe be made faster with a computed regex? # [answer: no; circa Python-2.0, Jan 2001] # new python: 28961/s # old python: 18307/s # re: 12820/s # regex: 14035/s
def find_prefix_at_end (haystack, needle): l = len(needle) - 1 while l and not haystack.endswith(needle[:l]): l -= 1 return l
|