diff options
Diffstat (limited to 'lib/python2.7/test/test_ssl.py')
-rw-r--r-- | lib/python2.7/test/test_ssl.py | 1394 |
1 files changed, 0 insertions, 1394 deletions
diff --git a/lib/python2.7/test/test_ssl.py b/lib/python2.7/test/test_ssl.py deleted file mode 100644 index 7552199..0000000 --- a/lib/python2.7/test/test_ssl.py +++ /dev/null @@ -1,1394 +0,0 @@ -# Test the support for SSL and sockets - -import sys -import unittest -from test import test_support -import asyncore -import socket -import select -import time -import gc -import os -import errno -import pprint -import urllib, urlparse -import traceback -import weakref -import functools -import platform - -from BaseHTTPServer import HTTPServer -from SimpleHTTPServer import SimpleHTTPRequestHandler - -ssl = test_support.import_module("ssl") - -HOST = test_support.HOST -CERTFILE = None -SVN_PYTHON_ORG_ROOT_CERT = None - -def handle_error(prefix): - exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) - if test_support.verbose: - sys.stdout.write(prefix + exc_format) - - -class BasicTests(unittest.TestCase): - - def test_sslwrap_simple(self): - # A crude test for the legacy API - try: - ssl.sslwrap_simple(socket.socket(socket.AF_INET)) - except IOError, e: - if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that - pass - else: - raise - try: - ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) - except IOError, e: - if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that - pass - else: - raise - -# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 -def skip_if_broken_ubuntu_ssl(func): - if hasattr(ssl, 'PROTOCOL_SSLv2'): - # We need to access the lower-level wrapper in order to create an - # implicit SSL context without trying to connect or listen. - try: - import _ssl - except ImportError: - # The returned function won't get executed, just ignore the error - pass - @functools.wraps(func) - def f(*args, **kwargs): - try: - s = socket.socket(socket.AF_INET) - _ssl.sslwrap(s._sock, 0, None, None, - ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) - except ssl.SSLError as e: - if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and - platform.linux_distribution() == ('debian', 'squeeze/sid', '') - and 'Invalid SSL protocol variant specified' in str(e)): - raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") - return func(*args, **kwargs) - return f - else: - return func - - -class BasicSocketTests(unittest.TestCase): - - def test_constants(self): - #ssl.PROTOCOL_SSLv2 - ssl.PROTOCOL_SSLv23 - ssl.PROTOCOL_SSLv3 - ssl.PROTOCOL_TLSv1 - ssl.CERT_NONE - ssl.CERT_OPTIONAL - ssl.CERT_REQUIRED - - def test_random(self): - v = ssl.RAND_status() - if test_support.verbose: - sys.stdout.write("\n RAND_status is %d (%s)\n" - % (v, (v and "sufficient randomness") or - "insufficient randomness")) - self.assertRaises(TypeError, ssl.RAND_egd, 1) - self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) - ssl.RAND_add("this is a random string", 75.0) - - def test_parse_cert(self): - # note that this uses an 'unofficial' function in _ssl.c, - # provided solely for this test, to exercise the certificate - # parsing code - p = ssl._ssl._test_decode_cert(CERTFILE, False) - if test_support.verbose: - sys.stdout.write("\n" + pprint.pformat(p) + "\n") - self.assertEqual(p['subject'], - ((('countryName', 'XY'),), - (('localityName', 'Castle Anthrax'),), - (('organizationName', 'Python Software Foundation'),), - (('commonName', 'localhost'),)) - ) - self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) - # Issue #13034: the subjectAltName in some certificates - # (notably projects.developer.nokia.com:443) wasn't parsed - p = ssl._ssl._test_decode_cert(NOKIACERT) - if test_support.verbose: - sys.stdout.write("\n" + pprint.pformat(p) + "\n") - self.assertEqual(p['subjectAltName'], - (('DNS', 'projects.developer.nokia.com'), - ('DNS', 'projects.forum.nokia.com')) - ) - - def test_DER_to_PEM(self): - with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: - pem = f.read() - d1 = ssl.PEM_cert_to_DER_cert(pem) - p2 = ssl.DER_cert_to_PEM_cert(d1) - d2 = ssl.PEM_cert_to_DER_cert(p2) - self.assertEqual(d1, d2) - if not p2.startswith(ssl.PEM_HEADER + '\n'): - self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) - if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): - self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) - - def test_openssl_version(self): - n = ssl.OPENSSL_VERSION_NUMBER - t = ssl.OPENSSL_VERSION_INFO - s = ssl.OPENSSL_VERSION - self.assertIsInstance(n, (int, long)) - self.assertIsInstance(t, tuple) - self.assertIsInstance(s, str) - # Some sanity checks follow - # >= 0.9 - self.assertGreaterEqual(n, 0x900000) - # < 2.0 - self.assertLess(n, 0x20000000) - major, minor, fix, patch, status = t - self.assertGreaterEqual(major, 0) - self.assertLess(major, 2) - self.assertGreaterEqual(minor, 0) - self.assertLess(minor, 256) - self.assertGreaterEqual(fix, 0) - self.assertLess(fix, 256) - self.assertGreaterEqual(patch, 0) - self.assertLessEqual(patch, 26) - self.assertGreaterEqual(status, 0) - self.assertLessEqual(status, 15) - # Version string as returned by OpenSSL, the format might change - self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), - (s, t)) - - def test_ciphers(self): - if not test_support.is_resource_enabled('network'): - return - remote = ("svn.python.org", 443) - with test_support.transient_internet(remote[0]): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="ALL") - s.connect(remote) - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") - s.connect(remote) - # Error checking occurs when connecting, because the SSL context - # isn't created before. - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") - with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): - s.connect(remote) - - @test_support.cpython_only - def test_refcycle(self): - # Issue #7943: an SSL object doesn't create reference cycles with - # itself. - s = socket.socket(socket.AF_INET) - ss = ssl.wrap_socket(s) - wr = weakref.ref(ss) - del ss - self.assertEqual(wr(), None) - - def test_wrapped_unconnected(self): - # The _delegate_methods in socket.py are correctly delegated to by an - # unconnected SSLSocket, so they will raise a socket.error rather than - # something unexpected like TypeError. - s = socket.socket(socket.AF_INET) - ss = ssl.wrap_socket(s) - self.assertRaises(socket.error, ss.recv, 1) - self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) - self.assertRaises(socket.error, ss.recvfrom, 1) - self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) - self.assertRaises(socket.error, ss.send, b'x') - self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) - - -class NetworkedTests(unittest.TestCase): - - def test_connect(self): - with test_support.transient_internet("svn.python.org"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE) - s.connect(("svn.python.org", 443)) - c = s.getpeercert() - if c: - self.fail("Peer cert %s shouldn't be here!") - s.close() - - # this should fail because we have no verification certs - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED) - try: - s.connect(("svn.python.org", 443)) - except ssl.SSLError: - pass - finally: - s.close() - - # this should succeed because we specify the root cert - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=SVN_PYTHON_ORG_ROOT_CERT) - try: - s.connect(("svn.python.org", 443)) - finally: - s.close() - - def test_connect_ex(self): - # Issue #11326: check connect_ex() implementation - with test_support.transient_internet("svn.python.org"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=SVN_PYTHON_ORG_ROOT_CERT) - try: - self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) - self.assertTrue(s.getpeercert()) - finally: - s.close() - - def test_non_blocking_connect_ex(self): - # Issue #11326: non-blocking connect_ex() should allow handshake - # to proceed after the socket gets ready. - with test_support.transient_internet("svn.python.org"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=SVN_PYTHON_ORG_ROOT_CERT, - do_handshake_on_connect=False) - try: - s.setblocking(False) - rc = s.connect_ex(('svn.python.org', 443)) - # EWOULDBLOCK under Windows, EINPROGRESS elsewhere - self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) - # Wait for connect to finish - select.select([], [s], [], 5.0) - # Non-blocking handshake - while True: - try: - s.do_handshake() - break - except ssl.SSLError as err: - if err.args[0] == ssl.SSL_ERROR_WANT_READ: - select.select([s], [], [], 5.0) - elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: - select.select([], [s], [], 5.0) - else: - raise - # SSL established - self.assertTrue(s.getpeercert()) - finally: - s.close() - - def test_timeout_connect_ex(self): - # Issue #12065: on a timeout, connect_ex() should return the original - # errno (mimicking the behaviour of non-SSL sockets). - with test_support.transient_internet("svn.python.org"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=SVN_PYTHON_ORG_ROOT_CERT, - do_handshake_on_connect=False) - try: - s.settimeout(0.0000001) - rc = s.connect_ex(('svn.python.org', 443)) - if rc == 0: - self.skipTest("svn.python.org responded too quickly") - self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) - finally: - s.close() - - def test_connect_ex_error(self): - with test_support.transient_internet("svn.python.org"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=SVN_PYTHON_ORG_ROOT_CERT) - try: - self.assertEqual(errno.ECONNREFUSED, - s.connect_ex(("svn.python.org", 444))) - finally: - s.close() - - @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") - def test_makefile_close(self): - # Issue #5238: creating a file-like object with makefile() shouldn't - # delay closing the underlying "real socket" (here tested with its - # file descriptor, hence skipping the test under Windows). - with test_support.transient_internet("svn.python.org"): - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) - ss.connect(("svn.python.org", 443)) - fd = ss.fileno() - f = ss.makefile() - f.close() - # The fd is still open - os.read(fd, 0) - # Closing the SSL socket should close the fd too - ss.close() - gc.collect() - with self.assertRaises(OSError) as e: - os.read(fd, 0) - self.assertEqual(e.exception.errno, errno.EBADF) - - def test_non_blocking_handshake(self): - with test_support.transient_internet("svn.python.org"): - s = socket.socket(socket.AF_INET) - s.connect(("svn.python.org", 443)) - s.setblocking(False) - s = ssl.wrap_socket(s, - cert_reqs=ssl.CERT_NONE, - do_handshake_on_connect=False) - count = 0 - while True: - try: - count += 1 - s.do_handshake() - break - except ssl.SSLError, err: - if err.args[0] == ssl.SSL_ERROR_WANT_READ: - select.select([s], [], []) - elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: - select.select([], [s], []) - else: - raise - s.close() - if test_support.verbose: - sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) - - def test_get_server_certificate(self): - with test_support.transient_internet("svn.python.org"): - pem = ssl.get_server_certificate(("svn.python.org", 443)) - if not pem: - self.fail("No server certificate on svn.python.org:443!") - - try: - pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) - except ssl.SSLError: - #should fail - pass - else: - self.fail("Got server certificate %s for svn.python.org!" % pem) - - pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) - if not pem: - self.fail("No server certificate on svn.python.org:443!") - if test_support.verbose: - sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) - - def test_algorithms(self): - # Issue #8484: all algorithms should be available when verifying a - # certificate. - # SHA256 was added in OpenSSL 0.9.8 - if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): - self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) - self.skipTest("remote host needs SNI, only available on Python 3.2+") - # NOTE: https://sha2.hboeck.de is another possible test host - remote = ("sha256.tbs-internet.com", 443) - sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") - with test_support.transient_internet("sha256.tbs-internet.com"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=sha256_cert,) - try: - s.connect(remote) - if test_support.verbose: - sys.stdout.write("\nCipher with %r is %r\n" % - (remote, s.cipher())) - sys.stdout.write("Certificate is:\n%s\n" % - pprint.pformat(s.getpeercert())) - finally: - s.close() - - -try: - import threading -except ImportError: - _have_threads = False -else: - _have_threads = True - - class ThreadedEchoServer(threading.Thread): - - class ConnectionHandler(threading.Thread): - - """A mildly complicated class, because we want it to work both - with and without the SSL wrapper around the socket connection, so - that we can test the STARTTLS functionality.""" - - def __init__(self, server, connsock): - self.server = server - self.running = False - self.sock = connsock - self.sock.setblocking(1) - self.sslconn = None - threading.Thread.__init__(self) - self.daemon = True - - def show_conn_details(self): - if self.server.certreqs == ssl.CERT_REQUIRED: - cert = self.sslconn.getpeercert() - if test_support.verbose and self.server.chatty: - sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") - cert_binary = self.sslconn.getpeercert(True) - if test_support.verbose and self.server.chatty: - sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") - cipher = self.sslconn.cipher() - if test_support.verbose and self.server.chatty: - sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") - - def wrap_conn(self): - try: - self.sslconn = ssl.wrap_socket(self.sock, server_side=True, - certfile=self.server.certificate, - ssl_version=self.server.protocol, - ca_certs=self.server.cacerts, - cert_reqs=self.server.certreqs, - ciphers=self.server.ciphers) - except ssl.SSLError as e: - # XXX Various errors can have happened here, for example - # a mismatching protocol version, an invalid certificate, - # or a low-level bug. This should be made more discriminating. - self.server.conn_errors.append(e) - if self.server.chatty: - handle_error("\n server: bad connection attempt from " + - str(self.sock.getpeername()) + ":\n") - self.close() - self.running = False - self.server.stop() - return False - else: - return True - - def read(self): - if self.sslconn: - return self.sslconn.read() - else: - return self.sock.recv(1024) - - def write(self, bytes): - if self.sslconn: - return self.sslconn.write(bytes) - else: - return self.sock.send(bytes) - - def close(self): - if self.sslconn: - self.sslconn.close() - else: - self.sock._sock.close() - - def run(self): - self.running = True - if not self.server.starttls_server: - if isinstance(self.sock, ssl.SSLSocket): - self.sslconn = self.sock - elif not self.wrap_conn(): - return - self.show_conn_details() - while self.running: - try: - msg = self.read() - if not msg: - # eof, so quit this handler - self.running = False - self.close() - elif msg.strip() == 'over': - if test_support.verbose and self.server.connectionchatty: - sys.stdout.write(" server: client closed connection\n") - self.close() - return - elif self.server.starttls_server and msg.strip() == 'STARTTLS': - if test_support.verbose and self.server.connectionchatty: - sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") - self.write("OK\n") - if not self.wrap_conn(): - return - elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': - if test_support.verbose and self.server.connectionchatty: - sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") - self.write("OK\n") - self.sslconn.unwrap() - self.sslconn = None - if test_support.verbose and self.server.connectionchatty: - sys.stdout.write(" server: connection is now unencrypted...\n") - else: - if (test_support.verbose and - self.server.connectionchatty): - ctype = (self.sslconn and "encrypted") or "unencrypted" - sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" - % (repr(msg), ctype, repr(msg.lower()), ctype)) - self.write(msg.lower()) - except ssl.SSLError: - if self.server.chatty: - handle_error("Test server failure:\n") - self.close() - self.running = False - # normally, we'd just stop here, but for the test - # harness, we want to stop the server - self.server.stop() - - def __init__(self, certificate, ssl_version=None, - certreqs=None, cacerts=None, - chatty=True, connectionchatty=False, starttls_server=False, - wrap_accepting_socket=False, ciphers=None): - - if ssl_version is None: - ssl_version = ssl.PROTOCOL_TLSv1 - if certreqs is None: - certreqs = ssl.CERT_NONE - self.certificate = certificate - self.protocol = ssl_version - self.certreqs = certreqs - self.cacerts = cacerts - self.ciphers = ciphers - self.chatty = chatty - self.connectionchatty = connectionchatty - self.starttls_server = starttls_server - self.sock = socket.socket() - self.flag = None - if wrap_accepting_socket: - self.sock = ssl.wrap_socket(self.sock, server_side=True, - certfile=self.certificate, - cert_reqs = self.certreqs, - ca_certs = self.cacerts, - ssl_version = self.protocol, - ciphers = self.ciphers) - if test_support.verbose and self.chatty: - sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) - self.port = test_support.bind_port(self.sock) - self.active = False - self.conn_errors = [] - threading.Thread.__init__(self) - self.daemon = True - - def __enter__(self): - self.start(threading.Event()) - self.flag.wait() - return self - - def __exit__(self, *args): - self.stop() - self.join() - - def start(self, flag=None): - self.flag = flag - threading.Thread.start(self) - - def run(self): - self.sock.settimeout(0.05) - self.sock.listen(5) - self.active = True - if self.flag: - # signal an event - self.flag.set() - while self.active: - try: - newconn, connaddr = self.sock.accept() - if test_support.verbose and self.chatty: - sys.stdout.write(' server: new connection from ' - + str(connaddr) + '\n') - handler = self.ConnectionHandler(self, newconn) - handler.start() - handler.join() - except socket.timeout: - pass - except KeyboardInterrupt: - self.stop() - self.sock.close() - - def stop(self): - self.active = False - - class AsyncoreEchoServer(threading.Thread): - - class EchoServer(asyncore.dispatcher): - - class ConnectionHandler(asyncore.dispatcher_with_send): - - def __init__(self, conn, certfile): - asyncore.dispatcher_with_send.__init__(self, conn) - self.socket = ssl.wrap_socket(conn, server_side=True, - certfile=certfile, - do_handshake_on_connect=False) - self._ssl_accepting = True - - def readable(self): - if isinstance(self.socket, ssl.SSLSocket): - while self.socket.pending() > 0: - self.handle_read_event() - return True - - def _do_ssl_handshake(self): - try: - self.socket.do_handshake() - except ssl.SSLError, err: - if err.args[0] in (ssl.SSL_ERROR_WANT_READ, - ssl.SSL_ERROR_WANT_WRITE): - return - elif err.args[0] == ssl.SSL_ERROR_EOF: - return self.handle_close() - raise - except socket.error, err: - if err.args[0] == errno.ECONNABORTED: - return self.handle_close() - else: - self._ssl_accepting = False - - def handle_read(self): - if self._ssl_accepting: - self._do_ssl_handshake() - else: - data = self.recv(1024) - if data and data.strip() != 'over': - self.send(data.lower()) - - def handle_close(self): - self.close() - if test_support.verbose: - sys.stdout.write(" server: closed connection %s\n" % self.socket) - - def handle_error(self): - raise - - def __init__(self, certfile): - self.certfile = certfile - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = test_support.bind_port(self.socket) - self.listen(5) - - def handle_accept(self): - sock_obj, addr = self.accept() - if test_support.verbose: - sys.stdout.write(" server: new connection from %s:%s\n" %addr) - self.ConnectionHandler(sock_obj, self.certfile) - - def handle_error(self): - raise - - def __init__(self, certfile): - self.flag = None - self.active = False - self.server = self.EchoServer(certfile) - self.port = self.server.port - threading.Thread.__init__(self) - self.daemon = True - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self.server) - - def __enter__(self): - self.start(threading.Event()) - self.flag.wait() - return self - - def __exit__(self, *args): - if test_support.verbose: - sys.stdout.write(" cleanup: stopping server.\n") - self.stop() - if test_support.verbose: - sys.stdout.write(" cleanup: joining server thread.\n") - self.join() - if test_support.verbose: - sys.stdout.write(" cleanup: successfully joined.\n") - - def start(self, flag=None): - self.flag = flag - threading.Thread.start(self) - - def run(self): - self.active = True - if self.flag: - self.flag.set() - while self.active: - asyncore.loop(0.05) - - def stop(self): - self.active = False - self.server.close() - - class SocketServerHTTPSServer(threading.Thread): - - class HTTPSServer(HTTPServer): - - def __init__(self, server_address, RequestHandlerClass, certfile): - HTTPServer.__init__(self, server_address, RequestHandlerClass) - # we assume the certfile contains both private key and certificate - self.certfile = certfile - self.allow_reuse_address = True - - def __str__(self): - return ('<%s %s:%s>' % - (self.__class__.__name__, - self.server_name, - self.server_port)) - - def get_request(self): - # override this to wrap socket with SSL - sock, addr = self.socket.accept() - sslconn = ssl.wrap_socket(sock, server_side=True, - certfile=self.certfile) - return sslconn, addr - - class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): - # need to override translate_path to get a known root, - # instead of using os.curdir, since the test could be - # run from anywhere - - server_version = "TestHTTPS/1.0" - - root = None - - def translate_path(self, path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = urlparse.urlparse(path)[2] - path = os.path.normpath(urllib.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = self.root - for word in words: - drive, word = os.path.splitdrive(word) - head, word = os.path.split(word) - if word in self.root: continue - path = os.path.join(path, word) - return path - - def log_message(self, format, *args): - - # we override this to suppress logging unless "verbose" - - if test_support.verbose: - sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % - (self.server.server_address, - self.server.server_port, - self.request.cipher(), - self.log_date_time_string(), - format%args)) - - - def __init__(self, certfile): - self.flag = None - self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] - self.server = self.HTTPSServer( - (HOST, 0), self.RootedHTTPRequestHandler, certfile) - self.port = self.server.server_port - threading.Thread.__init__(self) - self.daemon = True - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self.server) - - def start(self, flag=None): - self.flag = flag - threading.Thread.start(self) - - def run(self): - if self.flag: - self.flag.set() - self.server.serve_forever(0.05) - - def stop(self): - self.server.shutdown() - - - def bad_cert_test(certfile): - """ - Launch a server with CERT_REQUIRED, and check that trying to - connect to it with the given client certificate fails. - """ - server = ThreadedEchoServer(CERTFILE, - certreqs=ssl.CERT_REQUIRED, - cacerts=CERTFILE, chatty=False) - with server: - try: - s = ssl.wrap_socket(socket.socket(), - certfile=certfile, - ssl_version=ssl.PROTOCOL_TLSv1) - s.connect((HOST, server.port)) - except ssl.SSLError, x: - if test_support.verbose: - sys.stdout.write("\nSSLError is %s\n" % x[1]) - except socket.error, x: - if test_support.verbose: - sys.stdout.write("\nsocket.error is %s\n" % x[1]) - else: - raise AssertionError("Use of invalid cert should have failed!") - - def server_params_test(certfile, protocol, certreqs, cacertsfile, - client_certfile, client_protocol=None, indata="FOO\n", - ciphers=None, chatty=True, connectionchatty=False, - wrap_accepting_socket=False): - """ - Launch a server, connect a client to it and try various reads - and writes. - """ - server = ThreadedEchoServer(certfile, - certreqs=certreqs, - ssl_version=protocol, - cacerts=cacertsfile, - ciphers=ciphers, - chatty=chatty, - connectionchatty=connectionchatty, - wrap_accepting_socket=wrap_accepting_socket) - with server: - # try to connect - if client_protocol is None: - client_protocol = protocol - s = ssl.wrap_socket(socket.socket(), - certfile=client_certfile, - ca_certs=cacertsfile, - ciphers=ciphers, - cert_reqs=certreqs, - ssl_version=client_protocol) - s.connect((HOST, server.port)) - for arg in [indata, bytearray(indata), memoryview(indata)]: - if connectionchatty: - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % (repr(arg))) - s.write(arg) - outdata = s.read() - if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) - if outdata != indata.lower(): - raise AssertionError( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - s.close() - - def try_protocol_combo(server_protocol, - client_protocol, - expect_success, - certsreqs=None): - if certsreqs is None: - certsreqs = ssl.CERT_NONE - certtype = { - ssl.CERT_NONE: "CERT_NONE", - ssl.CERT_OPTIONAL: "CERT_OPTIONAL", - ssl.CERT_REQUIRED: "CERT_REQUIRED", - }[certsreqs] - if test_support.verbose: - formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" - sys.stdout.write(formatstr % - (ssl.get_protocol_name(client_protocol), - ssl.get_protocol_name(server_protocol), - certtype)) - try: - # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client - # will send an SSLv3 hello (rather than SSLv2) starting from - # OpenSSL 1.0.0 (see issue #8322). - server_params_test(CERTFILE, server_protocol, certsreqs, - CERTFILE, CERTFILE, client_protocol, - ciphers="ALL", chatty=False) - # Protocol mismatch can result in either an SSLError, or a - # "Connection reset by peer" error. - except ssl.SSLError: - if expect_success: - raise - except socket.error as e: - if expect_success or e.errno != errno.ECONNRESET: - raise - else: - if not expect_success: - raise AssertionError( - "Client protocol %s succeeded with server protocol %s!" - % (ssl.get_protocol_name(client_protocol), - ssl.get_protocol_name(server_protocol))) - - - class ThreadedTests(unittest.TestCase): - - def test_rude_shutdown(self): - """A brutal shutdown of an SSL server should raise an IOError - in the client when attempting handshake. - """ - listener_ready = threading.Event() - listener_gone = threading.Event() - - s = socket.socket() - port = test_support.bind_port(s, HOST) - - # `listener` runs in a thread. It sits in an accept() until - # the main thread connects. Then it rudely closes the socket, - # and sets Event `listener_gone` to let the main thread know - # the socket is gone. - def listener(): - s.listen(5) - listener_ready.set() - s.accept() - s.close() - listener_gone.set() - - def connector(): - listener_ready.wait() - c = socket.socket() - c.connect((HOST, port)) - listener_gone.wait() - try: - ssl_sock = ssl.wrap_socket(c) - except IOError: - pass - else: - self.fail('connecting to closed SSL socket should have failed') - - t = threading.Thread(target=listener) - t.start() - try: - connector() - finally: - t.join() - - @skip_if_broken_ubuntu_ssl - def test_echo(self): - """Basic test of an SSL client connecting to a server""" - if test_support.verbose: - sys.stdout.write("\n") - server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, - CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, - chatty=True, connectionchatty=True) - - def test_getpeercert(self): - if test_support.verbose: - sys.stdout.write("\n") - s2 = socket.socket() - server = ThreadedEchoServer(CERTFILE, - certreqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_SSLv23, - cacerts=CERTFILE, - chatty=False) - with server: - s = ssl.wrap_socket(socket.socket(), - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=ssl.PROTOCOL_SSLv23) - s.connect((HOST, server.port)) - cert = s.getpeercert() - self.assertTrue(cert, "Can't get peer certificate.") - cipher = s.cipher() - if test_support.verbose: - sys.stdout.write(pprint.pformat(cert) + '\n') - sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') - if 'subject' not in cert: - self.fail("No subject field in certificate: %s." % - pprint.pformat(cert)) - if ((('organizationName', 'Python Software Foundation'),) - not in cert['subject']): - self.fail( - "Missing or invalid 'organizationName' field in certificate subject; " - "should be 'Python Software Foundation'.") - s.close() - - def test_empty_cert(self): - """Connecting with an empty cert file""" - bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, - "nullcert.pem")) - def test_malformed_cert(self): - """Connecting with a badly formatted certificate (syntax error)""" - bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, - "badcert.pem")) - def test_nonexisting_cert(self): - """Connecting with a non-existing cert file""" - bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, - "wrongcert.pem")) - def test_malformed_key(self): - """Connecting with a badly formatted key (syntax error)""" - bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, - "badkey.pem")) - - @skip_if_broken_ubuntu_ssl - def test_protocol_sslv2(self): - """Connecting to an SSLv2 server with various client options""" - if test_support.verbose: - sys.stdout.write("\n") - if not hasattr(ssl, 'PROTOCOL_SSLv2'): - self.skipTest("PROTOCOL_SSLv2 needed") - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) - try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) - - @skip_if_broken_ubuntu_ssl - def test_protocol_sslv23(self): - """Connecting to an SSLv23 server with various client options""" - if test_support.verbose: - sys.stdout.write("\n") - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) - - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) - - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) - try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) - - @skip_if_broken_ubuntu_ssl - def test_protocol_sslv3(self): - """Connecting to an SSLv3 server with various client options""" - if test_support.verbose: - sys.stdout.write("\n") - try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) - try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) - try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) - if hasattr(ssl, 'PROTOCOL_SSLv2'): - try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) - try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) - - @skip_if_broken_ubuntu_ssl - def test_protocol_tlsv1(self): - """Connecting to a TLSv1 server with various client options""" - if test_support.verbose: - sys.stdout.write("\n") - try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) - try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) - try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) - if hasattr(ssl, 'PROTOCOL_SSLv2'): - try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) - try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) - - def test_starttls(self): - """Switching from clear text to encrypted and back again.""" - msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") - - server = ThreadedEchoServer(CERTFILE, - ssl_version=ssl.PROTOCOL_TLSv1, - starttls_server=True, - chatty=True, - connectionchatty=True) - wrapped = False - with server: - s = socket.socket() - s.setblocking(1) - s.connect((HOST, server.port)) - if test_support.verbose: - sys.stdout.write("\n") - for indata in msgs: - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % repr(indata)) - if wrapped: - conn.write(indata) - outdata = conn.read() - else: - s.send(indata) - outdata = s.recv(1024) - if (indata == "STARTTLS" and - outdata.strip().lower().startswith("ok")): - # STARTTLS ok, switch to secure mode - if test_support.verbose: - sys.stdout.write( - " client: read %s from server, starting TLS...\n" - % repr(outdata)) - conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) - wrapped = True - elif (indata == "ENDTLS" and - outdata.strip().lower().startswith("ok")): - # ENDTLS ok, switch back to clear text - if test_support.verbose: - sys.stdout.write( - " client: read %s from server, ending TLS...\n" - % repr(outdata)) - s = conn.unwrap() - wrapped = False - else: - if test_support.verbose: - sys.stdout.write( - " client: read %s from server\n" % repr(outdata)) - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - if wrapped: - conn.write("over\n") - else: - s.send("over\n") - s.close() - - def test_socketserver(self): - """Using a SocketServer to create and manage SSL connections.""" - server = SocketServerHTTPSServer(CERTFILE) - flag = threading.Event() - server.start(flag) - # wait for it to start - flag.wait() - # try to connect - try: - if test_support.verbose: - sys.stdout.write('\n') - with open(CERTFILE, 'rb') as f: - d1 = f.read() - d2 = '' - # now fetch the same data from the HTTPS server - url = 'https://127.0.0.1:%d/%s' % ( - server.port, os.path.split(CERTFILE)[1]) - with test_support.check_py3k_warnings(): - f = urllib.urlopen(url) - dlen = f.info().getheader("content-length") - if dlen and (int(dlen) > 0): - d2 = f.read(int(dlen)) - if test_support.verbose: - sys.stdout.write( - " client: read %d bytes from remote server '%s'\n" - % (len(d2), server)) - f.close() - self.assertEqual(d1, d2) - finally: - server.stop() - server.join() - - def test_wrapped_accept(self): - """Check the accept() method on SSL sockets.""" - if test_support.verbose: - sys.stdout.write("\n") - server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, - CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, - chatty=True, connectionchatty=True, - wrap_accepting_socket=True) - - def test_asyncore_server(self): - """Check the example asyncore integration.""" - indata = "TEST MESSAGE of mixed case\n" - - if test_support.verbose: - sys.stdout.write("\n") - server = AsyncoreEchoServer(CERTFILE) - with server: - s = ssl.wrap_socket(socket.socket()) - s.connect(('127.0.0.1', server.port)) - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % (repr(indata))) - s.write(indata) - outdata = s.read() - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) - if outdata != indata.lower(): - self.fail( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - s.close() - - def test_recv_send(self): - """Test recv(), send() and friends.""" - if test_support.verbose: - sys.stdout.write("\n") - - server = ThreadedEchoServer(CERTFILE, - certreqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_TLSv1, - cacerts=CERTFILE, - chatty=True, - connectionchatty=False) - with server: - s = ssl.wrap_socket(socket.socket(), - server_side=False, - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_TLSv1) - s.connect((HOST, server.port)) - # helper methods for standardising recv* method signatures - def _recv_into(): - b = bytearray("\0"*100) - count = s.recv_into(b) - return b[:count] - - def _recvfrom_into(): - b = bytearray("\0"*100) - count, addr = s.recvfrom_into(b) - return b[:count] - - # (name, method, whether to expect success, *args) - send_methods = [ - ('send', s.send, True, []), - ('sendto', s.sendto, False, ["some.address"]), - ('sendall', s.sendall, True, []), - ] - recv_methods = [ - ('recv', s.recv, True, []), - ('recvfrom', s.recvfrom, False, ["some.address"]), - ('recv_into', _recv_into, True, []), - ('recvfrom_into', _recvfrom_into, False, []), - ] - data_prefix = u"PREFIX_" - - for meth_name, send_meth, expect_success, args in send_methods: - indata = data_prefix + meth_name - try: - send_meth(indata.encode('ASCII', 'strict'), *args) - outdata = s.read() - outdata = outdata.decode('ASCII', 'strict') - if outdata != indata.lower(): - self.fail( - "While sending with <<%s>> bad data " - "<<%r>> (%d) received; " - "expected <<%r>> (%d)\n" % ( - meth_name, outdata[:20], len(outdata), - indata[:20], len(indata) - ) - ) - except ValueError as e: - if expect_success: - self.fail( - "Failed to send with method <<%s>>; " - "expected to succeed.\n" % (meth_name,) - ) - if not str(e).startswith(meth_name): - self.fail( - "Method <<%s>> failed with unexpected " - "exception message: %s\n" % ( - meth_name, e - ) - ) - - for meth_name, recv_meth, expect_success, args in recv_methods: - indata = data_prefix + meth_name - try: - s.send(indata.encode('ASCII', 'strict')) - outdata = recv_meth(*args) - outdata = outdata.decode('ASCII', 'strict') - if outdata != indata.lower(): - self.fail( - "While receiving with <<%s>> bad data " - "<<%r>> (%d) received; " - "expected <<%r>> (%d)\n" % ( - meth_name, outdata[:20], len(outdata), - indata[:20], len(indata) - ) - ) - except ValueError as e: - if expect_success: - self.fail( - "Failed to receive with method <<%s>>; " - "expected to succeed.\n" % (meth_name,) - ) - if not str(e).startswith(meth_name): - self.fail( - "Method <<%s>> failed with unexpected " - "exception message: %s\n" % ( - meth_name, e - ) - ) - # consume data - s.read() - - s.write("over\n".encode("ASCII", "strict")) - s.close() - - def test_handshake_timeout(self): - # Issue #5103: SSL handshake must respect the socket timeout - server = socket.socket(socket.AF_INET) - host = "127.0.0.1" - port = test_support.bind_port(server) - started = threading.Event() - finish = False - - def serve(): - server.listen(5) - started.set() - conns = [] - while not finish: - r, w, e = select.select([server], [], [], 0.1) - if server in r: - # Let the socket hang around rather than having - # it closed by garbage collection. - conns.append(server.accept()[0]) - - t = threading.Thread(target=serve) - t.start() - started.wait() - - try: - try: - c = socket.socket(socket.AF_INET) - c.settimeout(0.2) - c.connect((host, port)) - # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - ssl.wrap_socket, c) - finally: - c.close() - try: - c = socket.socket(socket.AF_INET) - c.settimeout(0.2) - c = ssl.wrap_socket(c) - # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - c.connect, (host, port)) - finally: - c.close() - finally: - finish = True - t.join() - server.close() - - def test_default_ciphers(self): - with ThreadedEchoServer(CERTFILE, - ssl_version=ssl.PROTOCOL_SSLv23, - chatty=False) as server: - sock = socket.socket() - try: - # Force a set of weak ciphers on our client socket - try: - s = ssl.wrap_socket(sock, - ssl_version=ssl.PROTOCOL_SSLv23, - ciphers="DES") - except ssl.SSLError: - self.skipTest("no DES cipher available") - with self.assertRaises((OSError, ssl.SSLError)): - s.connect((HOST, server.port)) - finally: - sock.close() - self.assertIn("no shared cipher", str(server.conn_errors[0])) - - -def test_main(verbose=False): - global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert.pem") - SVN_PYTHON_ORG_ROOT_CERT = os.path.join( - os.path.dirname(__file__) or os.curdir, - "https_svn_python_org_root.pem") - NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir, - "nokia.pem") - - if (not os.path.exists(CERTFILE) or - not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or - not os.path.exists(NOKIACERT)): - raise test_support.TestFailed("Can't read certificate files!") - - tests = [BasicTests, BasicSocketTests] - - if test_support.is_resource_enabled('network'): - tests.append(NetworkedTests) - - if _have_threads: - thread_info = test_support.threading_setup() - if thread_info and test_support.is_resource_enabled('network'): - tests.append(ThreadedTests) - - try: - test_support.run_unittest(*tests) - finally: - if _have_threads: - test_support.threading_cleanup(*thread_info) - -if __name__ == "__main__": - test_main() |