summaryrefslogtreecommitdiff
path: root/lib/python2.7/test/test_imaplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/test/test_imaplib.py')
-rw-r--r--lib/python2.7/test/test_imaplib.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/lib/python2.7/test/test_imaplib.py b/lib/python2.7/test/test_imaplib.py
new file mode 100644
index 0000000..2b2213d
--- /dev/null
+++ b/lib/python2.7/test/test_imaplib.py
@@ -0,0 +1,240 @@
+from test import test_support as support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split. Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
+import imaplib
+import os.path
+import SocketServer
+import time
+
+from test_support import reap_threads, verbose, transient_internet
+import unittest
+
+try:
+ import ssl
+except ImportError:
+ ssl = None
+
+CERTFILE = None
+
+
+class TestImaplib(unittest.TestCase):
+
+ def test_that_Time2Internaldate_returns_a_result(self):
+ # We can check only that it successfully produces a result,
+ # not the correctness of the result itself, since the result
+ # depends on the timezone the machine is in.
+ timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
+ '"18-May-2033 05:33:20 +0200"']
+
+ for t in timevalues:
+ imaplib.Time2Internaldate(t)
+
+
+if ssl:
+
+ class SecureTCPServer(SocketServer.TCPServer):
+
+ def get_request(self):
+ newsocket, fromaddr = self.socket.accept()
+ connstream = ssl.wrap_socket(newsocket,
+ server_side=True,
+ certfile=CERTFILE)
+ return connstream, fromaddr
+
+ IMAP4_SSL = imaplib.IMAP4_SSL
+
+else:
+
+ class SecureTCPServer:
+ pass
+
+ IMAP4_SSL = None
+
+
+class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
+
+ timeout = 1
+
+ def _send(self, message):
+ if verbose: print "SENT:", message.strip()
+ self.wfile.write(message)
+
+ def handle(self):
+ # Send a welcome message.
+ self._send('* OK IMAP4rev1\r\n')
+ while 1:
+ # Gather up input until we receive a line terminator or we timeout.
+ # Accumulate read(1) because it's simpler to handle the differences
+ # between naked sockets and SSL sockets.
+ line = ''
+ while 1:
+ try:
+ part = self.rfile.read(1)
+ if part == '':
+ # Naked sockets return empty strings..
+ return
+ line += part
+ except IOError:
+ # ..but SSLSockets raise exceptions.
+ return
+ if line.endswith('\r\n'):
+ break
+
+ if verbose: print 'GOT:', line.strip()
+ splitline = line.split()
+ tag = splitline[0]
+ cmd = splitline[1]
+ args = splitline[2:]
+
+ if hasattr(self, 'cmd_%s' % (cmd,)):
+ getattr(self, 'cmd_%s' % (cmd,))(tag, args)
+ else:
+ self._send('%s BAD %s unknown\r\n' % (tag, cmd))
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send('* CAPABILITY IMAP4rev1\r\n')
+ self._send('%s OK CAPABILITY completed\r\n' % (tag,))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+ def make_server(self, addr, hdlr):
+
+ class MyServer(self.server_class):
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+ if verbose: print "creating server"
+ server = MyServer(addr, hdlr)
+ self.assertEqual(server.server_address, server.socket.getsockname())
+
+ if verbose:
+ print "server created"
+ print "ADDR =", addr
+ print "CLASS =", self.server_class
+ print "HDLR =", server.RequestHandlerClass
+
+ t = threading.Thread(
+ name='%s serving' % self.server_class,
+ target=server.serve_forever,
+ # Short poll interval to make the test finish quickly.
+ # Time between requests is short enough that we won't wake
+ # up spuriously too many times.
+ kwargs={'poll_interval':0.01})
+ t.daemon = True # In case this function raises.
+ t.start()
+ if verbose: print "server running"
+ return server, t
+
+ def reap_server(self, server, thread):
+ if verbose: print "waiting for server"
+ server.shutdown()
+ thread.join()
+ if verbose: print "done"
+
+ @contextmanager
+ def reaped_server(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ try:
+ yield server
+ finally:
+ self.reap_server(server, thread)
+
+ @reap_threads
+ def test_connect(self):
+ with self.reaped_server(SimpleIMAPHandler) as server:
+ client = self.imap_class(*server.server_address)
+ client.shutdown()
+
+ @reap_threads
+ def test_issue5949(self):
+
+ class EOFHandler(SocketServer.StreamRequestHandler):
+ def handle(self):
+ # EOF without sending a complete welcome message.
+ self.wfile.write('* OK')
+
+ with self.reaped_server(EOFHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+ server_class = SocketServer.TCPServer
+ imap_class = imaplib.IMAP4
+
+
+@unittest.skipUnless(ssl, "SSL not available")
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+ server_class = SecureTCPServer
+ imap_class = IMAP4_SSL
+
+
+class RemoteIMAPTest(unittest.TestCase):
+ host = 'cyrus.andrew.cmu.edu'
+ port = 143
+ username = 'anonymous'
+ password = 'pass'
+ imap_class = imaplib.IMAP4
+
+ def setUp(self):
+ with transient_internet(self.host):
+ self.server = self.imap_class(self.host, self.port)
+
+ def tearDown(self):
+ if self.server is not None:
+ self.server.logout()
+
+ def test_logincapa(self):
+ self.assertTrue('LOGINDISABLED' in self.server.capabilities)
+
+ def test_anonlogin(self):
+ self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
+ rs = self.server.login(self.username, self.password)
+ self.assertEqual(rs[0], 'OK')
+
+ def test_logout(self):
+ rs = self.server.logout()
+ self.server = None
+ self.assertEqual(rs[0], 'BYE')
+
+
+@unittest.skipUnless(ssl, "SSL not available")
+class RemoteIMAP_SSLTest(RemoteIMAPTest):
+ port = 993
+ imap_class = IMAP4_SSL
+
+ def test_logincapa(self):
+ self.assertFalse('LOGINDISABLED' in self.server.capabilities)
+ self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
+
+
+def test_main():
+ tests = [TestImaplib]
+
+ if support.is_resource_enabled('network'):
+ if ssl:
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not os.path.exists(CERTFILE):
+ raise support.TestFailed("Can't read certificate files!")
+ tests.extend([
+ ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
+ RemoteIMAPTest, RemoteIMAP_SSLTest,
+ ])
+
+ support.run_unittest(*tests)
+
+
+if __name__ == "__main__":
+ support.use_resources = ['network']
+ test_main()