summaryrefslogtreecommitdiff
path: root/python/helpers/pydev/pydev_runfiles_parallel_client.py
blob: 7e5187ea8461dd462180cec432bad99ae9d7ee6b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from pydevd_constants import * #@UnusedWildImport
from pydev_imports import xmlrpclib, _queue
Queue = _queue.Queue
import traceback
from pydev_runfiles_coverage import StartCoverageSupportFromParams



#=======================================================================================================================
# ParallelNotification
#=======================================================================================================================
class ParallelNotification(object):

    def __init__(self, method, args, kwargs):
        self.method = method
        self.args = args
        self.kwargs = kwargs

    def ToTuple(self):
        return self.method, self.args, self.kwargs


#=======================================================================================================================
# KillServer
#=======================================================================================================================
class KillServer(object):
    pass



#=======================================================================================================================
# ServerComm
#=======================================================================================================================
class ServerComm(threading.Thread):



    def __init__(self, job_id, server):
        self.notifications_queue = Queue()
        threading.Thread.__init__(self)
        self.setDaemon(False) #Wait for all the notifications to be passed before exiting!
        assert job_id is not None
        assert port is not None
        self.job_id = job_id

        self.finished = False
        self.server = server


    def run(self):
        while True:
            kill_found = False
            commands = []
            command = self.notifications_queue.get(block=True)
            if isinstance(command, KillServer):
                kill_found = True
            else:
                assert isinstance(command, ParallelNotification)
                commands.append(command.ToTuple())

            try:
                while True:
                    command = self.notifications_queue.get(block=False) #No block to create a batch.
                    if isinstance(command, KillServer):
                        kill_found = True
                    else:
                        assert isinstance(command, ParallelNotification)
                        commands.append(command.ToTuple())
            except:
                pass #That's OK, we're getting it until it becomes empty so that we notify multiple at once.


            if commands:
                try:
                    #Batch notification.
                    self.server.lock.acquire()
                    try:
                        self.server.notifyCommands(self.job_id, commands)
                    finally:
                        self.server.lock.release()
                except:
                    traceback.print_exc()

            if kill_found:
                self.finished = True
                return



#=======================================================================================================================
# ServerFacade
#=======================================================================================================================
class ServerFacade(object):


    def __init__(self, notifications_queue):
        self.notifications_queue = notifications_queue


    def notifyTestsCollected(self, *args, **kwargs):
        pass #This notification won't be passed


    def notifyTestRunFinished(self, *args, **kwargs):
        pass #This notification won't be passed


    def notifyStartTest(self, *args, **kwargs):
        self.notifications_queue.put_nowait(ParallelNotification('notifyStartTest', args, kwargs))


    def notifyTest(self, *args, **kwargs):
        self.notifications_queue.put_nowait(ParallelNotification('notifyTest', args, kwargs))



#=======================================================================================================================
# run_client
#=======================================================================================================================
def run_client(job_id, port, verbosity, coverage_output_file, coverage_include):
    job_id = int(job_id)

    import pydev_localhost
    server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), port))
    server.lock = threading.Lock()


    server_comm = ServerComm(job_id, server)
    server_comm.start()

    try:
        server_facade = ServerFacade(server_comm.notifications_queue)
        import pydev_runfiles
        import pydev_runfiles_xml_rpc
        pydev_runfiles_xml_rpc.SetServer(server_facade)

        #Starts None and when the 1st test is gotten, it's started (because a server may be initiated and terminated
        #before receiving any test -- which would mean a different process got all the tests to run).
        coverage = None

        try:
            tests_to_run = [1]
            while tests_to_run:
                #Investigate: is it dangerous to use the same xmlrpclib server from different threads?
                #It seems it should be, as it creates a new connection for each request...
                server.lock.acquire()
                try:
                    tests_to_run = server.GetTestsToRun(job_id)
                finally:
                    server.lock.release()

                if not tests_to_run:
                    break

                if coverage is None:
                    _coverage_files, coverage = StartCoverageSupportFromParams(
                        None, coverage_output_file, 1, coverage_include)


                files_to_tests = {}
                for test in tests_to_run:
                    filename_and_test = test.split('|')
                    if len(filename_and_test) == 2:
                        files_to_tests.setdefault(filename_and_test[0], []).append(filename_and_test[1])

                configuration = pydev_runfiles.Configuration(
                    '',
                    verbosity,
                    None,
                    None,
                    None,
                    files_to_tests,
                    1, #Always single job here
                    None,

                    #The coverage is handled in this loop.
                    coverage_output_file=None,
                    coverage_include=None,
                )
                test_runner = pydev_runfiles.PydevTestRunner(configuration)
                sys.stdout.flush()
                test_runner.run_tests(handle_coverage=False)
        finally:
            if coverage is not None:
                coverage.stop()
                coverage.save()


    except:
        traceback.print_exc()
    server_comm.notifications_queue.put_nowait(KillServer())



#=======================================================================================================================
# main
#=======================================================================================================================
if __name__ == '__main__':
    if len(sys.argv) -1 == 3:
        job_id, port, verbosity = sys.argv[1:]
        coverage_output_file, coverage_include = None, None

    elif len(sys.argv) -1 == 5:
        job_id, port, verbosity, coverage_output_file, coverage_include = sys.argv[1:]

    else:
        raise AssertionError('Could not find out how to handle the parameters: '+sys.argv[1:])

    job_id = int(job_id)
    port = int(port)
    verbosity = int(verbosity)
    run_client(job_id, port, verbosity, coverage_output_file, coverage_include)