1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
from pydevd_constants import * #@UnusedWildImport
from pydev_imports import xmlrpclib, _queue
Queue = _queue.Queue
import traceback
from pydev_runfiles_coverage import StartCoverageSupportFromParams
#=======================================================================================================================
# ParallelNotification
#=======================================================================================================================
class ParallelNotification(object):
def __init__(self, method, args, kwargs):
self.method = method
self.args = args
self.kwargs = kwargs
def ToTuple(self):
return self.method, self.args, self.kwargs
#=======================================================================================================================
# KillServer
#=======================================================================================================================
class KillServer(object):
pass
#=======================================================================================================================
# ServerComm
#=======================================================================================================================
class ServerComm(threading.Thread):
def __init__(self, job_id, server):
self.notifications_queue = Queue()
threading.Thread.__init__(self)
self.setDaemon(False) #Wait for all the notifications to be passed before exiting!
assert job_id is not None
assert port is not None
self.job_id = job_id
self.finished = False
self.server = server
def run(self):
while True:
kill_found = False
commands = []
command = self.notifications_queue.get(block=True)
if isinstance(command, KillServer):
kill_found = True
else:
assert isinstance(command, ParallelNotification)
commands.append(command.ToTuple())
try:
while True:
command = self.notifications_queue.get(block=False) #No block to create a batch.
if isinstance(command, KillServer):
kill_found = True
else:
assert isinstance(command, ParallelNotification)
commands.append(command.ToTuple())
except:
pass #That's OK, we're getting it until it becomes empty so that we notify multiple at once.
if commands:
try:
#Batch notification.
self.server.lock.acquire()
try:
self.server.notifyCommands(self.job_id, commands)
finally:
self.server.lock.release()
except:
traceback.print_exc()
if kill_found:
self.finished = True
return
#=======================================================================================================================
# ServerFacade
#=======================================================================================================================
class ServerFacade(object):
def __init__(self, notifications_queue):
self.notifications_queue = notifications_queue
def notifyTestsCollected(self, *args, **kwargs):
pass #This notification won't be passed
def notifyTestRunFinished(self, *args, **kwargs):
pass #This notification won't be passed
def notifyStartTest(self, *args, **kwargs):
self.notifications_queue.put_nowait(ParallelNotification('notifyStartTest', args, kwargs))
def notifyTest(self, *args, **kwargs):
self.notifications_queue.put_nowait(ParallelNotification('notifyTest', args, kwargs))
#=======================================================================================================================
# run_client
#=======================================================================================================================
def run_client(job_id, port, verbosity, coverage_output_file, coverage_include):
job_id = int(job_id)
import pydev_localhost
server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), port))
server.lock = threading.Lock()
server_comm = ServerComm(job_id, server)
server_comm.start()
try:
server_facade = ServerFacade(server_comm.notifications_queue)
import pydev_runfiles
import pydev_runfiles_xml_rpc
pydev_runfiles_xml_rpc.SetServer(server_facade)
#Starts None and when the 1st test is gotten, it's started (because a server may be initiated and terminated
#before receiving any test -- which would mean a different process got all the tests to run).
coverage = None
try:
tests_to_run = [1]
while tests_to_run:
#Investigate: is it dangerous to use the same xmlrpclib server from different threads?
#It seems it should be, as it creates a new connection for each request...
server.lock.acquire()
try:
tests_to_run = server.GetTestsToRun(job_id)
finally:
server.lock.release()
if not tests_to_run:
break
if coverage is None:
_coverage_files, coverage = StartCoverageSupportFromParams(
None, coverage_output_file, 1, coverage_include)
files_to_tests = {}
for test in tests_to_run:
filename_and_test = test.split('|')
if len(filename_and_test) == 2:
files_to_tests.setdefault(filename_and_test[0], []).append(filename_and_test[1])
configuration = pydev_runfiles.Configuration(
'',
verbosity,
None,
None,
None,
files_to_tests,
1, #Always single job here
None,
#The coverage is handled in this loop.
coverage_output_file=None,
coverage_include=None,
)
test_runner = pydev_runfiles.PydevTestRunner(configuration)
sys.stdout.flush()
test_runner.run_tests(handle_coverage=False)
finally:
if coverage is not None:
coverage.stop()
coverage.save()
except:
traceback.print_exc()
server_comm.notifications_queue.put_nowait(KillServer())
#=======================================================================================================================
# main
#=======================================================================================================================
if __name__ == '__main__':
if len(sys.argv) -1 == 3:
job_id, port, verbosity = sys.argv[1:]
coverage_output_file, coverage_include = None, None
elif len(sys.argv) -1 == 5:
job_id, port, verbosity, coverage_output_file, coverage_include = sys.argv[1:]
else:
raise AssertionError('Could not find out how to handle the parameters: '+sys.argv[1:])
job_id = int(job_id)
port = int(port)
verbosity = int(verbosity)
run_client(job_id, port, verbosity, coverage_output_file, coverage_include)
|