aboutsummaryrefslogtreecommitdiff
path: root/run_tests.py
diff options
context:
space:
mode:
authorDan Albert <danalbert@google.com>2017-08-28 15:00:04 -0700
committerDan Albert <danalbert@google.com>2017-08-28 15:00:04 -0700
commitb140b0e5f35ff762662f65ce0498e86f67271a8f (patch)
treedca931cd850d2185758eaf44703421d54d5b5fe5 /run_tests.py
parentba1c7c3fce8beebd1f2c75d6d176a65595b20960 (diff)
downloadndk-b140b0e5f35ff762662f65ce0498e86f67271a8f.tar.gz
Refactor sharding to improve device load.
Allocating all tests to devices up front means that one device might be stuck with all the long running tests while the others sit idle. Wrap the WorkQueue in a ShardingWorkQueue that makes a WorkQueue per device, as task queue per sharding group, and a common result queue. Test: ./run_tests.py Bug: None Change-Id: I82aae68dd89e1b468d99bf7fb68b0da20a40ffd8
Diffstat (limited to 'run_tests.py')
-rwxr-xr-xrun_tests.py108
1 files changed, 80 insertions, 28 deletions
diff --git a/run_tests.py b/run_tests.py
index 48b67a70c..cbe59c48e 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -20,6 +20,7 @@ from __future__ import print_function
import argparse
import json
import logging
+import multiprocessing
import os
import posixpath
import random
@@ -183,10 +184,10 @@ class LibcxxTestCase(TestCase):
class TestRun(object):
- """A test case mapped to the device it will run on."""
- def __init__(self, test_case, device):
+ """A test case mapped to the device group it will run on."""
+ def __init__(self, test_case, device_group):
self.test_case = test_case
- self.device = device
+ self.device_group = device_group
@property
def name(self):
@@ -200,17 +201,17 @@ class TestRun(object):
def config(self):
return self.test_case.config
- def make_result(self, adb_result_tuple):
+ def make_result(self, adb_result_tuple, device):
status, out, _ = adb_result_tuple
if status == 0:
result = ndk.test.result.Success(self)
else:
- out = '\n'.join([str(self.device), out])
+ out = '\n'.join([str(device), out])
result = ndk.test.result.Failure(self, out)
- return self.fixup_xfail(result)
+ return self.fixup_xfail(result, device)
- def fixup_xfail(self, result):
- config, bug = self.test_case.check_broken(self.device)
+ def fixup_xfail(self, result, device):
+ config, bug = self.test_case.check_broken(device)
if config is not None:
if result.failed():
return ndk.test.result.ExpectedFailure(self, config, bug)
@@ -219,12 +220,12 @@ class TestRun(object):
raise ValueError('Test result must have either failed or passed.')
return result
- def run(self):
- config = self.test_case.check_unsupported(self.device)
+ def run(self, device):
+ config = self.test_case.check_unsupported(device)
if config is not None:
message = 'test unsupported for {}'.format(config)
return ndk.test.result.Skipped(self, message)
- return self.make_result(self.test_case.run(self.device))
+ return self.make_result(self.test_case.run(device), device)
def build_tests(ndk_dir, out_dir, clean, printer, config, test_filter):
@@ -320,7 +321,7 @@ def enumerate_tests(test_dir, test_filter, config_filter):
return tests
-def clear_test_directory(device):
+def clear_test_directory(_worker_data, device):
print('Clearing test directory on {}.'.format(device))
cmd = ['rm', '-r', DEVICE_TEST_BASE_DIR]
logger().info('%s: shell_nocheck "%s"', device.name, cmd)
@@ -345,7 +346,8 @@ def adb_has_feature(feature):
return feature in features
-def push_tests_to_device(src_dir, dest_dir, config, device, use_sync):
+def push_tests_to_device(_worker_data, src_dir, dest_dir, config, device,
+ use_sync):
print('Pushing {} tests to {}.'.format(config, device))
logger().info('%s: mkdir %s', device.name, dest_dir)
device.shell_nocheck(['mkdir', dest_dir])
@@ -414,7 +416,7 @@ def asan_device_setup(ndk_path, device):
device, out))
-def setup_asan_for_device(ndk_path, device):
+def setup_asan_for_device(_worker_data, ndk_path, device):
print('Performing ASAN setup for {}'.format(device))
disable_verity_and_wait_for_reboot(device)
asan_device_setup(ndk_path, device)
@@ -439,8 +441,9 @@ def perform_asan_setup(workqueue, ndk_path, groups_for_config):
workqueue.get_result()
-def run_test(test):
- return test.run()
+def run_test(worker_data, test):
+ device = worker_data[0]
+ return test.run(device)
def print_test_stats(test_groups):
@@ -484,7 +487,7 @@ def match_configs_to_device_groups(fleet, configs):
return groups_for_config
-def create_test_runs(test_groups, groups_for_config):
+def pair_test_runs(test_groups, groups_for_config):
"""Creates a TestRun object for each device/test case pairing."""
test_runs = []
for config, test_cases in test_groups.items():
@@ -492,9 +495,7 @@ def create_test_runs(test_groups, groups_for_config):
continue
for group in groups_for_config[config]:
- for shard_idx, device in enumerate(group.devices):
- sharded_tests = test_cases[shard_idx::len(group.devices)]
- test_runs.extend([TestRun(tc, device) for tc in sharded_tests])
+ test_runs.extend([TestRun(tc, group) for tc in test_cases])
return test_runs
@@ -643,6 +644,49 @@ class ConfigFilter(object):
return config_tuple in self.config_tuples
+class ShardingWorkQueue(object):
+ def __init__(self, device_groups, procs_per_device):
+ self.result_queue = multiprocessing.Queue()
+ self.task_queues = {}
+ self.work_queues = []
+ self.num_tasks = 0
+ for group in device_groups:
+ self.task_queues[group] = multiprocessing.Queue()
+ for device in group.devices:
+ self.work_queues.append(
+ ndk.workqueue.WorkQueue(
+ procs_per_device, task_queue=self.task_queues[group],
+ result_queue=self.result_queue, worker_data=[device]))
+
+ def add_task(self, group, func, *args, **kwargs):
+ self.task_queues[group].put(
+ ndk.workqueue.Task(func, args, kwargs))
+ self.num_tasks += 1
+
+ def get_result(self):
+ """Gets a result from the queue, blocking until one is available."""
+ result = self.result_queue.get()
+ if type(result) == ndk.workqueue.TaskError:
+ raise result
+ self.num_tasks -= 1
+ return result
+
+ def terminate(self):
+ for work_queue in self.work_queues:
+ work_queue.terminate()
+ for task_queue in self.task_queues.values():
+ ndk.workqueue.flush_queue(task_queue)
+ ndk.workqueue.flush_queue(self.result_queue)
+
+ def join(self):
+ for work_queue in self.work_queues:
+ work_queue.join()
+
+ def finished(self):
+ """Returns True if all tasks have completed execution."""
+ return self.num_tasks == 0
+
+
def main():
total_timer = ndk.timer.Timer()
total_timer.start()
@@ -745,25 +789,33 @@ def main():
asan_setup_timer = ndk.timer.Timer()
with asan_setup_timer:
perform_asan_setup(workqueue, args.ndk, groups_for_config)
+ finally:
+ workqueue.terminate()
+ workqueue.join()
+
+ shard_queue = ShardingWorkQueue(fleet.get_unique_device_groups(), 8)
+ try:
+ # Need an input queue per device group, a single result queue, and a
+ # pool of threads per device.
# Shuffle the test runs to distribute the load more evenly. These are
# ordered by (build config, device, test), so most of the tests running
# at any given point in time are all running on the same device.
- test_runs = create_test_runs(test_groups, groups_for_config)
+ test_runs = pair_test_runs(test_groups, groups_for_config)
random.shuffle(test_runs)
test_run_timer = ndk.timer.Timer()
with test_run_timer:
- for test in test_runs:
- workqueue.add_task(run_test, test)
+ for test_run in test_runs:
+ shard_queue.add_task(test_run.device_group, run_test, test_run)
- wait_for_results(report, workqueue, printer)
- restart_flaky_tests(report, workqueue)
- wait_for_results(report, workqueue, printer)
+ wait_for_results(report, shard_queue, printer)
+ restart_flaky_tests(report, shard_queue)
+ wait_for_results(report, shard_queue, printer)
printer.print_summary(report)
finally:
- workqueue.terminate()
- workqueue.join()
+ shard_queue.terminate()
+ shard_queue.join()
total_timer.finish()