aboutsummaryrefslogtreecommitdiff
path: root/tools/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/parser.py')
-rw-r--r--tools/parser.py483
1 files changed, 482 insertions, 1 deletions
diff --git a/tools/parser.py b/tools/parser.py
index 05b6628..41d2f52 100644
--- a/tools/parser.py
+++ b/tools/parser.py
@@ -21,8 +21,12 @@ from __future__ import division
from __future__ import print_function
import collections
+import itertools
+import os.path
import re
+import bpf
+
Token = collections.namedtuple('token',
['type', 'value', 'filename', 'line', 'column'])
@@ -30,7 +34,9 @@ Token = collections.namedtuple('token',
_TOKEN_SPECIFICATION = (
('COMMENT', r'#.*$'),
('WHITESPACE', r'\s+'),
+ ('DEFAULT', r'@default'),
('INCLUDE', r'@include'),
+ ('FREQUENCY', r'@frequency'),
('PATH', r'(?:\.)?/\S+'),
('NUMERIC_CONSTANT', r'-?0[xX][0-9a-fA-F]+|-?0[Oo][0-7]+|-?[0-9]+'),
('COLON', r':'),
@@ -137,12 +143,51 @@ class ParserState:
return tokens
+Atom = collections.namedtuple('Atom', ['argument_index', 'op', 'value'])
+"""A single boolean comparison within a filter expression."""
+
+Filter = collections.namedtuple('Filter', ['expression', 'action'])
+"""The result of parsing a DNF filter expression, with its action.
+
+Since the expression is in Disjunctive Normal Form, it is composed of two levels
+of lists, one for disjunctions and the inner one for conjunctions. The elements
+of the inner list are Atoms.
+"""
+
+Syscall = collections.namedtuple('Syscall', ['name', 'number'])
+"""A system call."""
+
+ParsedFilterStatement = collections.namedtuple('ParsedFilterStatement',
+ ['syscalls', 'filters'])
+"""The result of parsing a filter statement.
+
+Statements have a list of syscalls, and an associated list of filters that will
+be evaluated sequentially when any of the syscalls is invoked.
+"""
+
+FilterStatement = collections.namedtuple('FilterStatement',
+ ['syscall', 'frequency', 'filters'])
+"""The filter list for a particular syscall.
+
+This is a mapping from one syscall to a list of filters that are evaluated
+sequentially. The last filter is always an unconditional action.
+"""
+
+ParsedPolicy = collections.namedtuple('ParsedPolicy',
+ ['default_action', 'filter_statements'])
+"""The result of parsing a minijail .policy file."""
+
+
# pylint: disable=too-few-public-methods
class PolicyParser:
"""A parser for the Minijail seccomp policy file format."""
- def __init__(self, arch):
+ def __init__(self, arch, *, kill_action, include_depth_limit=10):
self._parser_states = [ParserState("<memory>")]
+ self._kill_action = kill_action
+ self._include_depth_limit = include_depth_limit
+ self._default_action = self._kill_action
+ self._frequency_mapping = collections.defaultdict(int)
self._arch = arch
@property
@@ -228,3 +273,439 @@ class PolicyParser:
else:
self._parser_state.error('empty constant')
return value
+
+ # atom = argument , op , value
+ # ;
+ def _parse_atom(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing argument')
+ argument = tokens.pop(0)
+ if argument.type != 'ARGUMENT':
+ self._parser_state.error('invalid argument', token=argument)
+
+ if not tokens:
+ self._parser_state.error('missing operator')
+ operator = tokens.pop(0)
+ if operator.type != 'OP':
+ self._parser_state.error('invalid operator', token=operator)
+
+ value = self.parse_value(tokens)
+ argument_index = int(argument.value[3:])
+ if not (0 <= argument_index < bpf.MAX_SYSCALL_ARGUMENTS):
+ self._parser_state.error('invalid argument', token=argument)
+ return Atom(argument_index, operator.value, value)
+
+ # clause = atom , [ { '&&' , atom } ]
+ # ;
+ def _parse_clause(self, tokens):
+ atoms = []
+ while tokens:
+ atoms.append(self._parse_atom(tokens))
+ if not tokens or tokens[0].type != 'AND':
+ break
+ tokens.pop(0)
+ else:
+ self._parser_state.error('empty clause')
+ return atoms
+
+ # argument-expression = clause , [ { '||' , clause } ]
+ # ;
+ def parse_argument_expression(self, tokens):
+ """Parse a argument expression in Disjunctive Normal Form.
+
+ Since BPF disallows back jumps, we build the basic blocks in reverse
+ order so that all the jump targets are known by the time we need to
+ reference them.
+ """
+
+ clauses = []
+ while tokens:
+ clauses.append(self._parse_clause(tokens))
+ if not tokens or tokens[0].type != 'OR':
+ break
+ tokens.pop(0)
+ else:
+ self._parser_state.error('empty argument expression')
+ return clauses
+
+ # default-action = 'kill-process'
+ # | 'kill-thread'
+ # | 'kill'
+ # | 'trap'
+ # ;
+ def _parse_default_action(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing default action')
+ action_token = tokens.pop(0)
+ if action_token.type != 'ACTION':
+ return self._parser_state.error(
+ 'invalid default action', token=action_token)
+ if action_token.value == 'kill-process':
+ return bpf.KillProcess()
+ if action_token.value == 'kill-thread':
+ return bpf.KillThread()
+ if action_token.value == 'kill':
+ return self._kill_action
+ if action_token.value == 'trap':
+ return bpf.Trap()
+ return self._parser_state.error(
+ 'invalid permissive default action', token=action_token)
+
+ # action = 'allow' | '1'
+ # | 'kill-process'
+ # | 'kill-thread'
+ # | 'kill'
+ # | 'trap'
+ # | 'trace'
+ # | 'log'
+ # | 'return' , single-constant
+ # ;
+ def _parse_action(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing action')
+ action_token = tokens.pop(0)
+ if action_token.type == 'ACTION':
+ if action_token.value == 'allow':
+ return bpf.Allow()
+ if action_token.value == 'kill':
+ return self._kill_action
+ if action_token.value == 'kill-process':
+ return bpf.KillProcess()
+ if action_token.value == 'kill-thread':
+ return bpf.KillThread()
+ if action_token.value == 'trap':
+ return bpf.Trap()
+ if action_token.value == 'trace':
+ return bpf.Trace()
+ if action_token.value == 'log':
+ return bpf.Log()
+ elif action_token.type == 'NUMERIC_CONSTANT':
+ constant = self._parse_single_constant(action_token)
+ if constant == 1:
+ return bpf.Allow()
+ elif action_token.type == 'RETURN':
+ if not tokens:
+ self._parser_state.error('missing return value')
+ return bpf.ReturnErrno(self._parse_single_constant(tokens.pop(0)))
+ return self._parser_state.error('invalid action', token=action_token)
+
+ # single-filter = action
+ # | argument-expression , [ ';' , action ]
+ # ;
+ def _parse_single_filter(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing filter')
+ if tokens[0].type == 'ARGUMENT':
+ # Only argument expressions can start with an ARGUMENT token.
+ argument_expression = self.parse_argument_expression(tokens)
+ if tokens and tokens[0].type == 'SEMICOLON':
+ tokens.pop(0)
+ action = self._parse_action(tokens)
+ else:
+ action = bpf.Allow()
+ return Filter(argument_expression, action)
+ else:
+ return Filter(None, self._parse_action(tokens))
+
+ # filter = '{' , single-filter , [ { ',' , single-filter } ] , '}'
+ # | single-filter
+ # ;
+ def parse_filter(self, tokens):
+ """Parse a filter and return a list of Filter objects."""
+ if not tokens:
+ self._parser_state.error('missing filter')
+ filters = []
+ if tokens[0].type == 'LBRACE':
+ opening_brace = tokens.pop(0)
+ while tokens:
+ filters.append(self._parse_single_filter(tokens))
+ if not tokens or tokens[0].type != 'COMMA':
+ break
+ tokens.pop(0)
+ if not tokens or tokens[0].type != 'RBRACE':
+ self._parser_state.error('unclosed brace', token=opening_brace)
+ tokens.pop(0)
+ else:
+ filters.append(self._parse_single_filter(tokens))
+ return filters
+
+ # key-value-pair = identifier , '=', identifier , [ { ',' , identifier } ]
+ # ;
+ def _parse_key_value_pair(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing key')
+ key = tokens.pop(0)
+ if key.type != 'IDENTIFIER':
+ self._parser_state.error('invalid key', token=key)
+ if not tokens:
+ self._parser_state.error('missing equal')
+ if tokens[0].type != 'EQUAL':
+ self._parser_state.error('invalid equal', token=tokens[0])
+ tokens.pop(0)
+ value_list = []
+ while tokens:
+ value = tokens.pop(0)
+ if value.type != 'IDENTIFIER':
+ self._parser_state.error('invalid value', token=value)
+ value_list.append(value.value)
+ if not tokens or tokens[0].type != 'COMMA':
+ break
+ tokens.pop(0)
+ else:
+ self._parser_state.error('empty value')
+ return (key.value, value_list)
+
+ # metadata = '[' , key-value-pair , [ { ';' , key-value-pair } ] , ']'
+ # ;
+ def _parse_metadata(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing opening bracket')
+ opening_bracket = tokens.pop(0)
+ if opening_bracket.type != 'LBRACKET':
+ self._parser_state.error(
+ 'invalid opening bracket', token=opening_bracket)
+ metadata = {}
+ while tokens:
+ first_token = tokens[0]
+ key, value = self._parse_key_value_pair(tokens)
+ if key in metadata:
+ self._parser_state.error(
+ 'duplicate metadata key: "%s"' % key, token=first_token)
+ metadata[key] = value
+ if not tokens or tokens[0].type != 'SEMICOLON':
+ break
+ tokens.pop(0)
+ if not tokens or tokens[0].type != 'RBRACKET':
+ self._parser_state.error('unclosed bracket', token=opening_bracket)
+ tokens.pop(0)
+ return metadata
+
+ # syscall-descriptor = syscall-name , [ metadata ]
+ # | libc-function , [ metadata ]
+ # ;
+ def _parse_syscall_descriptor(self, tokens):
+ if not tokens:
+ self._parser_state.error('missing syscall descriptor')
+ syscall_descriptor = tokens.pop(0)
+ if syscall_descriptor.type != 'IDENTIFIER':
+ self._parser_state.error(
+ 'invalid syscall descriptor', token=syscall_descriptor)
+ # TODO(lhchavez): Support libc function names.
+ if tokens and tokens[0].type == 'LBRACKET':
+ metadata = self._parse_metadata(tokens)
+ if 'arch' in metadata and self._arch.arch_name not in metadata['arch']:
+ return ()
+ if syscall_descriptor.value not in self._arch.syscalls:
+ self._parser_state.error(
+ 'nonexistent syscall', token=syscall_descriptor)
+ return (Syscall(syscall_descriptor.value,
+ self._arch.syscalls[syscall_descriptor.value]), )
+
+ # filter-statement = '{' , syscall-descriptor , [ { ',', syscall-descriptor } ] , '}' ,
+ # ':' , filter
+ # | syscall-descriptor , ':' , filter
+ # ;
+ def parse_filter_statement(self, tokens):
+ """Parse a filter statement and return a ParsedFilterStatement."""
+ if not tokens:
+ self._parser_state.error('empty filter statement')
+ syscall_descriptors = []
+ if tokens[0].type == 'LBRACE':
+ opening_brace = tokens.pop(0)
+ while tokens:
+ syscall_descriptors.extend(
+ self._parse_syscall_descriptor(tokens))
+ if not tokens or tokens[0].type != 'COMMA':
+ break
+ tokens.pop(0)
+ if not tokens or tokens[0].type != 'RBRACE':
+ self._parser_state.error('unclosed brace', token=opening_brace)
+ tokens.pop(0)
+ else:
+ syscall_descriptors.extend(self._parse_syscall_descriptor(tokens))
+ if not tokens:
+ self._parser_state.error('missing colon')
+ if tokens[0].type != 'COLON':
+ self._parser_state.error('invalid colon', token=tokens[0])
+ tokens.pop(0)
+ parsed_filter = self.parse_filter(tokens)
+ if not syscall_descriptors:
+ return None
+ return ParsedFilterStatement(tuple(syscall_descriptors), parsed_filter)
+
+ # include-statement = '@include' , posix-path
+ # ;
+ def _parse_include_statement(self, tokens):
+ if not tokens:
+ self._parser_state.error('empty filter statement')
+ if tokens[0].type != 'INCLUDE':
+ self._parser_state.error('invalid include', token=tokens[0])
+ tokens.pop(0)
+ if not tokens:
+ self._parser_state.error('empty include path')
+ include_path = tokens.pop(0)
+ if include_path.type != 'PATH':
+ self._parser_state.error(
+ 'invalid include path', token=include_path)
+ if len(self._parser_states) == self._include_depth_limit:
+ self._parser_state.error('@include statement nested too deep')
+ include_filename = os.path.normpath(
+ os.path.join(
+ os.path.dirname(self._parser_state.filename),
+ include_path.value))
+ if not os.path.isfile(include_filename):
+ self._parser_state.error(
+ 'Could not @include %s' % include_filename, token=include_path)
+ return self._parse_policy_file(include_filename)
+
+ def _parse_frequency_file(self, filename):
+ self._parser_states.append(ParserState(filename))
+ try:
+ frequency_mapping = collections.defaultdict(int)
+ with open(filename) as frequency_file:
+ for line in frequency_file:
+ self._parser_state.set_line(line.rstrip())
+ tokens = self._parser_state.tokenize()
+
+ if not tokens:
+ continue
+
+ syscall_numbers = self._parse_syscall_descriptor(tokens)
+ if not tokens:
+ self._parser_state.error('missing colon')
+ if tokens[0].type != 'COLON':
+ self._parser_state.error(
+ 'invalid colon', token=tokens[0])
+ tokens.pop(0)
+
+ if not tokens:
+ self._parser_state.error('missing number')
+ number = tokens.pop(0)
+ if number.type != 'NUMERIC_CONSTANT':
+ self._parser_state.error(
+ 'invalid number', token=number)
+ number_value = int(number.value, base=0)
+ if number_value < 0:
+ self._parser_state.error(
+ 'invalid number', token=number)
+
+ for syscall_number in syscall_numbers:
+ frequency_mapping[syscall_number] += number_value
+ return frequency_mapping
+ finally:
+ self._parser_states.pop()
+
+ # frequency-statement = '@frequency' , posix-path
+ # ;
+ def _parse_frequency_statement(self, tokens):
+ if not tokens:
+ self._parser_state.error('empty frequency statement')
+ if tokens[0].type != 'FREQUENCY':
+ self._parser_state.error('invalid frequency', token=tokens[0])
+ tokens.pop(0)
+ if not tokens:
+ self._parser_state.error('empty frequency path')
+ frequency_path = tokens.pop(0)
+ if frequency_path.type != 'PATH':
+ self._parser_state.error(
+ 'invalid frequency path', token=frequency_path)
+ frequency_filename = os.path.normpath(
+ os.path.join(
+ os.path.dirname(self._parser_state.filename),
+ frequency_path.value))
+ if not os.path.isfile(frequency_filename):
+ self._parser_state.error(
+ 'Could not open frequency file %s' % frequency_filename,
+ token=frequency_path)
+ return self._parse_frequency_file(frequency_filename)
+
+ # default-statement = '@default' , default-action
+ # ;
+ def _parse_default_statement(self, tokens):
+ if not tokens:
+ self._parser_state.error('empty default statement')
+ if tokens[0].type != 'DEFAULT':
+ self._parser_state.error('invalid default', token=tokens[0])
+ tokens.pop(0)
+ if not tokens:
+ self._parser_state.error('empty action')
+ return self._parse_default_action(tokens)
+
+ def _parse_policy_file(self, filename):
+ self._parser_states.append(ParserState(filename))
+ try:
+ statements = []
+ with open(filename) as policy_file:
+ for line in policy_file:
+ self._parser_state.set_line(line.rstrip())
+ tokens = self._parser_state.tokenize()
+
+ if not tokens:
+ # Allow empty lines.
+ continue
+
+ if tokens[0].type == 'INCLUDE':
+ statements.extend(
+ self._parse_include_statement(tokens))
+ elif tokens[0].type == 'FREQUENCY':
+ for syscall_number, frequency in self._parse_frequency_statement(
+ tokens).items():
+ self._frequency_mapping[
+ syscall_number] += frequency
+ elif tokens[0].type == 'DEFAULT':
+ self._default_action = self._parse_default_statement(
+ tokens)
+ else:
+ statement = self.parse_filter_statement(tokens)
+ if statement is None:
+ # If all the syscalls in the statement are for
+ # another arch, skip the whole statement.
+ continue
+ statements.append(statement)
+
+ if tokens:
+ self._parser_state.error(
+ 'extra tokens', token=tokens[0])
+ return statements
+ finally:
+ self._parser_states.pop()
+
+ def parse_file(self, filename):
+ """Parse a file and return the list of FilterStatements."""
+ self._frequency_mapping = collections.defaultdict(int)
+ try:
+ statements = [x for x in self._parse_policy_file(filename)]
+ except RecursionError:
+ raise ParseException('recursion limit exceeded', filename,
+ self._parser_states[-1].line)
+
+ # Collapse statements into a single syscall-to-filter-list.
+ syscall_filter_mapping = {}
+ filter_statements = []
+ for syscalls, filters in statements:
+ for syscall in syscalls:
+ if syscall not in syscall_filter_mapping:
+ filter_statements.append(
+ FilterStatement(
+ syscall, self._frequency_mapping.get(syscall, 1),
+ []))
+ syscall_filter_mapping[syscall] = filter_statements[-1]
+ syscall_filter_mapping[syscall].filters.extend(filters)
+ for filter_statement in filter_statements:
+ unconditional_actions_suffix = list(
+ itertools.dropwhile(lambda filt: filt.expression is not None,
+ filter_statement.filters))
+ if len(unconditional_actions_suffix) == 1:
+ # The last filter already has an unconditional action, no need
+ # to add another one.
+ continue
+ if len(unconditional_actions_suffix) > 1:
+ raise ParseException(('Syscall %s (number %d) already had '
+ 'an unconditional action applied') %
+ (filter_statement.syscall.name,
+ filter_statement.syscall.number),
+ filename, self._parser_states[-1].line)
+ assert not unconditional_actions_suffix
+ filter_statement.filters.append(
+ Filter(expression=None, action=self._default_action))
+ return ParsedPolicy(self._default_action, filter_statements)