diff options
Diffstat (limited to 'catapult/tracing/tracing/trace_data/trace_data_unittest.py')
-rw-r--r-- | catapult/tracing/tracing/trace_data/trace_data_unittest.py | 177 |
1 files changed, 96 insertions, 81 deletions
diff --git a/catapult/tracing/tracing/trace_data/trace_data_unittest.py b/catapult/tracing/tracing/trace_data/trace_data_unittest.py index dd91596c..fdc48e2c 100644 --- a/catapult/tracing/tracing/trace_data/trace_data_unittest.py +++ b/catapult/tracing/tracing/trace_data/trace_data_unittest.py @@ -2,98 +2,113 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -import datetime -import exceptions +import base64 +import json import os -import shutil import tempfile import unittest +from py_utils import tempfile_ext from tracing.trace_data import trace_data -from tracing_build import html2trace class TraceDataTest(unittest.TestCase): - def testSerialize(self): - test_dir = tempfile.mkdtemp() - trace_path = os.path.join(test_dir, 'test_trace.json') - try: - ri = trace_data.CreateTraceDataFromRawData({'traceEvents': [1, 2, 3]}) - ri.Serialize(trace_path) - with open(trace_path) as f: - json_traces = html2trace.ReadTracesFromHTMLFile(f) - self.assertEqual(json_traces, [{'traceEvents': [1, 2, 3]}]) - finally: - shutil.rmtree(test_dir) - - def testEmptyArrayValue(self): - # We can import empty lists and empty string. - d = trace_data.CreateTraceDataFromRawData([]) - self.assertFalse(d.HasTracesFor(trace_data.CHROME_TRACE_PART)) - - def testInvalidTrace(self): - with self.assertRaises(AssertionError): - trace_data.CreateTraceDataFromRawData({'hello': 1}) - - def testListForm(self): - d = trace_data.CreateTraceDataFromRawData([{'ph': 'B'}]) + def testHasTracesForChrome(self): + d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) self.assertTrue(d.HasTracesFor(trace_data.CHROME_TRACE_PART)) - events = d.GetTracesFor(trace_data.CHROME_TRACE_PART)[0].get( - 'traceEvents', []) - self.assertEquals(1, len(events)) - def testStringForm(self): - d = trace_data.CreateTraceDataFromRawData('[{"ph": "B"}]') - self.assertTrue(d.HasTracesFor(trace_data.CHROME_TRACE_PART)) - events = d.GetTracesFor(trace_data.CHROME_TRACE_PART)[0].get( - 'traceEvents', []) - self.assertEquals(1, len(events)) - - -class TraceDataBuilderTest(unittest.TestCase): - def testBasicChrome(self): - builder = trace_data.TraceDataBuilder() - builder.AddTraceFor(trace_data.CHROME_TRACE_PART, - {'traceEvents': [1, 2, 3]}) - - d = builder.AsData() - self.assertTrue(d.HasTracesFor(trace_data.CHROME_TRACE_PART)) - - self.assertRaises(Exception, builder.AsData) + def testHasNotTracesForCpu(self): + d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) + self.assertFalse(d.HasTracesFor(trace_data.CPU_TRACE_DATA)) - def testSetTraceFor(self): - telemetry_trace = { - 'traceEvents': [1, 2, 3], - 'metadata': { - 'field1': 'value1' - } - } + def testGetTracesForChrome(self): + d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) + ts = d.GetTracesFor(trace_data.CHROME_TRACE_PART) + self.assertEqual(len(ts), 1) + self.assertEqual(ts[0], {'traceEvents': [{'ph': 'B'}]}) - builder = trace_data.TraceDataBuilder() - builder.AddTraceFor(trace_data.TELEMETRY_PART, telemetry_trace) - d = builder.AsData() + def testGetNoTracesForCpu(self): + d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) + ts = d.GetTracesFor(trace_data.CPU_TRACE_DATA) + self.assertEqual(ts, []) - self.assertEqual(d.GetTracesFor(trace_data.TELEMETRY_PART), - [telemetry_trace]) - def testSetTraceForRaisesWithInvalidPart(self): - builder = trace_data.TraceDataBuilder() - - self.assertRaises(exceptions.AssertionError, - lambda: builder.AddTraceFor('not_a_trace_part', {})) - - def testSetTraceForRaisesWithInvalidTrace(self): - builder = trace_data.TraceDataBuilder() - - self.assertRaises( - exceptions.AssertionError, - lambda: builder.AddTraceFor(trace_data.TELEMETRY_PART, - datetime.time.min)) - - def testSetTraceForRaisesAfterAsData(self): - builder = trace_data.TraceDataBuilder() - builder.AsData() - - self.assertRaises( - exceptions.Exception, - lambda: builder.AddTraceFor(trace_data.TELEMETRY_PART, {})) +class TraceDataBuilderTest(unittest.TestCase): + def testAddTraceDataAndSerialize(self): + with tempfile_ext.TemporaryFileName() as trace_path: + with trace_data.TraceDataBuilder() as builder: + builder.AddTraceFor(trace_data.CHROME_TRACE_PART, + {'traceEvents': [1, 2, 3]}) + builder.Serialize(trace_path) + self.assertTrue(os.path.exists(trace_path)) + self.assertGreater(os.stat(trace_path).st_size, 0) # File not empty. + + def testAddTraceForRaisesWithInvalidPart(self): + with trace_data.TraceDataBuilder() as builder: + with self.assertRaises(TypeError): + builder.AddTraceFor('not_a_trace_part', {}) + + def testAddTraceWithUnstructuredData(self): + with trace_data.TraceDataBuilder() as builder: + builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace', + allow_unstructured=True) + + def testAddTraceRaisesWithImplicitUnstructuredData(self): + with trace_data.TraceDataBuilder() as builder: + with self.assertRaises(ValueError): + builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace') + + def testAddTraceFileFor(self): + original_data = {'msg': 'The answer is 42'} + with tempfile.NamedTemporaryFile(delete=False) as source: + json.dump(original_data, source) + with trace_data.TraceDataBuilder() as builder: + builder.AddTraceFileFor(trace_data.CHROME_TRACE_PART, source.name) + self.assertFalse(os.path.exists(source.name)) + out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) + + self.assertEqual(original_data, out_data) + + def testOpenTraceHandleFor(self): + original_data = {'msg': 'The answer is 42'} + with trace_data.TraceDataBuilder() as builder: + with builder.OpenTraceHandleFor(trace_data.CHROME_TRACE_PART) as handle: + handle.write(json.dumps(original_data)) + out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) + + # Trace handle should be cleaned up. + self.assertFalse(os.path.exists(handle.name)) + self.assertEqual(original_data, out_data) + + def testOpenTraceHandleForCompressedData(self): + original_data = {'msg': 'The answer is 42'} + # gzip.compress() does not work in python 2, so hardcode the encoded data. + compressed_data = base64.b64decode( + 'H4sIAIDMblwAA6tWyi1OV7JSUArJSFVIzCsuTy1SyCxWMDFSquUCAA4QMtscAAAA') + with trace_data.TraceDataBuilder() as builder: + with builder.OpenTraceHandleFor( + trace_data.CHROME_TRACE_PART, compressed=True) as handle: + handle.write(compressed_data) + out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) + + # Trace handle should be cleaned up. + self.assertFalse(os.path.exists(handle.name)) + self.assertEqual(original_data, out_data) + + def testCantWriteAfterCleanup(self): + with trace_data.TraceDataBuilder() as builder: + builder.AddTraceFor(trace_data.CHROME_TRACE_PART, + {'traceEvents': [1, 2, 3]}) + builder.CleanUpTraceData() + with self.assertRaises(RuntimeError): + builder.AddTraceFor(trace_data.CHROME_TRACE_PART, + {'traceEvents': [1, 2, 3]}) + + def testCantWriteAfterFreeze(self): + with trace_data.TraceDataBuilder() as builder: + builder.AddTraceFor(trace_data.CHROME_TRACE_PART, + {'traceEvents': [1, 2, 3]}) + builder.Freeze() + with self.assertRaises(RuntimeError): + builder.AddTraceFor(trace_data.CHROME_TRACE_PART, + {'traceEvents': [1, 2, 3]}) |