asyncore.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. # -*- Mode: Python -*-
  2. # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
  3. # Author: Sam Rushing <[email protected]>
  4. # ======================================================================
  5. # Copyright 1996 by Sam Rushing
  6. #
  7. # All Rights Reserved
  8. #
  9. # Permission to use, copy, modify, and distribute this software and
  10. # its documentation for any purpose and without fee is hereby
  11. # granted, provided that the above copyright notice appear in all
  12. # copies and that both that copyright notice and this permission
  13. # notice appear in supporting documentation, and that the name of Sam
  14. # Rushing not be used in advertising or publicity pertaining to
  15. # distribution of the software without specific, written prior
  16. # permission.
  17. #
  18. # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
  19. # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
  20. # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
  21. # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
  22. # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  23. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
  24. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  25. # ======================================================================
  26. """Basic infrastructure for asynchronous socket service clients and servers.
  27. There are only two ways to have a program on a single processor do "more
  28. than one thing at a time". Multi-threaded programming is the simplest and
  29. most popular way to do it, but there is another very different technique,
  30. that lets you have nearly all the advantages of multi-threading, without
  31. actually using multiple threads. it's really only practical if your program
  32. is largely I/O bound. If your program is CPU bound, then pre-emptive
  33. scheduled threads are probably what you really need. Network servers are
  34. rarely CPU-bound, however.
  35. If your operating system supports the select() system call in its I/O
  36. library (and nearly all do), then you can use it to juggle multiple
  37. communication channels at once; doing other work while your I/O is taking
  38. place in the "background." Although this strategy can seem strange and
  39. complex, especially at first, it is in many ways easier to understand and
  40. control than multi-threaded programming. The module documented here solves
  41. many of the difficult problems for you, making the task of building
  42. sophisticated high-performance network servers and clients a snap.
  43. """
  44. import exceptions
  45. import select
  46. import socket
  47. import sys
  48. import time
  49. import os
  50. from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
  51. ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode
  52. try:
  53. socket_map
  54. except NameError:
  55. socket_map = {}
  56. class ExitNow(exceptions.Exception):
  57. pass
  58. def read(obj):
  59. try:
  60. obj.handle_read_event()
  61. except ExitNow:
  62. raise
  63. except:
  64. obj.handle_error()
  65. def write(obj):
  66. try:
  67. obj.handle_write_event()
  68. except ExitNow:
  69. raise
  70. except:
  71. obj.handle_error()
  72. def _exception (obj):
  73. try:
  74. obj.handle_expt_event()
  75. except ExitNow:
  76. raise
  77. except:
  78. obj.handle_error()
  79. def readwrite(obj, flags):
  80. try:
  81. if flags & (select.POLLIN | select.POLLPRI):
  82. obj.handle_read_event()
  83. if flags & select.POLLOUT:
  84. obj.handle_write_event()
  85. if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
  86. obj.handle_expt_event()
  87. except ExitNow:
  88. raise
  89. except:
  90. obj.handle_error()
  91. def poll(timeout=0.0, map=None):
  92. if map is None:
  93. map = socket_map
  94. if map:
  95. r = []; w = []; e = []
  96. for fd, obj in map.items():
  97. is_r = obj.readable()
  98. is_w = obj.writable()
  99. if is_r:
  100. r.append(fd)
  101. if is_w:
  102. w.append(fd)
  103. if is_r or is_w:
  104. e.append(fd)
  105. if [] == r == w == e:
  106. time.sleep(timeout)
  107. else:
  108. try:
  109. r, w, e = select.select(r, w, e, timeout)
  110. except select.error, err:
  111. if err[0] != EINTR:
  112. raise
  113. else:
  114. return
  115. for fd in r:
  116. obj = map.get(fd)
  117. if obj is None:
  118. continue
  119. read(obj)
  120. for fd in w:
  121. obj = map.get(fd)
  122. if obj is None:
  123. continue
  124. write(obj)
  125. for fd in e:
  126. obj = map.get(fd)
  127. if obj is None:
  128. continue
  129. _exception(obj)
  130. def poll2(timeout=0.0, map=None):
  131. # Use the poll() support added to the select module in Python 2.0
  132. if map is None:
  133. map = socket_map
  134. if timeout is not None:
  135. # timeout is in milliseconds
  136. timeout = int(timeout*1000)
  137. pollster = select.poll()
  138. if map:
  139. for fd, obj in map.items():
  140. flags = 0
  141. if obj.readable():
  142. flags |= select.POLLIN | select.POLLPRI
  143. if obj.writable():
  144. flags |= select.POLLOUT
  145. if flags:
  146. # Only check for exceptions if object was either readable
  147. # or writable.
  148. flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
  149. pollster.register(fd, flags)
  150. try:
  151. r = pollster.poll(timeout)
  152. except select.error, err:
  153. if err[0] != EINTR:
  154. raise
  155. r = []
  156. for fd, flags in r:
  157. obj = map.get(fd)
  158. if obj is None:
  159. continue
  160. readwrite(obj, flags)
  161. poll3 = poll2 # Alias for backward compatibility
  162. def loop(timeout=30.0, use_poll=False, map=None, count=None):
  163. if map is None:
  164. map = socket_map
  165. if use_poll and hasattr(select, 'poll'):
  166. poll_fun = poll2
  167. else:
  168. poll_fun = poll
  169. if count is None:
  170. while map:
  171. poll_fun(timeout, map)
  172. else:
  173. while map and count > 0:
  174. poll_fun(timeout, map)
  175. count = count - 1
  176. class dispatcher:
  177. debug = False
  178. connected = False
  179. accepting = False
  180. closing = False
  181. addr = None
  182. def __init__(self, sock=None, map=None):
  183. if map is None:
  184. self._map = socket_map
  185. else:
  186. self._map = map
  187. if sock:
  188. self.set_socket(sock, map)
  189. # I think it should inherit this anyway
  190. self.socket.setblocking(0)
  191. self.connected = True
  192. # XXX Does the constructor require that the socket passed
  193. # be connected?
  194. try:
  195. self.addr = sock.getpeername()
  196. except socket.error:
  197. # The addr isn't crucial
  198. pass
  199. else:
  200. self.socket = None
  201. def __repr__(self):
  202. status = [self.__class__.__module__+"."+self.__class__.__name__]
  203. if self.accepting and self.addr:
  204. status.append('listening')
  205. elif self.connected:
  206. status.append('connected')
  207. if self.addr is not None:
  208. try:
  209. status.append('%s:%d' % self.addr)
  210. except TypeError:
  211. status.append(repr(self.addr))
  212. return '<%s at %#x>' % (' '.join(status), id(self))
  213. def add_channel(self, map=None):
  214. #self.log_info('adding channel %s' % self)
  215. if map is None:
  216. map = self._map
  217. map[self._fileno] = self
  218. def del_channel(self, map=None):
  219. fd = self._fileno
  220. if map is None:
  221. map = self._map
  222. if map.has_key(fd):
  223. #self.log_info('closing channel %d:%s' % (fd, self))
  224. del map[fd]
  225. self._fileno = None
  226. def create_socket(self, family, type):
  227. self.family_and_type = family, type
  228. self.socket = socket.socket(family, type)
  229. self.socket.setblocking(0)
  230. self._fileno = self.socket.fileno()
  231. self.add_channel()
  232. def set_socket(self, sock, map=None):
  233. self.socket = sock
  234. ## self.__dict__['socket'] = sock
  235. self._fileno = sock.fileno()
  236. self.add_channel(map)
  237. def set_reuse_addr(self):
  238. # try to re-use a server port if possible
  239. try:
  240. self.socket.setsockopt(
  241. socket.SOL_SOCKET, socket.SO_REUSEADDR,
  242. self.socket.getsockopt(socket.SOL_SOCKET,
  243. socket.SO_REUSEADDR) | 1
  244. )
  245. except socket.error:
  246. pass
  247. # ==================================================
  248. # predicates for select()
  249. # these are used as filters for the lists of sockets
  250. # to pass to select().
  251. # ==================================================
  252. def readable(self):
  253. return True
  254. def writable(self):
  255. return True
  256. # ==================================================
  257. # socket object methods.
  258. # ==================================================
  259. def listen(self, num):
  260. self.accepting = True
  261. if os.name == 'nt' and num > 5:
  262. num = 1
  263. return self.socket.listen(num)
  264. def bind(self, addr):
  265. self.addr = addr
  266. return self.socket.bind(addr)
  267. def connect(self, address):
  268. self.connected = False
  269. err = self.socket.connect_ex(address)
  270. # XXX Should interpret Winsock return values
  271. if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
  272. return
  273. if err in (0, EISCONN):
  274. self.addr = address
  275. self.connected = True
  276. self.handle_connect()
  277. else:
  278. raise socket.error, (err, errorcode[err])
  279. def accept(self):
  280. # XXX can return either an address pair or None
  281. try:
  282. conn, addr = self.socket.accept()
  283. return conn, addr
  284. except socket.error, why:
  285. if why[0] == EWOULDBLOCK:
  286. pass
  287. else:
  288. raise
  289. def send(self, data):
  290. try:
  291. result = self.socket.send(data)
  292. return result
  293. except socket.error, why:
  294. if why[0] == EWOULDBLOCK:
  295. return 0
  296. else:
  297. raise
  298. return 0
  299. def recv(self, buffer_size):
  300. try:
  301. data = self.socket.recv(buffer_size)
  302. if not data:
  303. # a closed connection is indicated by signaling
  304. # a read condition, and having recv() return 0.
  305. self.handle_close()
  306. return ''
  307. else:
  308. return data
  309. except socket.error, why:
  310. # winsock sometimes throws ENOTCONN
  311. if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
  312. self.handle_close()
  313. return ''
  314. else:
  315. raise
  316. def close(self):
  317. self.del_channel()
  318. self.socket.close()
  319. # cheap inheritance, used to pass all other attribute
  320. # references to the underlying socket object.
  321. def __getattr__(self, attr):
  322. return getattr(self.socket, attr)
  323. # log and log_info may be overridden to provide more sophisticated
  324. # logging and warning methods. In general, log is for 'hit' logging
  325. # and 'log_info' is for informational, warning and error logging.
  326. def log(self, message):
  327. sys.stderr.write('log: %s\n' % str(message))
  328. def log_info(self, message, type='info'):
  329. if __debug__ or type != 'info':
  330. print '%s: %s' % (type, message)
  331. def handle_read_event(self):
  332. if self.accepting:
  333. # for an accepting socket, getting a read implies
  334. # that we are connected
  335. if not self.connected:
  336. self.connected = True
  337. self.handle_accept()
  338. elif not self.connected:
  339. self.handle_connect()
  340. self.connected = True
  341. self.handle_read()
  342. else:
  343. self.handle_read()
  344. def handle_write_event(self):
  345. # getting a write implies that we are connected
  346. if not self.connected:
  347. self.handle_connect()
  348. self.connected = True
  349. self.handle_write()
  350. def handle_expt_event(self):
  351. self.handle_expt()
  352. def handle_error(self):
  353. nil, t, v, tbinfo = compact_traceback()
  354. # sometimes a user repr method will crash.
  355. try:
  356. self_repr = repr(self)
  357. except:
  358. self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
  359. self.log_info(
  360. 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
  361. self_repr,
  362. t,
  363. v,
  364. tbinfo
  365. ),
  366. 'error'
  367. )
  368. self.close()
  369. def handle_expt(self):
  370. self.log_info('unhandled exception', 'warning')
  371. def handle_read(self):
  372. self.log_info('unhandled read event', 'warning')
  373. def handle_write(self):
  374. self.log_info('unhandled write event', 'warning')
  375. def handle_connect(self):
  376. self.log_info('unhandled connect event', 'warning')
  377. def handle_accept(self):
  378. self.log_info('unhandled accept event', 'warning')
  379. def handle_close(self):
  380. self.log_info('unhandled close event', 'warning')
  381. self.close()
  382. # ---------------------------------------------------------------------------
  383. # adds simple buffered output capability, useful for simple clients.
  384. # [for more sophisticated usage use asynchat.async_chat]
  385. # ---------------------------------------------------------------------------
  386. class dispatcher_with_send(dispatcher):
  387. def __init__(self, sock=None, map=None):
  388. dispatcher.__init__(self, sock, map)
  389. self.out_buffer = ''
  390. def initiate_send(self):
  391. num_sent = 0
  392. num_sent = dispatcher.send(self, self.out_buffer[:512])
  393. self.out_buffer = self.out_buffer[num_sent:]
  394. def handle_write(self):
  395. self.initiate_send()
  396. def writable(self):
  397. return (not self.connected) or len(self.out_buffer)
  398. def send(self, data):
  399. if self.debug:
  400. self.log_info('sending %s' % repr(data))
  401. self.out_buffer = self.out_buffer + data
  402. self.initiate_send()
  403. # ---------------------------------------------------------------------------
  404. # used for debugging.
  405. # ---------------------------------------------------------------------------
  406. def compact_traceback():
  407. t, v, tb = sys.exc_info()
  408. tbinfo = []
  409. assert tb # Must have a traceback
  410. while tb:
  411. tbinfo.append((
  412. tb.tb_frame.f_code.co_filename,
  413. tb.tb_frame.f_code.co_name,
  414. str(tb.tb_lineno)
  415. ))
  416. tb = tb.tb_next
  417. # just to be safe
  418. del tb
  419. file, function, line = tbinfo[-1]
  420. info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
  421. return (file, function, line), t, v, info
  422. def close_all(map=None):
  423. if map is None:
  424. map = socket_map
  425. for x in map.values():
  426. x.socket.close()
  427. map.clear()
  428. # Asynchronous File I/O:
  429. #
  430. # After a little research (reading man pages on various unixen, and
  431. # digging through the linux kernel), I've determined that select()
  432. # isn't meant for doing asynchronous file i/o.
  433. # Heartening, though - reading linux/mm/filemap.c shows that linux
  434. # supports asynchronous read-ahead. So _MOST_ of the time, the data
  435. # will be sitting in memory for us already when we go to read it.
  436. #
  437. # What other OS's (besides NT) support async file i/o? [VMS?]
  438. #
  439. # Regardless, this is useful for pipes, and stdin/stdout...
  440. if os.name == 'posix':
  441. import fcntl
  442. class file_wrapper:
  443. # here we override just enough to make a file
  444. # look like a socket for the purposes of asyncore.
  445. def __init__(self, fd):
  446. self.fd = fd
  447. def recv(self, *args):
  448. return os.read(self.fd, *args)
  449. def send(self, *args):
  450. return os.write(self.fd, *args)
  451. read = recv
  452. write = send
  453. def close(self):
  454. os.close(self.fd)
  455. def fileno(self):
  456. return self.fd
  457. class file_dispatcher(dispatcher):
  458. def __init__(self, fd, map=None):
  459. dispatcher.__init__(self, None, map)
  460. self.connected = True
  461. self.set_file(fd)
  462. # set it to non-blocking mode
  463. flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
  464. flags = flags | os.O_NONBLOCK
  465. fcntl.fcntl(fd, fcntl.F_SETFL, flags)
  466. def set_file(self, fd):
  467. self._fileno = fd
  468. self.socket = file_wrapper(fd)
  469. self.add_channel()