diff options
Diffstat (limited to 'tools/parser.py')
-rw-r--r-- | tools/parser.py | 483 |
1 files changed, 482 insertions, 1 deletions
diff --git a/tools/parser.py b/tools/parser.py index 05b6628..41d2f52 100644 --- a/tools/parser.py +++ b/tools/parser.py @@ -21,8 +21,12 @@ from __future__ import division from __future__ import print_function import collections +import itertools +import os.path import re +import bpf + Token = collections.namedtuple('token', ['type', 'value', 'filename', 'line', 'column']) @@ -30,7 +34,9 @@ Token = collections.namedtuple('token', _TOKEN_SPECIFICATION = ( ('COMMENT', r'#.*$'), ('WHITESPACE', r'\s+'), + ('DEFAULT', r'@default'), ('INCLUDE', r'@include'), + ('FREQUENCY', r'@frequency'), ('PATH', r'(?:\.)?/\S+'), ('NUMERIC_CONSTANT', r'-?0[xX][0-9a-fA-F]+|-?0[Oo][0-7]+|-?[0-9]+'), ('COLON', r':'), @@ -137,12 +143,51 @@ class ParserState: return tokens +Atom = collections.namedtuple('Atom', ['argument_index', 'op', 'value']) +"""A single boolean comparison within a filter expression.""" + +Filter = collections.namedtuple('Filter', ['expression', 'action']) +"""The result of parsing a DNF filter expression, with its action. + +Since the expression is in Disjunctive Normal Form, it is composed of two levels +of lists, one for disjunctions and the inner one for conjunctions. The elements +of the inner list are Atoms. +""" + +Syscall = collections.namedtuple('Syscall', ['name', 'number']) +"""A system call.""" + +ParsedFilterStatement = collections.namedtuple('ParsedFilterStatement', + ['syscalls', 'filters']) +"""The result of parsing a filter statement. + +Statements have a list of syscalls, and an associated list of filters that will +be evaluated sequentially when any of the syscalls is invoked. +""" + +FilterStatement = collections.namedtuple('FilterStatement', + ['syscall', 'frequency', 'filters']) +"""The filter list for a particular syscall. + +This is a mapping from one syscall to a list of filters that are evaluated +sequentially. The last filter is always an unconditional action. +""" + +ParsedPolicy = collections.namedtuple('ParsedPolicy', + ['default_action', 'filter_statements']) +"""The result of parsing a minijail .policy file.""" + + # pylint: disable=too-few-public-methods class PolicyParser: """A parser for the Minijail seccomp policy file format.""" - def __init__(self, arch): + def __init__(self, arch, *, kill_action, include_depth_limit=10): self._parser_states = [ParserState("<memory>")] + self._kill_action = kill_action + self._include_depth_limit = include_depth_limit + self._default_action = self._kill_action + self._frequency_mapping = collections.defaultdict(int) self._arch = arch @property @@ -228,3 +273,439 @@ class PolicyParser: else: self._parser_state.error('empty constant') return value + + # atom = argument , op , value + # ; + def _parse_atom(self, tokens): + if not tokens: + self._parser_state.error('missing argument') + argument = tokens.pop(0) + if argument.type != 'ARGUMENT': + self._parser_state.error('invalid argument', token=argument) + + if not tokens: + self._parser_state.error('missing operator') + operator = tokens.pop(0) + if operator.type != 'OP': + self._parser_state.error('invalid operator', token=operator) + + value = self.parse_value(tokens) + argument_index = int(argument.value[3:]) + if not (0 <= argument_index < bpf.MAX_SYSCALL_ARGUMENTS): + self._parser_state.error('invalid argument', token=argument) + return Atom(argument_index, operator.value, value) + + # clause = atom , [ { '&&' , atom } ] + # ; + def _parse_clause(self, tokens): + atoms = [] + while tokens: + atoms.append(self._parse_atom(tokens)) + if not tokens or tokens[0].type != 'AND': + break + tokens.pop(0) + else: + self._parser_state.error('empty clause') + return atoms + + # argument-expression = clause , [ { '||' , clause } ] + # ; + def parse_argument_expression(self, tokens): + """Parse a argument expression in Disjunctive Normal Form. + + Since BPF disallows back jumps, we build the basic blocks in reverse + order so that all the jump targets are known by the time we need to + reference them. + """ + + clauses = [] + while tokens: + clauses.append(self._parse_clause(tokens)) + if not tokens or tokens[0].type != 'OR': + break + tokens.pop(0) + else: + self._parser_state.error('empty argument expression') + return clauses + + # default-action = 'kill-process' + # | 'kill-thread' + # | 'kill' + # | 'trap' + # ; + def _parse_default_action(self, tokens): + if not tokens: + self._parser_state.error('missing default action') + action_token = tokens.pop(0) + if action_token.type != 'ACTION': + return self._parser_state.error( + 'invalid default action', token=action_token) + if action_token.value == 'kill-process': + return bpf.KillProcess() + if action_token.value == 'kill-thread': + return bpf.KillThread() + if action_token.value == 'kill': + return self._kill_action + if action_token.value == 'trap': + return bpf.Trap() + return self._parser_state.error( + 'invalid permissive default action', token=action_token) + + # action = 'allow' | '1' + # | 'kill-process' + # | 'kill-thread' + # | 'kill' + # | 'trap' + # | 'trace' + # | 'log' + # | 'return' , single-constant + # ; + def _parse_action(self, tokens): + if not tokens: + self._parser_state.error('missing action') + action_token = tokens.pop(0) + if action_token.type == 'ACTION': + if action_token.value == 'allow': + return bpf.Allow() + if action_token.value == 'kill': + return self._kill_action + if action_token.value == 'kill-process': + return bpf.KillProcess() + if action_token.value == 'kill-thread': + return bpf.KillThread() + if action_token.value == 'trap': + return bpf.Trap() + if action_token.value == 'trace': + return bpf.Trace() + if action_token.value == 'log': + return bpf.Log() + elif action_token.type == 'NUMERIC_CONSTANT': + constant = self._parse_single_constant(action_token) + if constant == 1: + return bpf.Allow() + elif action_token.type == 'RETURN': + if not tokens: + self._parser_state.error('missing return value') + return bpf.ReturnErrno(self._parse_single_constant(tokens.pop(0))) + return self._parser_state.error('invalid action', token=action_token) + + # single-filter = action + # | argument-expression , [ ';' , action ] + # ; + def _parse_single_filter(self, tokens): + if not tokens: + self._parser_state.error('missing filter') + if tokens[0].type == 'ARGUMENT': + # Only argument expressions can start with an ARGUMENT token. + argument_expression = self.parse_argument_expression(tokens) + if tokens and tokens[0].type == 'SEMICOLON': + tokens.pop(0) + action = self._parse_action(tokens) + else: + action = bpf.Allow() + return Filter(argument_expression, action) + else: + return Filter(None, self._parse_action(tokens)) + + # filter = '{' , single-filter , [ { ',' , single-filter } ] , '}' + # | single-filter + # ; + def parse_filter(self, tokens): + """Parse a filter and return a list of Filter objects.""" + if not tokens: + self._parser_state.error('missing filter') + filters = [] + if tokens[0].type == 'LBRACE': + opening_brace = tokens.pop(0) + while tokens: + filters.append(self._parse_single_filter(tokens)) + if not tokens or tokens[0].type != 'COMMA': + break + tokens.pop(0) + if not tokens or tokens[0].type != 'RBRACE': + self._parser_state.error('unclosed brace', token=opening_brace) + tokens.pop(0) + else: + filters.append(self._parse_single_filter(tokens)) + return filters + + # key-value-pair = identifier , '=', identifier , [ { ',' , identifier } ] + # ; + def _parse_key_value_pair(self, tokens): + if not tokens: + self._parser_state.error('missing key') + key = tokens.pop(0) + if key.type != 'IDENTIFIER': + self._parser_state.error('invalid key', token=key) + if not tokens: + self._parser_state.error('missing equal') + if tokens[0].type != 'EQUAL': + self._parser_state.error('invalid equal', token=tokens[0]) + tokens.pop(0) + value_list = [] + while tokens: + value = tokens.pop(0) + if value.type != 'IDENTIFIER': + self._parser_state.error('invalid value', token=value) + value_list.append(value.value) + if not tokens or tokens[0].type != 'COMMA': + break + tokens.pop(0) + else: + self._parser_state.error('empty value') + return (key.value, value_list) + + # metadata = '[' , key-value-pair , [ { ';' , key-value-pair } ] , ']' + # ; + def _parse_metadata(self, tokens): + if not tokens: + self._parser_state.error('missing opening bracket') + opening_bracket = tokens.pop(0) + if opening_bracket.type != 'LBRACKET': + self._parser_state.error( + 'invalid opening bracket', token=opening_bracket) + metadata = {} + while tokens: + first_token = tokens[0] + key, value = self._parse_key_value_pair(tokens) + if key in metadata: + self._parser_state.error( + 'duplicate metadata key: "%s"' % key, token=first_token) + metadata[key] = value + if not tokens or tokens[0].type != 'SEMICOLON': + break + tokens.pop(0) + if not tokens or tokens[0].type != 'RBRACKET': + self._parser_state.error('unclosed bracket', token=opening_bracket) + tokens.pop(0) + return metadata + + # syscall-descriptor = syscall-name , [ metadata ] + # | libc-function , [ metadata ] + # ; + def _parse_syscall_descriptor(self, tokens): + if not tokens: + self._parser_state.error('missing syscall descriptor') + syscall_descriptor = tokens.pop(0) + if syscall_descriptor.type != 'IDENTIFIER': + self._parser_state.error( + 'invalid syscall descriptor', token=syscall_descriptor) + # TODO(lhchavez): Support libc function names. + if tokens and tokens[0].type == 'LBRACKET': + metadata = self._parse_metadata(tokens) + if 'arch' in metadata and self._arch.arch_name not in metadata['arch']: + return () + if syscall_descriptor.value not in self._arch.syscalls: + self._parser_state.error( + 'nonexistent syscall', token=syscall_descriptor) + return (Syscall(syscall_descriptor.value, + self._arch.syscalls[syscall_descriptor.value]), ) + + # filter-statement = '{' , syscall-descriptor , [ { ',', syscall-descriptor } ] , '}' , + # ':' , filter + # | syscall-descriptor , ':' , filter + # ; + def parse_filter_statement(self, tokens): + """Parse a filter statement and return a ParsedFilterStatement.""" + if not tokens: + self._parser_state.error('empty filter statement') + syscall_descriptors = [] + if tokens[0].type == 'LBRACE': + opening_brace = tokens.pop(0) + while tokens: + syscall_descriptors.extend( + self._parse_syscall_descriptor(tokens)) + if not tokens or tokens[0].type != 'COMMA': + break + tokens.pop(0) + if not tokens or tokens[0].type != 'RBRACE': + self._parser_state.error('unclosed brace', token=opening_brace) + tokens.pop(0) + else: + syscall_descriptors.extend(self._parse_syscall_descriptor(tokens)) + if not tokens: + self._parser_state.error('missing colon') + if tokens[0].type != 'COLON': + self._parser_state.error('invalid colon', token=tokens[0]) + tokens.pop(0) + parsed_filter = self.parse_filter(tokens) + if not syscall_descriptors: + return None + return ParsedFilterStatement(tuple(syscall_descriptors), parsed_filter) + + # include-statement = '@include' , posix-path + # ; + def _parse_include_statement(self, tokens): + if not tokens: + self._parser_state.error('empty filter statement') + if tokens[0].type != 'INCLUDE': + self._parser_state.error('invalid include', token=tokens[0]) + tokens.pop(0) + if not tokens: + self._parser_state.error('empty include path') + include_path = tokens.pop(0) + if include_path.type != 'PATH': + self._parser_state.error( + 'invalid include path', token=include_path) + if len(self._parser_states) == self._include_depth_limit: + self._parser_state.error('@include statement nested too deep') + include_filename = os.path.normpath( + os.path.join( + os.path.dirname(self._parser_state.filename), + include_path.value)) + if not os.path.isfile(include_filename): + self._parser_state.error( + 'Could not @include %s' % include_filename, token=include_path) + return self._parse_policy_file(include_filename) + + def _parse_frequency_file(self, filename): + self._parser_states.append(ParserState(filename)) + try: + frequency_mapping = collections.defaultdict(int) + with open(filename) as frequency_file: + for line in frequency_file: + self._parser_state.set_line(line.rstrip()) + tokens = self._parser_state.tokenize() + + if not tokens: + continue + + syscall_numbers = self._parse_syscall_descriptor(tokens) + if not tokens: + self._parser_state.error('missing colon') + if tokens[0].type != 'COLON': + self._parser_state.error( + 'invalid colon', token=tokens[0]) + tokens.pop(0) + + if not tokens: + self._parser_state.error('missing number') + number = tokens.pop(0) + if number.type != 'NUMERIC_CONSTANT': + self._parser_state.error( + 'invalid number', token=number) + number_value = int(number.value, base=0) + if number_value < 0: + self._parser_state.error( + 'invalid number', token=number) + + for syscall_number in syscall_numbers: + frequency_mapping[syscall_number] += number_value + return frequency_mapping + finally: + self._parser_states.pop() + + # frequency-statement = '@frequency' , posix-path + # ; + def _parse_frequency_statement(self, tokens): + if not tokens: + self._parser_state.error('empty frequency statement') + if tokens[0].type != 'FREQUENCY': + self._parser_state.error('invalid frequency', token=tokens[0]) + tokens.pop(0) + if not tokens: + self._parser_state.error('empty frequency path') + frequency_path = tokens.pop(0) + if frequency_path.type != 'PATH': + self._parser_state.error( + 'invalid frequency path', token=frequency_path) + frequency_filename = os.path.normpath( + os.path.join( + os.path.dirname(self._parser_state.filename), + frequency_path.value)) + if not os.path.isfile(frequency_filename): + self._parser_state.error( + 'Could not open frequency file %s' % frequency_filename, + token=frequency_path) + return self._parse_frequency_file(frequency_filename) + + # default-statement = '@default' , default-action + # ; + def _parse_default_statement(self, tokens): + if not tokens: + self._parser_state.error('empty default statement') + if tokens[0].type != 'DEFAULT': + self._parser_state.error('invalid default', token=tokens[0]) + tokens.pop(0) + if not tokens: + self._parser_state.error('empty action') + return self._parse_default_action(tokens) + + def _parse_policy_file(self, filename): + self._parser_states.append(ParserState(filename)) + try: + statements = [] + with open(filename) as policy_file: + for line in policy_file: + self._parser_state.set_line(line.rstrip()) + tokens = self._parser_state.tokenize() + + if not tokens: + # Allow empty lines. + continue + + if tokens[0].type == 'INCLUDE': + statements.extend( + self._parse_include_statement(tokens)) + elif tokens[0].type == 'FREQUENCY': + for syscall_number, frequency in self._parse_frequency_statement( + tokens).items(): + self._frequency_mapping[ + syscall_number] += frequency + elif tokens[0].type == 'DEFAULT': + self._default_action = self._parse_default_statement( + tokens) + else: + statement = self.parse_filter_statement(tokens) + if statement is None: + # If all the syscalls in the statement are for + # another arch, skip the whole statement. + continue + statements.append(statement) + + if tokens: + self._parser_state.error( + 'extra tokens', token=tokens[0]) + return statements + finally: + self._parser_states.pop() + + def parse_file(self, filename): + """Parse a file and return the list of FilterStatements.""" + self._frequency_mapping = collections.defaultdict(int) + try: + statements = [x for x in self._parse_policy_file(filename)] + except RecursionError: + raise ParseException('recursion limit exceeded', filename, + self._parser_states[-1].line) + + # Collapse statements into a single syscall-to-filter-list. + syscall_filter_mapping = {} + filter_statements = [] + for syscalls, filters in statements: + for syscall in syscalls: + if syscall not in syscall_filter_mapping: + filter_statements.append( + FilterStatement( + syscall, self._frequency_mapping.get(syscall, 1), + [])) + syscall_filter_mapping[syscall] = filter_statements[-1] + syscall_filter_mapping[syscall].filters.extend(filters) + for filter_statement in filter_statements: + unconditional_actions_suffix = list( + itertools.dropwhile(lambda filt: filt.expression is not None, + filter_statement.filters)) + if len(unconditional_actions_suffix) == 1: + # The last filter already has an unconditional action, no need + # to add another one. + continue + if len(unconditional_actions_suffix) > 1: + raise ParseException(('Syscall %s (number %d) already had ' + 'an unconditional action applied') % + (filter_statement.syscall.name, + filter_statement.syscall.number), + filename, self._parser_states[-1].line) + assert not unconditional_actions_suffix + filter_statement.filters.append( + Filter(expression=None, action=self._default_action)) + return ParsedPolicy(self._default_action, filter_statements) |