diff options
Diffstat (limited to 'scripts/generate_python_backend.py')
-rwxr-xr-x | scripts/generate_python_backend.py | 144 |
1 files changed, 130 insertions, 14 deletions
diff --git a/scripts/generate_python_backend.py b/scripts/generate_python_backend.py index 475154d..8938173 100755 --- a/scripts/generate_python_backend.py +++ b/scripts/generate_python_backend.py @@ -130,11 +130,11 @@ class FieldParser: after parsing is completed.""" self.unchecked_code.append(line) - def append_(self, line: str): + def append_(self, code: str): """Append field parsing code. There must be no unchecked code left before this function is called.""" assert len(self.unchecked_code) == 0 - self.code.append(line) + self.code.extend(code.split('\n')) def check_size_(self, size: str): """Generate a check of the current span size.""" @@ -318,6 +318,60 @@ class FieldParser: if padded_size: self.append_(f"span = remaining_span") + def parse_optional_field_(self, field: ast.Field): + """Parse the selected optional field. + Optional fields must start and end on a byte boundary.""" + + if self.shift != 0: + raise Exception('Optional field does not start on an octet boundary') + if (isinstance(field, ast.TypedefField) and + isinstance(field.type, ast.StructDeclaration) and + field.type.parent_id is not None): + raise Exception('Derived struct used in optional typedef field') + + self.consume_span_() + + if isinstance(field, ast.ScalarField): + self.append_(dedent(""" + if {cond_id} == {cond_value}: + if len(span) < {size}: + raise Exception('Invalid packet size') + fields['{field_id}'] = int.from_bytes(span[:{size}], byteorder='{byteorder}') + span = span[{size}:] + """.format(size=int(field.width / 8), + field_id=field.id, + cond_id=field.cond.id, + cond_value=field.cond.value, + byteorder=self.byteorder))) + + elif isinstance(field, ast.TypedefField) and isinstance(field.type, ast.EnumDeclaration): + self.append_(dedent(""" + if {cond_id} == {cond_value}: + if len(span) < {size}: + raise Exception('Invalid packet size') + fields['{field_id}'] = {type_id}( + int.from_bytes(span[:{size}], byteorder='{byteorder}')) + span = span[{size}:] + """.format(size=int(field.type.width / 8), + field_id=field.id, + type_id=field.type_id, + cond_id=field.cond.id, + cond_value=field.cond.value, + byteorder=self.byteorder))) + + elif isinstance(field, ast.TypedefField): + self.append_(dedent(""" + if {cond_id} == {cond_value}: + {field_id}, span = {type_id}.parse(span) + fields['{field_id}'] = {field_id} + """.format(field_id=field.id, + type_id=field.type_id, + cond_id=field.cond.id, + cond_value=field.cond.value))) + + else: + raise Exception(f"unsupported field type {field.__class__.__name__}") + def parse_bit_field_(self, field: ast.Field): """Parse the selected field as a bit field. The field is added to the current chunk. When a byte boundary @@ -347,7 +401,9 @@ class FieldParser: for shift, width, field in self.chunk: v = (value if len(self.chunk) == 1 and shift == 0 else f"({value} >> {shift}) & {mask(width)}") - if isinstance(field, ast.ScalarField): + if field.cond_for: + self.unchecked_append_(f"{field.id} = {v}") + elif isinstance(field, ast.ScalarField): self.unchecked_append_(f"fields['{field.id}'] = {v}") elif isinstance(field, ast.FixedField) and field.enum_id: self.unchecked_append_(f"if {v} != {field.enum_id}.{field.tag_id}:") @@ -511,10 +567,13 @@ class FieldParser: f" {{computed_{value_field.id}}} != {{{value_field.id}}}')") def parse(self, field: ast.Field): + if field.cond: + self.parse_optional_field_(field) + # Field has bit granularity. # Append the field to the current chunk, # check if a byte boundary was reached. - if core.is_bit_field(field): + elif core.is_bit_field(field): self.parse_bit_field_(field) # Padding fields. @@ -600,6 +659,35 @@ class FieldSerializer: if field.padded_size: self.append_(f"_span.extend([0] * ({field.padded_size} - len(_span) + _{field.id}_start))") + def serialize_optional_field_(self, field: ast.Field): + if isinstance(field, ast.ScalarField): + self.append_(dedent( + """ + if self.{field_id} is not None: + _span.extend(int.to_bytes(self.{field_id}, length={size}, byteorder='{byteorder}')) + """.format(field_id=field.id, + size=int(field.width / 8), + byteorder=self.byteorder))) + + elif isinstance(field, ast.TypedefField) and isinstance(field.type, ast.EnumDeclaration): + self.append_(dedent( + """ + if self.{field_id} is not None: + _span.extend(int.to_bytes(self.{field_id}, length={size}, byteorder='{byteorder}')) + """.format(field_id=field.id, + size=int(field.type.width / 8), + byteorder=self.byteorder))) + + elif isinstance(field, ast.TypedefField): + self.append_(dedent( + """ + if self.{field_id} is not None: + _span.extend(self.{field_id}.serialize()) + """.format(field_id=field.id))) + + else: + raise Exception(f"unsupported field type {field.__class__.__name__}") + def serialize_bit_field_(self, field: ast.Field): """Serialize the selected field as a bit field. The field is added to the current chunk. When a byte boundary @@ -611,6 +699,13 @@ class FieldSerializer: if isinstance(field, str): self.value.append(f"({field} << {shift})") + elif field.cond_for: + # Scalar field used as condition for an optional field. + # The width is always 1, the value is determined from + # the presence or absence of the optional field. + value_present = field.cond_for.cond.value + value_absent = 0 if field.cond_for.cond.value else 1 + self.value.append(f"(({value_absent} if self.{field.cond_for.id} is None else {value_present}) << {shift})") elif isinstance(field, ast.ScalarField): max_value = (1 << field.width) - 1 self.append_(f"if self.{field.id} > {max_value}:") @@ -741,10 +836,13 @@ class FieldSerializer: self.append_("_checksum_start = len(_span)") def serialize(self, field: ast.Field): + if field.cond: + self.serialize_optional_field_(field) + # Field has bit granularity. # Append the field to the current chunk, # check if a byte boundary was reached. - if core.is_bit_field(field): + elif core.is_bit_field(field): self.serialize_bit_field_(field) # Padding fields. @@ -806,9 +904,10 @@ def generate_packet_parser(packet: ast.Declaration) -> List[str]: # Convert the packet constraints to a boolean expression. validation = [] - if packet.constraints: + constraints = core.get_all_packet_constraints(packet) + if constraints: cond = [] - for c in packet.constraints: + for c in constraints: if c.value is not None: cond.append(f"fields['{c.id}'] != {hex(c.value)}") else: @@ -848,7 +947,16 @@ def generate_packet_size_getter(packet: ast.Declaration) -> List[str]: variable_width = [] for f in packet.fields: field_size = core.get_field_size(f) - if field_size is not None: + if f.cond: + if isinstance(f, ast.ScalarField): + return f"(0 if self.{f.id} is None else {f.width})" + elif isinstance(f, ast.TypedefField) and isinstance(f.type, ast.EnumDeclaration): + return f"(0 if self.{f.id} is None else {f.type.width})" + elif isinstance(f, ast.TypedefField): + return f"(0 if self.{f.id} is None else self.{f.id}.size)" + else: + raise Exception(f"unsupported field type {f.__class__.__name__}") + elif field_size is not None: constant_width += field_size elif isinstance(f, (ast.PayloadField, ast.BodyField)): variable_width.append("len(self.payload)") @@ -882,11 +990,7 @@ def generate_packet_post_init(decl: ast.Declaration) -> List[str]: """Generate __post_init__ function to set constraint field values.""" # Gather all constraints from parent packets. - constraints = [] - current = decl - while current.parent_id: - constraints.extend(current.constraints) - current = current.parent + constraints = core.get_all_packet_constraints(decl) if constraints: code = [] @@ -928,7 +1032,19 @@ def generate_packet_declaration(packet: ast.Declaration) -> str: packet_name = packet.id field_decls = [] for f in packet.fields: - if isinstance(f, ast.ScalarField): + if f.cond: + if isinstance(f, ast.ScalarField): + field_decls.append(f"{f.id}: Optional[int] = field(kw_only=True, default=None)") + elif isinstance(f, ast.TypedefField): + field_decls.append(f"{f.id}: Optional[{f.type_id}] = field(kw_only=True, default=None)") + else: + pass + elif f.cond_for: + # The fields used as condition for optional fields are + # not generated since their value is tied to the value of the + # optional field. + pass + elif isinstance(f, ast.ScalarField): field_decls.append(f"{f.id}: int = field(kw_only=True, default=0)") elif isinstance(f, ast.TypedefField): if isinstance(f.type, ast.EnumDeclaration): |