aboutsummaryrefslogtreecommitdiff
path: root/pw_tokenizer/py/pw_tokenizer/parse_message.py
blob: f8655e1f34d9b9d85b92cf412fa4120913ab57e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Parses the arguments in a Base64-encoded tokenized message.

This is useful for attempting to decode tokenized messages with arguments for
which the token is not recognized.
"""

import argparse
import base64
from dataclasses import dataclass
import logging
import sys
from typing import Collection, Iterable, Iterator, Sequence

import pw_cli.log
from pw_tokenizer.decode import FormatString, FormattedString

_LOG: logging.Logger = logging.getLogger('pw_tokenizer')

DEFAULT_FORMAT_SPECS = (
    '%s',
    '%d',
    '%f',
)

DEFAULT_MAX_ARGS = 8
PREFIX = '$'


def attempt_to_decode(
        arg_data: bytes,
        format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
        max_args: int = DEFAULT_MAX_ARGS,
        yield_failures: bool = False) -> Iterator[FormattedString]:
    """Attemps to decode arguments using the provided format specifiers."""
    format_strings = [(0, '')]  # (argument count, format string)

    # Each argument requires at least 1 byte.
    max_args = min(max_args, len(arg_data))

    while format_strings:
        arg_count, string = format_strings.pop(0)
        decode_attempt = FormatString(string).format(arg_data)

        if yield_failures or decode_attempt.ok():
            yield decode_attempt

        if arg_count < max_args:
            format_strings.extend(
                (arg_count + 1, string + spec) for spec in format_specs)


@dataclass(frozen=True)
class TokenizedMessage:
    string: str
    binary: bytes

    @property
    def token(self) -> int:
        return int.from_bytes(self.binary[:4], 'little')

    @property
    def binary_args(self) -> bytes:
        return self.binary[4:]

    @classmethod
    def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage':
        if not message.startswith(prefix):
            raise ValueError(
                f'{message} does not start wtih {prefix!r} as expected')

        binary = base64.b64decode(message[1:])

        if len(binary) < 4:
            raise ValueError(f'{message} is only {len(binary)} bytes; '
                             'tokenized messages must be at least 4 bytes')

        return cls(message, binary)


def _read_stdin():
    try:
        while True:
            yield input()
    except KeyboardInterrupt:
        return


def _text_list(items: Sequence, conjunction: str = 'or') -> str:
    if len(items) == 1:
        return str(items[0])

    return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}'


def main(messages: Iterable[str], max_args: int, specs: Sequence[str],
         show_failures: bool) -> int:
    """Parses the arguments for a series of tokenized messages."""
    exit_code = 0

    for message in iter(messages) if messages else _read_stdin():
        if not message:
            continue

        if not message.startswith(PREFIX):
            message = PREFIX + message

        _LOG.info('Decoding arguments for %r', message)
        try:
            parsed = TokenizedMessage.parse(message)
        except ValueError as exc:
            _LOG.error('%s', exc)
            exit_code = 2
            continue

        _LOG.info('Binary: %r [%s] (%d bytes)', parsed.binary,
                  parsed.binary.hex(' ', 1), len(parsed.binary))
        _LOG.info('Token:  0x%08x', parsed.token)
        _LOG.info('Args:   %r [%s] (%d bytes)', parsed.binary_args,
                  parsed.binary_args.hex(' ', 1), len(parsed.binary_args))
        _LOG.info('Decoding with up to %d %s arguments', max_args,
                  _text_list(specs))

        results = sorted(attempt_to_decode(parsed.binary_args, specs, max_args,
                                           show_failures),
                         key=FormattedString.score,
                         reverse=True)

        if not any(result.ok() for result in results):
            _LOG.warning(
                '  No combinations of up to %d %s arguments decoded '
                'successfully', max_args, _text_list(specs))
            exit_code = 1

        for i, result in enumerate(results, 1):
            _LOG.info(  # pylint: disable=logging-fstring-interpolation
                f'  Attempt %{len(str(len(results)))}d: [%s] %s', i,
                ' '.join(str(a.specifier) for a in result.args),
                ' '.join(str(a) for a in result.args))
        print()

    return exit_code


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--max-args',
                        default=DEFAULT_MAX_ARGS,
                        type=int,
                        help='Maximum number of printf-style arguments')
    parser.add_argument('--specs',
                        nargs='*',
                        default=DEFAULT_FORMAT_SPECS,
                        help='Which printf-style format specifiers to check')
    parser.add_argument('--show-failures',
                        action='store_true',
                        help='Show argument combintations that fail to decode')
    parser.add_argument(
        'messages',
        nargs='*',
        help=
        'Base64-encoded tokenized messages to decode; omit to read from stdin')
    return parser.parse_args()


if __name__ == '__main__':
    pw_cli.log.install()
    sys.exit(main(**vars(_parse_args())))