diff options
Diffstat (limited to 'lib/python2.7/test/test_socketserver.py')
-rw-r--r-- | lib/python2.7/test/test_socketserver.py | 319 |
1 files changed, 0 insertions, 319 deletions
diff --git a/lib/python2.7/test/test_socketserver.py b/lib/python2.7/test/test_socketserver.py deleted file mode 100644 index 0dbb8e2..0000000 --- a/lib/python2.7/test/test_socketserver.py +++ /dev/null @@ -1,319 +0,0 @@ -""" -Test suite for SocketServer.py. -""" - -import contextlib -import imp -import os -import select -import signal -import socket -import select -import errno -import tempfile -import unittest -import SocketServer - -import test.test_support -from test.test_support import reap_children, reap_threads, verbose -try: - import threading -except ImportError: - threading = None - -test.test_support.requires("network") - -TEST_STR = "hello world\n" -HOST = test.test_support.HOST - -HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") -HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" - -def signal_alarm(n): - """Call signal.alarm when it exists (i.e. not on Windows).""" - if hasattr(signal, 'alarm'): - signal.alarm(n) - -# Remember real select() to avoid interferences with mocking -_real_select = select.select - -def receive(sock, n, timeout=20): - r, w, x = _real_select([sock], [], [], timeout) - if sock in r: - return sock.recv(n) - else: - raise RuntimeError, "timed out on %r" % (sock,) - -if HAVE_UNIX_SOCKETS: - class ForkingUnixStreamServer(SocketServer.ForkingMixIn, - SocketServer.UnixStreamServer): - pass - - class ForkingUnixDatagramServer(SocketServer.ForkingMixIn, - SocketServer.UnixDatagramServer): - pass - - -@contextlib.contextmanager -def simple_subprocess(testcase): - pid = os.fork() - if pid == 0: - # Don't raise an exception; it would be caught by the test harness. - os._exit(72) - yield None - pid2, status = os.waitpid(pid, 0) - testcase.assertEqual(pid2, pid) - testcase.assertEqual(72 << 8, status) - - -@unittest.skipUnless(threading, 'Threading required for this test.') -class SocketServerTest(unittest.TestCase): - """Test all socket servers.""" - - def setUp(self): - signal_alarm(60) # Kill deadlocks after 60 seconds. - self.port_seed = 0 - self.test_files = [] - - def tearDown(self): - signal_alarm(0) # Didn't deadlock. - reap_children() - - for fn in self.test_files: - try: - os.remove(fn) - except os.error: - pass - self.test_files[:] = [] - - def pickaddr(self, proto): - if proto == socket.AF_INET: - return (HOST, 0) - else: - # XXX: We need a way to tell AF_UNIX to pick its own name - # like AF_INET provides port==0. - dir = None - if os.name == 'os2': - dir = '\socket' - fn = tempfile.mktemp(prefix='unix_socket.', dir=dir) - if os.name == 'os2': - # AF_UNIX socket names on OS/2 require a specific prefix - # which can't include a drive letter and must also use - # backslashes as directory separators - if fn[1] == ':': - fn = fn[2:] - if fn[0] in (os.sep, os.altsep): - fn = fn[1:] - if os.sep == '/': - fn = fn.replace(os.sep, os.altsep) - else: - fn = fn.replace(os.altsep, os.sep) - self.test_files.append(fn) - return fn - - def make_server(self, addr, svrcls, hdlrbase): - class MyServer(svrcls): - def handle_error(self, request, client_address): - self.close_request(request) - self.server_close() - raise - - class MyHandler(hdlrbase): - def handle(self): - line = self.rfile.readline() - self.wfile.write(line) - - if verbose: print "creating server" - server = MyServer(addr, MyHandler) - self.assertEqual(server.server_address, server.socket.getsockname()) - return server - - @reap_threads - def run_server(self, svrcls, hdlrbase, testfunc): - server = self.make_server(self.pickaddr(svrcls.address_family), - svrcls, hdlrbase) - # We had the OS pick a port, so pull the real address out of - # the server. - addr = server.server_address - if verbose: - print "server created" - print "ADDR =", addr - print "CLASS =", svrcls - t = threading.Thread( - name='%s serving' % svrcls, - target=server.serve_forever, - # Short poll interval to make the test finish quickly. - # Time between requests is short enough that we won't wake - # up spuriously too many times. - kwargs={'poll_interval':0.01}) - t.daemon = True # In case this function raises. - t.start() - if verbose: print "server running" - for i in range(3): - if verbose: print "test client", i - testfunc(svrcls.address_family, addr) - if verbose: print "waiting for server" - server.shutdown() - t.join() - if verbose: print "done" - - def stream_examine(self, proto, addr): - s = socket.socket(proto, socket.SOCK_STREAM) - s.connect(addr) - s.sendall(TEST_STR) - buf = data = receive(s, 100) - while data and '\n' not in buf: - data = receive(s, 100) - buf += data - self.assertEqual(buf, TEST_STR) - s.close() - - def dgram_examine(self, proto, addr): - s = socket.socket(proto, socket.SOCK_DGRAM) - s.sendto(TEST_STR, addr) - buf = data = receive(s, 100) - while data and '\n' not in buf: - data = receive(s, 100) - buf += data - self.assertEqual(buf, TEST_STR) - s.close() - - def test_TCPServer(self): - self.run_server(SocketServer.TCPServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - def test_ThreadingTCPServer(self): - self.run_server(SocketServer.ThreadingTCPServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - if HAVE_FORKING: - def test_ForkingTCPServer(self): - with simple_subprocess(self): - self.run_server(SocketServer.ForkingTCPServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - if HAVE_UNIX_SOCKETS: - def test_UnixStreamServer(self): - self.run_server(SocketServer.UnixStreamServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - def test_ThreadingUnixStreamServer(self): - self.run_server(SocketServer.ThreadingUnixStreamServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - if HAVE_FORKING: - def test_ForkingUnixStreamServer(self): - with simple_subprocess(self): - self.run_server(ForkingUnixStreamServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - - def test_UDPServer(self): - self.run_server(SocketServer.UDPServer, - SocketServer.DatagramRequestHandler, - self.dgram_examine) - - def test_ThreadingUDPServer(self): - self.run_server(SocketServer.ThreadingUDPServer, - SocketServer.DatagramRequestHandler, - self.dgram_examine) - - if HAVE_FORKING: - def test_ForkingUDPServer(self): - with simple_subprocess(self): - self.run_server(SocketServer.ForkingUDPServer, - SocketServer.DatagramRequestHandler, - self.dgram_examine) - - @contextlib.contextmanager - def mocked_select_module(self): - """Mocks the select.select() call to raise EINTR for first call""" - old_select = select.select - - class MockSelect: - def __init__(self): - self.called = 0 - - def __call__(self, *args): - self.called += 1 - if self.called == 1: - # raise the exception on first call - raise select.error(errno.EINTR, os.strerror(errno.EINTR)) - else: - # Return real select value for consecutive calls - return old_select(*args) - - select.select = MockSelect() - try: - yield select.select - finally: - select.select = old_select - - def test_InterruptServerSelectCall(self): - with self.mocked_select_module() as mock_select: - pid = self.run_server(SocketServer.TCPServer, - SocketServer.StreamRequestHandler, - self.stream_examine) - # Make sure select was called again: - self.assertGreater(mock_select.called, 1) - - # Alas, on Linux (at least) recvfrom() doesn't return a meaningful - # client address so this cannot work: - - # if HAVE_UNIX_SOCKETS: - # def test_UnixDatagramServer(self): - # self.run_server(SocketServer.UnixDatagramServer, - # SocketServer.DatagramRequestHandler, - # self.dgram_examine) - # - # def test_ThreadingUnixDatagramServer(self): - # self.run_server(SocketServer.ThreadingUnixDatagramServer, - # SocketServer.DatagramRequestHandler, - # self.dgram_examine) - # - # if HAVE_FORKING: - # def test_ForkingUnixDatagramServer(self): - # self.run_server(SocketServer.ForkingUnixDatagramServer, - # SocketServer.DatagramRequestHandler, - # self.dgram_examine) - - @reap_threads - def test_shutdown(self): - # Issue #2302: shutdown() should always succeed in making an - # other thread leave serve_forever(). - class MyServer(SocketServer.TCPServer): - pass - - class MyHandler(SocketServer.StreamRequestHandler): - pass - - threads = [] - for i in range(20): - s = MyServer((HOST, 0), MyHandler) - t = threading.Thread( - name='MyServer serving', - target=s.serve_forever, - kwargs={'poll_interval':0.01}) - t.daemon = True # In case this function raises. - threads.append((t, s)) - for t, s in threads: - t.start() - s.shutdown() - for t, s in threads: - t.join() - - -def test_main(): - if imp.lock_held(): - # If the import lock is held, the threads will hang - raise unittest.SkipTest("can't run when import lock is held") - - test.test_support.run_unittest(SocketServerTest) - -if __name__ == "__main__": - test_main() |