diff options
Diffstat (limited to 'trappy/utils.py')
-rw-r--r-- | trappy/utils.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/trappy/utils.py b/trappy/utils.py index eb73752..47ef4ab 100644 --- a/trappy/utils.py +++ b/trappy/utils.py @@ -13,6 +13,9 @@ # limitations under the License. # +import pandas as pd +import numpy as np + """Generic functions that can be used in multiple places in trappy """ @@ -102,3 +105,59 @@ def handle_duplicate_index(data, dup_index_left += 1 return data.reindex(new_index) + +# Iterate fast over all rows in a data frame and apply fn +def apply_callback(df, fn, *kwargs): + iters = df.itertuples() + event_tuple = iters.next() + + # Column names beginning with underscore will not be preserved in tuples + # due to constraints on namedtuple field names, so store mappings from + # column name to column number for each trace event. + col_idxs = { name: idx for idx, name in enumerate(['Time'] + df.columns.tolist()) } + + while True: + if not event_tuple: + break + event_dict = { col: event_tuple[idx] for col, idx in col_idxs.iteritems() } + + if kwargs: + fn(event_dict, kwargs) + else: + fn(event_dict) + + event_tuple = next(iters, None) + + +def merge_dfs(pr_df, sec_df, pivot): + # Keep track of last secondary event + pivot_map = {} + + # An array accumating dicts with merged data + merged_data = [] + def df_fn(data): + # Store the latest secondary info + if data['Time'][0] == 'secondary': + pivot_map[data[pivot]] = data + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + return + + # Propogate latest secondary info + for key, value in data.iteritems(): + if key == pivot: + continue + # Fast check for if value is nan (faster than np.isnan + try/except) + if value != value and pivot_map.has_key(data[pivot]): + data[key] = pivot_map[data[pivot]][key] + + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + merged_data.append(data) + + df = pd.concat([pr_df, sec_df], keys=['primary', 'secondary']).sort(columns='__line') + apply_callback(df, df_fn) + merged_df = pd.DataFrame.from_dict(merged_data) + merged_df.set_index('Time', inplace=True) + + return merged_df |