diff options
author | Wyatt Hepler <hepler@google.com> | 2022-02-16 16:13:38 -0800 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-04-05 00:11:59 +0000 |
commit | 03d14dcf193da5d5634322cf4070f7bc4df15e40 (patch) | |
tree | 9c4ac0b202d9f4ed7130533797a66cfd391f5cdf | |
parent | ff9933f2cf97eddb625d547ef9a86e9af95326ac (diff) | |
download | pigweed-03d14dcf193da5d5634322cf4070f7bc4df15e40.tar.gz |
pw_tokenizer: Message parser for undecoded messages
pw_tokenizer.parse_message decodes Base64 format tokenized messages as
much as possible. It attempts to decode the arguments by trying
different combinations of format specifiers.
Change-Id: I5c876d0b253e4a46772b00ae2686806aaa15f7e4
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/44260
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
-rw-r--r-- | pw_tokenizer/docs.rst | 38 | ||||
-rw-r--r-- | pw_tokenizer/py/BUILD.gn | 1 | ||||
-rwxr-xr-x | pw_tokenizer/py/decode_test.py | 56 | ||||
-rw-r--r-- | pw_tokenizer/py/pw_tokenizer/decode.py | 30 | ||||
-rwxr-xr-x | pw_tokenizer/py/pw_tokenizer/detokenize.py | 21 | ||||
-rw-r--r-- | pw_tokenizer/py/pw_tokenizer/parse_message.py | 182 |
6 files changed, 299 insertions, 29 deletions
diff --git a/pw_tokenizer/docs.rst b/pw_tokenizer/docs.rst index e124e125e..dc908ea4b 100644 --- a/pw_tokenizer/docs.rst +++ b/pw_tokenizer/docs.rst @@ -954,6 +954,44 @@ functions. TransmitLogMessage(base64_buffer, base64_size); } +Investigating undecoded messages +-------------------------------- +Tokenized messages cannot be decoded if the token is not recognized. The Python +package includes the ``parse_message`` tool, which parses tokenized Base64 +messages without looking up the token in a database. This tool attempts to guess +the types of the arguments and displays potential ways to decode them. + +This tool can be used to extract argument information from an otherwise unusable +message. It could help identify which statement in the code produced the +message. This tool is not particularly helpful for tokenized messages without +arguments, since all it can do is show the value of the unknown token. + +The tool is executed by passing Base64 tokenized messages, with or without the +``$`` prefix, to ``pw_tokenizer.parse_message``. Pass ``-h`` or ``--help`` to +see full usage information. + +Example +^^^^^^^ +.. code-block:: + + $ python -m pw_tokenizer.parse_message '$329JMwA=' koSl524TRkFJTEVEX1BSRUNPTkRJVElPTgJPSw== --specs %s %d + + INF Decoding arguments for '$329JMwA=' + INF Binary: b'\xdfoI3\x00' [df 6f 49 33 00] (5 bytes) + INF Token: 0x33496fdf + INF Args: b'\x00' [00] (1 bytes) + INF Decoding with up to 8 %s or %d arguments + INF Attempt 1: [%s] + INF Attempt 2: [%d] 0 + + INF Decoding arguments for '$koSl524TRkFJTEVEX1BSRUNPTkRJVElPTgJPSw==' + INF Binary: b'\x92\x84\xa5\xe7n\x13FAILED_PRECONDITION\x02OK' [92 84 a5 e7 6e 13 46 41 49 4c 45 44 5f 50 52 45 43 4f 4e 44 49 54 49 4f 4e 02 4f 4b] (28 bytes) + INF Token: 0xe7a58492 + INF Args: b'n\x13FAILED_PRECONDITION\x02OK' [6e 13 46 41 49 4c 45 44 5f 50 52 45 43 4f 4e 44 49 54 49 4f 4e 02 4f 4b] (24 bytes) + INF Decoding with up to 8 %s or %d arguments + INF Attempt 1: [%d %s %d %d %d] 55 FAILED_PRECONDITION 1 -40 -38 + INF Attempt 2: [%d %s %s] 55 FAILED_PRECONDITION OK + Command line utilities ^^^^^^^^^^^^^^^^^^^^^^ ``pw_tokenizer`` provides two standalone command line utilities for detokenizing diff --git a/pw_tokenizer/py/BUILD.gn b/pw_tokenizer/py/BUILD.gn index b0006ed10..2ac30448c 100644 --- a/pw_tokenizer/py/BUILD.gn +++ b/pw_tokenizer/py/BUILD.gn @@ -40,6 +40,7 @@ pw_python_package("py") { "pw_tokenizer/detokenize.py", "pw_tokenizer/elf_reader.py", "pw_tokenizer/encode.py", + "pw_tokenizer/parse_message.py", "pw_tokenizer/proto/__init__.py", "pw_tokenizer/serial_detokenizer.py", "pw_tokenizer/tokens.py", diff --git a/pw_tokenizer/py/decode_test.py b/pw_tokenizer/py/decode_test.py index c0c436616..be08eb824 100755 --- a/pw_tokenizer/py/decode_test.py +++ b/pw_tokenizer/py/decode_test.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2020 The Pigweed Authors +# Copyright 2022 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of @@ -14,6 +14,7 @@ # the License. """Tests the tokenized string decode module.""" +from datetime import datetime import unittest import tokenized_string_decoding_test_data as tokenized_string @@ -21,7 +22,7 @@ import varint_test_data from pw_tokenizer import decode -def error(msg, value=None): +def error(msg, value=None) -> str: """Formats msg as the message for an argument that failed to parse.""" if value is None: return '<[{}]>'.format(msg) @@ -30,13 +31,13 @@ def error(msg, value=None): class TestDecodeTokenized(unittest.TestCase): """Tests decoding tokenized strings with various arguments.""" - def test_decode_generated_data(self): + def test_decode_generated_data(self) -> None: self.assertGreater(len(tokenized_string.TEST_DATA), 100) for fmt, decoded, encoded in tokenized_string.TEST_DATA: self.assertEqual(decode.decode(fmt, encoded, True), decoded) - def test_unicode_decode_errors(self): + def test_unicode_decode_errors(self) -> None: """Tests unicode errors, which do not occur in the C++ decoding code.""" self.assertEqual(decode.decode('Why, %c', b'\x01', True), 'Why, ' + error('%c ERROR', -1)) @@ -55,12 +56,12 @@ class TestDecodeTokenized(unittest.TestCase): self.assertEqual(decode.decode('%c', b'\xff\xff\xff\xff\x0f', True), error('%c ERROR', -2147483648)) - def test_ignore_errors(self): + def test_ignore_errors(self) -> None: self.assertEqual(decode.decode('Why, %c', b'\x01'), 'Why, %c') self.assertEqual(decode.decode('%s %d', b'\x01!'), '! %d') - def test_pointer(self): + def test_pointer(self) -> None: """Tests pointer args, which are not natively supported in Python.""" self.assertEqual(decode.decode('Hello: %p', b'\x00', True), 'Hello: 0x00000000') @@ -69,8 +70,8 @@ class TestDecodeTokenized(unittest.TestCase): class TestIntegerDecoding(unittest.TestCase): - """Test decoding variable-length integers.""" - def test_decode_generated_data(self): + """Tests decoding variable-length integers.""" + def test_decode_generated_data(self) -> None: test_data = varint_test_data.TEST_DATA self.assertGreater(len(test_data), 100) @@ -86,5 +87,44 @@ class TestIntegerDecoding(unittest.TestCase): bytearray(encoded)).value) +class TestFormattedString(unittest.TestCase): + """Tests scoring how successfully a formatted string decoded.""" + def test_no_args(self) -> None: + result = decode.FormatString('string').format(b'') + + self.assertTrue(result.ok()) + self.assertEqual(result.score(), (True, True, 0, 0, datetime.max)) + + def test_one_arg(self) -> None: + result = decode.FormatString('%d').format(b'\0') + + self.assertTrue(result.ok()) + self.assertEqual(result.score(), (True, True, 0, 1, datetime.max)) + + def test_missing_args(self) -> None: + result = decode.FormatString('%p%d%d').format(b'\x02\x80') + + self.assertFalse(result.ok()) + self.assertEqual(result.score(), (False, True, -2, 3, datetime.max)) + self.assertGreater(result.score(), result.score(datetime.now())) + self.assertGreater(result.score(datetime.now()), + result.score(datetime.min)) + + def test_compare_score(self) -> None: + all_args_ok = decode.FormatString('%d%d%d').format(b'\0\0\0') + missing_one_arg = decode.FormatString('%d%d%d').format(b'\0\0') + missing_two_args = decode.FormatString('%d%d%d').format(b'\0') + all_args_extra_data = decode.FormatString('%d%d%d').format(b'\0\0\0\1') + missing_one_arg_extra_data = decode.FormatString('%d%d%d').format( + b'\0' + b'\x80' * 100) + + self.assertGreater(all_args_ok.score(), missing_one_arg.score()) + self.assertGreater(missing_one_arg.score(), missing_two_args.score()) + self.assertGreater(missing_two_args.score(), + all_args_extra_data.score()) + self.assertGreater(all_args_extra_data.score(), + missing_one_arg_extra_data.score()) + + if __name__ == '__main__': unittest.main() diff --git a/pw_tokenizer/py/pw_tokenizer/decode.py b/pw_tokenizer/py/pw_tokenizer/decode.py index 30d87711c..f6ca50364 100644 --- a/pw_tokenizer/py/pw_tokenizer/decode.py +++ b/pw_tokenizer/py/pw_tokenizer/decode.py @@ -20,6 +20,7 @@ Missing, truncated, or otherwise corrupted arguments are handled and displayed in the resulting string with an error message. """ +from datetime import datetime import re import struct from typing import Iterable, List, NamedTuple, Match, Sequence, Tuple @@ -275,7 +276,7 @@ class DecodedArg: return self.format() def __repr__(self) -> str: - return 'DecodedArg({!r})'.format(self) + return f'DecodedArg({self})' def parse_format_specifiers(format_string: str) -> Iterable[FormatSpec]: @@ -288,6 +289,33 @@ class FormattedString(NamedTuple): args: Sequence[DecodedArg] remaining: bytes + def ok(self) -> bool: + """Arg data decoded successfully and all expected args were found.""" + return all(arg.ok() for arg in self.args) and not self.remaining + + def score(self, date_removed: datetime = None) -> tuple: + """Returns a key for sorting by how successful a decode was. + + Decoded strings are sorted by whether they + + 1. decoded all bytes for all arguments without errors, + 2. decoded all data, + 3. have the fewest decoding errors, + 4. decoded the most arguments successfully, or + 5. have the most recent removal date, if they were removed. + + This must match the collision resolution logic in detokenize.cc. + + To format a list of FormattedStrings from most to least successful, + use sort(key=FormattedString.score, reverse=True). + """ + return ( + self.ok(), # decocoded all data and all expected args were found + not self.remaining, # decoded all data + -sum(not arg.ok() for arg in self.args), # fewest errors + len(self.args), # decoded the most arguments + date_removed or datetime.max) # most recently present + class FormatString: """Represents a printf-style format string.""" diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py index f4c59f040..8f94fa04e 100755 --- a/pw_tokenizer/py/pw_tokenizer/detokenize.py +++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py @@ -34,7 +34,6 @@ messages from a file or stdin. import argparse import base64 import binascii -from datetime import datetime import io import logging import os @@ -83,25 +82,7 @@ class DetokenizedString: for entry, fmt in format_string_entries: result = fmt.format(encoded_message[ENCODED_TOKEN.size:], show_errors) - - # Sort competing entries so the most likely matches appear first. - # Decoded strings are prioritized by whether they - # - # 1. decoded all bytes for all arguments without errors, - # 2. decoded all data, - # 3. have the fewest decoding errors, - # 4. decoded the most arguments successfully, or - # 5. have the most recent removal date, if they were removed. - # - # This must match the collision resolution logic in detokenize.cc. - score: Tuple = ( - all(arg.ok() for arg in result.args) and not result.remaining, - not result.remaining, # decoded all data - -sum(not arg.ok() for arg in result.args), # fewest errors - len(result.args), # decoded the most arguments - entry.date_removed or datetime.max) # most recently present - - decode_attempts.append((score, result)) + decode_attempts.append((result.score(entry.date_removed), result)) # Sort the attempts by the score so the most likely results are first. decode_attempts.sort(key=lambda value: value[0], reverse=True) diff --git a/pw_tokenizer/py/pw_tokenizer/parse_message.py b/pw_tokenizer/py/pw_tokenizer/parse_message.py new file mode 100644 index 000000000..f8655e1f3 --- /dev/null +++ b/pw_tokenizer/py/pw_tokenizer/parse_message.py @@ -0,0 +1,182 @@ +# Copyright 2022 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Parses the arguments in a Base64-encoded tokenized message. + +This is useful for attempting to decode tokenized messages with arguments for +which the token is not recognized. +""" + +import argparse +import base64 +from dataclasses import dataclass +import logging +import sys +from typing import Collection, Iterable, Iterator, Sequence + +import pw_cli.log +from pw_tokenizer.decode import FormatString, FormattedString + +_LOG: logging.Logger = logging.getLogger('pw_tokenizer') + +DEFAULT_FORMAT_SPECS = ( + '%s', + '%d', + '%f', +) + +DEFAULT_MAX_ARGS = 8 +PREFIX = '$' + + +def attempt_to_decode( + arg_data: bytes, + format_specs: Collection[str] = DEFAULT_FORMAT_SPECS, + max_args: int = DEFAULT_MAX_ARGS, + yield_failures: bool = False) -> Iterator[FormattedString]: + """Attemps to decode arguments using the provided format specifiers.""" + format_strings = [(0, '')] # (argument count, format string) + + # Each argument requires at least 1 byte. + max_args = min(max_args, len(arg_data)) + + while format_strings: + arg_count, string = format_strings.pop(0) + decode_attempt = FormatString(string).format(arg_data) + + if yield_failures or decode_attempt.ok(): + yield decode_attempt + + if arg_count < max_args: + format_strings.extend( + (arg_count + 1, string + spec) for spec in format_specs) + + +@dataclass(frozen=True) +class TokenizedMessage: + string: str + binary: bytes + + @property + def token(self) -> int: + return int.from_bytes(self.binary[:4], 'little') + + @property + def binary_args(self) -> bytes: + return self.binary[4:] + + @classmethod + def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage': + if not message.startswith(prefix): + raise ValueError( + f'{message} does not start wtih {prefix!r} as expected') + + binary = base64.b64decode(message[1:]) + + if len(binary) < 4: + raise ValueError(f'{message} is only {len(binary)} bytes; ' + 'tokenized messages must be at least 4 bytes') + + return cls(message, binary) + + +def _read_stdin(): + try: + while True: + yield input() + except KeyboardInterrupt: + return + + +def _text_list(items: Sequence, conjunction: str = 'or') -> str: + if len(items) == 1: + return str(items[0]) + + return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}' + + +def main(messages: Iterable[str], max_args: int, specs: Sequence[str], + show_failures: bool) -> int: + """Parses the arguments for a series of tokenized messages.""" + exit_code = 0 + + for message in iter(messages) if messages else _read_stdin(): + if not message: + continue + + if not message.startswith(PREFIX): + message = PREFIX + message + + _LOG.info('Decoding arguments for %r', message) + try: + parsed = TokenizedMessage.parse(message) + except ValueError as exc: + _LOG.error('%s', exc) + exit_code = 2 + continue + + _LOG.info('Binary: %r [%s] (%d bytes)', parsed.binary, + parsed.binary.hex(' ', 1), len(parsed.binary)) + _LOG.info('Token: 0x%08x', parsed.token) + _LOG.info('Args: %r [%s] (%d bytes)', parsed.binary_args, + parsed.binary_args.hex(' ', 1), len(parsed.binary_args)) + _LOG.info('Decoding with up to %d %s arguments', max_args, + _text_list(specs)) + + results = sorted(attempt_to_decode(parsed.binary_args, specs, max_args, + show_failures), + key=FormattedString.score, + reverse=True) + + if not any(result.ok() for result in results): + _LOG.warning( + ' No combinations of up to %d %s arguments decoded ' + 'successfully', max_args, _text_list(specs)) + exit_code = 1 + + for i, result in enumerate(results, 1): + _LOG.info( # pylint: disable=logging-fstring-interpolation + f' Attempt %{len(str(len(results)))}d: [%s] %s', i, + ' '.join(str(a.specifier) for a in result.args), + ' '.join(str(a) for a in result.args)) + print() + + return exit_code + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--max-args', + default=DEFAULT_MAX_ARGS, + type=int, + help='Maximum number of printf-style arguments') + parser.add_argument('--specs', + nargs='*', + default=DEFAULT_FORMAT_SPECS, + help='Which printf-style format specifiers to check') + parser.add_argument('--show-failures', + action='store_true', + help='Show argument combintations that fail to decode') + parser.add_argument( + 'messages', + nargs='*', + help= + 'Base64-encoded tokenized messages to decode; omit to read from stdin') + return parser.parse_args() + + +if __name__ == '__main__': + pw_cli.log.install() + sys.exit(main(**vars(_parse_args()))) |