aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2022-02-16 16:13:38 -0800
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-04-05 00:11:59 +0000
commit03d14dcf193da5d5634322cf4070f7bc4df15e40 (patch)
tree9c4ac0b202d9f4ed7130533797a66cfd391f5cdf
parentff9933f2cf97eddb625d547ef9a86e9af95326ac (diff)
downloadpigweed-03d14dcf193da5d5634322cf4070f7bc4df15e40.tar.gz
pw_tokenizer: Message parser for undecoded messages
pw_tokenizer.parse_message decodes Base64 format tokenized messages as much as possible. It attempts to decode the arguments by trying different combinations of format specifiers. Change-Id: I5c876d0b253e4a46772b00ae2686806aaa15f7e4 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/44260 Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com> Reviewed-by: Keir Mierle <keir@google.com> Commit-Queue: Wyatt Hepler <hepler@google.com>
-rw-r--r--pw_tokenizer/docs.rst38
-rw-r--r--pw_tokenizer/py/BUILD.gn1
-rwxr-xr-xpw_tokenizer/py/decode_test.py56
-rw-r--r--pw_tokenizer/py/pw_tokenizer/decode.py30
-rwxr-xr-xpw_tokenizer/py/pw_tokenizer/detokenize.py21
-rw-r--r--pw_tokenizer/py/pw_tokenizer/parse_message.py182
6 files changed, 299 insertions, 29 deletions
diff --git a/pw_tokenizer/docs.rst b/pw_tokenizer/docs.rst
index e124e125e..dc908ea4b 100644
--- a/pw_tokenizer/docs.rst
+++ b/pw_tokenizer/docs.rst
@@ -954,6 +954,44 @@ functions.
TransmitLogMessage(base64_buffer, base64_size);
}
+Investigating undecoded messages
+--------------------------------
+Tokenized messages cannot be decoded if the token is not recognized. The Python
+package includes the ``parse_message`` tool, which parses tokenized Base64
+messages without looking up the token in a database. This tool attempts to guess
+the types of the arguments and displays potential ways to decode them.
+
+This tool can be used to extract argument information from an otherwise unusable
+message. It could help identify which statement in the code produced the
+message. This tool is not particularly helpful for tokenized messages without
+arguments, since all it can do is show the value of the unknown token.
+
+The tool is executed by passing Base64 tokenized messages, with or without the
+``$`` prefix, to ``pw_tokenizer.parse_message``. Pass ``-h`` or ``--help`` to
+see full usage information.
+
+Example
+^^^^^^^
+.. code-block::
+
+ $ python -m pw_tokenizer.parse_message '$329JMwA=' koSl524TRkFJTEVEX1BSRUNPTkRJVElPTgJPSw== --specs %s %d
+
+ INF Decoding arguments for '$329JMwA='
+ INF Binary: b'\xdfoI3\x00' [df 6f 49 33 00] (5 bytes)
+ INF Token: 0x33496fdf
+ INF Args: b'\x00' [00] (1 bytes)
+ INF Decoding with up to 8 %s or %d arguments
+ INF Attempt 1: [%s]
+ INF Attempt 2: [%d] 0
+
+ INF Decoding arguments for '$koSl524TRkFJTEVEX1BSRUNPTkRJVElPTgJPSw=='
+ INF Binary: b'\x92\x84\xa5\xe7n\x13FAILED_PRECONDITION\x02OK' [92 84 a5 e7 6e 13 46 41 49 4c 45 44 5f 50 52 45 43 4f 4e 44 49 54 49 4f 4e 02 4f 4b] (28 bytes)
+ INF Token: 0xe7a58492
+ INF Args: b'n\x13FAILED_PRECONDITION\x02OK' [6e 13 46 41 49 4c 45 44 5f 50 52 45 43 4f 4e 44 49 54 49 4f 4e 02 4f 4b] (24 bytes)
+ INF Decoding with up to 8 %s or %d arguments
+ INF Attempt 1: [%d %s %d %d %d] 55 FAILED_PRECONDITION 1 -40 -38
+ INF Attempt 2: [%d %s %s] 55 FAILED_PRECONDITION OK
+
Command line utilities
^^^^^^^^^^^^^^^^^^^^^^
``pw_tokenizer`` provides two standalone command line utilities for detokenizing
diff --git a/pw_tokenizer/py/BUILD.gn b/pw_tokenizer/py/BUILD.gn
index b0006ed10..2ac30448c 100644
--- a/pw_tokenizer/py/BUILD.gn
+++ b/pw_tokenizer/py/BUILD.gn
@@ -40,6 +40,7 @@ pw_python_package("py") {
"pw_tokenizer/detokenize.py",
"pw_tokenizer/elf_reader.py",
"pw_tokenizer/encode.py",
+ "pw_tokenizer/parse_message.py",
"pw_tokenizer/proto/__init__.py",
"pw_tokenizer/serial_detokenizer.py",
"pw_tokenizer/tokens.py",
diff --git a/pw_tokenizer/py/decode_test.py b/pw_tokenizer/py/decode_test.py
index c0c436616..be08eb824 100755
--- a/pw_tokenizer/py/decode_test.py
+++ b/pw_tokenizer/py/decode_test.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright 2020 The Pigweed Authors
+# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
@@ -14,6 +14,7 @@
# the License.
"""Tests the tokenized string decode module."""
+from datetime import datetime
import unittest
import tokenized_string_decoding_test_data as tokenized_string
@@ -21,7 +22,7 @@ import varint_test_data
from pw_tokenizer import decode
-def error(msg, value=None):
+def error(msg, value=None) -> str:
"""Formats msg as the message for an argument that failed to parse."""
if value is None:
return '<[{}]>'.format(msg)
@@ -30,13 +31,13 @@ def error(msg, value=None):
class TestDecodeTokenized(unittest.TestCase):
"""Tests decoding tokenized strings with various arguments."""
- def test_decode_generated_data(self):
+ def test_decode_generated_data(self) -> None:
self.assertGreater(len(tokenized_string.TEST_DATA), 100)
for fmt, decoded, encoded in tokenized_string.TEST_DATA:
self.assertEqual(decode.decode(fmt, encoded, True), decoded)
- def test_unicode_decode_errors(self):
+ def test_unicode_decode_errors(self) -> None:
"""Tests unicode errors, which do not occur in the C++ decoding code."""
self.assertEqual(decode.decode('Why, %c', b'\x01', True),
'Why, ' + error('%c ERROR', -1))
@@ -55,12 +56,12 @@ class TestDecodeTokenized(unittest.TestCase):
self.assertEqual(decode.decode('%c', b'\xff\xff\xff\xff\x0f', True),
error('%c ERROR', -2147483648))
- def test_ignore_errors(self):
+ def test_ignore_errors(self) -> None:
self.assertEqual(decode.decode('Why, %c', b'\x01'), 'Why, %c')
self.assertEqual(decode.decode('%s %d', b'\x01!'), '! %d')
- def test_pointer(self):
+ def test_pointer(self) -> None:
"""Tests pointer args, which are not natively supported in Python."""
self.assertEqual(decode.decode('Hello: %p', b'\x00', True),
'Hello: 0x00000000')
@@ -69,8 +70,8 @@ class TestDecodeTokenized(unittest.TestCase):
class TestIntegerDecoding(unittest.TestCase):
- """Test decoding variable-length integers."""
- def test_decode_generated_data(self):
+ """Tests decoding variable-length integers."""
+ def test_decode_generated_data(self) -> None:
test_data = varint_test_data.TEST_DATA
self.assertGreater(len(test_data), 100)
@@ -86,5 +87,44 @@ class TestIntegerDecoding(unittest.TestCase):
bytearray(encoded)).value)
+class TestFormattedString(unittest.TestCase):
+ """Tests scoring how successfully a formatted string decoded."""
+ def test_no_args(self) -> None:
+ result = decode.FormatString('string').format(b'')
+
+ self.assertTrue(result.ok())
+ self.assertEqual(result.score(), (True, True, 0, 0, datetime.max))
+
+ def test_one_arg(self) -> None:
+ result = decode.FormatString('%d').format(b'\0')
+
+ self.assertTrue(result.ok())
+ self.assertEqual(result.score(), (True, True, 0, 1, datetime.max))
+
+ def test_missing_args(self) -> None:
+ result = decode.FormatString('%p%d%d').format(b'\x02\x80')
+
+ self.assertFalse(result.ok())
+ self.assertEqual(result.score(), (False, True, -2, 3, datetime.max))
+ self.assertGreater(result.score(), result.score(datetime.now()))
+ self.assertGreater(result.score(datetime.now()),
+ result.score(datetime.min))
+
+ def test_compare_score(self) -> None:
+ all_args_ok = decode.FormatString('%d%d%d').format(b'\0\0\0')
+ missing_one_arg = decode.FormatString('%d%d%d').format(b'\0\0')
+ missing_two_args = decode.FormatString('%d%d%d').format(b'\0')
+ all_args_extra_data = decode.FormatString('%d%d%d').format(b'\0\0\0\1')
+ missing_one_arg_extra_data = decode.FormatString('%d%d%d').format(
+ b'\0' + b'\x80' * 100)
+
+ self.assertGreater(all_args_ok.score(), missing_one_arg.score())
+ self.assertGreater(missing_one_arg.score(), missing_two_args.score())
+ self.assertGreater(missing_two_args.score(),
+ all_args_extra_data.score())
+ self.assertGreater(all_args_extra_data.score(),
+ missing_one_arg_extra_data.score())
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/pw_tokenizer/py/pw_tokenizer/decode.py b/pw_tokenizer/py/pw_tokenizer/decode.py
index 30d87711c..f6ca50364 100644
--- a/pw_tokenizer/py/pw_tokenizer/decode.py
+++ b/pw_tokenizer/py/pw_tokenizer/decode.py
@@ -20,6 +20,7 @@ Missing, truncated, or otherwise corrupted arguments are handled and displayed
in the resulting string with an error message.
"""
+from datetime import datetime
import re
import struct
from typing import Iterable, List, NamedTuple, Match, Sequence, Tuple
@@ -275,7 +276,7 @@ class DecodedArg:
return self.format()
def __repr__(self) -> str:
- return 'DecodedArg({!r})'.format(self)
+ return f'DecodedArg({self})'
def parse_format_specifiers(format_string: str) -> Iterable[FormatSpec]:
@@ -288,6 +289,33 @@ class FormattedString(NamedTuple):
args: Sequence[DecodedArg]
remaining: bytes
+ def ok(self) -> bool:
+ """Arg data decoded successfully and all expected args were found."""
+ return all(arg.ok() for arg in self.args) and not self.remaining
+
+ def score(self, date_removed: datetime = None) -> tuple:
+ """Returns a key for sorting by how successful a decode was.
+
+ Decoded strings are sorted by whether they
+
+ 1. decoded all bytes for all arguments without errors,
+ 2. decoded all data,
+ 3. have the fewest decoding errors,
+ 4. decoded the most arguments successfully, or
+ 5. have the most recent removal date, if they were removed.
+
+ This must match the collision resolution logic in detokenize.cc.
+
+ To format a list of FormattedStrings from most to least successful,
+ use sort(key=FormattedString.score, reverse=True).
+ """
+ return (
+ self.ok(), # decocoded all data and all expected args were found
+ not self.remaining, # decoded all data
+ -sum(not arg.ok() for arg in self.args), # fewest errors
+ len(self.args), # decoded the most arguments
+ date_removed or datetime.max) # most recently present
+
class FormatString:
"""Represents a printf-style format string."""
diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py
index f4c59f040..8f94fa04e 100755
--- a/pw_tokenizer/py/pw_tokenizer/detokenize.py
+++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py
@@ -34,7 +34,6 @@ messages from a file or stdin.
import argparse
import base64
import binascii
-from datetime import datetime
import io
import logging
import os
@@ -83,25 +82,7 @@ class DetokenizedString:
for entry, fmt in format_string_entries:
result = fmt.format(encoded_message[ENCODED_TOKEN.size:],
show_errors)
-
- # Sort competing entries so the most likely matches appear first.
- # Decoded strings are prioritized by whether they
- #
- # 1. decoded all bytes for all arguments without errors,
- # 2. decoded all data,
- # 3. have the fewest decoding errors,
- # 4. decoded the most arguments successfully, or
- # 5. have the most recent removal date, if they were removed.
- #
- # This must match the collision resolution logic in detokenize.cc.
- score: Tuple = (
- all(arg.ok() for arg in result.args) and not result.remaining,
- not result.remaining, # decoded all data
- -sum(not arg.ok() for arg in result.args), # fewest errors
- len(result.args), # decoded the most arguments
- entry.date_removed or datetime.max) # most recently present
-
- decode_attempts.append((score, result))
+ decode_attempts.append((result.score(entry.date_removed), result))
# Sort the attempts by the score so the most likely results are first.
decode_attempts.sort(key=lambda value: value[0], reverse=True)
diff --git a/pw_tokenizer/py/pw_tokenizer/parse_message.py b/pw_tokenizer/py/pw_tokenizer/parse_message.py
new file mode 100644
index 000000000..f8655e1f3
--- /dev/null
+++ b/pw_tokenizer/py/pw_tokenizer/parse_message.py
@@ -0,0 +1,182 @@
+# Copyright 2022 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Parses the arguments in a Base64-encoded tokenized message.
+
+This is useful for attempting to decode tokenized messages with arguments for
+which the token is not recognized.
+"""
+
+import argparse
+import base64
+from dataclasses import dataclass
+import logging
+import sys
+from typing import Collection, Iterable, Iterator, Sequence
+
+import pw_cli.log
+from pw_tokenizer.decode import FormatString, FormattedString
+
+_LOG: logging.Logger = logging.getLogger('pw_tokenizer')
+
+DEFAULT_FORMAT_SPECS = (
+ '%s',
+ '%d',
+ '%f',
+)
+
+DEFAULT_MAX_ARGS = 8
+PREFIX = '$'
+
+
+def attempt_to_decode(
+ arg_data: bytes,
+ format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
+ max_args: int = DEFAULT_MAX_ARGS,
+ yield_failures: bool = False) -> Iterator[FormattedString]:
+ """Attemps to decode arguments using the provided format specifiers."""
+ format_strings = [(0, '')] # (argument count, format string)
+
+ # Each argument requires at least 1 byte.
+ max_args = min(max_args, len(arg_data))
+
+ while format_strings:
+ arg_count, string = format_strings.pop(0)
+ decode_attempt = FormatString(string).format(arg_data)
+
+ if yield_failures or decode_attempt.ok():
+ yield decode_attempt
+
+ if arg_count < max_args:
+ format_strings.extend(
+ (arg_count + 1, string + spec) for spec in format_specs)
+
+
+@dataclass(frozen=True)
+class TokenizedMessage:
+ string: str
+ binary: bytes
+
+ @property
+ def token(self) -> int:
+ return int.from_bytes(self.binary[:4], 'little')
+
+ @property
+ def binary_args(self) -> bytes:
+ return self.binary[4:]
+
+ @classmethod
+ def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage':
+ if not message.startswith(prefix):
+ raise ValueError(
+ f'{message} does not start wtih {prefix!r} as expected')
+
+ binary = base64.b64decode(message[1:])
+
+ if len(binary) < 4:
+ raise ValueError(f'{message} is only {len(binary)} bytes; '
+ 'tokenized messages must be at least 4 bytes')
+
+ return cls(message, binary)
+
+
+def _read_stdin():
+ try:
+ while True:
+ yield input()
+ except KeyboardInterrupt:
+ return
+
+
+def _text_list(items: Sequence, conjunction: str = 'or') -> str:
+ if len(items) == 1:
+ return str(items[0])
+
+ return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}'
+
+
+def main(messages: Iterable[str], max_args: int, specs: Sequence[str],
+ show_failures: bool) -> int:
+ """Parses the arguments for a series of tokenized messages."""
+ exit_code = 0
+
+ for message in iter(messages) if messages else _read_stdin():
+ if not message:
+ continue
+
+ if not message.startswith(PREFIX):
+ message = PREFIX + message
+
+ _LOG.info('Decoding arguments for %r', message)
+ try:
+ parsed = TokenizedMessage.parse(message)
+ except ValueError as exc:
+ _LOG.error('%s', exc)
+ exit_code = 2
+ continue
+
+ _LOG.info('Binary: %r [%s] (%d bytes)', parsed.binary,
+ parsed.binary.hex(' ', 1), len(parsed.binary))
+ _LOG.info('Token: 0x%08x', parsed.token)
+ _LOG.info('Args: %r [%s] (%d bytes)', parsed.binary_args,
+ parsed.binary_args.hex(' ', 1), len(parsed.binary_args))
+ _LOG.info('Decoding with up to %d %s arguments', max_args,
+ _text_list(specs))
+
+ results = sorted(attempt_to_decode(parsed.binary_args, specs, max_args,
+ show_failures),
+ key=FormattedString.score,
+ reverse=True)
+
+ if not any(result.ok() for result in results):
+ _LOG.warning(
+ ' No combinations of up to %d %s arguments decoded '
+ 'successfully', max_args, _text_list(specs))
+ exit_code = 1
+
+ for i, result in enumerate(results, 1):
+ _LOG.info( # pylint: disable=logging-fstring-interpolation
+ f' Attempt %{len(str(len(results)))}d: [%s] %s', i,
+ ' '.join(str(a.specifier) for a in result.args),
+ ' '.join(str(a) for a in result.args))
+ print()
+
+ return exit_code
+
+
+def _parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('--max-args',
+ default=DEFAULT_MAX_ARGS,
+ type=int,
+ help='Maximum number of printf-style arguments')
+ parser.add_argument('--specs',
+ nargs='*',
+ default=DEFAULT_FORMAT_SPECS,
+ help='Which printf-style format specifiers to check')
+ parser.add_argument('--show-failures',
+ action='store_true',
+ help='Show argument combintations that fail to decode')
+ parser.add_argument(
+ 'messages',
+ nargs='*',
+ help=
+ 'Base64-encoded tokenized messages to decode; omit to read from stdin')
+ return parser.parse_args()
+
+
+if __name__ == '__main__':
+ pw_cli.log.install()
+ sys.exit(main(**vars(_parse_args())))