diff options
author | Joel Fernandes <joelaf@google.com> | 2017-06-29 14:28:20 -0700 |
---|---|---|
committer | Joel Fernandes <joelaf@google.com> | 2017-06-29 14:28:20 -0700 |
commit | f0c228a018cd332d28451f3e851b5e0640d917a6 (patch) | |
tree | 7c624b5404d1c61ae93dfa71ff4499ad6e97ca0e | |
parent | b0e013c24a4d59d21e8d078298f2d0f266bda611 (diff) | |
parent | 180ba383f2ab4e544273a1e75e8bee61df654ceb (diff) | |
download | trappy-f0c228a018cd332d28451f3e851b5e0640d917a6.tar.gz |
Merge remote-tracking branch 'origin/master' into HEAD
24 files changed, 429 insertions, 20 deletions
diff --git a/tests/test_caching.py b/tests/test_caching.py new file mode 100644 index 0000000..d0893b7 --- /dev/null +++ b/tests/test_caching.py @@ -0,0 +1,193 @@ +# Copyright 2015-2017 ARM Limited, Google and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import shutil +import sys +import unittest +import utils_tests +import trappy +from trappy.ftrace import GenericFTrace + +class TestCaching(utils_tests.SetupDirectory): + def __init__(self, *args, **kwargs): + super(TestCaching, self).__init__( + [("trace_sched.txt", "trace.txt"), + ("trace_sched.txt", "trace.raw.txt")], + *args, + **kwargs) + + def test_cache_created(self): + """Test cache creation when enabled""" + GenericFTrace.disable_cache = False + trace = trappy.FTrace() + + trace_path = os.path.abspath(trace.trace_path) + trace_dir = os.path.dirname(trace_path) + trace_file = os.path.basename(trace_path) + cache_dir = '.' + trace_file + '.cache' + + self.assertTrue(cache_dir in os.listdir(trace_dir)) + + def test_cache_not_created(self): + """Test that cache should not be created when disabled """ + GenericFTrace.disable_cache = True + trace = trappy.FTrace() + + trace_path = os.path.abspath(trace.trace_path) + trace_dir = os.path.dirname(trace_path) + trace_file = os.path.basename(trace_path) + cache_dir = '.' + trace_file + '.cache' + + self.assertFalse(cache_dir in os.listdir(trace_dir)) + + def test_compare_cached_vs_uncached(self): + """ Test that the cached and uncached traces are same """ + # Build the cache, but the actual trace will be parsed + # fresh since this is a first time parse + GenericFTrace.disable_cache = False + uncached_trace = trappy.FTrace() + uncached_dfr = uncached_trace.sched_wakeup.data_frame + + # Now read from previously parsed cache by reusing the path + cached_trace = trappy.FTrace(uncached_trace.trace_path) + cached_dfr = cached_trace.sched_wakeup.data_frame + + # Test whether timestamps are the same: + # The cached/uncached versions of the timestamps are slightly + # different due to floating point precision errors due to converting + # back and forth CSV and DataFrame. For all purposes this is not relevant + # since such rounding doesn't effect the end result. + # Here's an example of the error, the actual normalized time when + # calculated by hand is 0.081489, however following is what's stored + # in the CSV for sched_wakeup events in this trace. + # When converting the index to strings (and also what's in the CSV) + # cached: ['0.0814890000001', '1.981491'] + # uncached: ['0.0814890000001', '1.981491'] + # + # Keeping index as numpy.float64 + # cached: [0.081489000000100009, 1.9814909999999999] + # uncached: [0.081489000000146916, 1.9814909999995507] + # + # To make it possible to test, lets just convert the timestamps to strings + # and compare them below. + + cached_times = [str(r[0]) for r in cached_dfr.iterrows()] + uncached_times = [str(r[0]) for r in uncached_dfr.iterrows()] + + self.assertTrue(cached_times == uncached_times) + + # compare other columns as well + self.assertTrue([r[1].pid for r in cached_dfr.iterrows()] == + [r[1].pid for r in uncached_dfr.iterrows()]) + + self.assertTrue([r[1].comm for r in cached_dfr.iterrows()] == + [r[1].comm for r in uncached_dfr.iterrows()]) + + self.assertTrue([r[1].prio for r in cached_dfr.iterrows()] == + [r[1].prio for r in uncached_dfr.iterrows()]) + + def test_invalid_cache_overwritten(self): + """Test a cache with a bad checksum is overwritten""" + # This is a directory so we can't use the files_to_copy arg of + # SetUpDirectory, just do it ourselves. + cache_path = ".trace.txt.cache" + src = os.path.join(utils_tests.TESTS_DIRECTORY, "trace_sched.txt.cache") + shutil.copytree(src, cache_path) + + md5_path = os.path.join(cache_path, "md5sum") + def read_md5sum(): + with open(md5_path) as f: + return f.read() + + # Change 1 character of the stored checksum + md5sum = read_md5sum() + # Sorry, I guess modifying strings in Python is kind of awkward? + md5sum_inc = "".join(list(md5sum[:-1]) + [chr(ord(md5sum[-1]) + 1)]) + with open(md5_path, "w") as f: + f.write(md5sum_inc) + + # Parse a trace, this should delete and overwrite the invalidated cache + GenericFTrace.disable_cache = False + trace = trappy.FTrace() + + # Check that the modified md5sum was overwritten + self.assertNotEqual(read_md5sum(), md5sum_inc, + "The invalid ftrace cache wasn't overwritten") + + def test_cache_dynamic_events(self): + """Test that caching works if new event parsers have been registered""" + + # Parse the trace to create a cache + GenericFTrace.disable_cache = False + trace1 = trappy.FTrace() + + # Check we're actually testing what we think we are + if hasattr(trace1, 'dynamic_event'): + raise RuntimeError('Test bug: found unexpected event in trace') + + # Now register a new event type, call the constructor again, and check + # that the newly added event (which is not present in the cache) is + # parsed. + + parse_class = trappy.register_dynamic_ftrace("DynamicEvent", "dynamic_test_key") + + trace2 = trappy.FTrace() + self.assertTrue(len(trace2.dynamic_event.data_frame) == 1) + + trappy.unregister_dynamic_ftrace(parse_class) + + def test_cache_normalize_time(self): + """Test that caching doesn't break normalize_time""" + GenericFTrace.disable_cache = False + + # Times in trace_sched.txt + start_time = 6550.018511 + first_freq_event_time = 6550.056870 + + # Parse without normalizing time + trace1 = trappy.FTrace(events=['cpu_frequency', 'sched_wakeup'], + normalize_time=False) + + self.assertEqual(trace1.cpu_frequency.data_frame.index[0], + first_freq_event_time) + + # Parse with normalized time + trace2 = trappy.FTrace(events=['cpu_frequency', 'sched_wakeup'], + normalize_time=True) + + self.assertEqual(trace2.cpu_frequency.data_frame.index[0], + first_freq_event_time - start_time) + + def test_cache_window(self): + """Test that caching doesn't break the 'window' parameter""" + GenericFTrace.disable_cache = False + + trace1 = trappy.FTrace( + events=['sched_wakeup'], + window=(0, 1)) + + # Check that we're testing what we think we're testing The trace + # contains 2 sched_wakeup events; this window should get rid of one of + # them. + if len(trace1.sched_wakeup.data_frame) != 1: + raise RuntimeError('Test bug: bad sched_wakeup event count') + + # Parse again without the window + trace1 = trappy.FTrace( + events=['sched_wakeup'], + window=(0, None)) + + self.assertEqual(len(trace1.sched_wakeup.data_frame), 2) diff --git a/tests/test_ftrace.py b/tests/test_ftrace.py index 7d7874a..e6f6319 100644 --- a/tests/test_ftrace.py +++ b/tests/test_ftrace.py @@ -230,6 +230,31 @@ class TestFTrace(BaseTestThermal): # Make sure there are no NaNs in the middle of the array self.assertTrue(allfreqs[0][1]["A57_freq_in"].notnull().all()) + def test_apply_callbacks(self): + """Test apply_callbacks()""" + + counts = { + "cpu_in_power": 0, + "cpu_out_power": 0 + } + + def cpu_in_power_fn(data): + counts["cpu_in_power"] += 1 + + def cpu_out_power_fn(data): + counts["cpu_out_power"] += 1 + + fn_map = { + "cpu_in_power": cpu_in_power_fn, + "cpu_out_power": cpu_out_power_fn + } + trace = trappy.FTrace() + + trace.apply_callbacks(fn_map) + + self.assertEqual(counts["cpu_in_power"], 134) + self.assertEqual(counts["cpu_out_power"], 134) + def test_plot_freq_hists(self): """Test that plot_freq_hists() doesn't bomb""" diff --git a/tests/trace_sched.txt.cache/CpuIdle.csv b/tests/trace_sched.txt.cache/CpuIdle.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/CpuIdle.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/CpuInPower.csv b/tests/trace_sched.txt.cache/CpuInPower.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/CpuInPower.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/CpuOutPower.csv b/tests/trace_sched.txt.cache/CpuOutPower.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/CpuOutPower.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/DevfreqInPower.csv b/tests/trace_sched.txt.cache/DevfreqInPower.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/DevfreqInPower.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/DevfreqOutPower.csv b/tests/trace_sched.txt.cache/DevfreqOutPower.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/DevfreqOutPower.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/PIDController.csv b/tests/trace_sched.txt.cache/PIDController.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/PIDController.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/SchedContribScaleFactor.csv b/tests/trace_sched.txt.cache/SchedContribScaleFactor.csv new file mode 100644 index 0000000..a1764fe --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedContribScaleFactor.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,cpu,cpu_scale_factor,freq_scale_factor +0.000167999999576,<idle>,0,0,0,1024,426 diff --git a/tests/trace_sched.txt.cache/SchedCpuCapacity.csv b/tests/trace_sched.txt.cache/SchedCpuCapacity.csv new file mode 100644 index 0000000..4b75c6a --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedCpuCapacity.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,capacity,cpu,rt_capacity +0.000293999999485,trace-cmd,3,3519,430,3,1024 diff --git a/tests/trace_sched.txt.cache/SchedCpuFrequency.csv b/tests/trace_sched.txt.cache/SchedCpuFrequency.csv new file mode 100644 index 0000000..dbb941d --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedCpuFrequency.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,cpu,frequency +0.0383590000001,kworker/0:0,0,3410,0,600000 diff --git a/tests/trace_sched.txt.cache/SchedLoadAvgCpu.csv b/tests/trace_sched.txt.cache/SchedLoadAvgCpu.csv new file mode 100644 index 0000000..54a9596 --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedLoadAvgCpu.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,cpu,load,utilization +1.99999976758e-06,sshd,0,2962,0,13,18 diff --git a/tests/trace_sched.txt.cache/SchedLoadAvgSchedGroup.csv b/tests/trace_sched.txt.cache/SchedLoadAvgSchedGroup.csv new file mode 100644 index 0000000..fc57841 --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedLoadAvgSchedGroup.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,cpus,load,utilization +0.0,rcuos/2,1,22,00000002,0,0 diff --git a/tests/trace_sched.txt.cache/SchedLoadAvgTask.csv b/tests/trace_sched.txt.cache/SchedLoadAvgTask.csv new file mode 100644 index 0000000..8b3ccfe --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedLoadAvgTask.csv @@ -0,0 +1,2 @@ +Time,__comm,__cpu,__pid,avg_period,comm,load,pid,runnable_avg_sum,running_avg_sum,utilization +9.99999429041e-07,trace-cmd,4,2971,48595,sshd,0,2962,0,0,0 diff --git a/tests/trace_sched.txt.cache/SchedMigrateTask.csv b/tests/trace_sched.txt.cache/SchedMigrateTask.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedMigrateTask.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/SchedSwitch.csv b/tests/trace_sched.txt.cache/SchedSwitch.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedSwitch.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/SchedWakeup.csv b/tests/trace_sched.txt.cache/SchedWakeup.csv new file mode 100644 index 0000000..6210734 --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedWakeup.csv @@ -0,0 +1,3 @@ +Time,__comm,__cpu,__pid,comm,pid,prio,success,target_cpu +0.0814890000001,<idle>,1,0,rcu_preempt,7,120,1,1 +1.981491,<idle>,1,0,rcu_preempt,7,120,1,1 diff --git a/tests/trace_sched.txt.cache/SchedWakeupNew.csv b/tests/trace_sched.txt.cache/SchedWakeupNew.csv new file mode 100644 index 0000000..4ea006b --- /dev/null +++ b/tests/trace_sched.txt.cache/SchedWakeupNew.csv @@ -0,0 +1,3 @@ +Time,__comm,__cpu,__pid,comm,pid,prio,success,target_cpu +0.000152999999955,<...>,0,19427,shutils,19428,120,1,2 +1.975373,<...>,0,19427,shutils,19428,120,1,2 diff --git a/tests/trace_sched.txt.cache/Thermal.csv b/tests/trace_sched.txt.cache/Thermal.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/Thermal.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/ThermalGovernor.csv b/tests/trace_sched.txt.cache/ThermalGovernor.csv new file mode 100644 index 0000000..e16c76d --- /dev/null +++ b/tests/trace_sched.txt.cache/ThermalGovernor.csv @@ -0,0 +1 @@ +"" diff --git a/tests/trace_sched.txt.cache/md5sum b/tests/trace_sched.txt.cache/md5sum new file mode 100644 index 0000000..9b481a3 --- /dev/null +++ b/tests/trace_sched.txt.cache/md5sum @@ -0,0 +1 @@ +47be9ccdd36fa0c3646b0d9b0f649da4
\ No newline at end of file diff --git a/tests/utils_tests.py b/tests/utils_tests.py index 617cfa3..e13b868 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -19,6 +19,8 @@ import os import shutil import subprocess import tempfile +import trappy +from trappy.ftrace import GenericFTrace TESTS_DIRECTORY = os.path.dirname(os.path.realpath(__file__)) @@ -36,6 +38,7 @@ class SetupDirectory(unittest.TestCase): def __init__(self, files_to_copy, *args, **kwargs): self.files_to_copy = files_to_copy super(SetupDirectory, self).__init__(*args, **kwargs) + GenericFTrace.disable_cache = True def setUp(self): self.previous_dir = os.getcwd() diff --git a/trappy/base.py b/trappy/base.py index 4f06af2..06857b5 100644 --- a/trappy/base.py +++ b/trappy/base.py @@ -113,6 +113,7 @@ class Base(object): self.pid_array = [] self.cpu_array = [] self.parse_raw = parse_raw + self.cached = False def finalize_object(self): pass @@ -179,6 +180,27 @@ class Base(object): self.line_array.append(line) self.data_array.append(data) + def string_cast(self, string, type): + """ Attempt to convert string to another type + + Here we attempt to cast string to a type. Currently only + integer conversion is supported with future expansion + left open to other types. + + :param string: The value to convert. + :type string: str + + :param type: The type to convert to. + :type type: type + """ + # Currently this function only supports int conversion + if type != int: + return + # Handle false-positives for negative numbers + if not string.lstrip("-").isdigit(): + return string + return int(string) + def generate_data_dict(self, data_str): data_dict = {} prev_key = None @@ -190,10 +212,7 @@ class Base(object): data_dict[prev_key] += ' ' + field continue (key, value) = field.split('=', 1) - try: - value = int(value) - except ValueError: - pass + value = self.string_cast(value, int) data_dict[key] = value prev_key = key return data_dict @@ -259,6 +278,14 @@ class Base(object): """ self.data_frame.to_csv(fname) + def read_csv(self, fname): + """Read the csv data into a DataFrame + + :param fname: The name of the CSV file + :type fname: str + """ + self.data_frame = pd.read_csv(fname, index_col = 0) + def normalize_time(self, basetime): """Substract basetime from the Time of the data frame diff --git a/trappy/ftrace.py b/trappy/ftrace.py index ea435f5..c0a40c2 100644 --- a/trappy/ftrace.py +++ b/trappy/ftrace.py @@ -18,9 +18,13 @@ # pylint: disable=no-member import itertools +import json import os import re import pandas as pd +import hashlib +import shutil +import warnings from trappy.bare_trace import BareTrace from trappy.utils import listify @@ -62,6 +66,57 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" dynamic_classes = {} + disable_cache = False + + def _trace_cache_path(self): + trace_file = self.trace_path + cache_dir = '.' + os.path.basename(trace_file) + '.cache' + tracefile_dir = os.path.dirname(os.path.abspath(trace_file)) + cache_path = os.path.join(tracefile_dir, cache_dir) + return cache_path + + def _check_trace_cache(self, params): + cache_path = self._trace_cache_path() + md5file = os.path.join(cache_path, 'md5sum') + params_path = os.path.join(cache_path, 'params.json') + + for path in [cache_path, md5file, params_path]: + if not os.path.exists(path): + return False + + with open(md5file) as f: + cache_md5sum = f.read() + with open(self.trace_path, 'rb') as f: + trace_md5sum = hashlib.md5(f.read()).hexdigest() + with open(params_path) as f: + cache_params = json.load(f) + + # check if cache is valid + if cache_md5sum != trace_md5sum or cache_params != params: + shutil.rmtree(cache_path) + return False + return True + + def _create_trace_cache(self, params): + cache_path = self._trace_cache_path() + md5file = os.path.join(cache_path, 'md5sum') + params_path = os.path.join(cache_path, 'params.json') + + if os.path.exists(cache_path): + shutil.rmtree(cache_path) + os.mkdir(cache_path) + + md5sum = hashlib.md5(open(self.trace_path, 'rb').read()).hexdigest() + with open(md5file, 'w') as f: + f.write(md5sum) + + with open(params_path, 'w') as f: + json.dump(params, f) + + def _get_csv_path(self, trace_class): + path = self._trace_cache_path() + return os.path.join(path, trace_class.__class__.__name__ + '.csv') + def __init__(self, name="", normalize_time=True, scope="all", events=[], window=(0, None), abs_window=(0, None)): super(GenericFTrace, self).__init__(name) @@ -127,7 +182,36 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" del scope_classes[name] def _do_parse(self): + params = {'window': self.window, 'abs_window': self.abs_window} + if not self.__class__.disable_cache and self._check_trace_cache(params): + # Read csv into frames + for trace_class in self.trace_classes: + try: + csv_file = self._get_csv_path(trace_class) + trace_class.read_csv(csv_file) + trace_class.cached = True + except: + warnstr = "TRAPpy: Couldn't read {} from cache, reading it from trace".format(trace_class) + warnings.warn(warnstr) + self.__parse_trace_file(self.trace_path) + + if not self.__class__.disable_cache: + try: + # Recreate basic cache directories only if nothing cached + if not all([c.cached for c in self.trace_classes]): + self._create_trace_cache(params) + + # Write out only events that weren't cached before + for trace_class in self.trace_classes: + if trace_class.cached: + continue + csv_file = self._get_csv_path(trace_class) + trace_class.write_csv(csv_file) + except OSError as err: + warnings.warn( + "TRAPpy: Cache not created due to OS error: {0}".format(err)) + self.finalize_objects() if self.normalize_time: @@ -165,31 +249,27 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" trace_class = DynamicTypeFactory(event_name, (Base,), kwords) self.class_definitions[event_name] = trace_class + def __get_trace_class(self, line, cls_word): + trace_class = None + for unique_word, cls in cls_word.iteritems(): + if unique_word in line: + trace_class = cls + if not cls.fallback: + return trace_class + return trace_class + def __populate_data(self, fin, cls_for_unique_word): """Append to trace data from a txt trace""" - def contains_unique_word(line, unique_words=cls_for_unique_word.keys()): - for unique_word in unique_words: - if unique_word in line: - return True - return False - actual_trace = itertools.dropwhile(self.trace_hasnt_started(), fin) actual_trace = itertools.takewhile(self.trace_hasnt_finished(), actual_trace) for line in actual_trace: - if not contains_unique_word(line): + trace_class = self.__get_trace_class(line, cls_for_unique_word) + if not trace_class: self.lines += 1 continue - for unique_word, cls in cls_for_unique_word.iteritems(): - if unique_word in line: - trace_class = cls - if not cls.fallback: - break - else: - if not trace_class: - raise FTraceParseError("No unique word in '{}'".format(line)) line = line[:-1] @@ -222,7 +302,8 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" return # Remove empty arrays from the trace - data_str = re.sub(r"[A-Za-z0-9_]+=\{\} ", r"", data_str) + if "={}" in data_str: + data_str = re.sub(r"[A-Za-z0-9_]+=\{\} ", r"", data_str) trace_class.append_data(timestamp, comm, pid, cpu, self.lines, data_str) self.lines += 1 @@ -265,6 +346,8 @@ is part of the trace. cls_for_unique_word = {} for trace_name in self.class_definitions.iterkeys(): trace_class = getattr(self, trace_name) + if trace_class.cached: + continue unique_word = trace_class.unique_word cls_for_unique_word[unique_word] = trace_class @@ -315,6 +398,55 @@ is part of the trace. return ret + def apply_callbacks(self, fn_map): + """ + Apply callback functions to trace events in chronological order. + + This method iterates over a user-specified subset of the available trace + event dataframes, calling different user-specified functions for each + event type. These functions are passed a dictionary mapping 'Index' and + the column names to their values for that row. + + For example, to iterate over trace t, applying your functions callback_fn1 + and callback_fn2 to each sched_switch and sched_wakeup event respectively: + + t.apply_callbacks({ + "sched_switch": callback_fn1, + "sched_wakeup": callback_fn2 + }) + """ + dfs = {event: getattr(self, event).data_frame for event in fn_map.keys()} + events = [event for event in fn_map.keys() if not dfs[event].empty] + iters = {event: dfs[event].itertuples() for event in events} + next_rows = {event: iterator.next() for event,iterator in iters.iteritems()} + + # Column names beginning with underscore will not be preserved in tuples + # due to constraints on namedtuple field names, so store mappings from + # column name to column number for each trace event. + col_idxs = {event: { + name: idx for idx, name in enumerate( + ['Index'] + dfs[event].columns.tolist() + ) + } for event in events} + + def getLine(event): + line_col_idx = col_idxs[event]['__line'] + return next_rows[event][line_col_idx] + + while events: + event_name = min(events, key=getLine) + event_tuple = next_rows[event_name] + + event_dict = { + col: event_tuple[idx] for col, idx in col_idxs[event_name].iteritems() + } + fn_map[event_name](event_dict) + event_row = next(iters[event_name], None) + if event_row: + next_rows[event_name] = event_row + else: + events.remove(event_name) + def plot_freq_hists(self, map_label, ax): """Plot histograms for each actor input and output frequency |