diff options
author | Kapileshwar Singh <kapileshwar.singh@arm.com> | 2015-08-13 22:07:37 +0100 |
---|---|---|
committer | Kapileshwar Singh <kapileshwar.singh@arm.com> | 2015-08-14 11:28:05 +0100 |
commit | 0329a41aca1a181ef47c98638acce7e331f1f324 (patch) | |
tree | 2dd019935531b2d4e2ed080914a1c1f7ddb6f99e /bart/sched | |
parent | e9f28ed7ce6041b58ff08a6ecf5486a8e8ac95e3 (diff) | |
download | bart-0329a41aca1a181ef47c98638acce7e331f1f324.tar.gz |
sched: Move Sched Assertions to a separate directory
Also change namespace from sheye to bart.sched
Signed-off-by: Kapileshwar Singh <kapileshwar.singh@arm.com>
Diffstat (limited to 'bart/sched')
-rwxr-xr-x | bart/sched/SchedAssert.py | 437 | ||||
-rwxr-xr-x | bart/sched/SchedMatrix.py | 210 | ||||
-rwxr-xr-x | bart/sched/SchedMultiAssert.py | 148 | ||||
-rw-r--r-- | bart/sched/__init__.py | 21 |
4 files changed, 816 insertions, 0 deletions
diff --git a/bart/sched/SchedAssert.py b/bart/sched/SchedAssert.py new file mode 100755 index 0000000..a00002e --- /dev/null +++ b/bart/sched/SchedAssert.py @@ -0,0 +1,437 @@ +# Copyright 2015-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A library for asserting scheduler scenarios based on the +statistics aggregation framework""" + +import trappy +import itertools +import math +from trappy.plotter.Utils import listify +from trappy.stats.Aggregator import MultiTriggerAggregator +from trappy.stats import SchedConf as sconf +from bart import Utils +import numpy as np + +# pylint: disable=invalid-name +# pylint: disable=too-many-arguments +class SchedAssert(object): + + """The primary focus of this class is to assert and verify + predefined scheduler scenarios. This does not compare parameters + across runs""" + + def __init__(self, run, topology, execname=None, pid=None): + """Args: + run (trappy.Run): A single trappy.Run object + or a path that can be passed to trappy.Run + topology(trappy.stats.Topology): The CPU topology + execname(str, optional): Optional execname of the task + under consideration. + PID(int): The PID of the task to be checked + + One of pid or execname is mandatory. If only execname + is specified, The current implementation will fail if + there are more than one processes with the same execname + """ + + run = Utils.init_run(run) + + if not execname and not pid: + raise ValueError("Need to specify at least one of pid or execname") + + self.execname = execname + self._run = run + self._pid = self._validate_pid(pid) + self._aggs = {} + self._topology = topology + self._triggers = sconf.sched_triggers(self._run, self._pid, + trappy.sched.SchedSwitch) + self.name = "{}-{}".format(self.execname, self._pid) + + def _validate_pid(self, pid): + """Validate the passed pid argument""" + + if not pid: + pids = sconf.get_pids_for_process(self._run, + self.execname) + + if len(pids) != 1: + raise RuntimeError( + "There should be exactly one PID {0} for {1}".format( + pids, + self.execname)) + + return pids[0] + + elif self.execname: + + pids = sconf.get_pids_for_process(self._run, + self.execname) + if pid not in pids: + raise RuntimeError( + "PID {0} not mapped to {1}".format( + pid, + self.execname)) + else: + self.execname = sconf.get_task_name(self._run, pid) + + return pid + + def _aggregator(self, aggfunc): + """ + Returns an aggregator corresponding to the + aggfunc, the aggregators are memoized for performance + + Args: + aggfunc (function(pandas.Series)): Function parameter that + accepts a pandas.Series object and returns a vector/scalar result + """ + + if aggfunc not in self._aggs.keys(): + self._aggs[aggfunc] = MultiTriggerAggregator(self._triggers, + self._topology, + aggfunc) + return self._aggs[aggfunc] + + def getResidency(self, level, node, window=None, percent=False): + """ + Residency of the task is the amount of time it spends executing + a particular node of a topological level. For example: + + clusters=[] + big = [1,2] + little = [0,3,4,5] + + topology = Topology(clusters=clusters) + + level="cluster" + node = [1,2] + + Will return the residency of the task on the big cluster. If + percent is specified it will be normalized to the total RUNTIME + of the TASK + + Args: + level (hashable): The level to which the node belongs + node (list): The node for which residency needs to calculated + window (tuple): A (start, end) tuple to limit the scope of the + residency calculation. + percent: If true the result is normalized to the total runtime + of the task and returned as a percentage + """ + + # Get the index of the node in the level + node_index = self._topology.get_index(level, node) + + agg = self._aggregator(sconf.residency_sum) + level_result = agg.aggregate(level=level, window=window) + + node_value = level_result[node_index] + + if percent: + total = agg.aggregate(level="all", window=window)[0] + node_value = node_value * 100 + node_value = node_value / total + + return node_value + + def assertResidency( + self, + level, + node, + expected_value, + operator, + window=None, + percent=False): + """ + Args: + level (hashable): The level to which the node belongs + node (list): The node for which residency needs to assert + expected_value (double): The expected value of the residency + operator (function): A binary operator function that returns + a boolean + window (tuple): A (start, end) tuple to limit the scope of the + residency calculation. + percent: If true the result is normalized to the total runtime + of the task and returned as a percentage + """ + node_value = self.getResidency(level, node, window, percent) + return operator(node_value, expected_value) + + def getStartTime(self): + """ + Returns the first time the task ran + (across all CPUs) + """ + + agg = self._aggregator(sconf.first_time) + result = agg.aggregate(level="all", value=sconf.TASK_RUNNING) + return min(result[0]) + + def getEndTime(self): + """ + Returns the last time the task ran + (across all CPUs) + """ + + agg = self._aggregator(sconf.first_time) + agg = self._aggregator(sconf.last_time) + result = agg.aggregate(level="all", value=sconf.TASK_RUNNING) + return max(result[0]) + + def _relax_switch_window(self, series, direction, window): + """ + direction == "left" + return the last time the task was running + if no such time exists in the window, + extend the window's left extent to + getStartTime + + direction == "right" + return the first time the task was running + in the window. If no such time exists in the + window, extend the window's right extent to + getEndTime() + + The function returns a None if + len(series[series == TASK_RUNNING]) == 0 + even in the extended window + """ + + series = series[series == sconf.TASK_RUNNING] + w_series = sconf.select_window(series, window) + start, stop = window + + if direction == "left": + if len(w_series): + return w_series.index.values[-1] + else: + start_time = self.getStartTime() + w_series = sconf.select_window( + series, + window=( + start_time, + start)) + + if not len(w_series): + return None + else: + return w_series.index.values[-1] + + elif direction == "right": + if len(w_series): + return w_series.index.values[0] + else: + end_time = self.getEndTime() + w_series = sconf.select_window(series, window=(stop, end_time)) + + if not len(w_series): + return None + else: + return w_series.index.values[0] + else: + raise ValueError("direction should be either left or right") + + def assertSwitch( + self, + level, + from_node, + to_node, + window, + ignore_multiple=True): + """ + This function asserts that there is context switch from the + from_node to the to_node: + + Args: + level (hashable): The level to which the node belongs + from_node (list): The node from which the task switches out + to_node (list): The node to which the task switches + window (tuple): A (start, end) tuple window of time where the + switch needs to be asserted + ignore_multiple (bool): If true, the function will ignore multiple + switches in the window, If false the assert will be true if and + only if there is a single switch within the specified window + + The function will only return true if and only if there is one + context switch between the specified nodes + """ + + from_node_index = self._topology.get_index(level, from_node) + to_node_index = self._topology.get_index(level, to_node) + + agg = self._aggregator(sconf.csum) + level_result = agg.aggregate(level=level) + + from_node_result = level_result[from_node_index] + to_node_result = level_result[to_node_index] + + from_time = self._relax_switch_window(from_node_result, "left", window) + if ignore_multiple: + to_time = self._relax_switch_window(to_node_result, "left", window) + else: + to_time = self._relax_switch_window( + to_node_result, + "right", window) + + if from_time and to_time: + if from_time < to_time: + return True + + return False + + def getRuntime(self, window=None, percent=False): + """Returns the Total Runtime of a task + + Args: + window (tuple): A (start, end) tuple to limit + the scope of the calculation + percent (boolean): If True, the result is returned + as a percentage of the total execution time + of the run. + """ + + agg = self._aggregator(sconf.residency_sum) + run_time = agg.aggregate(level="all", window=window)[0] + + if percent: + + if window: + begin, end = window + total_time = end - begin + else: + total_time = self._run.get_duration() + + run_time = run_time * 100 + run_time = run_time / total_time + + return run_time + + def assertRuntime( + self, + expected_value, + operator, + window=None, + percent=False): + """Assert on the total runtime of the task + + Args: + expected_value (double): The expected value of the total runtime + operator (func(a, b)): A binary operator function that + returns a boolean + window (tuple): A (start, end) tuple to limit the + scope of the calculation + percent (boolean): If True, the result is returned + as a percentage of the total execution time of the run. + """ + + run_time = self.getRuntime(window, percent) + return operator(run_time, expected_value) + + def getDutyCycle(self, window): + """Returns the duty cycle of the task + Args: + window (tuple): A (start, end) tuple to limit the + scope of the calculation + + Duty Cycle: + The percentage of time the task spends executing + in the given window + """ + + return self.getRuntime(window, percent=True) + + def assertDutyCycle(self, expected_value, operator, window): + """ + Args: + expected_value (double): The expected value of + the duty cycle + operator (func(a, b)): A binary operator function that + returns a boolean + window (tuple): A (start, end) tuple to limit the + scope of the calculation + + Duty Cycle: + The percentage of time the task spends executing + in the given window + """ + return self.assertRuntime( + expected_value, + operator, + window, + percent=True) + + def getFirstCpu(self, window=None): + """ + Args: + window (tuple): A (start, end) tuple to limit the + scope of the calculation + """ + + agg = self._aggregator(sconf.first_cpu) + result = agg.aggregate(level="cpu", window=window) + result = list(itertools.chain.from_iterable(result)) + + min_time = min(result) + if math.isinf(min_time): + return -1 + index = result.index(min_time) + return self._topology.get_node("cpu", index)[0] + + def assertFirstCpu(self, cpus, window=None): + """ + Args: + cpus (int, list): A list of acceptable CPUs + window (tuple): A (start, end) tuple to limit the scope + of the calculation + """ + first_cpu = self.getFirstCpu(window=window) + cpus = listify(cpus) + return first_cpu in cpus + + def generate_events(self, level, start_id=0, window=None): + """Generate events for the trace plot""" + + agg = self._aggregator(sconf.trace_event) + result = agg.aggregate(level=level, window=window) + events = [] + + for idx, level_events in enumerate(result): + if not len(level_events): + continue + events += np.column_stack((level_events, np.full(len(level_events), idx))).tolist() + + return sorted(events, key = lambda x : x[0]) + + def plot(self, level="cpu", window=None, xlim=None): + """ + Returns: + trappy.plotter.AbstractDataPlotter + Call .view() to draw the graph + """ + + if not xlim: + if not window: + xlim = [0, self._run.get_duration()] + else: + xlim = list(window) + + events = {} + events[self.name] = self.generate_events(level, window) + names = [self.name] + num_lanes = self._topology.level_span(level) + lane_prefix = level.upper() + ": " + return trappy.EventPlot(events, names, lane_prefix, num_lanes, xlim) diff --git a/bart/sched/SchedMatrix.py b/bart/sched/SchedMatrix.py new file mode 100755 index 0000000..f41af14 --- /dev/null +++ b/bart/sched/SchedMatrix.py @@ -0,0 +1,210 @@ +# Copyright 2015-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +The SchedMatrix provides an ability to compare two executions +of benchmarks with multiple processes. + +For example, consider a benchmark that spawns 4 identical threads +and any two threads should exhibit a certain behaviours and the +remaining another identical but different behaviour. + +SchedMatrix creates a Matrix of Scheduler Waveform Correlations + +A = Reference Execution +B = Execution to be Evaluated + + +---+ +---+ + | | | | +A1, B3 +---+ +--+ +--------------+ + +---+ +---+ + | | | | +A2, B4 +--------------+ +--+ +---+ + +---+ +---+ + | | | | +A3, B1 +---+ +--+ +--------------+ + +---+ +---+ + | | | | +A4, B2 +--------------+ +--+ +---+ + + +Correlation Matrix + + B1 B2 B3 B4 +A1 1 0 1 0 + +A2 0 1 0 1 + +A3 1 0 1 0 + +A4 0 1 0 1 + + +Thus a success criteria can be defined as + +A1 has two similar threads in the +evaluated execution + +assertSiblings(A1, 2, operator.eq) +assertSiblings(A2, 2, operator.eq) +assertSiblings(A3, 2, operator.eq) +assertSiblings(A4, 2, operator.eq) +""" + + +import sys +import trappy +import numpy as np +from trappy.stats.Aggregator import MultiTriggerAggregator +from trappy.stats.Correlator import Correlator +from trappy.plotter.Utils import listify +from trappy.stats import SchedConf as sconf +from bart import Utils + +POSITIVE_TOLERANCE = 0.80 + +# pylint: disable=invalid-name +# pylint: disable=too-many-arguments + + +class SchedMatrix(object): + + """Valid cases are: + + * Single execname, multiple PIDs + * PID List + * Multiple execname, one-to-one PID + association + """ + + def __init__( + self, + reference_trace, + trace, + topology, + execnames, + aggfunc=sconf.csum): + + run = Utils.init_run(trace) + reference_run = Utils.init_run(reference_trace) + + self._execnames = listify(execnames) + self._reference_pids = self._populate_pids(reference_run) + self._pids = self._populate_pids(run) + self._dimension = len(self._pids) + self._topology = topology + self._matrix = self._generate_matrix(run, reference_run, aggfunc) + + if len(self._pids) != len(self._reference_pids): + raise RuntimeError( + "The runs do not have the same number of PIDs for {0}".format( + str(execnames))) + + def _populate_pids(self, run): + """Populate the qualifying PIDs from the run""" + + if len(self._execnames) == 1: + return sconf.get_pids_for_process(run, self._execnames[0]) + + pids = [] + + for proc in self._execnames: + pids += sconf.get_pids_for_process(run, proc) + + return list(set(pids)) + + def _generate_matrix(self, run, reference_run, aggfunc): + """Generate the Correlation Matrix""" + + reference_aggs = [] + aggs = [] + + for idx in range(self._dimension): + + reference_aggs.append( + MultiTriggerAggregator( + sconf.sched_triggers( + reference_run, + self._reference_pids[idx], + trappy.sched.SchedSwitch + ), + self._topology, + aggfunc)) + + aggs.append( + MultiTriggerAggregator( + sconf.sched_triggers( + run, + self._pids[idx], + trappy.sched.SchedSwitch + ), + self._topology, + aggfunc)) + + agg_pair_gen = ((r_agg, agg) + for r_agg in reference_aggs for agg in aggs) + + # pylint fails to recognize numpy members. + # pylint: disable=no-member + matrix = np.zeros((self._dimension, self._dimension)) + # pylint: enable=no-member + + for (ref_result, test_result) in agg_pair_gen: + i = reference_aggs.index(ref_result) + j = aggs.index(test_result) + corr = Correlator( + ref_result, + test_result, + corrfunc=sconf.binary_correlate, + filter_gaps=True) + _, total = corr.correlate(level="cluster") + + matrix[i][j] = total + + return matrix + + def print_matrix(self): + """Print the correlation matrix""" + + # pylint fails to recognize numpy members. + # pylint: disable=no-member + np.set_printoptions(precision=5) + np.set_printoptions(suppress=False) + np.savetxt(sys.stdout, self._matrix, "%5.5f") + # pylint: enable=no-member + + def getSiblings(self, pid, tolerance=POSITIVE_TOLERANCE): + """Return the number of processes in the + reference trace that have a correlation + greater than tolerance + """ + + ref_pid_idx = self._reference_pids.index(pid) + pid_result = self._matrix[ref_pid_idx] + return len(pid_result[pid_result > tolerance]) + + def assertSiblings(self, pid, expected_value, operator, + tolerance=POSITIVE_TOLERANCE): + """Assert that the number of siblings in the reference + trace match the expected value and the operator + + Args: + pid: The PID in the reference trace + expected_value: the second argument to the operator + operator: a function of the type f(a, b) that returns + a boolean + """ + num_siblings = self.getSiblings(pid, tolerance) + return operator(num_siblings, expected_value) diff --git a/bart/sched/SchedMultiAssert.py b/bart/sched/SchedMultiAssert.py new file mode 100755 index 0000000..8e2f334 --- /dev/null +++ b/bart/sched/SchedMultiAssert.py @@ -0,0 +1,148 @@ +# Copyright 2015-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A library for asserting scheduler scenarios based on the +statistics aggregation framework""" + +import re +import inspect +import trappy +from trappy.stats import SchedConf as sconf +from trappy.plotter.Utils import listify +from bart.sched.SchedAssert import SchedAssert +from bart import Utils + +class SchedMultiAssert(object): + + """The primary focus of this class is to assert and verify + predefined scheduler scenarios. This does not compare parameters + across runs""" + + def __init__(self, run, topology, execnames): + """Args: + run (trappy.Run): A single trappy.Run object + or a path that can be passed to trappy.Run + topology(trappy.stats.Topology): The CPU topology + execname(str, list): List of execnames or single task + """ + + self._execnames = listify(execnames) + self._run = Utils.init_run(run) + self._pids = self._populate_pids() + self._topology = topology + self._asserts = self._populate_asserts() + self._populate_methods() + + def _populate_asserts(self): + """Populate SchedAsserts for the PIDs""" + + asserts = {} + + for pid in self._pids: + asserts[pid] = SchedAssert(self._run, self._topology, pid=pid) + + return asserts + + def _populate_pids(self): + """Map the input execnames to PIDs""" + + if len(self._execnames) == 1: + return sconf.get_pids_for_process(self._run, self._execnames[0]) + + pids = [] + + for proc in self._execnames: + pids += sconf.get_pids_for_process(self._run, proc) + + return list(set(pids)) + + def _create_method(self, attr_name): + """A wrapper function to create a dispatch function""" + + return lambda *args, **kwargs: self._dispatch(attr_name, *args, **kwargs) + + def _populate_methods(self): + """Populate Methods from SchedAssert""" + + for attr_name in dir(SchedAssert): + attr = getattr(SchedAssert, attr_name) + + valid_method = attr_name.startswith("get") or \ + attr_name.startswith("assert") + if inspect.ismethod(attr) and valid_method: + func = self._create_method(attr_name) + setattr(self, attr_name, func) + + def get_task_name(self, pid): + """Get task name for the PID""" + return self._asserts[pid].execname + + + def _dispatch(self, func_name, *args, **kwargs): + """The dispatch function to call into the SchedAssert + Method + """ + + assert_func = func_name.startswith("assert") + num_true = 0 + + rank = kwargs.pop("rank", None) + result = kwargs.pop("result", {}) + param = kwargs.pop("param", re.sub(r"assert|get", "", func_name, count=1).lower()) + + for pid in self._pids: + + if pid not in result: + result[pid] = {} + result[pid]["task_name"] = self.get_task_name(pid) + + attr = getattr(self._asserts[pid], func_name) + result[pid][param] = attr(*args, **kwargs) + + if assert_func and result[pid][param]: + num_true += 1 + + if assert_func and rank: + return num_true == rank + else: + return result + + def generate_events(self, level, window=None): + """Generate Events for the trace plot""" + + events = {} + for s_assert in self._asserts.values(): + events[s_assert.name] = s_assert.generate_events(level, window=window) + + return events + + def plot(self, level="cpu", window=None, xlim=None): + """ + Returns: + trappy.plotter.AbstractDataPlotter. Call .view() for + displaying the plot + """ + + if not xlim: + if not window: + xlim = [0, self._run.get_duration()] + else: + xlim = list(window) + + events = self.generate_events(level, window) + names = [s.name for s in self._asserts.values()] + num_lanes = self._topology.level_span(level) + lane_prefix = level.upper() + ": " + return trappy.EventPlot(events, names, lane_prefix, num_lanes, xlim) diff --git a/bart/sched/__init__.py b/bart/sched/__init__.py new file mode 100644 index 0000000..c391ecb --- /dev/null +++ b/bart/sched/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2015-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Initialization for bart.sched""" + + +from bart.sched import SchedAssert +from bart.sched import SchedMultiAssert +from bart.sched import SchedMatrix |