diff options
Diffstat (limited to 'lib/python2.7/test/test_socket.py')
-rw-r--r-- | lib/python2.7/test/test_socket.py | 1724 |
1 files changed, 0 insertions, 1724 deletions
diff --git a/lib/python2.7/test/test_socket.py b/lib/python2.7/test/test_socket.py deleted file mode 100644 index 111e553..0000000 --- a/lib/python2.7/test/test_socket.py +++ /dev/null @@ -1,1724 +0,0 @@ -#!/usr/bin/env python - -import unittest -from test import test_support - -import errno -import socket -import select -import _testcapi -import time -import traceback -import Queue -import sys -import os -import array -import contextlib -from weakref import proxy -import signal -import math - -def try_address(host, port=0, family=socket.AF_INET): - """Try to bind a socket on the given host:port and return True - if that has been possible.""" - try: - sock = socket.socket(family, socket.SOCK_STREAM) - sock.bind((host, port)) - except (socket.error, socket.gaierror): - return False - else: - sock.close() - return True - -HOST = test_support.HOST -MSG = b'Michael Gilfix was here\n' -SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) - -try: - import thread - import threading -except ImportError: - thread = None - threading = None - -HOST = test_support.HOST -MSG = 'Michael Gilfix was here\n' - -class SocketTCPTest(unittest.TestCase): - - def setUp(self): - self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = test_support.bind_port(self.serv) - self.serv.listen(1) - - def tearDown(self): - self.serv.close() - self.serv = None - -class SocketUDPTest(unittest.TestCase): - - def setUp(self): - self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.port = test_support.bind_port(self.serv) - - def tearDown(self): - self.serv.close() - self.serv = None - -class ThreadableTest: - """Threadable Test class - - The ThreadableTest class makes it easy to create a threaded - client/server pair from an existing unit test. To create a - new threaded class from an existing unit test, use multiple - inheritance: - - class NewClass (OldClass, ThreadableTest): - pass - - This class defines two new fixture functions with obvious - purposes for overriding: - - clientSetUp () - clientTearDown () - - Any new test functions within the class must then define - tests in pairs, where the test name is preceeded with a - '_' to indicate the client portion of the test. Ex: - - def testFoo(self): - # Server portion - - def _testFoo(self): - # Client portion - - Any exceptions raised by the clients during their tests - are caught and transferred to the main thread to alert - the testing framework. - - Note, the server setup function cannot call any blocking - functions that rely on the client thread during setup, - unless serverExplicitReady() is called just before - the blocking call (such as in setting up a client/server - connection and performing the accept() in setUp(). - """ - - def __init__(self): - # Swap the true setup function - self.__setUp = self.setUp - self.__tearDown = self.tearDown - self.setUp = self._setUp - self.tearDown = self._tearDown - - def serverExplicitReady(self): - """This method allows the server to explicitly indicate that - it wants the client thread to proceed. This is useful if the - server is about to execute a blocking routine that is - dependent upon the client thread during its setup routine.""" - self.server_ready.set() - - def _setUp(self): - self.server_ready = threading.Event() - self.client_ready = threading.Event() - self.done = threading.Event() - self.queue = Queue.Queue(1) - - # Do some munging to start the client test. - methodname = self.id() - i = methodname.rfind('.') - methodname = methodname[i+1:] - test_method = getattr(self, '_' + methodname) - self.client_thread = thread.start_new_thread( - self.clientRun, (test_method,)) - - self.__setUp() - if not self.server_ready.is_set(): - self.server_ready.set() - self.client_ready.wait() - - def _tearDown(self): - self.__tearDown() - self.done.wait() - - if not self.queue.empty(): - msg = self.queue.get() - self.fail(msg) - - def clientRun(self, test_func): - self.server_ready.wait() - self.clientSetUp() - self.client_ready.set() - if not callable(test_func): - raise TypeError("test_func must be a callable function.") - try: - test_func() - except Exception, strerror: - self.queue.put(strerror) - self.clientTearDown() - - def clientSetUp(self): - raise NotImplementedError("clientSetUp must be implemented.") - - def clientTearDown(self): - self.done.set() - thread.exit() - -class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketTCPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketUDPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -class SocketConnectedTest(ThreadedTCPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedTCPSocketTest.__init__(self, methodName=methodName) - - def setUp(self): - ThreadedTCPSocketTest.setUp(self) - # Indicate explicitly we're ready for the client thread to - # proceed and then perform the blocking call to accept - self.serverExplicitReady() - conn, addr = self.serv.accept() - self.cli_conn = conn - - def tearDown(self): - self.cli_conn.close() - self.cli_conn = None - ThreadedTCPSocketTest.tearDown(self) - - def clientSetUp(self): - ThreadedTCPSocketTest.clientSetUp(self) - self.cli.connect((HOST, self.port)) - self.serv_conn = self.cli - - def clientTearDown(self): - self.serv_conn.close() - self.serv_conn = None - ThreadedTCPSocketTest.clientTearDown(self) - -class SocketPairTest(unittest.TestCase, ThreadableTest): - - def __init__(self, methodName='runTest'): - unittest.TestCase.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def setUp(self): - self.serv, self.cli = socket.socketpair() - - def tearDown(self): - self.serv.close() - self.serv = None - - def clientSetUp(self): - pass - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - - -####################################################################### -## Begin Tests - -class GeneralModuleTests(unittest.TestCase): - - def test_weakref(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - p = proxy(s) - self.assertEqual(p.fileno(), s.fileno()) - s.close() - s = None - try: - p.fileno() - except ReferenceError: - pass - else: - self.fail('Socket proxy still exists') - - def testSocketError(self): - # Testing socket module exceptions - def raise_error(*args, **kwargs): - raise socket.error - def raise_herror(*args, **kwargs): - raise socket.herror - def raise_gaierror(*args, **kwargs): - raise socket.gaierror - self.assertRaises(socket.error, raise_error, - "Error raising socket exception.") - self.assertRaises(socket.error, raise_herror, - "Error raising socket exception.") - self.assertRaises(socket.error, raise_gaierror, - "Error raising socket exception.") - - def testSendtoErrors(self): - # Testing that sendto doens't masks failures. See #10169. - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.addCleanup(s.close) - s.bind(('', 0)) - sockname = s.getsockname() - # 2 args - with self.assertRaises(UnicodeEncodeError): - s.sendto(u'\u2620', sockname) - with self.assertRaises(TypeError) as cm: - s.sendto(5j, sockname) - self.assertIn('not complex', str(cm.exception)) - with self.assertRaises(TypeError) as cm: - s.sendto('foo', None) - self.assertIn('not NoneType', str(cm.exception)) - # 3 args - with self.assertRaises(UnicodeEncodeError): - s.sendto(u'\u2620', 0, sockname) - with self.assertRaises(TypeError) as cm: - s.sendto(5j, 0, sockname) - self.assertIn('not complex', str(cm.exception)) - with self.assertRaises(TypeError) as cm: - s.sendto('foo', 0, None) - self.assertIn('not NoneType', str(cm.exception)) - with self.assertRaises(TypeError) as cm: - s.sendto('foo', 'bar', sockname) - self.assertIn('an integer is required', str(cm.exception)) - with self.assertRaises(TypeError) as cm: - s.sendto('foo', None, None) - self.assertIn('an integer is required', str(cm.exception)) - # wrong number of args - with self.assertRaises(TypeError) as cm: - s.sendto('foo') - self.assertIn('(1 given)', str(cm.exception)) - with self.assertRaises(TypeError) as cm: - s.sendto('foo', 0, sockname, 4) - self.assertIn('(4 given)', str(cm.exception)) - - - def testCrucialConstants(self): - # Testing for mission critical constants - socket.AF_INET - socket.SOCK_STREAM - socket.SOCK_DGRAM - socket.SOCK_RAW - socket.SOCK_RDM - socket.SOCK_SEQPACKET - socket.SOL_SOCKET - socket.SO_REUSEADDR - - def testHostnameRes(self): - # Testing hostname resolution mechanisms - hostname = socket.gethostname() - try: - ip = socket.gethostbyname(hostname) - except socket.error: - # Probably name lookup wasn't set up right; skip this test - return - self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") - try: - hname, aliases, ipaddrs = socket.gethostbyaddr(ip) - except socket.error: - # Probably a similar problem as above; skip this test - return - all_host_names = [hostname, hname] + aliases - fqhn = socket.getfqdn(ip) - if not fqhn in all_host_names: - self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) - - def testRefCountGetNameInfo(self): - # Testing reference count for getnameinfo - if hasattr(sys, "getrefcount"): - try: - # On some versions, this loses a reference - orig = sys.getrefcount(__name__) - socket.getnameinfo(__name__,0) - except TypeError: - self.assertEqual(sys.getrefcount(__name__), orig, - "socket.getnameinfo loses a reference") - - def testInterpreterCrash(self): - # Making sure getnameinfo doesn't crash the interpreter - try: - # On some versions, this crashes the interpreter. - socket.getnameinfo(('x', 0, 0, 0), 0) - except socket.error: - pass - - def testNtoH(self): - # This just checks that htons etc. are their own inverse, - # when looking at the lower 16 or 32 bits. - sizes = {socket.htonl: 32, socket.ntohl: 32, - socket.htons: 16, socket.ntohs: 16} - for func, size in sizes.items(): - mask = (1L<<size) - 1 - for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): - self.assertEqual(i & mask, func(func(i&mask)) & mask) - - swapped = func(mask) - self.assertEqual(swapped & mask, mask) - self.assertRaises(OverflowError, func, 1L<<34) - - def testNtoHErrors(self): - good_values = [ 1, 2, 3, 1L, 2L, 3L ] - bad_values = [ -1, -2, -3, -1L, -2L, -3L ] - for k in good_values: - socket.ntohl(k) - socket.ntohs(k) - socket.htonl(k) - socket.htons(k) - for k in bad_values: - self.assertRaises(OverflowError, socket.ntohl, k) - self.assertRaises(OverflowError, socket.ntohs, k) - self.assertRaises(OverflowError, socket.htonl, k) - self.assertRaises(OverflowError, socket.htons, k) - - def testGetServBy(self): - eq = self.assertEqual - # Find one service that exists, then check all the related interfaces. - # I've ordered this by protocols that have both a tcp and udp - # protocol, at least for modern Linuxes. - if (sys.platform.startswith('linux') or - sys.platform.startswith('freebsd') or - sys.platform.startswith('netbsd') or - sys.platform == 'darwin'): - # avoid the 'echo' service on this platform, as there is an - # assumption breaking non-standard port/protocol entry - services = ('daytime', 'qotd', 'domain') - else: - services = ('echo', 'daytime', 'domain') - for service in services: - try: - port = socket.getservbyname(service, 'tcp') - break - except socket.error: - pass - else: - raise socket.error - # Try same call with optional protocol omitted - port2 = socket.getservbyname(service) - eq(port, port2) - # Try udp, but don't barf if it doesn't exist - try: - udpport = socket.getservbyname(service, 'udp') - except socket.error: - udpport = None - else: - eq(udpport, port) - # Now make sure the lookup by port returns the same service name - eq(socket.getservbyport(port2), service) - eq(socket.getservbyport(port, 'tcp'), service) - if udpport is not None: - eq(socket.getservbyport(udpport, 'udp'), service) - # Make sure getservbyport does not accept out of range ports. - self.assertRaises(OverflowError, socket.getservbyport, -1) - self.assertRaises(OverflowError, socket.getservbyport, 65536) - - def testDefaultTimeout(self): - # Testing default timeout - # The default timeout should initially be None - self.assertEqual(socket.getdefaulttimeout(), None) - s = socket.socket() - self.assertEqual(s.gettimeout(), None) - s.close() - - # Set the default timeout to 10, and see if it propagates - socket.setdefaulttimeout(10) - self.assertEqual(socket.getdefaulttimeout(), 10) - s = socket.socket() - self.assertEqual(s.gettimeout(), 10) - s.close() - - # Reset the default timeout to None, and see if it propagates - socket.setdefaulttimeout(None) - self.assertEqual(socket.getdefaulttimeout(), None) - s = socket.socket() - self.assertEqual(s.gettimeout(), None) - s.close() - - # Check that setting it to an invalid value raises ValueError - self.assertRaises(ValueError, socket.setdefaulttimeout, -1) - - # Check that setting it to an invalid type raises TypeError - self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") - - def testIPv4_inet_aton_fourbytes(self): - if not hasattr(socket, 'inet_aton'): - return # No inet_aton, nothing to check - # Test that issue1008086 and issue767150 are fixed. - # It must return 4 bytes. - self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) - self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) - - def testIPv4toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform - from socket import inet_aton as f, inet_pton, AF_INET - g = lambda a: inet_pton(AF_INET, a) - - self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) - self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) - self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) - self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) - self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) - - self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) - self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) - self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) - self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) - - def testIPv6toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform - try: - from socket import inet_pton, AF_INET6, has_ipv6 - if not has_ipv6: - return - except ImportError: - return - f = lambda a: inet_pton(AF_INET6, a) - - self.assertEqual('\x00' * 16, f('::')) - self.assertEqual('\x00' * 16, f('0::0')) - self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) - self.assertEqual( - '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', - f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') - ) - - def testStringToIPv4(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform - from socket import inet_ntoa as f, inet_ntop, AF_INET - g = lambda a: inet_ntop(AF_INET, a) - - self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) - self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) - self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) - self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) - - self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) - self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) - self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) - - def testStringToIPv6(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform - try: - from socket import inet_ntop, AF_INET6, has_ipv6 - if not has_ipv6: - return - except ImportError: - return - f = lambda a: inet_ntop(AF_INET6, a) - - self.assertEqual('::', f('\x00' * 16)) - self.assertEqual('::1', f('\x00' * 15 + '\x01')) - self.assertEqual( - 'aef:b01:506:1001:ffff:9997:55:170', - f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') - ) - - # XXX The following don't test module-level functionality... - - def _get_unused_port(self, bind_address='0.0.0.0'): - """Use a temporary socket to elicit an unused ephemeral port. - - Args: - bind_address: Hostname or IP address to search for a port on. - - Returns: A most likely to be unused port. - """ - tempsock = socket.socket() - tempsock.bind((bind_address, 0)) - host, port = tempsock.getsockname() - tempsock.close() - return port - - def testSockName(self): - # Testing getsockname() - port = self._get_unused_port() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(sock.close) - sock.bind(("0.0.0.0", port)) - name = sock.getsockname() - # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate - # it reasonable to get the host's addr in addition to 0.0.0.0. - # At least for eCos. This is required for the S/390 to pass. - try: - my_ip_addr = socket.gethostbyname(socket.gethostname()) - except socket.error: - # Probably name lookup wasn't set up right; skip this test - return - self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) - self.assertEqual(name[1], port) - - def testGetSockOpt(self): - # Testing getsockopt() - # We know a socket should start without reuse==0 - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(sock.close) - reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) - self.assertFalse(reuse != 0, "initial mode is reuse") - - def testSetSockOpt(self): - # Testing setsockopt() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(sock.close) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) - self.assertFalse(reuse == 0, "failed to set reuse mode") - - def testSendAfterClose(self): - # testing send() after close() with timeout - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1) - sock.close() - self.assertRaises(socket.error, sock.send, "spam") - - def testNewAttributes(self): - # testing .family, .type and .protocol - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.assertEqual(sock.family, socket.AF_INET) - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.proto, 0) - sock.close() - - def test_getsockaddrarg(self): - host = '0.0.0.0' - port = self._get_unused_port(bind_address=host) - big_port = port + 65536 - neg_port = port - 65536 - sock = socket.socket() - try: - self.assertRaises(OverflowError, sock.bind, (host, big_port)) - self.assertRaises(OverflowError, sock.bind, (host, neg_port)) - sock.bind((host, port)) - finally: - sock.close() - - @unittest.skipUnless(os.name == "nt", "Windows specific") - def test_sock_ioctl(self): - self.assertTrue(hasattr(socket.socket, 'ioctl')) - self.assertTrue(hasattr(socket, 'SIO_RCVALL')) - self.assertTrue(hasattr(socket, 'RCVALL_ON')) - self.assertTrue(hasattr(socket, 'RCVALL_OFF')) - self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) - s = socket.socket() - self.addCleanup(s.close) - self.assertRaises(ValueError, s.ioctl, -1, None) - s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) - - def testGetaddrinfo(self): - try: - socket.getaddrinfo('localhost', 80) - except socket.gaierror as err: - if err.errno == socket.EAI_SERVICE: - # see http://bugs.python.org/issue1282647 - self.skipTest("buggy libc version") - raise - # len of every sequence is supposed to be == 5 - for info in socket.getaddrinfo(HOST, None): - self.assertEqual(len(info), 5) - # host can be a domain name, a string representation of an - # IPv4/v6 address or None - socket.getaddrinfo('localhost', 80) - socket.getaddrinfo('127.0.0.1', 80) - socket.getaddrinfo(None, 80) - if SUPPORTS_IPV6: - socket.getaddrinfo('::1', 80) - # port can be a string service name such as "http", a numeric - # port number (int or long), or None - socket.getaddrinfo(HOST, "http") - socket.getaddrinfo(HOST, 80) - socket.getaddrinfo(HOST, 80L) - socket.getaddrinfo(HOST, None) - # test family and socktype filters - infos = socket.getaddrinfo(HOST, None, socket.AF_INET) - for family, _, _, _, _ in infos: - self.assertEqual(family, socket.AF_INET) - infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) - for _, socktype, _, _, _ in infos: - self.assertEqual(socktype, socket.SOCK_STREAM) - # test proto and flags arguments - socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) - socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) - # a server willing to support both IPv4 and IPv6 will - # usually do this - socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, - socket.AI_PASSIVE) - - - def check_sendall_interrupted(self, with_timeout): - # socketpair() is not stricly required, but it makes things easier. - if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): - self.skipTest("signal.alarm and socket.socketpair required for this test") - # Our signal handlers clobber the C errno by calling a math function - # with an invalid domain value. - def ok_handler(*args): - self.assertRaises(ValueError, math.acosh, 0) - def raising_handler(*args): - self.assertRaises(ValueError, math.acosh, 0) - 1 // 0 - c, s = socket.socketpair() - old_alarm = signal.signal(signal.SIGALRM, raising_handler) - try: - if with_timeout: - # Just above the one second minimum for signal.alarm - c.settimeout(1.5) - with self.assertRaises(ZeroDivisionError): - signal.alarm(1) - c.sendall(b"x" * (1024**2)) - if with_timeout: - signal.signal(signal.SIGALRM, ok_handler) - signal.alarm(1) - self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2)) - finally: - signal.signal(signal.SIGALRM, old_alarm) - c.close() - s.close() - - def test_sendall_interrupted(self): - self.check_sendall_interrupted(False) - - def test_sendall_interrupted_with_timeout(self): - self.check_sendall_interrupted(True) - - def test_listen_backlog(self): - for backlog in 0, -1: - srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv.bind((HOST, 0)) - srv.listen(backlog) - srv.close() - - # Issue 15989 - srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv.bind((HOST, 0)) - self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) - srv.close() - - @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') - def test_flowinfo(self): - self.assertRaises(OverflowError, socket.getnameinfo, - ('::1',0, 0xffffffff), 0) - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - try: - self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) - finally: - s.close() - - -@unittest.skipUnless(thread, 'Threading required for this test.') -class BasicTCPTest(SocketConnectedTest): - - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def testRecv(self): - # Testing large receive over TCP - msg = self.cli_conn.recv(1024) - self.assertEqual(msg, MSG) - - def _testRecv(self): - self.serv_conn.send(MSG) - - def testOverFlowRecv(self): - # Testing receive in chunks over TCP - seg1 = self.cli_conn.recv(len(MSG) - 3) - seg2 = self.cli_conn.recv(1024) - msg = seg1 + seg2 - self.assertEqual(msg, MSG) - - def _testOverFlowRecv(self): - self.serv_conn.send(MSG) - - def testRecvFrom(self): - # Testing large recvfrom() over TCP - msg, addr = self.cli_conn.recvfrom(1024) - self.assertEqual(msg, MSG) - - def _testRecvFrom(self): - self.serv_conn.send(MSG) - - def testOverFlowRecvFrom(self): - # Testing recvfrom() in chunks over TCP - seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) - seg2, addr = self.cli_conn.recvfrom(1024) - msg = seg1 + seg2 - self.assertEqual(msg, MSG) - - def _testOverFlowRecvFrom(self): - self.serv_conn.send(MSG) - - def testSendAll(self): - # Testing sendall() with a 2048 byte string over TCP - msg = '' - while 1: - read = self.cli_conn.recv(1024) - if not read: - break - msg += read - self.assertEqual(msg, 'f' * 2048) - - def _testSendAll(self): - big_chunk = 'f' * 2048 - self.serv_conn.sendall(big_chunk) - - def testFromFd(self): - # Testing fromfd() - if not hasattr(socket, "fromfd"): - return # On Windows, this doesn't exist - fd = self.cli_conn.fileno() - sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(sock.close) - msg = sock.recv(1024) - self.assertEqual(msg, MSG) - - def _testFromFd(self): - self.serv_conn.send(MSG) - - def testDup(self): - # Testing dup() - sock = self.cli_conn.dup() - self.addCleanup(sock.close) - msg = sock.recv(1024) - self.assertEqual(msg, MSG) - - def _testDup(self): - self.serv_conn.send(MSG) - - def testShutdown(self): - # Testing shutdown() - msg = self.cli_conn.recv(1024) - self.assertEqual(msg, MSG) - # wait for _testShutdown to finish: on OS X, when the server - # closes the connection the client also becomes disconnected, - # and the client's shutdown call will fail. (Issue #4397.) - self.done.wait() - - def _testShutdown(self): - self.serv_conn.send(MSG) - # Issue 15989 - self.assertRaises(OverflowError, self.serv_conn.shutdown, - _testcapi.INT_MAX + 1) - self.assertRaises(OverflowError, self.serv_conn.shutdown, - 2 + (_testcapi.UINT_MAX + 1)) - self.serv_conn.shutdown(2) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class BasicUDPTest(ThreadedUDPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedUDPSocketTest.__init__(self, methodName=methodName) - - def testSendtoAndRecv(self): - # Testing sendto() and Recv() over UDP - msg = self.serv.recv(len(MSG)) - self.assertEqual(msg, MSG) - - def _testSendtoAndRecv(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - - def testRecvFrom(self): - # Testing recvfrom() over UDP - msg, addr = self.serv.recvfrom(len(MSG)) - self.assertEqual(msg, MSG) - - def _testRecvFrom(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - - def testRecvFromNegative(self): - # Negative lengths passed to recvfrom should give ValueError. - self.assertRaises(ValueError, self.serv.recvfrom, -1) - - def _testRecvFromNegative(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class TCPCloserTest(ThreadedTCPSocketTest): - - def testClose(self): - conn, addr = self.serv.accept() - conn.close() - - sd = self.cli - read, write, err = select.select([sd], [], [], 1.0) - self.assertEqual(read, [sd]) - self.assertEqual(sd.recv(1), '') - - def _testClose(self): - self.cli.connect((HOST, self.port)) - time.sleep(1.0) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class BasicSocketPairTest(SocketPairTest): - - def __init__(self, methodName='runTest'): - SocketPairTest.__init__(self, methodName=methodName) - - def testRecv(self): - msg = self.serv.recv(1024) - self.assertEqual(msg, MSG) - - def _testRecv(self): - self.cli.send(MSG) - - def testSend(self): - self.serv.send(MSG) - - def _testSend(self): - msg = self.cli.recv(1024) - self.assertEqual(msg, MSG) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class NonBlockingTCPTests(ThreadedTCPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedTCPSocketTest.__init__(self, methodName=methodName) - - def testSetBlocking(self): - # Testing whether set blocking works - self.serv.setblocking(True) - self.assertIsNone(self.serv.gettimeout()) - self.serv.setblocking(False) - self.assertEqual(self.serv.gettimeout(), 0.0) - start = time.time() - try: - self.serv.accept() - except socket.error: - pass - end = time.time() - self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") - # Issue 15989 - if _testcapi.UINT_MAX < _testcapi.ULONG_MAX: - self.serv.setblocking(_testcapi.UINT_MAX + 1) - self.assertIsNone(self.serv.gettimeout()) - - def _testSetBlocking(self): - pass - - def testAccept(self): - # Testing non-blocking accept - self.serv.setblocking(0) - try: - conn, addr = self.serv.accept() - except socket.error: - pass - else: - self.fail("Error trying to do non-blocking accept.") - read, write, err = select.select([self.serv], [], []) - if self.serv in read: - conn, addr = self.serv.accept() - conn.close() - else: - self.fail("Error trying to do accept after select.") - - def _testAccept(self): - time.sleep(0.1) - self.cli.connect((HOST, self.port)) - - def testConnect(self): - # Testing non-blocking connect - conn, addr = self.serv.accept() - conn.close() - - def _testConnect(self): - self.cli.settimeout(10) - self.cli.connect((HOST, self.port)) - - def testRecv(self): - # Testing non-blocking recv - conn, addr = self.serv.accept() - conn.setblocking(0) - try: - msg = conn.recv(len(MSG)) - except socket.error: - pass - else: - self.fail("Error trying to do non-blocking recv.") - read, write, err = select.select([conn], [], []) - if conn in read: - msg = conn.recv(len(MSG)) - conn.close() - self.assertEqual(msg, MSG) - else: - self.fail("Error during select call to non-blocking socket.") - - def _testRecv(self): - self.cli.connect((HOST, self.port)) - time.sleep(0.1) - self.cli.send(MSG) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class FileObjectClassTestCase(SocketConnectedTest): - - bufsize = -1 # Use default buffer size - - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def setUp(self): - SocketConnectedTest.setUp(self) - self.serv_file = self.cli_conn.makefile('rb', self.bufsize) - - def tearDown(self): - self.serv_file.close() - self.assertTrue(self.serv_file.closed) - SocketConnectedTest.tearDown(self) - self.serv_file = None - - def clientSetUp(self): - SocketConnectedTest.clientSetUp(self) - self.cli_file = self.serv_conn.makefile('wb') - - def clientTearDown(self): - self.cli_file.close() - self.assertTrue(self.cli_file.closed) - self.cli_file = None - SocketConnectedTest.clientTearDown(self) - - def testSmallRead(self): - # Performing small file read test - first_seg = self.serv_file.read(len(MSG)-3) - second_seg = self.serv_file.read(3) - msg = first_seg + second_seg - self.assertEqual(msg, MSG) - - def _testSmallRead(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testFullRead(self): - # read until EOF - msg = self.serv_file.read() - self.assertEqual(msg, MSG) - - def _testFullRead(self): - self.cli_file.write(MSG) - self.cli_file.close() - - def testUnbufferedRead(self): - # Performing unbuffered file read test - buf = '' - while 1: - char = self.serv_file.read(1) - if not char: - break - buf += char - self.assertEqual(buf, MSG) - - def _testUnbufferedRead(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testReadline(self): - # Performing file readline test - line = self.serv_file.readline() - self.assertEqual(line, MSG) - - def _testReadline(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testReadlineAfterRead(self): - a_baloo_is = self.serv_file.read(len("A baloo is")) - self.assertEqual("A baloo is", a_baloo_is) - _a_bear = self.serv_file.read(len(" a bear")) - self.assertEqual(" a bear", _a_bear) - line = self.serv_file.readline() - self.assertEqual("\n", line) - line = self.serv_file.readline() - self.assertEqual("A BALOO IS A BEAR.\n", line) - line = self.serv_file.readline() - self.assertEqual(MSG, line) - - def _testReadlineAfterRead(self): - self.cli_file.write("A baloo is a bear\n") - self.cli_file.write("A BALOO IS A BEAR.\n") - self.cli_file.write(MSG) - self.cli_file.flush() - - def testReadlineAfterReadNoNewline(self): - end_of_ = self.serv_file.read(len("End Of ")) - self.assertEqual("End Of ", end_of_) - line = self.serv_file.readline() - self.assertEqual("Line", line) - - def _testReadlineAfterReadNoNewline(self): - self.cli_file.write("End Of Line") - - def testClosedAttr(self): - self.assertTrue(not self.serv_file.closed) - - def _testClosedAttr(self): - self.assertTrue(not self.cli_file.closed) - - -class FileObjectInterruptedTestCase(unittest.TestCase): - """Test that the file object correctly handles EINTR internally.""" - - class MockSocket(object): - def __init__(self, recv_funcs=()): - # A generator that returns callables that we'll call for each - # call to recv(). - self._recv_step = iter(recv_funcs) - - def recv(self, size): - return self._recv_step.next()() - - @staticmethod - def _raise_eintr(): - raise socket.error(errno.EINTR) - - def _test_readline(self, size=-1, **kwargs): - mock_sock = self.MockSocket(recv_funcs=[ - lambda : "This is the first line\nAnd the sec", - self._raise_eintr, - lambda : "ond line is here\n", - lambda : "", - ]) - fo = socket._fileobject(mock_sock, **kwargs) - self.assertEqual(fo.readline(size), "This is the first line\n") - self.assertEqual(fo.readline(size), "And the second line is here\n") - - def _test_read(self, size=-1, **kwargs): - mock_sock = self.MockSocket(recv_funcs=[ - lambda : "This is the first line\nAnd the sec", - self._raise_eintr, - lambda : "ond line is here\n", - lambda : "", - ]) - fo = socket._fileobject(mock_sock, **kwargs) - self.assertEqual(fo.read(size), "This is the first line\n" - "And the second line is here\n") - - def test_default(self): - self._test_readline() - self._test_readline(size=100) - self._test_read() - self._test_read(size=100) - - def test_with_1k_buffer(self): - self._test_readline(bufsize=1024) - self._test_readline(size=100, bufsize=1024) - self._test_read(bufsize=1024) - self._test_read(size=100, bufsize=1024) - - def _test_readline_no_buffer(self, size=-1): - mock_sock = self.MockSocket(recv_funcs=[ - lambda : "aa", - lambda : "\n", - lambda : "BB", - self._raise_eintr, - lambda : "bb", - lambda : "", - ]) - fo = socket._fileobject(mock_sock, bufsize=0) - self.assertEqual(fo.readline(size), "aa\n") - self.assertEqual(fo.readline(size), "BBbb") - - def test_no_buffer(self): - self._test_readline_no_buffer() - self._test_readline_no_buffer(size=4) - self._test_read(bufsize=0) - self._test_read(size=100, bufsize=0) - - -class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): - - """Repeat the tests from FileObjectClassTestCase with bufsize==0. - - In this case (and in this case only), it should be possible to - create a file object, read a line from it, create another file - object, read another line from it, without loss of data in the - first file object's buffer. Note that httplib relies on this - when reading multiple requests from the same socket.""" - - bufsize = 0 # Use unbuffered mode - - def testUnbufferedReadline(self): - # Read a line, create a new file object, read another line with it - line = self.serv_file.readline() # first line - self.assertEqual(line, "A. " + MSG) # first line - self.serv_file = self.cli_conn.makefile('rb', 0) - line = self.serv_file.readline() # second line - self.assertEqual(line, "B. " + MSG) # second line - - def _testUnbufferedReadline(self): - self.cli_file.write("A. " + MSG) - self.cli_file.write("B. " + MSG) - self.cli_file.flush() - -class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): - - bufsize = 1 # Default-buffered for reading; line-buffered for writing - - class SocketMemo(object): - """A wrapper to keep track of sent data, needed to examine write behaviour""" - def __init__(self, sock): - self._sock = sock - self.sent = [] - - def send(self, data, flags=0): - n = self._sock.send(data, flags) - self.sent.append(data[:n]) - return n - - def sendall(self, data, flags=0): - self._sock.sendall(data, flags) - self.sent.append(data) - - def __getattr__(self, attr): - return getattr(self._sock, attr) - - def getsent(self): - return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] - - def setUp(self): - FileObjectClassTestCase.setUp(self) - self.serv_file._sock = self.SocketMemo(self.serv_file._sock) - - def testLinebufferedWrite(self): - # Write two lines, in small chunks - msg = MSG.strip() - print >> self.serv_file, msg, - print >> self.serv_file, msg - - # second line: - print >> self.serv_file, msg, - print >> self.serv_file, msg, - print >> self.serv_file, msg - - # third line - print >> self.serv_file, '' - - self.serv_file.flush() - - msg1 = "%s %s\n"%(msg, msg) - msg2 = "%s %s %s\n"%(msg, msg, msg) - msg3 = "\n" - self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) - - def _testLinebufferedWrite(self): - msg = MSG.strip() - msg1 = "%s %s\n"%(msg, msg) - msg2 = "%s %s %s\n"%(msg, msg, msg) - msg3 = "\n" - l1 = self.cli_file.readline() - self.assertEqual(l1, msg1) - l2 = self.cli_file.readline() - self.assertEqual(l2, msg2) - l3 = self.cli_file.readline() - self.assertEqual(l3, msg3) - - -class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): - - bufsize = 2 # Exercise the buffering code - - -class NetworkConnectionTest(object): - """Prove network connection.""" - def clientSetUp(self): - # We're inherited below by BasicTCPTest2, which also inherits - # BasicTCPTest, which defines self.port referenced below. - self.cli = socket.create_connection((HOST, self.port)) - self.serv_conn = self.cli - -class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): - """Tests that NetworkConnection does not break existing TCP functionality. - """ - -class NetworkConnectionNoServer(unittest.TestCase): - class MockSocket(socket.socket): - def connect(self, *args): - raise socket.timeout('timed out') - - @contextlib.contextmanager - def mocked_socket_module(self): - """Return a socket which times out on connect""" - old_socket = socket.socket - socket.socket = self.MockSocket - try: - yield - finally: - socket.socket = old_socket - - def test_connect(self): - port = test_support.find_unused_port() - cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(cli.close) - with self.assertRaises(socket.error) as cm: - cli.connect((HOST, port)) - self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) - - def test_create_connection(self): - # Issue #9792: errors raised by create_connection() should have - # a proper errno attribute. - port = test_support.find_unused_port() - with self.assertRaises(socket.error) as cm: - socket.create_connection((HOST, port)) - - # Issue #16257: create_connection() calls getaddrinfo() against - # 'localhost'. This may result in an IPV6 addr being returned - # as well as an IPV4 one: - # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) - # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), - # (26, 2, 0, '', ('::1', 41230, 0, 0))] - # - # create_connection() enumerates through all the addresses returned - # and if it doesn't successfully bind to any of them, it propagates - # the last exception it encountered. - # - # On Solaris, ENETUNREACH is returned in this circumstance instead - # of ECONNREFUSED. So, if that errno exists, add it to our list of - # expected errnos. - expected_errnos = [ errno.ECONNREFUSED, ] - if hasattr(errno, 'ENETUNREACH'): - expected_errnos.append(errno.ENETUNREACH) - - self.assertIn(cm.exception.errno, expected_errnos) - - def test_create_connection_timeout(self): - # Issue #9792: create_connection() should not recast timeout errors - # as generic socket errors. - with self.mocked_socket_module(): - with self.assertRaises(socket.timeout): - socket.create_connection((HOST, 1234)) - - -@unittest.skipUnless(thread, 'Threading required for this test.') -class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketTCPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.source_port = test_support.find_unused_port() - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - - def _justAccept(self): - conn, addr = self.serv.accept() - conn.close() - - testFamily = _justAccept - def _testFamily(self): - self.cli = socket.create_connection((HOST, self.port), timeout=30) - self.addCleanup(self.cli.close) - self.assertEqual(self.cli.family, 2) - - testSourceAddress = _justAccept - def _testSourceAddress(self): - self.cli = socket.create_connection((HOST, self.port), timeout=30, - source_address=('', self.source_port)) - self.addCleanup(self.cli.close) - self.assertEqual(self.cli.getsockname()[1], self.source_port) - # The port number being used is sufficient to show that the bind() - # call happened. - - testTimeoutDefault = _justAccept - def _testTimeoutDefault(self): - # passing no explicit timeout uses socket's global default - self.assertTrue(socket.getdefaulttimeout() is None) - socket.setdefaulttimeout(42) - try: - self.cli = socket.create_connection((HOST, self.port)) - self.addCleanup(self.cli.close) - finally: - socket.setdefaulttimeout(None) - self.assertEqual(self.cli.gettimeout(), 42) - - testTimeoutNone = _justAccept - def _testTimeoutNone(self): - # None timeout means the same as sock.settimeout(None) - self.assertTrue(socket.getdefaulttimeout() is None) - socket.setdefaulttimeout(30) - try: - self.cli = socket.create_connection((HOST, self.port), timeout=None) - self.addCleanup(self.cli.close) - finally: - socket.setdefaulttimeout(None) - self.assertEqual(self.cli.gettimeout(), None) - - testTimeoutValueNamed = _justAccept - def _testTimeoutValueNamed(self): - self.cli = socket.create_connection((HOST, self.port), timeout=30) - self.assertEqual(self.cli.gettimeout(), 30) - - testTimeoutValueNonamed = _justAccept - def _testTimeoutValueNonamed(self): - self.cli = socket.create_connection((HOST, self.port), 30) - self.addCleanup(self.cli.close) - self.assertEqual(self.cli.gettimeout(), 30) - -@unittest.skipUnless(thread, 'Threading required for this test.') -class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketTCPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - pass - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - - def testInsideTimeout(self): - conn, addr = self.serv.accept() - self.addCleanup(conn.close) - time.sleep(3) - conn.send("done!") - testOutsideTimeout = testInsideTimeout - - def _testInsideTimeout(self): - self.cli = sock = socket.create_connection((HOST, self.port)) - data = sock.recv(5) - self.assertEqual(data, "done!") - - def _testOutsideTimeout(self): - self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) - self.assertRaises(socket.timeout, lambda: sock.recv(5)) - - -class Urllib2FileobjectTest(unittest.TestCase): - - # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that - # it close the socket if the close c'tor argument is true - - def testClose(self): - class MockSocket: - closed = False - def flush(self): pass - def close(self): self.closed = True - - # must not close unless we request it: the original use of _fileobject - # by module socket requires that the underlying socket not be closed until - # the _socketobject that created the _fileobject is closed - s = MockSocket() - f = socket._fileobject(s) - f.close() - self.assertTrue(not s.closed) - - s = MockSocket() - f = socket._fileobject(s, close=True) - f.close() - self.assertTrue(s.closed) - -class TCPTimeoutTest(SocketTCPTest): - - def testTCPTimeout(self): - def raise_timeout(*args, **kwargs): - self.serv.settimeout(1.0) - self.serv.accept() - self.assertRaises(socket.timeout, raise_timeout, - "Error generating a timeout exception (TCP)") - - def testTimeoutZero(self): - ok = False - try: - self.serv.settimeout(0.0) - foo = self.serv.accept() - except socket.timeout: - self.fail("caught timeout instead of error (TCP)") - except socket.error: - ok = True - except: - self.fail("caught unexpected exception (TCP)") - if not ok: - self.fail("accept() returned success when we did not expect it") - - def testInterruptedTimeout(self): - # XXX I don't know how to do this test on MSWindows or any other - # plaform that doesn't support signal.alarm() or os.kill(), though - # the bug should have existed on all platforms. - if not hasattr(signal, "alarm"): - return # can only test on *nix - self.serv.settimeout(5.0) # must be longer than alarm - class Alarm(Exception): - pass - def alarm_handler(signal, frame): - raise Alarm - old_alarm = signal.signal(signal.SIGALRM, alarm_handler) - try: - signal.alarm(2) # POSIX allows alarm to be up to 1 second early - try: - foo = self.serv.accept() - except socket.timeout: - self.fail("caught timeout instead of Alarm") - except Alarm: - pass - except: - self.fail("caught other exception instead of Alarm:" - " %s(%s):\n%s" % - (sys.exc_info()[:2] + (traceback.format_exc(),))) - else: - self.fail("nothing caught") - finally: - signal.alarm(0) # shut off alarm - except Alarm: - self.fail("got Alarm in wrong place") - finally: - # no alarm can be pending. Safe to restore old handler. - signal.signal(signal.SIGALRM, old_alarm) - -class UDPTimeoutTest(SocketUDPTest): - - def testUDPTimeout(self): - def raise_timeout(*args, **kwargs): - self.serv.settimeout(1.0) - self.serv.recv(1024) - self.assertRaises(socket.timeout, raise_timeout, - "Error generating a timeout exception (UDP)") - - def testTimeoutZero(self): - ok = False - try: - self.serv.settimeout(0.0) - foo = self.serv.recv(1024) - except socket.timeout: - self.fail("caught timeout instead of error (UDP)") - except socket.error: - ok = True - except: - self.fail("caught unexpected exception (UDP)") - if not ok: - self.fail("recv() returned success when we did not expect it") - -class TestExceptions(unittest.TestCase): - - def testExceptionTree(self): - self.assertTrue(issubclass(socket.error, Exception)) - self.assertTrue(issubclass(socket.herror, socket.error)) - self.assertTrue(issubclass(socket.gaierror, socket.error)) - self.assertTrue(issubclass(socket.timeout, socket.error)) - -class TestLinuxAbstractNamespace(unittest.TestCase): - - UNIX_PATH_MAX = 108 - - def testLinuxAbstractNamespace(self): - address = "\x00python-test-hello\x00\xff" - s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s1.bind(address) - s1.listen(1) - s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s2.connect(s1.getsockname()) - s1.accept() - self.assertEqual(s1.getsockname(), address) - self.assertEqual(s2.getpeername(), address) - - def testMaxName(self): - address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.bind(address) - self.assertEqual(s.getsockname(), address) - - def testNameOverflow(self): - address = "\x00" + "h" * self.UNIX_PATH_MAX - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.assertRaises(socket.error, s.bind, address) - - -@unittest.skipUnless(thread, 'Threading required for this test.') -class BufferIOTest(SocketConnectedTest): - """ - Test the buffer versions of socket.recv() and socket.send(). - """ - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def testRecvIntoArray(self): - buf = array.array('c', ' '*1024) - nbytes = self.cli_conn.recv_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf.tostring()[:len(MSG)] - self.assertEqual(msg, MSG) - - def _testRecvIntoArray(self): - with test_support.check_py3k_warnings(): - buf = buffer(MSG) - self.serv_conn.send(buf) - - def testRecvIntoBytearray(self): - buf = bytearray(1024) - nbytes = self.cli_conn.recv_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf[:len(MSG)] - self.assertEqual(msg, MSG) - - _testRecvIntoBytearray = _testRecvIntoArray - - def testRecvIntoMemoryview(self): - buf = bytearray(1024) - nbytes = self.cli_conn.recv_into(memoryview(buf)) - self.assertEqual(nbytes, len(MSG)) - msg = buf[:len(MSG)] - self.assertEqual(msg, MSG) - - _testRecvIntoMemoryview = _testRecvIntoArray - - def testRecvFromIntoArray(self): - buf = array.array('c', ' '*1024) - nbytes, addr = self.cli_conn.recvfrom_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf.tostring()[:len(MSG)] - self.assertEqual(msg, MSG) - - def _testRecvFromIntoArray(self): - with test_support.check_py3k_warnings(): - buf = buffer(MSG) - self.serv_conn.send(buf) - - def testRecvFromIntoBytearray(self): - buf = bytearray(1024) - nbytes, addr = self.cli_conn.recvfrom_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf[:len(MSG)] - self.assertEqual(msg, MSG) - - _testRecvFromIntoBytearray = _testRecvFromIntoArray - - def testRecvFromIntoMemoryview(self): - buf = bytearray(1024) - nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) - self.assertEqual(nbytes, len(MSG)) - msg = buf[:len(MSG)] - self.assertEqual(msg, MSG) - - _testRecvFromIntoMemoryview = _testRecvFromIntoArray - - -TIPC_STYPE = 2000 -TIPC_LOWER = 200 -TIPC_UPPER = 210 - -def isTipcAvailable(): - """Check if the TIPC module is loaded - - The TIPC module is not loaded automatically on Ubuntu and probably - other Linux distros. - """ - if not hasattr(socket, "AF_TIPC"): - return False - if not os.path.isfile("/proc/modules"): - return False - with open("/proc/modules") as f: - for line in f: - if line.startswith("tipc "): - return True - if test_support.verbose: - print "TIPC module is not loaded, please 'sudo modprobe tipc'" - return False - -class TIPCTest (unittest.TestCase): - def testRDM(self): - srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) - cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) - - srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, - TIPC_LOWER, TIPC_UPPER) - srv.bind(srvaddr) - - sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, - TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) - cli.sendto(MSG, sendaddr) - - msg, recvaddr = srv.recvfrom(1024) - - self.assertEqual(cli.getsockname(), recvaddr) - self.assertEqual(msg, MSG) - - -class TIPCThreadableTest (unittest.TestCase, ThreadableTest): - def __init__(self, methodName = 'runTest'): - unittest.TestCase.__init__(self, methodName = methodName) - ThreadableTest.__init__(self) - - def setUp(self): - self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) - self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, - TIPC_LOWER, TIPC_UPPER) - self.srv.bind(srvaddr) - self.srv.listen(5) - self.serverExplicitReady() - self.conn, self.connaddr = self.srv.accept() - - def clientSetUp(self): - # The is a hittable race between serverExplicitReady() and the - # accept() call; sleep a little while to avoid it, otherwise - # we could get an exception - time.sleep(0.1) - self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) - addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, - TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) - self.cli.connect(addr) - self.cliaddr = self.cli.getsockname() - - def testStream(self): - msg = self.conn.recv(1024) - self.assertEqual(msg, MSG) - self.assertEqual(self.cliaddr, self.connaddr) - - def _testStream(self): - self.cli.send(MSG) - self.cli.close() - - -def test_main(): - tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, - TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, - UDPTimeoutTest ] - - tests.extend([ - NonBlockingTCPTests, - FileObjectClassTestCase, - FileObjectInterruptedTestCase, - UnbufferedFileObjectClassTestCase, - LineBufferedFileObjectClassTestCase, - SmallBufferedFileObjectClassTestCase, - Urllib2FileobjectTest, - NetworkConnectionNoServer, - NetworkConnectionAttributesTest, - NetworkConnectionBehaviourTest, - ]) - if hasattr(socket, "socketpair"): - tests.append(BasicSocketPairTest) - if sys.platform == 'linux2': - tests.append(TestLinuxAbstractNamespace) - if isTipcAvailable(): - tests.append(TIPCTest) - tests.append(TIPCThreadableTest) - - thread_info = test_support.threading_setup() - test_support.run_unittest(*tests) - test_support.threading_cleanup(*thread_info) - -if __name__ == "__main__": - test_main() |