aboutsummaryrefslogtreecommitdiff
path: root/pw_tokenizer
diff options
context:
space:
mode:
authorWyatt Hepler <hepler@google.com>2021-06-01 21:58:25 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2021-06-04 21:39:22 +0000
commit5f53d27f84e151969015f7fa91b5516c3e8582e8 (patch)
tree68f46937dadf518ce8df1adcb962c88c14fe0b0f /pw_tokenizer
parentb2062fd8d667a6f46462adb28d6b13847d271cb2 (diff)
downloadpigweed-5f53d27f84e151969015f7fa91b5516c3e8582e8.tar.gz
pw_tokenizer: Move Base64 functions to class
- Move the Base64 tokenization functions to the tokenizer class, which is much cleaner to work with. - Have AutoUpdatingDetokenizer derive from Detokenizer. This makes working with detokenizers simpler. Change-Id: Ic6bd9354c34f21a9931c83200e7c98e05911b6a2 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/47761 Reviewed-by: Keir Mierle <keir@google.com> Commit-Queue: Keir Mierle <keir@google.com> Commit-Queue: Wyatt Hepler <hepler@google.com>
Diffstat (limited to 'pw_tokenizer')
-rw-r--r--pw_tokenizer/docs.rst6
-rw-r--r--pw_tokenizer/py/BUILD.gn5
-rwxr-xr-xpw_tokenizer/py/detokenize_test.py25
-rwxr-xr-xpw_tokenizer/py/pw_tokenizer/detokenize.py204
-rw-r--r--pw_tokenizer/py/pw_tokenizer/serial_detokenizer.py3
5 files changed, 121 insertions, 122 deletions
diff --git a/pw_tokenizer/docs.rst b/pw_tokenizer/docs.rst
index e2d714ab6..8967f1232 100644
--- a/pw_tokenizer/docs.rst
+++ b/pw_tokenizer/docs.rst
@@ -876,8 +876,8 @@ in the tokenizer handler function. For example,
Decoding
--------
-Base64 decoding and detokenizing is supported in the Python detokenizer through
-the ``detokenize_base64`` and related functions.
+The Python ``Detokenizer`` class supprts decoding and detokenizing prefixed
+Base64 messages with ``detokenize_base64`` and related methods.
.. tip::
The Python detokenization tools support recursive detokenization for prefixed
@@ -1019,7 +1019,7 @@ Decoding tooling deployment
* Provide simple wrapper shell scripts that fill in arguments for the
project. For example, point ``detokenize.py`` to the project's token
databases.
- * Use ``pw_tokenizer.AutoReloadingDetokenizer`` to decode in
+ * Use ``pw_tokenizer.AutoUpdatingDetokenizer`` to decode in
continuously-running tools, so that users don't have to restart the tool
when the token database updates.
* Integrate detokenization everywhere it is needed. Integrating the tools
diff --git a/pw_tokenizer/py/BUILD.gn b/pw_tokenizer/py/BUILD.gn
index e9e446871..00128d104 100644
--- a/pw_tokenizer/py/BUILD.gn
+++ b/pw_tokenizer/py/BUILD.gn
@@ -31,8 +31,6 @@ pw_python_package("py") {
"pw_tokenizer/encode.py",
"pw_tokenizer/serial_detokenizer.py",
"pw_tokenizer/tokens.py",
- "tokenized_string_decoding_test_data.py",
- "varint_test_data.py",
]
tests = [
"database_test.py",
@@ -40,9 +38,12 @@ pw_python_package("py") {
"detokenize_test.py",
"elf_reader_test.py",
"encode_test.py",
+ "tokenized_string_decoding_test_data.py",
"tokens_test.py",
+ "varint_test_data.py",
]
inputs = [
+ "elf_reader_test_binary.elf",
"example_binary_with_tokenized_strings.elf",
"example_legacy_binary_with_tokenized_strings.elf",
]
diff --git a/pw_tokenizer/py/detokenize_test.py b/pw_tokenizer/py/detokenize_test.py
index 1de616070..23f0f643b 100755
--- a/pw_tokenizer/py/detokenize_test.py
+++ b/pw_tokenizer/py/detokenize_test.py
@@ -526,22 +526,21 @@ class DetokenizeBase64(unittest.TestCase):
def test_detokenize_base64_live(self):
for data, expected in self.TEST_CASES:
output = io.BytesIO()
- detokenize.detokenize_base64_live(self.detok, io.BytesIO(data),
- output, '$')
+ self.detok.detokenize_base64_live(io.BytesIO(data), output, '$')
self.assertEqual(expected, output.getvalue())
def test_detokenize_base64_to_file(self):
for data, expected in self.TEST_CASES:
output = io.BytesIO()
- detokenize.detokenize_base64_to_file(self.detok, data, output, '$')
+ self.detok.detokenize_base64_to_file(data, output, '$')
self.assertEqual(expected, output.getvalue())
def test_detokenize_base64(self):
for data, expected in self.TEST_CASES:
- self.assertEqual(
- expected, detokenize.detokenize_base64(self.detok, data, b'$'))
+ self.assertEqual(expected,
+ self.detok.detokenize_base64(data, b'$'))
class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
@@ -559,28 +558,24 @@ class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
def test_detokenize_self_recursion(self):
for depth in range(5):
self.assertEqual(
- detokenize.detokenize_base64(self.detok,
- b'This one is deep: $AAAAAA==',
+ self.detok.detokenize_base64(b'This one is deep: $AAAAAA==',
recursion=depth),
b'This one is deep: $AAAAAA==')
def test_detokenize_self_recursion_default(self):
self.assertEqual(
- detokenize.detokenize_base64(self.detok,
- b'This one is deep: $AAAAAA=='),
+ self.detok.detokenize_base64(b'This one is deep: $AAAAAA=='),
b'This one is deep: $AAAAAA==')
def test_detokenize_cyclic_recursion_even(self):
self.assertEqual(
- detokenize.detokenize_base64(self.detok,
- b'I said "$AQAAAA=="',
- recursion=2), b'I said "$AgAAAA=="')
+ self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2),
+ b'I said "$AgAAAA=="')
def test_detokenize_cyclic_recursion_odd(self):
self.assertEqual(
- detokenize.detokenize_base64(self.detok,
- b'I said "$AQAAAA=="',
- recursion=3), b'I said "$AwAAAA=="')
+ self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3),
+ b'I said "$AwAAAA=="')
if __name__ == '__main__':
diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py
index 770a4f003..f9b459fe5 100755
--- a/pw_tokenizer/py/pw_tokenizer/detokenize.py
+++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py
@@ -56,9 +56,12 @@ except ImportError:
os.path.abspath(__file__))))
from pw_tokenizer import database, decode, encode, tokens
-ENCODED_TOKEN = struct.Struct('<I')
_LOG = logging.getLogger('pw_tokenizer')
+ENCODED_TOKEN = struct.Struct('<I')
+BASE64_PREFIX = encode.BASE64_PREFIX.encode()
+DEFAULT_RECURSION = 9
+
class DetokenizedString:
"""A detokenized string, with all results if there are collisions."""
@@ -179,12 +182,17 @@ class Detokenizer:
show_errors: if True, an error message is used in place of the %
conversion specifier when an argument fails to decode
"""
- self.database = database.load_token_database(*token_database_or_elf)
self.show_errors = show_errors
# Cache FormatStrings for faster lookup & formatting.
self._cache: Dict[int, List[_TokenizedFormatString]] = {}
+ self._initialize_database(token_database_or_elf)
+
+ def _initialize_database(self, token_sources: Iterable) -> None:
+ self.database = database.load_token_database(*token_sources)
+ self._cache.clear()
+
def lookup(self, token: int) -> List[_TokenizedFormatString]:
"""Returns (TokenizedStringEntry, FormatString) list for matches."""
try:
@@ -207,8 +215,88 @@ class Detokenizer:
return DetokenizedString(token, self.lookup(token), encoded_message,
self.show_errors)
+ def detokenize_base64(self,
+ data: bytes,
+ prefix: Union[str, bytes] = BASE64_PREFIX,
+ recursion: int = DEFAULT_RECURSION) -> bytes:
+ """Decodes and replaces prefixed Base64 messages in the provided data.
+
+ Args:
+ data: the binary data to decode
+ prefix: one-character byte string that signals the start of a message
+ recursion: how many levels to recursively decode
+
+ Returns:
+ copy of the data with all recognized tokens decoded
+ """
+ output = io.BytesIO()
+ self.detokenize_base64_to_file(data, output, prefix, recursion)
+ return output.getvalue()
+
+ def detokenize_base64_to_file(self,
+ data: bytes,
+ output: BinaryIO,
+ prefix: Union[str, bytes] = BASE64_PREFIX,
+ recursion: int = DEFAULT_RECURSION) -> None:
+ """Decodes prefixed Base64 messages in data; decodes to output file."""
+ prefix = prefix.encode() if isinstance(prefix, str) else prefix
+ output.write(
+ _base64_message_regex(prefix).sub(
+ self._detokenize_prefixed_base64(prefix, recursion), data))
+
+ def detokenize_base64_live(self,
+ input_file: BinaryIO,
+ output: BinaryIO,
+ prefix: Union[str, bytes] = BASE64_PREFIX,
+ recursion: int = DEFAULT_RECURSION) -> None:
+ """Reads chars one-at-a-time, decoding messages; SLOW for big files."""
+ prefix_bytes = prefix.encode() if isinstance(prefix, str) else prefix
+
+ base64_message = _base64_message_regex(prefix_bytes)
+
+ def transform(data: bytes) -> bytes:
+ return base64_message.sub(
+ self._detokenize_prefixed_base64(prefix_bytes, recursion),
+ data)
+
+ for message in PrefixedMessageDecoder(
+ prefix,
+ string.ascii_letters + string.digits + '+/-_=').transform(
+ input_file, transform):
+ output.write(message)
+
+ # Flush each line to prevent delays when piping between processes.
+ if b'\n' in message:
+ output.flush()
-class AutoUpdatingDetokenizer:
+ def _detokenize_prefixed_base64(
+ self, prefix: bytes,
+ recursion: int) -> Callable[[Match[bytes]], bytes]:
+ """Returns a function that decodes prefixed Base64."""
+ def decode_and_detokenize(match: Match[bytes]) -> bytes:
+ """Decodes prefixed base64 with this detokenizer."""
+ original = match.group(0)
+
+ try:
+ detokenized_string = self.detokenize(
+ base64.b64decode(original[1:], validate=True))
+ if detokenized_string.matches():
+ result = str(detokenized_string).encode()
+
+ if recursion > 0 and original != result:
+ result = self.detokenize_base64(
+ result, prefix, recursion - 1)
+
+ return result
+ except binascii.Error:
+ pass
+
+ return original
+
+ return decode_and_detokenize
+
+
+class AutoUpdatingDetokenizer(Detokenizer):
"""Loads and updates a detokenizer from database paths."""
class _DatabasePath:
"""Tracks the modified time of a path or file object."""
@@ -243,22 +331,19 @@ class AutoUpdatingDetokenizer:
self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
self.min_poll_period_s = min_poll_period_s
self._last_checked_time: float = time.time()
- self._detokenizer = Detokenizer(*(path.load() for path in self.paths))
+ super().__init__(*(path.load() for path in self.paths))
- def detokenize(self, data: bytes) -> DetokenizedString:
- """Updates the token database if it has changed, then detokenizes."""
+ def _reload_if_changed(self) -> None:
if time.time() - self._last_checked_time >= self.min_poll_period_s:
self._last_checked_time = time.time()
if any(path.updated() for path in self.paths):
_LOG.info('Changes detected; reloading token database')
- self._detokenizer = Detokenizer(*(path.load()
- for path in self.paths))
-
- return self._detokenizer.detokenize(data)
+ self._initialize_database(path.load() for path in self.paths)
-
-_Detokenizer = Union[Detokenizer, AutoUpdatingDetokenizer]
+ def lookup(self, token: int) -> List[_TokenizedFormatString]:
+ self._reload_if_changed()
+ return super().lookup(token)
class PrefixedMessageDecoder:
@@ -328,37 +413,6 @@ class PrefixedMessageDecoder:
yield transform(chunk) if is_message else chunk
-def _detokenize_prefixed_base64(
- detokenizer: _Detokenizer, prefix: bytes,
- recursion: int) -> Callable[[Match[bytes]], bytes]:
- """Returns a function that decodes prefixed Base64 with the detokenizer."""
- def decode_and_detokenize(match: Match[bytes]) -> bytes:
- """Decodes prefixed base64 with the provided detokenizer."""
- original = match.group(0)
-
- try:
- detokenized_string = detokenizer.detokenize(
- base64.b64decode(original[1:], validate=True))
- if detokenized_string.matches():
- result = str(detokenized_string).encode()
-
- if recursion > 0 and original != result:
- result = detokenize_base64(detokenizer, result, prefix,
- recursion - 1)
-
- return result
- except binascii.Error:
- pass
-
- return original
-
- return decode_and_detokenize
-
-
-BASE64_PREFIX = encode.BASE64_PREFIX.encode()
-DEFAULT_RECURSION = 9
-
-
def _base64_message_regex(prefix: bytes) -> Pattern[bytes]:
"""Returns a regular expression for prefixed base64 tokenized strings."""
return re.compile(
@@ -370,64 +424,14 @@ def _base64_message_regex(prefix: bytes) -> Pattern[bytes]:
br'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'))
-def detokenize_base64_live(detokenizer: _Detokenizer,
- input_file: BinaryIO,
- output: BinaryIO,
- prefix: Union[str, bytes] = BASE64_PREFIX,
- recursion: int = DEFAULT_RECURSION) -> None:
- """Reads chars one-at-a-time and decodes messages; SLOW for big files."""
- prefix_bytes = prefix.encode() if isinstance(prefix, str) else prefix
-
- base64_message = _base64_message_regex(prefix_bytes)
-
- def transform(data: bytes) -> bytes:
- return base64_message.sub(
- _detokenize_prefixed_base64(detokenizer, prefix_bytes, recursion),
- data)
-
- for message in PrefixedMessageDecoder(
- prefix, string.ascii_letters + string.digits + '+/-_=').transform(
- input_file, transform):
- output.write(message)
-
- # Flush each line to prevent delays when piping between processes.
- if b'\n' in message:
- output.flush()
-
-
-def detokenize_base64_to_file(detokenizer: _Detokenizer,
- data: bytes,
- output: BinaryIO,
- prefix: Union[str, bytes] = BASE64_PREFIX,
- recursion: int = DEFAULT_RECURSION) -> None:
- """Decodes prefixed Base64 messages in data; decodes to an output file."""
- prefix = prefix.encode() if isinstance(prefix, str) else prefix
- output.write(
- _base64_message_regex(prefix).sub(
- _detokenize_prefixed_base64(detokenizer, prefix, recursion), data))
-
-
-def detokenize_base64(detokenizer: _Detokenizer,
+def detokenize_base64(detokenizer: Detokenizer,
data: bytes,
prefix: Union[str, bytes] = BASE64_PREFIX,
recursion: int = DEFAULT_RECURSION) -> bytes:
- """Decodes and replaces prefixed Base64 messages in the provided data.
-
- Args:
- detokenizer: the detokenizer with which to decode messages
- data: the binary data to decode
- prefix: one-character byte string that signals the start of a message
- recursion: how many levels to recursively decode
-
- Returns:
- copy of the data with all recognized tokens decoded
- """
- output = io.BytesIO()
- detokenize_base64_to_file(detokenizer, data, output, prefix, recursion)
- return output.getvalue()
+ return detokenizer.detokenize_base64(data, prefix, recursion)
-def _follow_and_detokenize_file(detokenizer: _Detokenizer,
+def _follow_and_detokenize_file(detokenizer: Detokenizer,
file: BinaryIO,
output: BinaryIO,
prefix: Union[str, bytes],
@@ -438,7 +442,7 @@ def _follow_and_detokenize_file(detokenizer: _Detokenizer,
while True:
data = file.read()
if data:
- detokenize_base64_to_file(detokenizer, data, output, prefix)
+ detokenizer.detokenize_base64_to_file(data, output, prefix)
output.flush()
else:
time.sleep(poll_period_s)
@@ -463,11 +467,11 @@ def _handle_base64(databases, input_file: BinaryIO, output: BinaryIO,
_follow_and_detokenize_file(detokenizer, input_file, output, prefix)
elif input_file.seekable():
# Process seekable files all at once, which is MUCH faster.
- detokenize_base64_to_file(detokenizer, input_file.read(), output,
- prefix)
+ detokenizer.detokenize_base64_to_file(input_file.read(), output,
+ prefix)
else:
# For non-seekable inputs (e.g. pipes), read one character at a time.
- detokenize_base64_live(detokenizer, input_file, output, prefix)
+ detokenizer.detokenize_base64_live(input_file, output, prefix)
def _parse_args() -> argparse.Namespace:
diff --git a/pw_tokenizer/py/pw_tokenizer/serial_detokenizer.py b/pw_tokenizer/py/pw_tokenizer/serial_detokenizer.py
index e0102251b..234ca64de 100644
--- a/pw_tokenizer/py/pw_tokenizer/serial_detokenizer.py
+++ b/pw_tokenizer/py/pw_tokenizer/serial_detokenizer.py
@@ -74,8 +74,7 @@ def _detokenize_serial(databases: Iterable, device: serial.Serial,
serial_device = serial.Serial(port=device, baudrate=baudrate)
try:
- detokenize.detokenize_base64_live(detokenizer, serial_device, output,
- prefix)
+ detokenizer.detokenize_base64_live(serial_device, output, prefix)
except KeyboardInterrupt:
output.flush()