diff options
-rw-r--r-- | tests/test_base.py | 8 | ||||
-rw-r--r-- | tests/test_common_clk.py | 6 | ||||
-rw-r--r-- | tests/test_filesystem.py | 8 | ||||
-rw-r--r-- | trappy/base.py | 12 | ||||
-rw-r--r-- | trappy/ftrace.py | 17 | ||||
-rw-r--r-- | trappy/utils.py | 59 |
6 files changed, 89 insertions, 21 deletions
diff --git a/tests/test_base.py b/tests/test_base.py index a0a4920..8bebfba 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -85,7 +85,7 @@ class TestBase(utils_tests.SetupDirectory): in_data = """ kworker/4:1-397 [004] 720.741315: thermal_power_cpu_get: cpus=000000f0 freq=1900000 raw_cpu_power=1259 load={} power=61 kworker/4:1-397 [004] 720.741349: thermal_power_cpu_get: cpus=0000000f freq=1400000 raw_cpu_power=189 load={} power=14""" - expected_columns = set(["__comm", "__pid", "__cpu", "__line", "cpus", "freq", + expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "cpus", "freq", "raw_cpu_power", "power"]) with open("trace.txt", "w") as fout: @@ -131,7 +131,7 @@ class TestBase(utils_tests.SetupDirectory): timestamp ) - expected_columns = set(["__comm", "__pid", "__cpu", "__line", "tag"]) + expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "tag"]) with open("trace.txt", "w") as fout: fout.write(in_data) @@ -157,7 +157,7 @@ class TestBase(utils_tests.SetupDirectory): in_data = """ rcu_preempt-7 [000] 73.604532: my_sched_stat_runtime: comm=Space separated taskname pid=7 runtime=262875 [ns] vruntime=17096359856 [ns]""" - expected_columns = set(["__comm", "__pid", "__cpu", "__line", "comm", "pid", "runtime", "vruntime"]) + expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "comm", "pid", "runtime", "vruntime"]) with open("trace.txt", "w") as fout: fout.write(in_data) @@ -234,7 +234,7 @@ class TestBase(utils_tests.SetupDirectory): df = trace.equals_event.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__pid", "__cpu", "__line", "my_field"])) + set(["__comm", "__pid", "__tgid", "__cpu", "__line", "my_field"])) self.assertListEqual(df["my_field"].tolist(), ["foo", "foo=bar", "foo=bar=baz", 1, "1=2", "1=foo", "1foo=2"]) diff --git a/tests/test_common_clk.py b/tests/test_common_clk.py index afc270e..3d4cfca 100644 --- a/tests/test_common_clk.py +++ b/tests/test_common_clk.py @@ -33,18 +33,18 @@ class TestCommonClk(utils_tests.SetupDirectory): trace = trappy.FTrace("trace_common_clk.txt", events=['clock_set_rate']) df = trace.clock_set_rate.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "rate"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "rate"])) def test_common_clk_enable_can_be_parsed(self): """TestCommonClk: test that clock_enable events can be parsed""" trace = trappy.FTrace("trace_common_clk.txt", events=['clock_enable']) df = trace.clock_enable.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "state"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "state"])) def test_common_clk_disable_can_be_parsed(self): """TestCommonClk: test that clock_disable events can be parsed""" trace = trappy.FTrace("trace_common_clk.txt", events=['clock_disable']) df = trace.clock_disable.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "state"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "state"])) diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index a2921f0..212b2f5 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -33,25 +33,25 @@ class TestFilesystem(utils_tests.SetupDirectory): trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_da_write_begin']) df = trace.ext4_da_write_begin.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "pos", "len", "flags"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "pos", "len", "flags"])) def test_filesystem_ext_da_write_end_can_be_parsed(self): """TestFilesystem: test that ext4_da_write_end events can be parsed""" trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_da_write_end']) df = trace.ext4_da_write_end.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "pos", "len", "copied"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "pos", "len", "copied"])) def test_filesystem_ext_sync_file_enter_can_be_parsed(self): """TestFilesystem: test that ext4_sync_file_enter events can be parsed""" trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_sync_file_enter']) df = trace.ext4_sync_file_enter.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "parent", "datasync"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "parent", "datasync"])) def test_filesystem_ext_sync_file_exit_can_be_parsed(self): """TestFilesystem: test that ext4_sync_file_exit events can be parsed""" trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_sync_file_exit']) df = trace.ext4_sync_file_exit.data_frame self.assertSetEqual(set(df.columns), - set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "ret"])) + set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "ret"])) diff --git a/trappy/base.py b/trappy/base.py index 06857b5..8a7fb38 100644 --- a/trappy/base.py +++ b/trappy/base.py @@ -111,6 +111,7 @@ class Base(object): self.time_array = [] self.comm_array = [] self.pid_array = [] + self.tgid_array = [] self.cpu_array = [] self.parse_raw = parse_raw self.cached = False @@ -152,7 +153,7 @@ class Base(object): return ret - def append_data(self, time, comm, pid, cpu, line, data): + def append_data(self, time, comm, pid, tgid, cpu, line, data): """Append data parsed from a line to the corresponding arrays The :mod:`DataFrame` will be created from this when the whole trace @@ -176,6 +177,7 @@ class Base(object): self.time_array.append(time) self.comm_array.append(comm) self.pid_array.append(pid) + self.tgid_array.append(tgid) self.cpu_array.append(cpu) self.line_array.append(line) self.data_array.append(data) @@ -226,10 +228,10 @@ class Base(object): check_memory_usage = True check_memory_count = 1 - for (comm, pid, cpu, line, data_str) in zip(self.comm_array, self.pid_array, - self.cpu_array, self.line_array, - self.data_array): - data_dict = {"__comm": comm, "__pid": pid, "__cpu": cpu, "__line": line} + for (comm, pid, tgid, cpu, line, data_str) in zip(self.comm_array, self.pid_array, + self.tgid_array, self.cpu_array, + self.line_array, self.data_array): + data_dict = {"__comm": comm, "__pid": pid, "__tgid": tgid, "__cpu": cpu, "__line": line} data_dict.update(self.generate_data_dict(data_str)) # When running out of memory, Pandas has been observed to segfault diff --git a/trappy/ftrace.py b/trappy/ftrace.py index 7d23432..ec7b002 100644 --- a/trappy/ftrace.py +++ b/trappy/ftrace.py @@ -51,8 +51,8 @@ def _plot_freq_hists(allfreqs, what, axis, title): "Frequency", xlim, "default") SPECIAL_FIELDS_RE = re.compile( - r"^\s*(?P<comm>.*)-(?P<pid>\d+)(?:\s+\(.*\))"\ - r"?\s+\[(?P<cpu>\d+)\](?:\s+....)?\s+"\ + r"^\s*(?P<comm>.*)-(?P<pid>\d+)\s+\(?(?P<tgid>.*?)?\)"\ + r"?\s*\[(?P<cpu>\d+)\](?:\s+....)?\s+"\ r"(?P<timestamp>[0-9]+(?P<us>\.[0-9]+)?): (\w+:\s+)+(?P<data>.+)" ) @@ -294,6 +294,8 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" comm = fields_match.group('comm') pid = int(fields_match.group('pid')) cpu = int(fields_match.group('cpu')) + tgid = fields_match.group('tgid') + tgid = -1 if (not tgid or '-' in tgid) else int(tgid) # The timestamp, depending on the trace_clock configuration, can be # reported either in [s].[us] or [ns] format. Let's ensure that we @@ -320,7 +322,7 @@ subclassed by FTrace (for parsing FTrace coming from trace-cmd) and SysTrace.""" if "={}" in data_str: data_str = re.sub(r"[A-Za-z0-9_]+=\{\} ", r"", data_str) - trace_class.append_data(timestamp, comm, pid, cpu, self.lines, data_str) + trace_class.append_data(timestamp, comm, pid, tgid, cpu, self.lines, data_str) self.lines += 1 def trace_hasnt_started(self): @@ -413,7 +415,7 @@ is part of the trace. return ret - def apply_callbacks(self, fn_map): + def apply_callbacks(self, fn_map, *kwarg): """ Apply callback functions to trace events in chronological order. @@ -455,7 +457,12 @@ is part of the trace. event_dict = { col: event_tuple[idx] for col, idx in col_idxs[event_name].iteritems() } - fn_map[event_name](event_dict) + + if kwarg: + fn_map[event_name](event_dict, kwarg) + else: + fn_map[event_name](event_dict) + event_row = next(iters[event_name], None) if event_row: next_rows[event_name] = event_row diff --git a/trappy/utils.py b/trappy/utils.py index eb73752..a06ff1d 100644 --- a/trappy/utils.py +++ b/trappy/utils.py @@ -13,6 +13,9 @@ # limitations under the License. # +import pandas as pd +import numpy as np + """Generic functions that can be used in multiple places in trappy """ @@ -102,3 +105,59 @@ def handle_duplicate_index(data, dup_index_left += 1 return data.reindex(new_index) + +# Iterate fast over all rows in a data frame and apply fn +def apply_callback(df, fn, *kwargs): + iters = df.itertuples() + event_tuple = iters.next() + + # Column names beginning with underscore will not be preserved in tuples + # due to constraints on namedtuple field names, so store mappings from + # column name to column number for each trace event. + col_idxs = { name: idx for idx, name in enumerate(['Time'] + df.columns.tolist()) } + + while True: + if not event_tuple: + break + event_dict = { col: event_tuple[idx] for col, idx in col_idxs.iteritems() } + + if kwargs: + fn(event_dict, kwargs) + else: + fn(event_dict) + + event_tuple = next(iters, None) + + +def merge_dfs(pr_df, sec_df, pivot): + # Keep track of last secondary event + pivot_map = {} + + # An array accumating dicts with merged data + merged_data = [] + def df_fn(data): + # Store the latest secondary info + if data['Time'][0] == 'secondary': + pivot_map[data[pivot]] = data + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + return + + # Propogate latest secondary info + for key, value in data.iteritems(): + if key == pivot: + continue + # Fast check for if value is nan (faster than np.isnan + try/except) + if value != value and pivot_map.has_key(data[pivot]): + data[key] = pivot_map[data[pivot]][key] + + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + merged_data.append(data) + + df = pd.concat([pr_df, sec_df], keys=['primary', 'secondary']).sort_values(by='__line') + apply_callback(df, df_fn) + merged_df = pd.DataFrame.from_dict(merged_data) + merged_df.set_index('Time', inplace=True) + + return merged_df |