summaryrefslogtreecommitdiff
path: root/python/helpers/pydev/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/helpers/pydev/tests')
-rw-r--r--python/helpers/pydev/tests/__not_in_default_pythonpath.txt1
-rw-r--r--python/helpers/pydev/tests/check_pydevconsole.py105
-rw-r--r--python/helpers/pydev/tests/test_get_referrers.py139
-rw-r--r--python/helpers/pydev/tests/test_jyserver.py165
-rw-r--r--python/helpers/pydev/tests/test_jysimpleTipper.py255
-rw-r--r--python/helpers/pydev/tests/test_pydev_ipython_010.py80
-rw-r--r--python/helpers/pydev/tests/test_pydev_ipython_011.py193
-rw-r--r--python/helpers/pydev/tests/test_pydevconsole.py231
-rw-r--r--python/helpers/pydev/tests/test_pyserver.py173
-rw-r--r--python/helpers/pydev/tests/test_simpleTipper.py209
10 files changed, 1551 insertions, 0 deletions
diff --git a/python/helpers/pydev/tests/__not_in_default_pythonpath.txt b/python/helpers/pydev/tests/__not_in_default_pythonpath.txt
new file mode 100644
index 000000000000..29cdc5bc1078
--- /dev/null
+++ b/python/helpers/pydev/tests/__not_in_default_pythonpath.txt
@@ -0,0 +1 @@
+(no __init__.py file) \ No newline at end of file
diff --git a/python/helpers/pydev/tests/check_pydevconsole.py b/python/helpers/pydev/tests/check_pydevconsole.py
new file mode 100644
index 000000000000..7d1b7eed4e0b
--- /dev/null
+++ b/python/helpers/pydev/tests/check_pydevconsole.py
@@ -0,0 +1,105 @@
+import sys
+import os
+
+#Put pydevconsole in the path.
+sys.argv[0] = os.path.dirname(sys.argv[0])
+sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+
+print('Running tests with:', sys.executable)
+print('PYTHONPATH:')
+print('\n'.join(sorted(sys.path)))
+
+import threading
+import unittest
+
+import pydevconsole
+from pydev_imports import xmlrpclib, SimpleXMLRPCServer
+
+try:
+ raw_input
+ raw_input_name = 'raw_input'
+except NameError:
+ raw_input_name = 'input'
+
+#=======================================================================================================================
+# Test
+#=======================================================================================================================
+class Test(unittest.TestCase):
+
+
+ def startClientThread(self, client_port):
+ class ClientThread(threading.Thread):
+ def __init__(self, client_port):
+ threading.Thread.__init__(self)
+ self.client_port = client_port
+
+ def run(self):
+ class HandleRequestInput:
+ def RequestInput(self):
+ return 'RequestInput: OK'
+
+ handle_request_input = HandleRequestInput()
+
+ import pydev_localhost
+ print('Starting client with:', pydev_localhost.get_localhost(), self.client_port)
+ client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False)
+ client_server.register_function(handle_request_input.RequestInput)
+ client_server.serve_forever()
+
+ client_thread = ClientThread(client_port)
+ client_thread.setDaemon(True)
+ client_thread.start()
+ return client_thread
+
+
+ def getFreeAddresses(self):
+ import socket
+ s = socket.socket()
+ s.bind(('', 0))
+ port0 = s.getsockname()[1]
+
+ s1 = socket.socket()
+ s1.bind(('', 0))
+ port1 = s1.getsockname()[1]
+ s.close()
+ s1.close()
+ return port0, port1
+
+
+ def testServer(self):
+ client_port, server_port = self.getFreeAddresses()
+ class ServerThread(threading.Thread):
+ def __init__(self, client_port, server_port):
+ threading.Thread.__init__(self)
+ self.client_port = client_port
+ self.server_port = server_port
+
+ def run(self):
+ import pydev_localhost
+ print('Starting server with:', pydev_localhost.get_localhost(), self.server_port, self.client_port)
+ pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port)
+ server_thread = ServerThread(client_port, server_port)
+ server_thread.setDaemon(True)
+ server_thread.start()
+
+ client_thread = self.startClientThread(client_port) #@UnusedVariable
+
+ import time
+ time.sleep(.3) #let's give it some time to start the threads
+
+ import pydev_localhost
+ server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port))
+ server.addExec("import sys; print('Running with: %s %s' % (sys.executable or sys.platform, sys.version))")
+ server.addExec('class Foo:')
+ server.addExec(' pass')
+ server.addExec('')
+ server.addExec('foo = Foo()')
+ server.addExec('a = %s()' % raw_input_name)
+ server.addExec('print (a)')
+
+#=======================================================================================================================
+# main
+#=======================================================================================================================
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/python/helpers/pydev/tests/test_get_referrers.py b/python/helpers/pydev/tests/test_get_referrers.py
new file mode 100644
index 000000000000..7fc85142b02e
--- /dev/null
+++ b/python/helpers/pydev/tests/test_get_referrers.py
@@ -0,0 +1,139 @@
+import os.path
+import sys
+import threading
+import time
+
+IS_JYTHON = sys.platform.find('java') != -1
+
+try:
+ this_file_name = __file__
+except NameError:
+ # stupid jython. plain old __file__ isnt working for some reason
+ import test_runfiles #@UnresolvedImport - importing the module itself
+ this_file_name = test_runfiles.__file__
+
+
+desired_runfiles_path = os.path.normpath(os.path.dirname(this_file_name) + "/..")
+sys.path.insert(0, desired_runfiles_path)
+
+import unittest
+import pydevd_referrers
+from pydev_imports import StringIO
+
+#=======================================================================================================================
+# Test
+#=======================================================================================================================
+class Test(unittest.TestCase):
+
+
+ def testGetReferrers1(self):
+
+ container = []
+ contained = [1, 2]
+ container.append(0)
+ container.append(contained)
+
+ # Ok, we have the contained in this frame and inside the given list (which on turn is in this frame too).
+ # we should skip temporary references inside the get_referrer_info.
+ result = pydevd_referrers.get_referrer_info(contained)
+ assert 'list[1]' in result
+ pydevd_referrers.print_referrers(contained, stream=StringIO())
+
+ def testGetReferrers2(self):
+
+ class MyClass(object):
+ def __init__(self):
+ pass
+
+ contained = [1, 2]
+ obj = MyClass()
+ obj.contained = contained
+ del contained
+
+ # Ok, we have the contained in this frame and inside the given list (which on turn is in this frame too).
+ # we should skip temporary references inside the get_referrer_info.
+ result = pydevd_referrers.get_referrer_info(obj.contained)
+ assert 'found_as="contained"' in result
+ assert 'MyClass' in result
+
+
+ def testGetReferrers3(self):
+
+ class MyClass(object):
+ def __init__(self):
+ pass
+
+ contained = [1, 2]
+ obj = MyClass()
+ obj.contained = contained
+ del contained
+
+ # Ok, we have the contained in this frame and inside the given list (which on turn is in this frame too).
+ # we should skip temporary references inside the get_referrer_info.
+ result = pydevd_referrers.get_referrer_info(obj.contained)
+ assert 'found_as="contained"' in result
+ assert 'MyClass' in result
+
+
+ def testGetReferrers4(self):
+
+ class MyClass(object):
+ def __init__(self):
+ pass
+
+ obj = MyClass()
+ obj.me = obj
+
+ # Let's see if we detect the cycle...
+ result = pydevd_referrers.get_referrer_info(obj)
+ assert 'found_as="me"' in result #Cyclic ref
+
+
+ def testGetReferrers5(self):
+ container = dict(a=[1])
+
+ # Let's see if we detect the cycle...
+ result = pydevd_referrers.get_referrer_info(container['a'])
+ assert 'testGetReferrers5' not in result #I.e.: NOT in the current method
+ assert 'found_as="a"' in result
+ assert 'dict' in result
+ assert str(id(container)) in result
+
+
+ def testGetReferrers6(self):
+ container = dict(a=[1])
+
+ def should_appear(obj):
+ # Let's see if we detect the cycle...
+ return pydevd_referrers.get_referrer_info(obj)
+
+ result = should_appear(container['a'])
+ assert 'should_appear' in result
+
+
+ def testGetReferrers7(self):
+
+ class MyThread(threading.Thread):
+ def run(self):
+ #Note: we do that because if we do
+ self.frame = sys._getframe()
+
+ t = MyThread()
+ t.start()
+ while not hasattr(t, 'frame'):
+ time.sleep(0.01)
+
+ result = pydevd_referrers.get_referrer_info(t.frame)
+ assert 'MyThread' in result
+
+
+if __name__ == "__main__":
+ #this is so that we can run it frem the jython tests -- because we don't actually have an __main__ module
+ #(so, it won't try importing the __main__ module)
+ try:
+ import gc
+ gc.get_referrers(unittest)
+ except:
+ pass
+ else:
+ unittest.TextTestRunner().run(unittest.makeSuite(Test))
diff --git a/python/helpers/pydev/tests/test_jyserver.py b/python/helpers/pydev/tests/test_jyserver.py
new file mode 100644
index 000000000000..8765400ae39d
--- /dev/null
+++ b/python/helpers/pydev/tests/test_jyserver.py
@@ -0,0 +1,165 @@
+'''
+@author Fabio Zadrozny
+'''
+import sys
+import unittest
+import socket
+import urllib
+
+
+IS_JYTHON = sys.platform.find('java') != -1
+
+if IS_JYTHON:
+ import os
+
+ #make it as if we were executing from the directory above this one (so that we can use jycompletionserver
+ #without the need for it being in the pythonpath)
+ sys.argv[0] = os.path.dirname(sys.argv[0])
+ #twice the dirname to get the previous level from this file.
+ sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+
+ import pycompletionserver as jycompletionserver
+
+
+ DEBUG = 0
+
+def dbg(s):
+ if DEBUG:
+ sys.stdout.write('TEST %s\n' % s)
+
+class Test(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+
+ def tearDown(self):
+ unittest.TestCase.tearDown(self)
+
+ def testIt(self):
+ dbg('ok')
+
+ def testMessage(self):
+ t = jycompletionserver.T(0)
+
+ l = []
+ l.append(('Def', 'description' , 'args'))
+ l.append(('Def1', 'description1', 'args1'))
+ l.append(('Def2', 'description2', 'args2'))
+
+ msg = t.processor.formatCompletionMessage('test_jyserver.py', l)
+
+ self.assertEquals('@@COMPLETIONS(test_jyserver.py,(Def,description,args),(Def1,description1,args1),(Def2,description2,args2))END@@', msg)
+
+ l = []
+ l.append(('Def', 'desc,,r,,i()ption', ''))
+ l.append(('Def(1', 'descriptio(n1', ''))
+ l.append(('De,f)2', 'de,s,c,ription2', ''))
+ msg = t.processor.formatCompletionMessage(None, l)
+ expected = '@@COMPLETIONS(None,(Def,desc%2C%2Cr%2C%2Ci%28%29ption, ),(Def%281,descriptio%28n1, ),(De%2Cf%292,de%2Cs%2Cc%2Cription2, ))END@@'
+
+ self.assertEquals(expected, msg)
+
+
+
+
+
+
+ def testCompletionSocketsAndMessages(self):
+ dbg('testCompletionSocketsAndMessages')
+ t, socket = self.createConnections()
+ self.socket = socket
+ dbg('connections created')
+
+ try:
+ #now that we have the connections all set up, check the code completion messages.
+ msg = urllib.quote_plus('math')
+
+ toWrite = '@@IMPORTS:%sEND@@' % msg
+ dbg('writing' + str(toWrite))
+ socket.send(toWrite) #math completions
+ completions = self.readMsg()
+ dbg(urllib.unquote_plus(completions))
+
+ start = '@@COMPLETIONS('
+ self.assert_(completions.startswith(start), '%s DOESNT START WITH %s' % (completions, start))
+ self.assert_(completions.find('@@COMPLETIONS') != -1)
+ self.assert_(completions.find('END@@') != -1)
+
+
+ msg = urllib.quote_plus('__builtin__.str')
+ toWrite = '@@IMPORTS:%sEND@@' % msg
+ dbg('writing' + str(toWrite))
+ socket.send(toWrite) #math completions
+ completions = self.readMsg()
+ dbg(urllib.unquote_plus(completions))
+
+ start = '@@COMPLETIONS('
+ self.assert_(completions.startswith(start), '%s DOESNT START WITH %s' % (completions, start))
+ self.assert_(completions.find('@@COMPLETIONS') != -1)
+ self.assert_(completions.find('END@@') != -1)
+
+
+
+ finally:
+ try:
+ self.sendKillMsg(socket)
+
+
+ while not t.ended:
+ pass #wait until it receives the message and quits.
+
+
+ socket.close()
+ except:
+ pass
+
+
+
+
+ def createConnections(self, p1=50001):
+ '''
+ Creates the connections needed for testing.
+ '''
+ t = jycompletionserver.T(p1)
+
+ t.start()
+
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.bind((jycompletionserver.HOST, p1))
+ server.listen(1)
+
+ sock, _addr = server.accept()
+
+ return t, sock
+
+
+ def readMsg(self):
+ msg = '@@PROCESSING_END@@'
+ while msg.startswith('@@PROCESSING'):
+ msg = self.socket.recv(1024)
+ if msg.startswith('@@PROCESSING:'):
+ dbg('Status msg:' + str(msg))
+
+ while msg.find('END@@') == -1:
+ msg += self.socket.recv(1024)
+
+ return msg
+
+ def sendKillMsg(self, socket):
+ socket.send(jycompletionserver.MSG_KILL_SERVER)
+
+
+
+
+#"C:\Program Files\Java\jdk1.5.0_04\bin\java.exe" -Dpython.path="C:\bin\jython21\Lib";"C:\bin\jython21";"C:\Program Files\Java\jdk1.5.0_04\jre\lib\rt.jar" -classpath C:/bin/jython21/jython.jar org.python.util.jython D:\eclipse_workspace\org.python.pydev\pysrc\pycompletionserver.py 53795 58659
+#
+#"C:\Program Files\Java\jdk1.5.0_04\bin\java.exe" -Dpython.path="C:\bin\jython21\Lib";"C:\bin\jython21";"C:\Program Files\Java\jdk1.5.0_04\jre\lib\rt.jar" -classpath C:/bin/jython21/jython.jar org.python.util.jython D:\eclipse_workspace\org.python.pydev\pysrc\tests\test_jyserver.py
+#
+#"C:\Program Files\Java\jdk1.5.0_04\bin\java.exe" -Dpython.path="C:\bin\jython21\Lib";"C:\bin\jython21";"C:\Program Files\Java\jdk1.5.0_04\jre\lib\rt.jar" -classpath C:/bin/jython21/jython.jar org.python.util.jython d:\runtime-workbench-workspace\jython_test\src\test.py
+if __name__ == '__main__':
+ if IS_JYTHON:
+ suite = unittest.makeSuite(Test)
+ unittest.TextTestRunner(verbosity=1).run(suite)
+ else:
+ sys.stdout.write('Not running jython tests for non-java platform: %s' % sys.platform)
+
diff --git a/python/helpers/pydev/tests/test_jysimpleTipper.py b/python/helpers/pydev/tests/test_jysimpleTipper.py
new file mode 100644
index 000000000000..4a755634bdc7
--- /dev/null
+++ b/python/helpers/pydev/tests/test_jysimpleTipper.py
@@ -0,0 +1,255 @@
+#line to run:
+#java -classpath D:\bin\jython-2.1\jython.jar;D:\bin\eclipse331_1\plugins\org.junit_3.8.2.v200706111738\junit.jar;D:\bin\eclipse331_1\plugins\org.apache.ant_1.7.0.v200706080842\lib\ant.jar org.python.util.jython w:\org.python.pydev\pysrc\tests\test_jysimpleTipper.py
+
+import unittest
+import os
+import sys
+#make it as if we were executing from the directory above this one (so that we can use pycompletionserver
+#without the need for it being in the pythonpath)
+sys.argv[0] = os.path.dirname(sys.argv[0])
+#twice the dirname to get the previous level from this file.
+sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+
+#this does not work (they must be in the system pythonpath)
+#sys.path.insert(1, r"D:\bin\eclipse321\plugins\org.junit_3.8.1\junit.jar" ) #some late loading jar tests
+#sys.path.insert(1, r"D:\bin\eclipse331_1\plugins\org.apache.ant_1.7.0.v200706080842\lib\ant.jar" ) #some late loading jar tests
+
+if sys.platform.find('java') != -1:
+ from _pydev_jy_imports_tipper import ismethod
+ from _pydev_jy_imports_tipper import isclass
+ from _pydev_jy_imports_tipper import dirObj
+ import _pydev_jy_imports_tipper
+ from java.lang.reflect import Method #@UnresolvedImport
+ from java.lang import System #@UnresolvedImport
+ from java.lang import String #@UnresolvedImport
+ from java.lang.System import arraycopy #@UnresolvedImport
+ from java.lang.System import out #@UnresolvedImport
+ import java.lang.String #@UnresolvedImport
+
+__DBG = 0
+def dbg(s):
+ if __DBG:
+ sys.stdout.write('%s\n' % (s,))
+
+
+
+class TestMod(unittest.TestCase):
+
+ def assertArgs(self, tok, args, tips):
+ for a in tips:
+ if tok == a[0]:
+ self.assertEquals(args, a[2])
+ return
+ raise AssertionError('%s not in %s', tok, tips)
+
+ def assertIn(self, tok, tips):
+ self.assertEquals(4, len(tips[0]))
+ for a in tips:
+ if tok == a[0]:
+ return a
+ s = ''
+ for a in tips:
+ s += str(a)
+ s += '\n'
+ raise AssertionError('%s not in %s' % (tok, s))
+
+ def testImports1a(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('java.util.HashMap')
+ assert f.endswith('rt.jar')
+
+ def testImports1c(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('java.lang.Class')
+ assert f.endswith('rt.jar')
+
+ def testImports1b(self):
+ try:
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('__builtin__.m')
+ self.fail('err')
+ except:
+ pass
+
+ def testImports1(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('junit.framework.TestCase')
+ assert f.endswith('junit.jar')
+ ret = self.assertIn('assertEquals', tip)
+# self.assertEquals('', ret[2])
+
+ def testImports2(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('junit.framework')
+ assert f.endswith('junit.jar')
+ ret = self.assertIn('TestCase', tip)
+ self.assertEquals('', ret[2])
+
+ def testImports2a(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('org.apache.tools.ant')
+ assert f.endswith('ant.jar')
+ ret = self.assertIn('Task', tip)
+ self.assertEquals('', ret[2])
+
+ def testImports3(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('os')
+ assert f.endswith('os.py')
+ ret = self.assertIn('path', tip)
+ self.assertEquals('', ret[2])
+
+ def testTipOnString(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('string')
+ self.assertIn('join', tip)
+ self.assertIn('uppercase', tip)
+
+ def testImports(self):
+ tip = _pydev_jy_imports_tipper.GenerateTip('__builtin__')[1]
+ self.assertIn('tuple' , tip)
+ self.assertIn('RuntimeError' , tip)
+ self.assertIn('RuntimeWarning' , tip)
+
+ def testImports5(self):
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('java.lang')
+ assert f.endswith('rt.jar')
+ tup = self.assertIn('String' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_CLASS), tup[3])
+
+ tip = _pydev_jy_imports_tipper.GenerateTip('java')[1]
+ tup = self.assertIn('lang' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_IMPORT), tup[3])
+
+ tip = _pydev_jy_imports_tipper.GenerateTip('java.lang.String')[1]
+ tup = self.assertIn('indexOf' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_FUNCTION), tup[3])
+
+ tip = _pydev_jy_imports_tipper.GenerateTip('java.lang.String')[1]
+ tup = self.assertIn('charAt' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_FUNCTION), tup[3])
+ self.assertEquals('(int)', tup[2])
+
+ tup = self.assertIn('format' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_FUNCTION), tup[3])
+ self.assertEquals('(string, objectArray)', tup[2])
+ self.assert_(tup[1].find('[Ljava.lang.Object;') == -1)
+
+ tup = self.assertIn('getBytes' , tip)
+ self.assertEquals(str(_pydev_jy_imports_tipper.TYPE_FUNCTION), tup[3])
+ self.assert_(tup[1].find('[B') == -1)
+ self.assert_(tup[1].find('byte[]') != -1)
+
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('__builtin__.str')
+ assert f.endswith('jython.jar')
+ self.assertIn('find' , tip)
+
+ f, tip = _pydev_jy_imports_tipper.GenerateTip('__builtin__.dict')
+ assert f.endswith('jython.jar')
+ self.assertIn('get' , tip)
+
+
+class TestSearch(unittest.TestCase):
+
+ def testSearchOnJython(self):
+ self.assertEqual('javaos.py', _pydev_jy_imports_tipper.Search('os')[0][0].split(os.sep)[-1])
+ self.assertEqual(0, _pydev_jy_imports_tipper.Search('os')[0][1])
+
+ self.assertEqual('javaos.py', _pydev_jy_imports_tipper.Search('os.makedirs')[0][0].split(os.sep)[-1])
+ self.assertNotEqual(0, _pydev_jy_imports_tipper.Search('os.makedirs')[0][1])
+
+ #print _pydev_jy_imports_tipper.Search('os.makedirs')
+
+class TestCompl(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+
+ def tearDown(self):
+ unittest.TestCase.tearDown(self)
+
+ def testGettingInfoOnJython(self):
+
+ dbg('\n\n--------------------------- java')
+ assert not ismethod(java)[0]
+ assert not isclass(java)
+ assert _pydev_jy_imports_tipper.ismodule(java)
+
+ dbg('\n\n--------------------------- java.lang')
+ assert not ismethod(java.lang)[0]
+ assert not isclass(java.lang)
+ assert _pydev_jy_imports_tipper.ismodule(java.lang)
+
+ dbg('\n\n--------------------------- Method')
+ assert not ismethod(Method)[0]
+ assert isclass(Method)
+
+ dbg('\n\n--------------------------- System')
+ assert not ismethod(System)[0]
+ assert isclass(System)
+
+ dbg('\n\n--------------------------- String')
+ assert not ismethod(System)[0]
+ assert isclass(String)
+ assert len(dirObj(String)) > 10
+
+ dbg('\n\n--------------------------- arraycopy')
+ isMet = ismethod(arraycopy)
+ assert isMet[0]
+ assert isMet[1][0].basicAsStr() == "function:arraycopy args=['java.lang.Object', 'int', 'java.lang.Object', 'int', 'int'], varargs=None, kwargs=None, docs:None"
+ assert not isclass(arraycopy)
+
+ dbg('\n\n--------------------------- out')
+ isMet = ismethod(out)
+ assert not isMet[0]
+ assert not isclass(out)
+
+ dbg('\n\n--------------------------- out.println')
+ isMet = ismethod(out.println) #@UndefinedVariable
+ assert isMet[0]
+ assert len(isMet[1]) == 10
+ self.assertEquals(isMet[1][0].basicAsStr(), "function:println args=[], varargs=None, kwargs=None, docs:None")
+ assert isMet[1][1].basicAsStr() == "function:println args=['long'], varargs=None, kwargs=None, docs:None"
+ assert not isclass(out.println) #@UndefinedVariable
+
+ dbg('\n\n--------------------------- str')
+ isMet = ismethod(str)
+ #the code below should work, but is failing on jython 22a1
+ #assert isMet[0]
+ #assert isMet[1][0].basicAsStr() == "function:str args=['org.python.core.PyObject'], varargs=None, kwargs=None, docs:None"
+ assert not isclass(str)
+
+
+ def met1():
+ a = 3
+ return a
+
+ dbg('\n\n--------------------------- met1')
+ isMet = ismethod(met1)
+ assert isMet[0]
+ assert isMet[1][0].basicAsStr() == "function:met1 args=[], varargs=None, kwargs=None, docs:None"
+ assert not isclass(met1)
+
+ def met2(arg1, arg2, *vararg, **kwarg):
+ '''docmet2'''
+
+ a = 1
+ return a
+
+ dbg('\n\n--------------------------- met2')
+ isMet = ismethod(met2)
+ assert isMet[0]
+ assert isMet[1][0].basicAsStr() == "function:met2 args=['arg1', 'arg2'], varargs=vararg, kwargs=kwarg, docs:docmet2"
+ assert not isclass(met2)
+
+
+
+if __name__ == '__main__':
+ if sys.platform.find('java') != -1:
+ #Only run if jython
+ suite = unittest.makeSuite(TestCompl)
+ suite2 = unittest.makeSuite(TestMod)
+ suite3 = unittest.makeSuite(TestSearch)
+
+ unittest.TextTestRunner(verbosity=1).run(suite)
+ unittest.TextTestRunner(verbosity=1).run(suite2)
+ unittest.TextTestRunner(verbosity=1).run(suite3)
+
+# suite.addTest(Test('testCase12'))
+# suite = unittest.TestSuite()
+# unittest.TextTestRunner(verbosity=1).run(suite)
+
+ else:
+ sys.stdout.write('Not running jython tests for non-java platform: %s' % sys.platform)
diff --git a/python/helpers/pydev/tests/test_pydev_ipython_010.py b/python/helpers/pydev/tests/test_pydev_ipython_010.py
new file mode 100644
index 000000000000..5ce1dc32c34a
--- /dev/null
+++ b/python/helpers/pydev/tests/test_pydev_ipython_010.py
@@ -0,0 +1,80 @@
+#TODO: This test no longer works (check if it should be fixed or removed altogether).
+
+#import unittest
+#import sys
+#import os
+##make it as if we were executing from the directory above this one
+#sys.argv[0] = os.path.dirname(sys.argv[0])
+##twice the dirname to get the previous level from this file.
+#sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+#
+#from pydev_localhost import get_localhost
+#
+#
+#IS_JYTHON = sys.platform.find('java') != -1
+#
+##=======================================================================================================================
+## TestCase
+##=======================================================================================================================
+#class TestCase(unittest.TestCase):
+#
+# def setUp(self):
+# unittest.TestCase.setUp(self)
+#
+# def tearDown(self):
+# unittest.TestCase.tearDown(self)
+#
+# def testIPython(self):
+# try:
+# from pydev_ipython_console import PyDevFrontEnd
+# except:
+# if IS_JYTHON:
+# return
+# front_end = PyDevFrontEnd(get_localhost(), 0)
+#
+# front_end.input_buffer = 'if True:'
+# self.assert_(not front_end._on_enter())
+#
+# front_end.input_buffer = 'if True:\n' + \
+# front_end.continuation_prompt() + ' a = 10\n'
+# self.assert_(not front_end._on_enter())
+#
+#
+# front_end.input_buffer = 'if True:\n' + \
+# front_end.continuation_prompt() + ' a = 10\n\n'
+# self.assert_(front_end._on_enter())
+#
+#
+## front_end.input_buffer = ' print a'
+## self.assert_(not front_end._on_enter())
+## front_end.input_buffer = ''
+## self.assert_(front_end._on_enter())
+#
+#
+## front_end.input_buffer = 'a.'
+## front_end.complete_current_input()
+## front_end.input_buffer = 'if True:'
+## front_end._on_enter()
+# front_end.input_buffer = 'a = 30'
+# front_end._on_enter()
+# front_end.input_buffer = 'print a'
+# front_end._on_enter()
+# front_end.input_buffer = 'a?'
+# front_end._on_enter()
+# print front_end.complete('%')
+# print front_end.complete('%e')
+# print front_end.complete('cd c:/t')
+# print front_end.complete('cd c:/temp/')
+## front_end.input_buffer = 'print raw_input("press enter\\n")'
+## front_end._on_enter()
+##
+#
+##=======================================================================================================================
+## main
+##=======================================================================================================================
+#if __name__ == '__main__':
+# if sys.platform.find('java') == -1:
+# #IPython not available for Jython
+# unittest.main()
+# else:
+# print('not supported on Jython')
diff --git a/python/helpers/pydev/tests/test_pydev_ipython_011.py b/python/helpers/pydev/tests/test_pydev_ipython_011.py
new file mode 100644
index 000000000000..3cfa70fd0112
--- /dev/null
+++ b/python/helpers/pydev/tests/test_pydev_ipython_011.py
@@ -0,0 +1,193 @@
+import sys
+import unittest
+import threading
+import os
+from nose.tools import eq_
+from pydev_imports import StringIO, SimpleXMLRPCServer
+from pydev_localhost import get_localhost
+from pydev_console_utils import StdIn
+import socket
+
+# make it as if we were executing from the directory above this one
+sys.argv[0] = os.path.dirname(sys.argv[0])
+# twice the dirname to get the previous level from this file.
+sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+
+# PyDevFrontEnd depends on singleton in IPython, so you
+# can't make multiple versions. So we reuse front_end for
+# all the tests
+
+orig_stdout = sys.stdout
+orig_stderr = sys.stderr
+
+stdout = sys.stdout = StringIO()
+stderr = sys.stderr = StringIO()
+
+from pydev_ipython_console_011 import PyDevFrontEnd
+s = socket.socket()
+s.bind(('', 0))
+client_port = s.getsockname()[1]
+s.close()
+front_end = PyDevFrontEnd(get_localhost(), client_port)
+
+
+def addExec(code, expected_more=False):
+ more = front_end.addExec(code)
+ eq_(expected_more, more)
+
+class TestBase(unittest.TestCase):
+ def setUp(self):
+ front_end.input_splitter.reset()
+ stdout.truncate(0)
+ stdout.seek(0)
+ stderr.truncate(0)
+ stderr.seek(0)
+ def tearDown(self):
+ pass
+
+
+class TestPyDevFrontEnd(TestBase):
+ def testAddExec_1(self):
+ addExec('if True:', True)
+ def testAddExec_2(self):
+ addExec('if True:\n testAddExec_a = 10\n', True)
+ def testAddExec_3(self):
+ assert 'testAddExec_a' not in front_end.getNamespace()
+ addExec('if True:\n testAddExec_a = 10\n\n')
+ assert 'testAddExec_a' in front_end.getNamespace()
+ eq_(front_end.getNamespace()['testAddExec_a'], 10)
+
+ def testGetNamespace(self):
+ assert 'testGetNamespace_a' not in front_end.getNamespace()
+ addExec('testGetNamespace_a = 10')
+ assert 'testGetNamespace_a' in front_end.getNamespace()
+ eq_(front_end.getNamespace()['testGetNamespace_a'], 10)
+
+ def testComplete(self):
+ unused_text, matches = front_end.complete('%')
+ assert len(matches) > 1, 'at least one magic should appear in completions'
+
+ def testCompleteDoesNotDoPythonMatches(self):
+ # Test that IPython's completions do not do the things that
+ # PyDev's completions will handle
+ addExec('testComplete_a = 5')
+ addExec('testComplete_b = 10')
+ addExec('testComplete_c = 15')
+ unused_text, matches = front_end.complete('testComplete_')
+ assert len(matches) == 0
+
+ def testGetCompletions_1(self):
+ # Test the merged completions include the standard completions
+ addExec('testComplete_a = 5')
+ addExec('testComplete_b = 10')
+ addExec('testComplete_c = 15')
+ res = front_end.getCompletions('testComplete_', 'testComplete_')
+ matches = [f[0] for f in res]
+ assert len(matches) == 3
+ eq_(set(['testComplete_a', 'testComplete_b', 'testComplete_c']), set(matches))
+
+ def testGetCompletions_2(self):
+ # Test that we get IPython completions in results
+ # we do this by checking kw completion which PyDev does
+ # not do by default
+ addExec('def ccc(ABC=123): pass')
+ res = front_end.getCompletions('ccc(', '')
+ matches = [f[0] for f in res]
+ assert 'ABC=' in matches
+
+ def testGetCompletions_3(self):
+ # Test that magics return IPYTHON magic as type
+ res = front_end.getCompletions('%cd', '%cd')
+ assert len(res) == 1
+ eq_(res[0][3], '12') # '12' == IToken.TYPE_IPYTHON_MAGIC
+ assert len(res[0][1]) > 100, 'docstring for %cd should be a reasonably long string'
+
+class TestRunningCode(TestBase):
+ def testPrint(self):
+ addExec('print("output")')
+ eq_(stdout.getvalue(), 'output\n')
+
+ def testQuestionMark_1(self):
+ addExec('?')
+ assert len(stdout.getvalue()) > 1000, 'IPython help should be pretty big'
+
+ def testQuestionMark_2(self):
+ addExec('int?')
+ assert stdout.getvalue().find('Convert') != -1
+
+
+ def testGui(self):
+ from pydev_ipython.inputhook import get_inputhook, set_stdin_file
+ set_stdin_file(sys.stdin)
+ assert get_inputhook() is None
+ addExec('%gui tk')
+ # we can't test the GUI works here because we aren't connected to XML-RPC so
+ # nowhere for hook to run
+ assert get_inputhook() is not None
+ addExec('%gui none')
+ assert get_inputhook() is None
+
+ def testHistory(self):
+ ''' Make sure commands are added to IPython's history '''
+ addExec('a=1')
+ addExec('b=2')
+ _ih = front_end.getNamespace()['_ih']
+ eq_(_ih[-1], 'b=2')
+ eq_(_ih[-2], 'a=1')
+
+ addExec('history')
+ hist = stdout.getvalue().split('\n')
+ eq_(hist[-1], '')
+ eq_(hist[-2], 'history')
+ eq_(hist[-3], 'b=2')
+ eq_(hist[-4], 'a=1')
+
+ def testEdit(self):
+ ''' Make sure we can issue an edit command '''
+ called_RequestInput = [False]
+ called_IPythonEditor = [False]
+ def startClientThread(client_port):
+ class ClientThread(threading.Thread):
+ def __init__(self, client_port):
+ threading.Thread.__init__(self)
+ self.client_port = client_port
+ def run(self):
+ class HandleRequestInput:
+ def RequestInput(self):
+ called_RequestInput[0] = True
+ return '\n'
+ def IPythonEditor(self, name, line):
+ called_IPythonEditor[0] = (name, line)
+ return True
+
+ handle_request_input = HandleRequestInput()
+
+ import pydev_localhost
+ client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False)
+ client_server.register_function(handle_request_input.RequestInput)
+ client_server.register_function(handle_request_input.IPythonEditor)
+ client_server.serve_forever()
+
+ client_thread = ClientThread(client_port)
+ client_thread.setDaemon(True)
+ client_thread.start()
+ return client_thread
+
+ startClientThread(client_port)
+ orig_stdin = sys.stdin
+ sys.stdin = StdIn(self, get_localhost(), client_port)
+ try:
+ filename = 'made_up_file.py'
+ addExec('%edit ' + filename)
+ eq_(called_IPythonEditor[0], (os.path.abspath(filename), 0))
+ assert called_RequestInput[0], "Make sure the 'wait' parameter has been respected"
+ finally:
+ sys.stdin = orig_stdin
+
+if __name__ == '__main__':
+
+ #Just doing: unittest.main() was not working for me when run directly (not sure why)
+ #And doing it the way below the test with the import: from pydev_ipython.inputhook import get_inputhook, set_stdin_file
+ #is failing (but if I do a Ctrl+F9 in PyDev to run it, it works properly, so, I'm a bit puzzled here).
+ unittest.TextTestRunner(verbosity=1).run(unittest.makeSuite(TestRunningCode))
+ unittest.TextTestRunner(verbosity=1).run(unittest.makeSuite(TestPyDevFrontEnd))
diff --git a/python/helpers/pydev/tests/test_pydevconsole.py b/python/helpers/pydev/tests/test_pydevconsole.py
new file mode 100644
index 000000000000..9a9e3edaf124
--- /dev/null
+++ b/python/helpers/pydev/tests/test_pydevconsole.py
@@ -0,0 +1,231 @@
+import threading
+import unittest
+import sys
+import os
+
+sys.argv[0] = os.path.dirname(sys.argv[0])
+sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+import pydevconsole
+from pydev_imports import xmlrpclib, SimpleXMLRPCServer, StringIO
+
+try:
+ raw_input
+ raw_input_name = 'raw_input'
+except NameError:
+ raw_input_name = 'input'
+
+#=======================================================================================================================
+# Test
+#=======================================================================================================================
+class Test(unittest.TestCase):
+
+ def setUp(self):
+ self.original_stdout = sys.stdout
+ sys.stdout = StringIO()
+
+
+ def tearDown(self):
+ ret = sys.stdout #@UnusedVariable
+ sys.stdout = self.original_stdout
+ #print_ ret.getvalue() -- use to see test output
+
+ def testConsoleHello(self):
+ client_port, _server_port = self.getFreeAddresses()
+ client_thread = self.startClientThread(client_port) #@UnusedVariable
+ import time
+ time.sleep(.3) #let's give it some time to start the threads
+
+ import pydev_localhost
+ interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, server=None)
+
+ (result,) = interpreter.hello("Hello pydevconsole")
+ self.assertEqual(result, "Hello eclipse")
+
+
+ def testConsoleRequests(self):
+ client_port, _server_port = self.getFreeAddresses()
+ client_thread = self.startClientThread(client_port) #@UnusedVariable
+ import time
+ time.sleep(.3) #let's give it some time to start the threads
+
+ import pydev_localhost
+ interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, server=None)
+ interpreter.addExec('class Foo:')
+ interpreter.addExec(' CONSTANT=1')
+ interpreter.addExec('')
+ interpreter.addExec('foo=Foo()')
+ interpreter.addExec('foo.__doc__=None')
+ interpreter.addExec('val = %s()' % (raw_input_name,))
+ interpreter.addExec('50')
+ interpreter.addExec('print (val)')
+ found = sys.stdout.getvalue().split()
+ try:
+ self.assertEqual(['50', 'input_request'], found)
+ except:
+ self.assertEqual(['input_request'], found) #IPython
+
+ comps = interpreter.getCompletions('foo.', 'foo.')
+ self.assert_(
+ ('CONSTANT', '', '', '3') in comps or ('CONSTANT', '', '', '4') in comps, \
+ 'Found: %s' % comps
+ )
+
+ comps = interpreter.getCompletions('"".', '"".')
+ self.assert_(
+ ('__add__', 'x.__add__(y) <==> x+y', '', '3') in comps or
+ ('__add__', '', '', '4') in comps or
+ ('__add__', 'x.__add__(y) <==> x+y\r\nx.__add__(y) <==> x+y', '()', '2') in comps or
+ ('__add__', 'x.\n__add__(y) <==> x+yx.\n__add__(y) <==> x+y', '()', '2'),
+ 'Did not find __add__ in : %s' % (comps,)
+ )
+
+
+ completions = interpreter.getCompletions('', '')
+ for c in completions:
+ if c[0] == 'AssertionError':
+ break
+ else:
+ self.fail('Could not find AssertionError')
+
+ completions = interpreter.getCompletions('Assert', 'Assert')
+ for c in completions:
+ if c[0] == 'RuntimeError':
+ self.fail('Did not expect to find RuntimeError there')
+
+ self.assert_(('__doc__', None, '', '3') not in interpreter.getCompletions('foo.CO', 'foo.'))
+
+ comps = interpreter.getCompletions('va', 'va')
+ self.assert_(('val', '', '', '3') in comps or ('val', '', '', '4') in comps)
+
+ interpreter.addExec('s = "mystring"')
+
+ desc = interpreter.getDescription('val')
+ self.assert_(desc.find('str(object) -> string') >= 0 or
+ desc == "'input_request'" or
+ desc.find('str(string[, encoding[, errors]]) -> str') >= 0 or
+ desc.find('str(Char* value)') >= 0 or
+ desc.find('str(value: Char*)') >= 0,
+ 'Could not find what was needed in %s' % desc)
+
+ desc = interpreter.getDescription('val.join')
+ self.assert_(desc.find('S.join(sequence) -> string') >= 0 or
+ desc.find('S.join(sequence) -> str') >= 0 or
+ desc.find('S.join(iterable) -> string') >= 0 or
+ desc == "<builtin method 'join'>" or
+ desc == "<built-in method join of str object>" or
+ desc.find('str join(str self, list sequence)') >= 0 or
+ desc.find('S.join(iterable) -> str') >= 0 or
+ desc.find('join(self: str, sequence: list) -> str') >= 0,
+ "Could not recognize: %s" % (desc,))
+
+
+ def startClientThread(self, client_port):
+ class ClientThread(threading.Thread):
+ def __init__(self, client_port):
+ threading.Thread.__init__(self)
+ self.client_port = client_port
+ def run(self):
+ class HandleRequestInput:
+ def RequestInput(self):
+ return 'input_request'
+
+ handle_request_input = HandleRequestInput()
+
+ import pydev_localhost
+ client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False)
+ client_server.register_function(handle_request_input.RequestInput)
+ client_server.serve_forever()
+
+ client_thread = ClientThread(client_port)
+ client_thread.setDaemon(True)
+ client_thread.start()
+ return client_thread
+
+
+ def startDebuggerServerThread(self, debugger_port, socket_code):
+ class DebuggerServerThread(threading.Thread):
+ def __init__(self, debugger_port, socket_code):
+ threading.Thread.__init__(self)
+ self.debugger_port = debugger_port
+ self.socket_code = socket_code
+ def run(self):
+ import socket
+ s = socket.socket()
+ s.bind(('', debugger_port))
+ s.listen(1)
+ socket, unused_addr = s.accept()
+ socket_code(socket)
+
+ debugger_thread = DebuggerServerThread(debugger_port, socket_code)
+ debugger_thread.setDaemon(True)
+ debugger_thread.start()
+ return debugger_thread
+
+
+ def getFreeAddresses(self):
+ import socket
+ s = socket.socket()
+ s.bind(('', 0))
+ port0 = s.getsockname()[1]
+
+ s1 = socket.socket()
+ s1.bind(('', 0))
+ port1 = s1.getsockname()[1]
+ s.close()
+ s1.close()
+
+ if port0 <= 0 or port1 <= 0:
+ #This happens in Jython...
+ from java.net import ServerSocket
+ s0 = ServerSocket(0)
+ port0 = s0.getLocalPort()
+
+ s1 = ServerSocket(0)
+ port1 = s1.getLocalPort()
+
+ s0.close()
+ s1.close()
+
+ assert port0 != port1
+ assert port0 > 0
+ assert port1 > 0
+
+ return port0, port1
+
+
+ def testServer(self):
+ client_port, server_port = self.getFreeAddresses()
+ class ServerThread(threading.Thread):
+ def __init__(self, client_port, server_port):
+ threading.Thread.__init__(self)
+ self.client_port = client_port
+ self.server_port = server_port
+
+ def run(self):
+ import pydev_localhost
+ pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port)
+ server_thread = ServerThread(client_port, server_port)
+ server_thread.setDaemon(True)
+ server_thread.start()
+
+ client_thread = self.startClientThread(client_port) #@UnusedVariable
+
+ import time
+ time.sleep(.3) #let's give it some time to start the threads
+
+ import pydev_localhost
+ server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port))
+ server.addExec('class Foo:')
+ server.addExec(' pass')
+ server.addExec('')
+ server.addExec('foo = Foo()')
+ server.addExec('a = %s()' % (raw_input_name,))
+ server.addExec('print (a)')
+ self.assertEqual(['input_request'], sys.stdout.getvalue().split())
+
+#=======================================================================================================================
+# main
+#=======================================================================================================================
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/python/helpers/pydev/tests/test_pyserver.py b/python/helpers/pydev/tests/test_pyserver.py
new file mode 100644
index 000000000000..a74876b8c8b9
--- /dev/null
+++ b/python/helpers/pydev/tests/test_pyserver.py
@@ -0,0 +1,173 @@
+'''
+@author Fabio Zadrozny
+'''
+import sys
+import os
+
+#make it as if we were executing from the directory above this one (so that we can use pycompletionserver
+#without the need for it being in the pythonpath)
+sys.argv[0] = os.path.dirname(sys.argv[0])
+#twice the dirname to get the previous level from this file.
+sys.path.insert(1, os.path.join(os.path.dirname(sys.argv[0])))
+
+IS_PYTHON_3K = 0
+if sys.platform.find('java') == -1:
+
+
+ try:
+ import inspect
+ import pycompletionserver
+ import socket
+ try:
+ from urllib import quote_plus, unquote_plus
+ def send(s, msg):
+ s.send(msg)
+ except ImportError:
+ IS_PYTHON_3K = 1
+ from urllib.parse import quote_plus, unquote_plus #Python 3.0
+ def send(s, msg):
+ s.send(bytearray(msg, 'utf-8'))
+ except ImportError:
+ pass #Not available in jython
+
+ import unittest
+
+ class Test(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+
+ def tearDown(self):
+ unittest.TestCase.tearDown(self)
+
+ def testMessage(self):
+ t = pycompletionserver.T(0)
+
+ l = []
+ l.append(('Def', 'description' , 'args'))
+ l.append(('Def1', 'description1', 'args1'))
+ l.append(('Def2', 'description2', 'args2'))
+
+ msg = t.processor.formatCompletionMessage(None, l)
+ self.assertEquals('@@COMPLETIONS(None,(Def,description,args),(Def1,description1,args1),(Def2,description2,args2))END@@', msg)
+
+ l = []
+ l.append(('Def', 'desc,,r,,i()ption', ''))
+ l.append(('Def(1', 'descriptio(n1', ''))
+ l.append(('De,f)2', 'de,s,c,ription2', ''))
+ msg = t.processor.formatCompletionMessage(None, l)
+ self.assertEquals('@@COMPLETIONS(None,(Def,desc%2C%2Cr%2C%2Ci%28%29ption, ),(Def%281,descriptio%28n1, ),(De%2Cf%292,de%2Cs%2Cc%2Cription2, ))END@@', msg)
+
+ def createConnections(self, p1=50002):
+ '''
+ Creates the connections needed for testing.
+ '''
+ t = pycompletionserver.T(p1)
+
+ t.start()
+
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.bind((pycompletionserver.HOST, p1))
+ server.listen(1) #socket to receive messages.
+
+ s, addr = server.accept()
+
+ return t, s
+
+
+ def readMsg(self):
+ finish = False
+ msg = ''
+ while finish == False:
+ m = self.socket.recv(1024 * 4)
+ if IS_PYTHON_3K:
+ m = m.decode('utf-8')
+ if m.startswith('@@PROCESSING'):
+ sys.stdout.write('Status msg: %s\n' % (msg,))
+ else:
+ msg += m
+
+ if msg.find('END@@') != -1:
+ finish = True
+
+ return msg
+
+ def testCompletionSocketsAndMessages(self):
+ t, socket = self.createConnections()
+ self.socket = socket
+
+ try:
+ #now that we have the connections all set up, check the code completion messages.
+ msg = quote_plus('math')
+ send(socket, '@@IMPORTS:%sEND@@' % msg) #math completions
+ completions = self.readMsg()
+ #print_ unquote_plus(completions)
+
+ #math is a builtin and because of that, it starts with None as a file
+ start = '@@COMPLETIONS(None,(__doc__,'
+ start_2 = '@@COMPLETIONS(None,(__name__,'
+ self.assert_(completions.startswith(start) or completions.startswith(start_2), '%s DOESNT START WITH %s' % (completions, (start, start_2)))
+
+ self.assert_('@@COMPLETIONS' in completions)
+ self.assert_('END@@' in completions)
+
+
+ #now, test i
+ msg = quote_plus('__builtin__.list')
+ send(socket, "@@IMPORTS:%s\nEND@@" % msg)
+ found = self.readMsg()
+ self.assert_('sort' in found, 'Could not find sort in: %s' % (found,))
+
+ #now, test search
+ msg = quote_plus('inspect.ismodule')
+ send(socket, '@@SEARCH%sEND@@' % msg) #math completions
+ found = self.readMsg()
+ self.assert_('inspect.py' in found)
+ self.assert_('33' in found or '34' in found or '51' in found or '50' in found, 'Could not find 33, 34, 50 or 51 in %s' % found)
+
+ #now, test search
+ msg = quote_plus('inspect.CO_NEWLOCALS')
+ send(socket, '@@SEARCH%sEND@@' % msg) #math completions
+ found = self.readMsg()
+ self.assert_('inspect.py' in found)
+ self.assert_('CO_NEWLOCALS' in found)
+
+ #now, test search
+ msg = quote_plus('inspect.BlockFinder.tokeneater')
+ send(socket, '@@SEARCH%sEND@@' % msg)
+ found = self.readMsg()
+ self.assert_('inspect.py' in found)
+ # self.assert_('CO_NEWLOCALS' in found)
+
+ #reload modules test
+ # send(socket, '@@RELOAD_MODULES_END@@')
+ # ok = self.readMsg()
+ # self.assertEquals('@@MSG_OK_END@@' , ok)
+ # this test is not executed because it breaks our current enviroment.
+
+
+ finally:
+ try:
+ sys.stdout.write('succedded...sending kill msg\n')
+ self.sendKillMsg(socket)
+
+
+ # while not hasattr(t, 'ended'):
+ # pass #wait until it receives the message and quits.
+
+
+ socket.close()
+ self.socket.close()
+ except:
+ pass
+
+ def sendKillMsg(self, socket):
+ socket.send(pycompletionserver.MSG_KILL_SERVER)
+
+
+if __name__ == '__main__':
+ if sys.platform.find('java') == -1:
+ unittest.main()
+ else:
+ sys.stdout.write('Not running python tests in platform: %s\n' % (sys.platform,))
+
diff --git a/python/helpers/pydev/tests/test_simpleTipper.py b/python/helpers/pydev/tests/test_simpleTipper.py
new file mode 100644
index 000000000000..f759ad60f678
--- /dev/null
+++ b/python/helpers/pydev/tests/test_simpleTipper.py
@@ -0,0 +1,209 @@
+'''
+@author Fabio Zadrozny
+'''
+import os
+import sys
+#make it as if we were executing from the directory above this one (so that we can use pycompletionserver
+#without the need for it being in the pythonpath)
+#twice the dirname to get the previous level from this file.
+sys.path.insert(1, os.path.split(os.path.split(__file__)[0])[0])
+
+try:
+ import __builtin__ #@UnusedImport
+ BUILTIN_MOD = '__builtin__'
+except ImportError:
+ BUILTIN_MOD = 'builtins'
+
+
+if sys.platform.find('java') == -1:
+
+ HAS_WX = False
+
+ import unittest
+ import _pydev_imports_tipper
+ import inspect
+
+ class Test(unittest.TestCase):
+
+ def p(self, t):
+ for a in t:
+ sys.stdout.write('%s\n' % (a,))
+
+ def testImports3(self):
+ tip = _pydev_imports_tipper.GenerateTip('os')
+ ret = self.assertIn('path', tip)
+ self.assertEquals('', ret[2])
+
+ def testImports2(self):
+ try:
+ tip = _pydev_imports_tipper.GenerateTip('OpenGL.GLUT')
+ self.assertIn('glutDisplayFunc', tip)
+ self.assertIn('glutInitDisplayMode', tip)
+ except ImportError:
+ pass
+
+ def testImports4(self):
+ try:
+ tip = _pydev_imports_tipper.GenerateTip('mx.DateTime.mxDateTime.mxDateTime')
+ self.assertIn('now', tip)
+ except ImportError:
+ pass
+
+ def testImports5(self):
+ tip = _pydev_imports_tipper.GenerateTip('__builtin__.list')
+ s = self.assertIn('sort', tip)
+ self.CheckArgs(
+ s,
+ '(cmp=None, key=None, reverse=False)',
+ '(self, object cmp, object key, bool reverse)',
+ '(self, cmp: object, key: object, reverse: bool)'
+ )
+
+ def testImports2a(self):
+ tips = _pydev_imports_tipper.GenerateTip('%s.RuntimeError' % BUILTIN_MOD)
+ self.assertIn('__doc__', tips)
+
+ def testImports2b(self):
+ tips = _pydev_imports_tipper.GenerateTip('%s' % BUILTIN_MOD)
+ t = self.assertIn('file' , tips)
+ self.assert_('->' in t[1].strip() or 'file' in t[1])
+
+ def testImports2c(self):
+ tips = _pydev_imports_tipper.GenerateTip('%s.file' % BUILTIN_MOD)
+ t = self.assertIn('readlines' , tips)
+ self.assert_('->' in t[1] or 'sizehint' in t[1])
+
+ def testImports(self):
+ '''
+ You can print_ the results to check...
+ '''
+ if HAS_WX:
+ tip = _pydev_imports_tipper.GenerateTip('wxPython.wx')
+ self.assertIn('wxApp' , tip)
+
+ tip = _pydev_imports_tipper.GenerateTip('wxPython.wx.wxApp')
+
+ try:
+ tip = _pydev_imports_tipper.GenerateTip('qt')
+ self.assertIn('QWidget' , tip)
+ self.assertIn('QDialog' , tip)
+
+ tip = _pydev_imports_tipper.GenerateTip('qt.QWidget')
+ self.assertIn('rect' , tip)
+ self.assertIn('rect' , tip)
+ self.assertIn('AltButton' , tip)
+
+ tip = _pydev_imports_tipper.GenerateTip('qt.QWidget.AltButton')
+ self.assertIn('__xor__' , tip)
+
+ tip = _pydev_imports_tipper.GenerateTip('qt.QWidget.AltButton.__xor__')
+ self.assertIn('__class__' , tip)
+ except ImportError:
+ pass
+
+ tip = _pydev_imports_tipper.GenerateTip(BUILTIN_MOD)
+ # for t in tip[1]:
+ # print_ t
+ self.assertIn('object' , tip)
+ self.assertIn('tuple' , tip)
+ self.assertIn('list' , tip)
+ self.assertIn('RuntimeError' , tip)
+ self.assertIn('RuntimeWarning' , tip)
+
+ t = self.assertIn('cmp' , tip)
+
+ self.CheckArgs(t, '(x, y)', '(object x, object y)', '(x: object, y: object)') #args
+
+ t = self.assertIn('isinstance' , tip)
+ self.CheckArgs(t, '(object, class_or_type_or_tuple)', '(object o, type typeinfo)', '(o: object, typeinfo: type)') #args
+
+ t = self.assertIn('compile' , tip)
+ self.CheckArgs(t, '(source, filename, mode)', '()', '(o: object, name: str, val: object)') #args
+
+ t = self.assertIn('setattr' , tip)
+ self.CheckArgs(t, '(object, name, value)', '(object o, str name, object val)', '(o: object, name: str, val: object)') #args
+
+ try:
+ import compiler
+ compiler_module = 'compiler'
+ except ImportError:
+ try:
+ import ast
+ compiler_module = 'ast'
+ except ImportError:
+ compiler_module = None
+
+ if compiler_module is not None: #Not available in iron python
+ tip = _pydev_imports_tipper.GenerateTip(compiler_module)
+ if compiler_module == 'compiler':
+ self.assertArgs('parse', '(buf, mode)', tip)
+ self.assertArgs('walk', '(tree, visitor, walker, verbose)', tip)
+ self.assertIn('parseFile' , tip)
+ else:
+ self.assertArgs('parse', '(source, filename, mode)', tip)
+ self.assertArgs('walk', '(node)', tip)
+ self.assertIn('parse' , tip)
+
+
+ def CheckArgs(self, t, *expected):
+ for x in expected:
+ if x == t[2]:
+ return
+ self.fail('Found: %s. Expected: %s' % (t[2], expected))
+
+
+ def assertArgs(self, tok, args, tips):
+ for a in tips[1]:
+ if tok == a[0]:
+ self.assertEquals(args, a[2])
+ return
+ raise AssertionError('%s not in %s', tok, tips)
+
+ def assertIn(self, tok, tips):
+ for a in tips[1]:
+ if tok == a[0]:
+ return a
+ raise AssertionError('%s not in %s' % (tok, tips))
+
+
+ def testSearch(self):
+ s = _pydev_imports_tipper.Search('inspect.ismodule')
+ (f, line, col), foundAs = s
+ self.assert_(line > 0)
+
+
+ def testDotNetLibraries(self):
+ if sys.platform == 'cli':
+ tip = _pydev_imports_tipper.GenerateTip('System.Drawing')
+ self.assertIn('Brushes' , tip)
+
+ tip = _pydev_imports_tipper.GenerateTip('System.Drawing.Brushes')
+ self.assertIn('Aqua' , tip)
+
+
+ def testInspect(self):
+
+ class C(object):
+ def metA(self, a, b):
+ pass
+
+ obj = C.metA
+ if inspect.ismethod (obj):
+ pass
+ # print_ obj.im_func
+ # print_ inspect.getargspec(obj.im_func)
+
+
+ def suite():
+ s = unittest.TestSuite()
+ s.addTest(Test("testImports5"))
+ unittest.TextTestRunner(verbosity=2).run(s)
+
+
+if __name__ == '__main__':
+ if sys.platform.find('java') == -1:
+# suite()
+ unittest.main()
+ else:
+ sys.stdout.write('Not running python tests in platform: %s\n' % (sys.platform,))
+