aboutsummaryrefslogtreecommitdiff
path: root/scripts/generate_python_backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/generate_python_backend.py')
-rwxr-xr-xscripts/generate_python_backend.py144
1 files changed, 130 insertions, 14 deletions
diff --git a/scripts/generate_python_backend.py b/scripts/generate_python_backend.py
index 475154d..8938173 100755
--- a/scripts/generate_python_backend.py
+++ b/scripts/generate_python_backend.py
@@ -130,11 +130,11 @@ class FieldParser:
after parsing is completed."""
self.unchecked_code.append(line)
- def append_(self, line: str):
+ def append_(self, code: str):
"""Append field parsing code.
There must be no unchecked code left before this function is called."""
assert len(self.unchecked_code) == 0
- self.code.append(line)
+ self.code.extend(code.split('\n'))
def check_size_(self, size: str):
"""Generate a check of the current span size."""
@@ -318,6 +318,60 @@ class FieldParser:
if padded_size:
self.append_(f"span = remaining_span")
+ def parse_optional_field_(self, field: ast.Field):
+ """Parse the selected optional field.
+ Optional fields must start and end on a byte boundary."""
+
+ if self.shift != 0:
+ raise Exception('Optional field does not start on an octet boundary')
+ if (isinstance(field, ast.TypedefField) and
+ isinstance(field.type, ast.StructDeclaration) and
+ field.type.parent_id is not None):
+ raise Exception('Derived struct used in optional typedef field')
+
+ self.consume_span_()
+
+ if isinstance(field, ast.ScalarField):
+ self.append_(dedent("""
+ if {cond_id} == {cond_value}:
+ if len(span) < {size}:
+ raise Exception('Invalid packet size')
+ fields['{field_id}'] = int.from_bytes(span[:{size}], byteorder='{byteorder}')
+ span = span[{size}:]
+ """.format(size=int(field.width / 8),
+ field_id=field.id,
+ cond_id=field.cond.id,
+ cond_value=field.cond.value,
+ byteorder=self.byteorder)))
+
+ elif isinstance(field, ast.TypedefField) and isinstance(field.type, ast.EnumDeclaration):
+ self.append_(dedent("""
+ if {cond_id} == {cond_value}:
+ if len(span) < {size}:
+ raise Exception('Invalid packet size')
+ fields['{field_id}'] = {type_id}(
+ int.from_bytes(span[:{size}], byteorder='{byteorder}'))
+ span = span[{size}:]
+ """.format(size=int(field.type.width / 8),
+ field_id=field.id,
+ type_id=field.type_id,
+ cond_id=field.cond.id,
+ cond_value=field.cond.value,
+ byteorder=self.byteorder)))
+
+ elif isinstance(field, ast.TypedefField):
+ self.append_(dedent("""
+ if {cond_id} == {cond_value}:
+ {field_id}, span = {type_id}.parse(span)
+ fields['{field_id}'] = {field_id}
+ """.format(field_id=field.id,
+ type_id=field.type_id,
+ cond_id=field.cond.id,
+ cond_value=field.cond.value)))
+
+ else:
+ raise Exception(f"unsupported field type {field.__class__.__name__}")
+
def parse_bit_field_(self, field: ast.Field):
"""Parse the selected field as a bit field.
The field is added to the current chunk. When a byte boundary
@@ -347,7 +401,9 @@ class FieldParser:
for shift, width, field in self.chunk:
v = (value if len(self.chunk) == 1 and shift == 0 else f"({value} >> {shift}) & {mask(width)}")
- if isinstance(field, ast.ScalarField):
+ if field.cond_for:
+ self.unchecked_append_(f"{field.id} = {v}")
+ elif isinstance(field, ast.ScalarField):
self.unchecked_append_(f"fields['{field.id}'] = {v}")
elif isinstance(field, ast.FixedField) and field.enum_id:
self.unchecked_append_(f"if {v} != {field.enum_id}.{field.tag_id}:")
@@ -511,10 +567,13 @@ class FieldParser:
f" {{computed_{value_field.id}}} != {{{value_field.id}}}')")
def parse(self, field: ast.Field):
+ if field.cond:
+ self.parse_optional_field_(field)
+
# Field has bit granularity.
# Append the field to the current chunk,
# check if a byte boundary was reached.
- if core.is_bit_field(field):
+ elif core.is_bit_field(field):
self.parse_bit_field_(field)
# Padding fields.
@@ -600,6 +659,35 @@ class FieldSerializer:
if field.padded_size:
self.append_(f"_span.extend([0] * ({field.padded_size} - len(_span) + _{field.id}_start))")
+ def serialize_optional_field_(self, field: ast.Field):
+ if isinstance(field, ast.ScalarField):
+ self.append_(dedent(
+ """
+ if self.{field_id} is not None:
+ _span.extend(int.to_bytes(self.{field_id}, length={size}, byteorder='{byteorder}'))
+ """.format(field_id=field.id,
+ size=int(field.width / 8),
+ byteorder=self.byteorder)))
+
+ elif isinstance(field, ast.TypedefField) and isinstance(field.type, ast.EnumDeclaration):
+ self.append_(dedent(
+ """
+ if self.{field_id} is not None:
+ _span.extend(int.to_bytes(self.{field_id}, length={size}, byteorder='{byteorder}'))
+ """.format(field_id=field.id,
+ size=int(field.type.width / 8),
+ byteorder=self.byteorder)))
+
+ elif isinstance(field, ast.TypedefField):
+ self.append_(dedent(
+ """
+ if self.{field_id} is not None:
+ _span.extend(self.{field_id}.serialize())
+ """.format(field_id=field.id)))
+
+ else:
+ raise Exception(f"unsupported field type {field.__class__.__name__}")
+
def serialize_bit_field_(self, field: ast.Field):
"""Serialize the selected field as a bit field.
The field is added to the current chunk. When a byte boundary
@@ -611,6 +699,13 @@ class FieldSerializer:
if isinstance(field, str):
self.value.append(f"({field} << {shift})")
+ elif field.cond_for:
+ # Scalar field used as condition for an optional field.
+ # The width is always 1, the value is determined from
+ # the presence or absence of the optional field.
+ value_present = field.cond_for.cond.value
+ value_absent = 0 if field.cond_for.cond.value else 1
+ self.value.append(f"(({value_absent} if self.{field.cond_for.id} is None else {value_present}) << {shift})")
elif isinstance(field, ast.ScalarField):
max_value = (1 << field.width) - 1
self.append_(f"if self.{field.id} > {max_value}:")
@@ -741,10 +836,13 @@ class FieldSerializer:
self.append_("_checksum_start = len(_span)")
def serialize(self, field: ast.Field):
+ if field.cond:
+ self.serialize_optional_field_(field)
+
# Field has bit granularity.
# Append the field to the current chunk,
# check if a byte boundary was reached.
- if core.is_bit_field(field):
+ elif core.is_bit_field(field):
self.serialize_bit_field_(field)
# Padding fields.
@@ -806,9 +904,10 @@ def generate_packet_parser(packet: ast.Declaration) -> List[str]:
# Convert the packet constraints to a boolean expression.
validation = []
- if packet.constraints:
+ constraints = core.get_all_packet_constraints(packet)
+ if constraints:
cond = []
- for c in packet.constraints:
+ for c in constraints:
if c.value is not None:
cond.append(f"fields['{c.id}'] != {hex(c.value)}")
else:
@@ -848,7 +947,16 @@ def generate_packet_size_getter(packet: ast.Declaration) -> List[str]:
variable_width = []
for f in packet.fields:
field_size = core.get_field_size(f)
- if field_size is not None:
+ if f.cond:
+ if isinstance(f, ast.ScalarField):
+ return f"(0 if self.{f.id} is None else {f.width})"
+ elif isinstance(f, ast.TypedefField) and isinstance(f.type, ast.EnumDeclaration):
+ return f"(0 if self.{f.id} is None else {f.type.width})"
+ elif isinstance(f, ast.TypedefField):
+ return f"(0 if self.{f.id} is None else self.{f.id}.size)"
+ else:
+ raise Exception(f"unsupported field type {f.__class__.__name__}")
+ elif field_size is not None:
constant_width += field_size
elif isinstance(f, (ast.PayloadField, ast.BodyField)):
variable_width.append("len(self.payload)")
@@ -882,11 +990,7 @@ def generate_packet_post_init(decl: ast.Declaration) -> List[str]:
"""Generate __post_init__ function to set constraint field values."""
# Gather all constraints from parent packets.
- constraints = []
- current = decl
- while current.parent_id:
- constraints.extend(current.constraints)
- current = current.parent
+ constraints = core.get_all_packet_constraints(decl)
if constraints:
code = []
@@ -928,7 +1032,19 @@ def generate_packet_declaration(packet: ast.Declaration) -> str:
packet_name = packet.id
field_decls = []
for f in packet.fields:
- if isinstance(f, ast.ScalarField):
+ if f.cond:
+ if isinstance(f, ast.ScalarField):
+ field_decls.append(f"{f.id}: Optional[int] = field(kw_only=True, default=None)")
+ elif isinstance(f, ast.TypedefField):
+ field_decls.append(f"{f.id}: Optional[{f.type_id}] = field(kw_only=True, default=None)")
+ else:
+ pass
+ elif f.cond_for:
+ # The fields used as condition for optional fields are
+ # not generated since their value is tied to the value of the
+ # optional field.
+ pass
+ elif isinstance(f, ast.ScalarField):
field_decls.append(f"{f.id}: int = field(kw_only=True, default=0)")
elif isinstance(f, ast.TypedefField):
if isinstance(f.type, ast.EnumDeclaration):