diff options
Diffstat (limited to 'lib/python2.7/site-packages/sepolgen/interfaces.py')
-rw-r--r-- | lib/python2.7/site-packages/sepolgen/interfaces.py | 509 |
1 files changed, 509 insertions, 0 deletions
diff --git a/lib/python2.7/site-packages/sepolgen/interfaces.py b/lib/python2.7/site-packages/sepolgen/interfaces.py new file mode 100644 index 0000000..48ae4f2 --- /dev/null +++ b/lib/python2.7/site-packages/sepolgen/interfaces.py @@ -0,0 +1,509 @@ +# Authors: Karl MacMillan <kmacmillan@mentalrootkit.com> +# +# Copyright (C) 2006 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Classes for representing and manipulating interfaces. +""" + +import copy +import itertools + +from . import access +from . import refpolicy +from . import objectmodel +from . import matching +from .sepolgeni18n import _ + + +class Param: + """ + Object representing a paramater for an interface. + """ + def __init__(self): + self.__name = "" + self.type = refpolicy.SRC_TYPE + self.obj_classes = refpolicy.IdSet() + self.required = True + + def set_name(self, name): + if not access.is_idparam(name): + raise ValueError("Name [%s] is not a param" % name) + self.__name = name + + def get_name(self): + return self.__name + + name = property(get_name, set_name) + + num = property(fget=lambda self: int(self.name[1:])) + + def __repr__(self): + return "<sepolgen.policygen.Param instance [%s, %s, %s]>" % \ + (self.name, refpolicy.field_to_str[self.type], " ".join(self.obj_classes)) + + +# Helper for extract perms +def __param_insert(name, type, av, params): + ret = 0 + if name in params: + p = params[name] + # The entries are identical - we're done + if type == p.type: + return + # Hanldle implicitly typed objects (like process) + if (type == refpolicy.SRC_TYPE or type == refpolicy.TGT_TYPE) and \ + (p.type == refpolicy.TGT_TYPE or p.type == refpolicy.SRC_TYPE): + #print name, refpolicy.field_to_str[p.type] + # If the object is not implicitly typed, tell the + # caller there is a likely conflict. + ret = 1 + if av: + avobjs = [av.obj_class] + else: + avobjs = [] + for obj in itertools.chain(p.obj_classes, avobjs): + if obj in objectmodel.implicitly_typed_objects: + ret = 0 + break + # "Promote" to a SRC_TYPE as this is the likely usage. + # We do this even if the above test fails on purpose + # as there is really no sane way to resolve the conflict + # here. The caller can take other actions if needed. + p.type = refpolicy.SRC_TYPE + else: + # There is some conflict - no way to resolve it really + # so we just leave the first entry and tell the caller + # there was a conflict. + ret = 1 + else: + p = Param() + p.name = name + p.type = type + params[p.name] = p + + if av: + p.obj_classes.add(av.obj_class) + return ret + + + +def av_extract_params(av, params): + """Extract the paramaters from an access vector. + + Extract the paramaters (in the form $N) from an access + vector, storing them as Param objects in a dictionary. + Some attempt is made at resolving conflicts with other + entries in the dict, but if an unresolvable conflict is + found it is reported to the caller. + + The goal here is to figure out how interface paramaters are + actually used in the interface - e.g., that $1 is a domain used as + a SRC_TYPE. In general an interface will look like this: + + interface(`foo', ` + allow $1 foo : file read; + ') + + This is simple to figure out - $1 is a SRC_TYPE. A few interfaces + are more complex, for example: + + interface(`foo_trans',` + domain_auto_trans($1,fingerd_exec_t,fingerd_t) + + allow $1 fingerd_t:fd use; + allow fingerd_t $1:fd use; + allow fingerd_t $1:fifo_file rw_file_perms; + allow fingerd_t $1:process sigchld; + ') + + Here the usage seems ambigious, but it is not. $1 is still domain + and therefore should be returned as a SRC_TYPE. + + Returns: + 0 - success + 1 - conflict found + """ + ret = 0 + found_src = False + if access.is_idparam(av.src_type): + if __param_insert(av.src_type, refpolicy.SRC_TYPE, av, params) == 1: + ret = 1 + + if access.is_idparam(av.tgt_type): + if __param_insert(av.tgt_type, refpolicy.TGT_TYPE, av, params) == 1: + ret = 1 + + if access.is_idparam(av.obj_class): + if __param_insert(av.obj_class, refpolicy.OBJ_CLASS, av, params) == 1: + ret = 1 + + for perm in av.perms: + if access.is_idparam(perm): + if __param_insert(perm, PERM) == 1: + ret = 1 + + return ret + +def role_extract_params(role, params): + if access.is_idparam(role.role): + return __param_insert(role.role, refpolicy.ROLE, None, params) + +def type_rule_extract_params(rule, params): + def extract_from_set(set, type): + ret = 0 + for x in set: + if access.is_idparam(x): + if __param_insert(x, type, None, params): + ret = 1 + return ret + + ret = 0 + if extract_from_set(rule.src_types, refpolicy.SRC_TYPE): + ret = 1 + + if extract_from_set(rule.tgt_types, refpolicy.TGT_TYPE): + ret = 1 + + if extract_from_set(rule.obj_classes, refpolicy.OBJ_CLASS): + ret = 1 + + if access.is_idparam(rule.dest_type): + if __param_insert(rule.dest_type, refpolicy.DEST_TYPE, None, params): + ret = 1 + + return ret + +def ifcall_extract_params(ifcall, params): + ret = 0 + for arg in ifcall.args: + if access.is_idparam(arg): + # Assume interface arguments are source types. Fairly safe + # assumption for most interfaces + if __param_insert(arg, refpolicy.SRC_TYPE, None, params): + ret = 1 + + return ret + +class AttributeVector: + def __init__(self): + self.name = "" + self.access = access.AccessVectorSet() + + def add_av(self, av): + self.access.add_av(av) + +class AttributeSet: + def __init__(self): + self.attributes = { } + + def add_attr(self, attr): + self.attributes[attr.name] = attr + + def from_file(self, fd): + def parse_attr(line): + fields = line[1:-1].split() + if len(fields) != 2 or fields[0] != "Attribute": + raise SyntaxError("Syntax error Attribute statement %s" % line) + a = AttributeVector() + a.name = fields[1] + + return a + + a = None + for line in fd: + line = line[:-1] + if line[0] == "[": + if a: + self.add_attr(a) + a = parse_attr(line) + elif a: + l = line.split(",") + av = access.AccessVector(l) + a.add_av(av) + if a: + self.add_attr(a) + +class InterfaceVector: + def __init__(self, interface=None, attributes={}): + # Enabled is a loose concept currently - we are essentially + # not enabling interfaces that we can't handle currently. + # See InterfaceVector.add_ifv for more information. + self.enabled = True + self.name = "" + # The access that is enabled by this interface - eventually + # this will include indirect access from typeattribute + # statements. + self.access = access.AccessVectorSet() + # Paramaters are stored in a dictionary (key: param name + # value: Param object). + self.params = { } + if interface: + self.from_interface(interface, attributes) + self.expanded = False + + def from_interface(self, interface, attributes={}): + self.name = interface.name + + # Add allow rules + for avrule in interface.avrules(): + if avrule.rule_type != refpolicy.AVRule.ALLOW: + continue + # Handle some policy bugs + if "dontaudit" in interface.name: + #print "allow rule in interface: %s" % interface + continue + avs = access.avrule_to_access_vectors(avrule) + for av in avs: + self.add_av(av) + + # Add typeattribute access + if attributes: + for typeattribute in interface.typeattributes(): + for attr in typeattribute.attributes: + if attr not in attributes.attributes: + # print "missing attribute " + attr + continue + attr_vec = attributes.attributes[attr] + for a in attr_vec.access: + av = copy.copy(a) + if av.src_type == attr_vec.name: + av.src_type = typeattribute.type + if av.tgt_type == attr_vec.name: + av.tgt_type = typeattribute.type + self.add_av(av) + + + # Extract paramaters from roles + for role in interface.roles(): + if role_extract_params(role, self.params): + pass + #print "found conflicting role param %s for interface %s" % \ + # (role.name, interface.name) + # Extract paramaters from type rules + for rule in interface.typerules(): + if type_rule_extract_params(rule, self.params): + pass + #print "found conflicting params in rule %s in interface %s" % \ + # (str(rule), interface.name) + + for ifcall in interface.interface_calls(): + if ifcall_extract_params(ifcall, self.params): + pass + #print "found conflicting params in ifcall %s in interface %s" % \ + # (str(ifcall), interface.name) + + + def add_av(self, av): + if av_extract_params(av, self.params) == 1: + pass + #print "found conflicting perms [%s]" % str(av) + self.access.add_av(av) + + def to_string(self): + s = [] + s.append("[InterfaceVector %s]" % self.name) + for av in self.access: + s.append(str(av)) + return "\n".join(s) + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return "<InterfaceVector %s:%s>" % (self.name, self.enabled) + + +class InterfaceSet: + def __init__(self, output=None): + self.interfaces = { } + self.tgt_type_map = { } + self.tgt_type_all = [] + self.output = output + + def o(self, str): + if self.output: + self.output.write(str + "\n") + + def to_file(self, fd): + for iv in sorted(self.interfaces.values(), key=lambda x: x.name): + fd.write("[InterfaceVector %s " % iv.name) + for param in sorted(iv.params.values(), key=lambda x: x.name): + fd.write("%s:%s " % (param.name, refpolicy.field_to_str[param.type])) + fd.write("]\n") + avl = sorted(iv.access.to_list()) + for av in avl: + fd.write(",".join(av)) + fd.write("\n") + + def from_file(self, fd): + def parse_ifv(line): + fields = line[1:-1].split() + if len(fields) < 2 or fields[0] != "InterfaceVector": + raise SyntaxError("Syntax error InterfaceVector statement %s" % line) + ifv = InterfaceVector() + ifv.name = fields[1] + if len(fields) == 2: + return + for field in fields[2:]: + p = field.split(":") + if len(p) != 2: + raise SyntaxError("Invalid param in InterfaceVector statement %s" % line) + param = Param() + param.name = p[0] + param.type = refpolicy.str_to_field[p[1]] + ifv.params[param.name] = param + return ifv + + ifv = None + for line in fd: + line = line[:-1] + if line[0] == "[": + if ifv: + self.add_ifv(ifv) + ifv = parse_ifv(line) + elif ifv: + l = line.split(",") + av = access.AccessVector(l) + ifv.add_av(av) + if ifv: + self.add_ifv(ifv) + + self.index() + + def add_ifv(self, ifv): + self.interfaces[ifv.name] = ifv + + def index(self): + for ifv in self.interfaces.values(): + tgt_types = set() + for av in ifv.access: + if access.is_idparam(av.tgt_type): + self.tgt_type_all.append(ifv) + tgt_types = set() + break + tgt_types.add(av.tgt_type) + + for type in tgt_types: + l = self.tgt_type_map.setdefault(type, []) + l.append(ifv) + + def add(self, interface, attributes={}): + ifv = InterfaceVector(interface, attributes) + self.add_ifv(ifv) + + def add_headers(self, headers, output=None, attributes={}): + for i in itertools.chain(headers.interfaces(), headers.templates()): + self.add(i, attributes) + + self.expand_ifcalls(headers) + self.index() + + def map_param(self, id, ifcall): + if access.is_idparam(id): + num = int(id[1:]) + if num > len(ifcall.args): + # Tell caller to drop this because it must have + # been generated from an optional param. + return None + else: + arg = ifcall.args[num - 1] + if isinstance(arg, list): + return arg + else: + return [arg] + else: + return [id] + + def map_add_av(self, ifv, av, ifcall): + src_types = self.map_param(av.src_type, ifcall) + if src_types is None: + return + + tgt_types = self.map_param(av.tgt_type, ifcall) + if tgt_types is None: + return + + obj_classes = self.map_param(av.obj_class, ifcall) + if obj_classes is None: + return + + new_perms = refpolicy.IdSet() + for perm in av.perms: + p = self.map_param(perm, ifcall) + if p is None: + continue + else: + new_perms.update(p) + if len(new_perms) == 0: + return + + for src_type in src_types: + for tgt_type in tgt_types: + for obj_class in obj_classes: + ifv.access.add(src_type, tgt_type, obj_class, new_perms) + + def do_expand_ifcalls(self, interface, if_by_name): + # Descend an interface call tree adding the access + # from each interface. This is a depth first walk + # of the tree. + + stack = [(interface, None)] + ifv = self.interfaces[interface.name] + ifv.expanded = True + + while len(stack) > 0: + cur, cur_ifcall = stack.pop(-1) + + cur_ifv = self.interfaces[cur.name] + if cur != interface: + + for av in cur_ifv.access: + self.map_add_av(ifv, av, cur_ifcall) + + # If we have already fully expanded this interface + # there is no reason to descend further. + if cur_ifv.expanded: + continue + + for ifcall in cur.interface_calls(): + if ifcall.ifname == interface.name: + self.o(_("Found circular interface class")) + return + try: + newif = if_by_name[ifcall.ifname] + except KeyError: + self.o(_("Missing interface definition for %s" % ifcall.ifname)) + continue + + stack.append((newif, ifcall)) + + + def expand_ifcalls(self, headers): + # Create a map of interface names to interfaces - + # this mirrors the interface vector map we already + # have. + if_by_name = { } + + for i in itertools.chain(headers.interfaces(), headers.templates()): + if_by_name[i.name] = i + + + for interface in itertools.chain(headers.interfaces(), headers.templates()): + self.do_expand_ifcalls(interface, if_by_name) + |