diff options
Diffstat (limited to 'python/helpers/pydev/tests/test_pydevconsole.py')
-rw-r--r-- | python/helpers/pydev/tests/test_pydevconsole.py | 283 |
1 files changed, 156 insertions, 127 deletions
diff --git a/python/helpers/pydev/tests/test_pydevconsole.py b/python/helpers/pydev/tests/test_pydevconsole.py index 9a9e3edaf124..18421980ae82 100644 --- a/python/helpers/pydev/tests/test_pydevconsole.py +++ b/python/helpers/pydev/tests/test_pydevconsole.py @@ -1,10 +1,6 @@ import threading import unittest import sys -import os - -sys.argv[0] = os.path.dirname(sys.argv[0]) -sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0]))) import pydevconsole from pydev_imports import xmlrpclib, SimpleXMLRPCServer, StringIO @@ -19,104 +15,112 @@ except NameError: #======================================================================================================================= class Test(unittest.TestCase): - def setUp(self): + def testConsoleHello(self): self.original_stdout = sys.stdout sys.stdout = StringIO() - - - def tearDown(self): - ret = sys.stdout #@UnusedVariable - sys.stdout = self.original_stdout - #print_ ret.getvalue() -- use to see test output - - def testConsoleHello(self): - client_port, _server_port = self.getFreeAddresses() - client_thread = self.startClientThread(client_port) #@UnusedVariable - import time - time.sleep(.3) #let's give it some time to start the threads - - import pydev_localhost - interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, server=None) - - (result,) = interpreter.hello("Hello pydevconsole") - self.assertEqual(result, "Hello eclipse") + + try: + client_port, _server_port = self.getFreeAddresses() + client_thread = self.startClientThread(client_port) #@UnusedVariable + import time + time.sleep(.3) #let's give it some time to start the threads + + import pydev_localhost + interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread()) + + (result,) = interpreter.hello("Hello pydevconsole") + self.assertEqual(result, "Hello eclipse") + finally: + sys.stdout = self.original_stdout def testConsoleRequests(self): - client_port, _server_port = self.getFreeAddresses() - client_thread = self.startClientThread(client_port) #@UnusedVariable - import time - time.sleep(.3) #let's give it some time to start the threads - - import pydev_localhost - interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, server=None) - interpreter.addExec('class Foo:') - interpreter.addExec(' CONSTANT=1') - interpreter.addExec('') - interpreter.addExec('foo=Foo()') - interpreter.addExec('foo.__doc__=None') - interpreter.addExec('val = %s()' % (raw_input_name,)) - interpreter.addExec('50') - interpreter.addExec('print (val)') - found = sys.stdout.getvalue().split() + self.original_stdout = sys.stdout + sys.stdout = StringIO() + try: - self.assertEqual(['50', 'input_request'], found) - except: - self.assertEqual(['input_request'], found) #IPython - - comps = interpreter.getCompletions('foo.', 'foo.') - self.assert_( - ('CONSTANT', '', '', '3') in comps or ('CONSTANT', '', '', '4') in comps, \ - 'Found: %s' % comps - ) - - comps = interpreter.getCompletions('"".', '"".') - self.assert_( - ('__add__', 'x.__add__(y) <==> x+y', '', '3') in comps or - ('__add__', '', '', '4') in comps or - ('__add__', 'x.__add__(y) <==> x+y\r\nx.__add__(y) <==> x+y', '()', '2') in comps or - ('__add__', 'x.\n__add__(y) <==> x+yx.\n__add__(y) <==> x+y', '()', '2'), - 'Did not find __add__ in : %s' % (comps,) - ) - - - completions = interpreter.getCompletions('', '') - for c in completions: - if c[0] == 'AssertionError': - break - else: - self.fail('Could not find AssertionError') - - completions = interpreter.getCompletions('Assert', 'Assert') - for c in completions: - if c[0] == 'RuntimeError': - self.fail('Did not expect to find RuntimeError there') - - self.assert_(('__doc__', None, '', '3') not in interpreter.getCompletions('foo.CO', 'foo.')) - - comps = interpreter.getCompletions('va', 'va') - self.assert_(('val', '', '', '3') in comps or ('val', '', '', '4') in comps) - - interpreter.addExec('s = "mystring"') - - desc = interpreter.getDescription('val') - self.assert_(desc.find('str(object) -> string') >= 0 or - desc == "'input_request'" or - desc.find('str(string[, encoding[, errors]]) -> str') >= 0 or - desc.find('str(Char* value)') >= 0 or - desc.find('str(value: Char*)') >= 0, - 'Could not find what was needed in %s' % desc) - - desc = interpreter.getDescription('val.join') - self.assert_(desc.find('S.join(sequence) -> string') >= 0 or - desc.find('S.join(sequence) -> str') >= 0 or - desc.find('S.join(iterable) -> string') >= 0 or - desc == "<builtin method 'join'>" or - desc == "<built-in method join of str object>" or - desc.find('str join(str self, list sequence)') >= 0 or - desc.find('S.join(iterable) -> str') >= 0 or - desc.find('join(self: str, sequence: list) -> str') >= 0, - "Could not recognize: %s" % (desc,)) + client_port, _server_port = self.getFreeAddresses() + client_thread = self.startClientThread(client_port) #@UnusedVariable + import time + time.sleep(.3) #let's give it some time to start the threads + + import pydev_localhost + from pydev_console_utils import CodeFragment + + interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread()) + sys.stdout = StringIO() + interpreter.addExec(CodeFragment('class Foo:')) + interpreter.addExec(CodeFragment(' CONSTANT=1')) + interpreter.addExec(CodeFragment('')) + interpreter.addExec(CodeFragment('foo=Foo()')) + interpreter.addExec(CodeFragment('foo.__doc__=None')) + interpreter.addExec(CodeFragment('val = %s()' % (raw_input_name,))) + interpreter.addExec(CodeFragment('50')) + interpreter.addExec(CodeFragment('print (val)')) + found = sys.stdout.getvalue().split() + try: + self.assertEqual(['50', 'input_request'], found) + except: + self.assertEqual(['input_request'], found) #IPython + + comps = interpreter.getCompletions('foo.', 'foo.') + self.assert_( + ('CONSTANT', '', '', '3') in comps or ('CONSTANT', '', '', '4') in comps, \ + 'Found: %s' % comps + ) + + comps = interpreter.getCompletions('"".', '"".') + self.assert_( + ('__add__', 'x.__add__(y) <==> x+y', '', '3') in comps or + ('__add__', '', '', '4') in comps or + ('__add__', 'x.__add__(y) <==> x+y\r\nx.__add__(y) <==> x+y', '()', '2') in comps or + ('__add__', 'x.\n__add__(y) <==> x+yx.\n__add__(y) <==> x+y', '()', '2'), + 'Did not find __add__ in : %s' % (comps,) + ) + + + completions = interpreter.getCompletions('', '') + for c in completions: + if c[0] == 'AssertionError': + break + else: + self.fail('Could not find AssertionError') + + completions = interpreter.getCompletions('Assert', 'Assert') + for c in completions: + if c[0] == 'RuntimeError': + self.fail('Did not expect to find RuntimeError there') + + self.assert_(('__doc__', None, '', '3') not in interpreter.getCompletions('foo.CO', 'foo.')) + + comps = interpreter.getCompletions('va', 'va') + self.assert_(('val', '', '', '3') in comps or ('val', '', '', '4') in comps) + + interpreter.addExec(CodeFragment('s = "mystring"')) + + desc = interpreter.getDescription('val') + self.assert_(desc.find('str(object) -> string') >= 0 or + desc == "'input_request'" or + desc.find('str(string[, encoding[, errors]]) -> str') >= 0 or + desc.find('str(Char* value)') >= 0 or + desc.find('str(object=\'\') -> string') >= 0 or + desc.find('str(value: Char*)') >= 0 or + desc.find('str(object=\'\') -> str') >= 0 + , + 'Could not find what was needed in %s' % desc) + + desc = interpreter.getDescription('val.join') + self.assert_(desc.find('S.join(sequence) -> string') >= 0 or + desc.find('S.join(sequence) -> str') >= 0 or + desc.find('S.join(iterable) -> string') >= 0 or + desc == "<builtin method 'join'>" or + desc == "<built-in method join of str object>" or + desc.find('str join(str self, list sequence)') >= 0 or + desc.find('S.join(iterable) -> str') >= 0 or + desc.find('join(self: str, sequence: list) -> str') >= 0, + "Could not recognize: %s" % (desc,)) + finally: + sys.stdout = self.original_stdout def startClientThread(self, client_port): @@ -124,19 +128,28 @@ class Test(unittest.TestCase): def __init__(self, client_port): threading.Thread.__init__(self) self.client_port = client_port + def run(self): class HandleRequestInput: def RequestInput(self): + client_thread.requested_input = True return 'input_request' - + + def NotifyFinished(self, *args, **kwargs): + client_thread.notified_finished += 1 + return 1 + handle_request_input = HandleRequestInput() - + import pydev_localhost client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False) client_server.register_function(handle_request_input.RequestInput) + client_server.register_function(handle_request_input.NotifyFinished) client_server.serve_forever() - + client_thread = ClientThread(client_port) + client_thread.requested_input = False + client_thread.notified_finished = 0 client_thread.setDaemon(True) client_thread.start() return client_thread @@ -194,34 +207,50 @@ class Test(unittest.TestCase): def testServer(self): - client_port, server_port = self.getFreeAddresses() - class ServerThread(threading.Thread): - def __init__(self, client_port, server_port): - threading.Thread.__init__(self) - self.client_port = client_port - self.server_port = server_port - - def run(self): - import pydev_localhost - pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port) - server_thread = ServerThread(client_port, server_port) - server_thread.setDaemon(True) - server_thread.start() - - client_thread = self.startClientThread(client_port) #@UnusedVariable - - import time - time.sleep(.3) #let's give it some time to start the threads - - import pydev_localhost - server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port)) - server.addExec('class Foo:') - server.addExec(' pass') - server.addExec('') - server.addExec('foo = Foo()') - server.addExec('a = %s()' % (raw_input_name,)) - server.addExec('print (a)') - self.assertEqual(['input_request'], sys.stdout.getvalue().split()) + self.original_stdout = sys.stdout + sys.stdout = StringIO() + try: + client_port, server_port = self.getFreeAddresses() + class ServerThread(threading.Thread): + def __init__(self, client_port, server_port): + threading.Thread.__init__(self) + self.client_port = client_port + self.server_port = server_port + + def run(self): + import pydev_localhost + pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port) + server_thread = ServerThread(client_port, server_port) + server_thread.setDaemon(True) + server_thread.start() + + client_thread = self.startClientThread(client_port) #@UnusedVariable + + import time + time.sleep(.3) #let's give it some time to start the threads + sys.stdout = StringIO() + + import pydev_localhost + server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port)) + server.execLine('class Foo:') + server.execLine(' pass') + server.execLine('') + server.execLine('foo = Foo()') + server.execLine('a = %s()' % (raw_input_name,)) + server.execLine('print (a)') + initial = time.time() + while not client_thread.requested_input: + if time.time() - initial > 2: + raise AssertionError('Did not get the return asked before the timeout.') + time.sleep(.1) + + while ['input_request'] != sys.stdout.getvalue().split(): + if time.time() - initial > 2: + break + time.sleep(.1) + self.assertEqual(['input_request'], sys.stdout.getvalue().split()) + finally: + sys.stdout = self.original_stdout #======================================================================================================================= # main |