import threading import unittest import sys import pydevconsole from pydev_imports import xmlrpclib, SimpleXMLRPCServer, StringIO try: raw_input raw_input_name = 'raw_input' except NameError: raw_input_name = 'input' #======================================================================================================================= # Test #======================================================================================================================= class Test(unittest.TestCase): def testConsoleHello(self): self.original_stdout = sys.stdout sys.stdout = StringIO() try: client_port, _server_port = self.getFreeAddresses() client_thread = self.startClientThread(client_port) #@UnusedVariable import time time.sleep(.3) #let's give it some time to start the threads import pydev_localhost interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread()) (result,) = interpreter.hello("Hello pydevconsole") self.assertEqual(result, "Hello eclipse") finally: sys.stdout = self.original_stdout def testConsoleRequests(self): self.original_stdout = sys.stdout sys.stdout = StringIO() try: client_port, _server_port = self.getFreeAddresses() client_thread = self.startClientThread(client_port) #@UnusedVariable import time time.sleep(.3) #let's give it some time to start the threads import pydev_localhost from pydev_console_utils import CodeFragment interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread()) sys.stdout = StringIO() interpreter.addExec(CodeFragment('class Foo:')) interpreter.addExec(CodeFragment(' CONSTANT=1')) interpreter.addExec(CodeFragment('')) interpreter.addExec(CodeFragment('foo=Foo()')) interpreter.addExec(CodeFragment('foo.__doc__=None')) interpreter.addExec(CodeFragment('val = %s()' % (raw_input_name,))) interpreter.addExec(CodeFragment('50')) interpreter.addExec(CodeFragment('print (val)')) found = sys.stdout.getvalue().split() try: self.assertEqual(['50', 'input_request'], found) except: self.assertEqual(['input_request'], found) #IPython comps = interpreter.getCompletions('foo.', 'foo.') self.assert_( ('CONSTANT', '', '', '3') in comps or ('CONSTANT', '', '', '4') in comps, \ 'Found: %s' % comps ) comps = interpreter.getCompletions('"".', '"".') self.assert_( ('__add__', 'x.__add__(y) <==> x+y', '', '3') in comps or ('__add__', '', '', '4') in comps or ('__add__', 'x.__add__(y) <==> x+y\r\nx.__add__(y) <==> x+y', '()', '2') in comps or ('__add__', 'x.\n__add__(y) <==> x+yx.\n__add__(y) <==> x+y', '()', '2'), 'Did not find __add__ in : %s' % (comps,) ) completions = interpreter.getCompletions('', '') for c in completions: if c[0] == 'AssertionError': break else: self.fail('Could not find AssertionError') completions = interpreter.getCompletions('Assert', 'Assert') for c in completions: if c[0] == 'RuntimeError': self.fail('Did not expect to find RuntimeError there') self.assert_(('__doc__', None, '', '3') not in interpreter.getCompletions('foo.CO', 'foo.')) comps = interpreter.getCompletions('va', 'va') self.assert_(('val', '', '', '3') in comps or ('val', '', '', '4') in comps) interpreter.addExec(CodeFragment('s = "mystring"')) desc = interpreter.getDescription('val') self.assert_(desc.find('str(object) -> string') >= 0 or desc == "'input_request'" or desc.find('str(string[, encoding[, errors]]) -> str') >= 0 or desc.find('str(Char* value)') >= 0 or desc.find('str(object=\'\') -> string') >= 0 or desc.find('str(value: Char*)') >= 0 or desc.find('str(object=\'\') -> str') >= 0 , 'Could not find what was needed in %s' % desc) desc = interpreter.getDescription('val.join') self.assert_(desc.find('S.join(sequence) -> string') >= 0 or desc.find('S.join(sequence) -> str') >= 0 or desc.find('S.join(iterable) -> string') >= 0 or desc == "" or desc == "" or desc.find('str join(str self, list sequence)') >= 0 or desc.find('S.join(iterable) -> str') >= 0 or desc.find('join(self: str, sequence: list) -> str') >= 0, "Could not recognize: %s" % (desc,)) finally: sys.stdout = self.original_stdout def startClientThread(self, client_port): class ClientThread(threading.Thread): def __init__(self, client_port): threading.Thread.__init__(self) self.client_port = client_port def run(self): class HandleRequestInput: def RequestInput(self): client_thread.requested_input = True return 'input_request' def NotifyFinished(self, *args, **kwargs): client_thread.notified_finished += 1 return 1 handle_request_input = HandleRequestInput() import pydev_localhost client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False) client_server.register_function(handle_request_input.RequestInput) client_server.register_function(handle_request_input.NotifyFinished) client_server.serve_forever() client_thread = ClientThread(client_port) client_thread.requested_input = False client_thread.notified_finished = 0 client_thread.setDaemon(True) client_thread.start() return client_thread def startDebuggerServerThread(self, debugger_port, socket_code): class DebuggerServerThread(threading.Thread): def __init__(self, debugger_port, socket_code): threading.Thread.__init__(self) self.debugger_port = debugger_port self.socket_code = socket_code def run(self): import socket s = socket.socket() s.bind(('', debugger_port)) s.listen(1) socket, unused_addr = s.accept() socket_code(socket) debugger_thread = DebuggerServerThread(debugger_port, socket_code) debugger_thread.setDaemon(True) debugger_thread.start() return debugger_thread def getFreeAddresses(self): import socket s = socket.socket() s.bind(('', 0)) port0 = s.getsockname()[1] s1 = socket.socket() s1.bind(('', 0)) port1 = s1.getsockname()[1] s.close() s1.close() if port0 <= 0 or port1 <= 0: #This happens in Jython... from java.net import ServerSocket s0 = ServerSocket(0) port0 = s0.getLocalPort() s1 = ServerSocket(0) port1 = s1.getLocalPort() s0.close() s1.close() assert port0 != port1 assert port0 > 0 assert port1 > 0 return port0, port1 def testServer(self): self.original_stdout = sys.stdout sys.stdout = StringIO() try: client_port, server_port = self.getFreeAddresses() class ServerThread(threading.Thread): def __init__(self, client_port, server_port): threading.Thread.__init__(self) self.client_port = client_port self.server_port = server_port def run(self): import pydev_localhost pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port) server_thread = ServerThread(client_port, server_port) server_thread.setDaemon(True) server_thread.start() client_thread = self.startClientThread(client_port) #@UnusedVariable import time time.sleep(.3) #let's give it some time to start the threads sys.stdout = StringIO() import pydev_localhost server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port)) server.execLine('class Foo:') server.execLine(' pass') server.execLine('') server.execLine('foo = Foo()') server.execLine('a = %s()' % (raw_input_name,)) server.execLine('print (a)') initial = time.time() while not client_thread.requested_input: if time.time() - initial > 2: raise AssertionError('Did not get the return asked before the timeout.') time.sleep(.1) while ['input_request'] != sys.stdout.getvalue().split(): if time.time() - initial > 2: break time.sleep(.1) self.assertEqual(['input_request'], sys.stdout.getvalue().split()) finally: sys.stdout = self.original_stdout #======================================================================================================================= # main #======================================================================================================================= if __name__ == '__main__': unittest.main()