# Copyright 2015-2017 ARM Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import unittest from trappy.stats.Topology import Topology from trappy.stats.Trigger import Trigger from trappy.stats.Aggregator import MultiTriggerAggregator import collections import trappy from trappy.base import Base import pandas as pd from pandas.util.testing import assert_series_equal class TestTopology(unittest.TestCase): def test_add_to_level(self): """Test level creation""" level_groups = [[1, 2], [0, 3, 4, 5]] level = "test_level" topology = Topology() topology.add_to_level(level, level_groups) check_groups = topology.get_level(level) self.assertTrue(topology.has_level(level)) self.assertEqual(level_groups, check_groups) def test_flatten(self): """Test Topology: flatten""" level_groups = [[1, 2], [0, 3, 4, 5]] level = "test_level" topology = Topology() topology.add_to_level(level, level_groups) flattened = [0, 1, 2, 3, 4, 5] self.assertEqual(flattened, topology.flatten()) def test_cpu_topology_construction(self): """Test CPU Topology Construction""" cluster_0 = [0, 3, 4, 5] cluster_1 = [1, 2] clusters = [cluster_0, cluster_1] topology = Topology(clusters=clusters) # Check cluster level creation cluster_groups = [[0, 3, 4, 5], [1, 2]] self.assertTrue(topology.has_level("cluster")) self.assertEqual(cluster_groups, topology.get_level("cluster")) # Check cpu level creation cpu_groups = [[0], [1], [2], [3], [4], [5]] self.assertTrue(topology.has_level("cpu")) self.assertEqual(cpu_groups, topology.get_level("cpu")) # Check "all" level all_groups = [[0, 1, 2, 3, 4, 5]] self.assertEqual(all_groups, topology.get_level("all")) def test_level_span(self): """TestTopology: level_span""" level_groups = [[1, 2], [0, 3, 4, 5]] level = "test_level" topology = Topology() topology.add_to_level(level, level_groups) self.assertEqual(topology.level_span(level), 2) def test_group_index(self): """TestTopology: get_index""" level_groups = [[1, 2], [0, 3, 4, 5]] level = "test_level" topology = Topology() topology.add_to_level(level, level_groups) self.assertEqual(topology.get_index(level, [1, 2]), 0) self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1) class BaseTestStats(unittest.TestCase): def setUp(self): trace = trappy.BareTrace() data = { "identifier": [ 0, 0, 0, 1, 1, 1, ], "result": [ "fire", "blank", "fire", "blank", "fire", "blank", ], } index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time") data_frame = pd.DataFrame(data, index=index) trace.add_parsed_event("aim_and_fire", data_frame) self._trace = trace self.topology = Topology(clusters=[[0], [1]]) class TestTrigger(BaseTestStats): def test_trigger_generation(self): """TestTrigger: generate""" filters = { "result": "fire" } event_class = self._trace.aim_and_fire value = 1 pivot = "identifier" trigger = Trigger(self._trace, event_class, filters, value, pivot) expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time")) assert_series_equal(expected, trigger.generate(0)) expected = pd.Series([1], index=pd.Index([0.5], name="Time")) assert_series_equal(expected, trigger.generate(1)) def test_trigger_with_func(self): """Trigger works with a function or lambda as filter""" def my_filter(val): return val.startswith("fi") trigger = Trigger(self._trace, self._trace.aim_and_fire, filters={"result": my_filter}, value=1, pivot="identifier") expected = pd.Series([1], index=pd.Index([0.5], name="Time")) assert_series_equal(expected, trigger.generate(1)) my_filters = {"result": lambda x: x.startswith("bl")} trigger = Trigger(self._trace, self._trace.aim_and_fire, filters=my_filters, value=1, pivot="identifier") expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time")) assert_series_equal(expected, trigger.generate(1)) def test_trigger_with_callable_class(self): """Trigger works with a callable class as filter""" class my_filter(object): def __init__(self, val_out): self.prev_val = 0 self.val_out = val_out def __call__(self, val): ret = self.prev_val == self.val_out self.prev_val = val return ret trigger = Trigger(self._trace, self._trace.aim_and_fire, filters={"identifier": my_filter(1)}, value=1, pivot="result") expected = pd.Series([1], index=pd.Index([0.6], name="Time")) assert_series_equal(expected, trigger.generate("blank")) def test_filter_prev_values(self): """Trigger works with a filter that depends on previous values of the same pivot""" # We generate an example in which we want a trigger whenever the # identifier is no longer 1 for blank class my_filter(object): def __init__(self, val_out): self.prev_val = 0 self.val_out = val_out def __call__(self, val): ret = self.prev_val == self.val_out self.prev_val = val return ret trace = trappy.BareTrace() data = collections.OrderedDict([ (0.1, ["blank", 1]), (0.2, ["fire", 1]), (0.3, ["blank", 0]), # value is no longer 1, trigger (0.4, ["blank", 1]), (0.5, ["fire", 0]), # This should NOT trigger (0.6, ["blank", 0]), # value is no longer 1 for blank, trigger ]) data_frame = pd.DataFrame.from_dict(data, orient="index", ) data_frame.columns = ["result", "identifier"] trace.add_parsed_event("aim_and_fire", data_frame) trigger = Trigger(trace, trace.aim_and_fire, filters={"identifier": my_filter(1)}, value=-1, pivot="result") expected = pd.Series([-1, -1], index=[0.3, 0.6]) assert_series_equal(expected, trigger.generate("blank")) class TestAggregator(BaseTestStats): def test_scalar_aggfunc_single_trigger(self): """TestAggregator: 1 trigger scalar aggfunc""" def aggfunc(series): return series.sum() filters = { "result": "fire" } event_class = self._trace.aim_and_fire value = 1 pivot = "identifier" trigger = Trigger(self._trace, event_class, filters, value, pivot) aggregator = MultiTriggerAggregator([trigger], self.topology, aggfunc=aggfunc) # There are three "fire" in total # The all level in topology looks like # [[0, 1]] result = aggregator.aggregate(level="all") self.assertEqual(result, [3.0]) # There are two "fire" on the first node group and a # a single "fire" on the second node group at the cluster # level which looks like # [[0], [1]] result = aggregator.aggregate(level="cluster") self.assertEqual(result, [2.0, 1.0]) def test_vector_aggfunc_single_trigger(self): """TestAggregator: 1 trigger vector aggfunc""" def aggfunc(series): return series.cumsum() filters = { "result": "fire" } event_class = self._trace.aim_and_fire value = 1 pivot = "identifier" trigger = Trigger(self._trace, event_class, filters, value, pivot) aggregator = MultiTriggerAggregator([trigger], self.topology, aggfunc=aggfunc) # There are three "fire" in total # The all level in topology looks like # [[0, 1]] result = aggregator.aggregate(level="all") expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0], index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) ) assert_series_equal(result[0], expected_result) def test_vector_aggfunc_multiple_trigger(self): """TestAggregator: multi trigger vector aggfunc""" def aggfunc(series): return series.cumsum() filters = { "result": "fire" } event_class = self._trace.aim_and_fire value = 1 pivot = "identifier" trigger_fire = Trigger(self._trace, event_class, filters, value, pivot) filters = { "result": "blank" } value = -1 trigger_blank = Trigger(self._trace, event_class, filters, value, pivot) aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], self.topology, aggfunc=aggfunc) # There are three "fire" in total # The all level in topology looks like # [[0, 1]] result = aggregator.aggregate(level="all") expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0], index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) ) assert_series_equal(result[0], expected_result) def test_default_aggfunc_multiple_trigger(self): """MultiTriggerAggregator with the default aggfunc""" trigger_fire = Trigger(self._trace, self._trace.aim_and_fire, filters={"result": "fire"}, pivot="identifier", value=1) trigger_blank = Trigger(self._trace, self._trace.aim_and_fire, filters={"result": "blank"}, pivot="identifier", value=2) aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], self.topology) results = aggregator.aggregate(level="cpu") expected_results = [ pd.Series([1., 2., 1., 0., 0., 0.], index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), pd.Series([0., 0., 0., 2., 1., 2.], index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), ] self.assertEquals(len(results), len(expected_results)) for result, expected in zip(results, expected_results): assert_series_equal(result, expected)