# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2013-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2013-2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
# Copyright (C) 2018 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
# pylint: disable=too-many-lines
"""Qrexec policy parser and evaluator"""
import abc
import collections
import collections.abc
import enum
import io
import itertools
import logging
import pathlib
import string
from typing import (
Iterable,
List,
TextIO,
Tuple,
Dict,
Optional,
Set,
Union,
Type,
NoReturn,
FrozenSet,
Sequence,
)
from .. import (
POLICYPATH,
RPCNAME_ALLOWED_CHARSET,
POLICYSUFFIX,
RUNTIME_POLICY_PATH,
)
from ..utils import FullSystemInfo, SystemInfo, uuid_to_name
from .. import exc
from ..exc import (
AccessDenied,
PolicySyntaxError,
RequestError,
)
FILENAME_ALLOWED_CHARSET = set(string.digits + string.ascii_lowercase + "_.-")
[docs]
def filter_filepaths(filepaths: Iterable[pathlib.Path]) -> List[pathlib.Path]:
"""Check if files should be considered by policy.
The file name should contain only allowed characters (latin lowercase,
digits, underscore, full stop and dash). It should not start with the dot.
Only the file name is considered, not the directories on path that leads to
it.
Args:
filepaths: the file paths
Returns:
list of pathlib.Path: sorted list of paths, without ignored ones
Raises:
qrexec.exc.AccessDenied: for invalid path which is not ignored
"""
filepaths = [
path
for path in filepaths
if path.is_file()
and path.suffix == POLICYSUFFIX
and not path.name.startswith(".")
]
# check for invalid filenames first, then return all or nothing
for path in filepaths:
if not set(path.name).issubset(FILENAME_ALLOWED_CHARSET):
raise exc.AccessDenied("invalid filename: {}".format(path))
filepaths.sort()
return filepaths
[docs]
def parse_service_and_argument(
rpcname: Union[str, pathlib.PurePath], *, no_arg: str = "+"
) -> Tuple[str, str]:
"""Parse service and argument string.
Parse ``SERVICE+ARGUMENT``. Argument may be empty (single ``+`` at the end)
or omitted (no ``+`` at all). If no argument is given, `no_arg` is returned
instead. By default this returns ``'+'``, as if argument is empty.
A `Path` from :py:mod:`pathlib` is also accepted, in which case the filename
is parsed.
"""
if isinstance(rpcname, pathlib.PurePath):
rpcname = rpcname.name
if "+" in rpcname:
service, argument = rpcname.split("+", 1)
argument = "+" + argument
else:
service, argument = rpcname, no_arg
return service, argument
[docs]
def get_invalid_characters(
s: str,
allowed: FrozenSet[str] = RPCNAME_ALLOWED_CHARSET,
disallowed: Iterable[str] = "",
) -> Sequence[str]:
"""Return characters contained in *disallowed* and/or not int *allowed*"""
# pylint: disable=invalid-name
return tuple(
sorted(set(c for c in s if c not in allowed.difference(disallowed)))
)
[docs]
def validate_service_and_argument(
service: Optional[str],
argument: Optional[str],
*,
filepath: pathlib.Path,
lineno: Optional[int],
) -> Tuple[Optional[str], Optional[str]]:
"""Check service name and argument
This is intended as policy syntax checker to discard obviously invalid
service names and arguments. There are some cases for which this function
will not signal a problem, but the call still would be invalid. One of those
cases is too long total call name.
Args:
service (str): the service as appeared in policy file
argument (str): the argument as appeared in policy file
filepath (pathlib.Path): the file path
lineno (int): the line in the file
Returns:
(str or None, str or None): service and argument
Raises:
qrexec.exc.PolicySyntaxError: for a number of forbidden cases
"""
if service == "*":
service = None
if service is not None:
invalid_chars = get_invalid_characters(service, disallowed="+")
if invalid_chars:
raise PolicySyntaxError(
filepath,
lineno,
"service {!r} contains invalid characters: {!r}".format(
service, invalid_chars
),
)
if argument == "*":
argument = None
if argument is not None:
invalid_chars = get_invalid_characters(argument)
if invalid_chars:
raise PolicySyntaxError(
filepath,
lineno,
"argument {!r} contains invalid characters: {!r}".format(
argument, invalid_chars
),
)
if not argument.startswith("+"):
raise PolicySyntaxError(
filepath,
lineno,
"argument {!r} does not start with +".format(argument),
)
if service is None:
raise PolicySyntaxError(
filepath, lineno, "only * argument allowed for * service"
)
return service, argument
def match_strings(info: SystemInfo, self: str, other: str) -> bool:
if self.startswith("uuid:"):
if other.startswith("uuid:"):
return self == other
try:
return self[5:] == info[str(other)]["uuid"]
except KeyError:
return False
if other.startswith("uuid:"):
try:
return other[5:] == info[str(self)]["uuid"]
except KeyError:
return False
return self == other
def is_dom0(token) -> bool:
return token in (
"@adminvm",
"dom0",
"uuid:00000000-0000-0000-0000-000000000000",
)
[docs]
class VMToken(str, metaclass=VMTokenMeta):
"""A domain specification
Wherever policy evaluation needs to represent a VM or a ``@something``
token, instances of this class (and subclasses) are used. Each ``@token``
has its own dedicated class.
There are 4 such contexts:
- :py:class:`Source`: for whatever was specified in policy in 3rd column
- :py:class:`Target`: 4th column in policy
- :py:class:`Redirect`: ``target=`` parameter to :py:class:`Allow` and
:py:class:`Ask`, and ``default_target=`` for the latter
- :py:class:`IntendedTarget`: for what **user** invoked the call for
Not all ``@tokens`` can be used everywhere. Where they can be used is
specified by inheritance.
All tokens are also instances of :py:class:`str` and can be compared to
other strings.
"""
def __new__(
cls,
token: str,
*,
filepath: Optional[pathlib.Path] = None,
lineno: Optional[int] = None,
) -> "VMToken":
orig_token = token
if len(token) == 0:
raise PolicySyntaxError(
filepath,
lineno or 0,
"invalid empty {} token".format(cls.__name__.lower()),
)
# if user specified just qube name or UUID, use it directly
if not (token.startswith("@") or token == "*"):
return super().__new__(cls, token)
# token starts with @, we search for right subclass
for exact, token_cls in cls.exacts.items():
if not issubclass(token_cls, cls):
# the class has to be our subclass, that's how we define which
# tokens can be used where
continue
if token == exact:
return super().__new__(token_cls, token)
# for prefixed tokens, we pass just suffixes
for prefix, token_cls in cls.prefixes.items():
if not issubclass(token_cls, cls):
continue
if token.startswith(prefix):
value = token[len(prefix) :]
if not value:
raise PolicySyntaxError(
filepath,
lineno or 0,
"invalid empty {} token: {!r}".format(prefix, token),
)
if value.startswith("@"):
# we are either part of a longer prefix (@dispvm:@tag: etc),
# or the token is invalid, in which case this will fallthru
continue
return super().__new__(token_cls, token)
# the loop didn't find any valid prefix, so this is not a valid token
raise PolicySyntaxError(
filepath,
lineno or 0,
"invalid {} token: {!r}".format(cls.__name__.lower(), orig_token),
)
value: str
filepath: Optional[pathlib.Path]
lineno: Optional[int]
def __init__(
self,
token: str,
*,
filepath: Optional[pathlib.Path] = None,
lineno: Optional[int] = None,
):
# pylint: disable=unused-argument
super().__init__()
self.filepath = filepath
self.lineno = lineno
try:
self.value = self[len(self.PREFIX) :] # type: ignore
assert self.value[0] != "@"
except AttributeError:
# self.value = self
pass
# def __repr__(self):
# return '<{} value={!r} filepath={} lineno={}>'.format(
# type(self).__name__, str(self), self.filepath, self.lineno)
# This replaces is_match() and is_match_single().
[docs]
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional["VMToken"] = None,
) -> bool:
"""Check if this token matches opposite token"""
# pylint: disable=unused-argument
if is_dom0(self):
# see note in AdminVM.match
return is_dom0(other)
return match_strings(system_info["domains"], self, other)
[docs]
def is_special_value(self) -> bool:
"""Check if the token specification is special (keyword) value"""
return self.startswith("@") or self == "*"
@property
def type(self) -> str:
"""Type of the token
``'keyword'`` for special values, ``'name'`` for qube name
"""
return "keyword" if self.is_special_value() else "name"
@property
def text(self) -> str:
"""Text of the token, without possibly '@' prefix"""
return self.lstrip("@")
[docs]
class Source(VMToken):
# pylint: disable=missing-docstring
pass
class _BaseTarget(VMToken):
# pylint: disable=missing-docstring
def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]:
"""An iterator over all valid domain names that this token would match
This is used as part of :py:meth:`Policy.collect_targets_for_ask()`.
"""
info = system_info["domains"]
if self in info:
yield IntendedTarget(uuid_to_name(info, self))
[docs]
class Target(_BaseTarget):
# pylint: disable=missing-docstring
pass
[docs]
class Redirect(_BaseTarget):
# pylint: disable=missing-docstring
def __new__(
cls,
value: Optional[str],
*,
filepath: Optional[pathlib.Path] = None,
lineno: Optional[int] = None,
) -> "Redirect":
if value is None:
return None # type: ignore
return super().__new__(
cls, value, filepath=filepath, lineno=lineno # type: ignore
)
# this method (with overloads in subclasses) was verify_target_value
[docs]
class IntendedTarget(VMToken):
# pylint: disable=missing-docstring
[docs]
def verify(self, *, system_info: FullSystemInfo) -> Optional[VMToken]:
"""Check if given value names valid target
This function check if given value is not only syntactically correct,
but also if names valid service call target (existing domain, or valid
``'@dispvm'`` like keyword). If the domain does not exist,
returns a DefaultVM.
Args:
system_info: information about the system
Returns:
VMToken: for successful verification
Raises:
qrexec.exc.AccessDenied: for failed verification
"""
# for subclass it has to be overloaded
# pylint: disable=unidiomatic-typecheck
if type(self) != IntendedTarget:
raise NotImplementedError()
if self not in system_info["domains"]:
logging.warning(
"qrexec: target %r does not exist, using @default instead",
str(self),
)
return DefaultVM(
"@default", filepath=self.filepath, lineno=self.lineno
)
return self
# And the tokens. Inheritance defines, where the token can be used.
class WildcardVM(Source, Target):
# any, including AdminVM
# pylint: disable=missing-docstring,unused-argument
EXACT = "*"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
return True
def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]:
for name, domain in system_info["domains"].items():
if name.startswith("uuid:"):
continue
yield IntendedTarget(name)
if domain["template_for_dispvms"]:
yield DispVMTemplate("@dispvm:" + name)
yield DispVM("@dispvm")
[docs]
class AdminVM(Source, Target, Redirect, IntendedTarget):
# no Source, for calls originating from AdminVM policy is not evaluated
# pylint: disable=missing-docstring,unused-argument
EXACT = "@adminvm"
def expand(self, *, system_info: FullSystemInfo) -> Iterable["AdminVM"]:
yield self
def verify(self, *, system_info: FullSystemInfo) -> "AdminVM":
return self
# Currently there's only one valid AdminVM nameley "@adminvm". It refers to
# (the local) dom0. We match dom0 and @adminvm commutatively for backward
# compatibility. If you are going to change that, you need to check all
# code that tests for dom0, uuid:0, @adminvm or AdminVM and adapt it, if
# necessary.
def match(
self,
other: Optional[str],
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
return is_dom0(other)
[docs]
class AnyVM(Source, Target):
# pylint: disable=missing-docstring,unused-argument
EXACT = "@anyvm"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
return not is_dom0(other)
def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]:
for name, domain in system_info["domains"].items():
if name.startswith("uuid:"):
continue
if not is_dom0(name):
yield IntendedTarget(name)
if domain["template_for_dispvms"]:
yield DispVMTemplate("@dispvm:" + name)
yield DispVM("@dispvm")
[docs]
class DefaultVM(Target, IntendedTarget):
# pylint: disable=missing-docstring,unused-argument
EXACT = "@default"
def expand(self, *, system_info: FullSystemInfo) -> Iterable[NoReturn]:
yield from ()
def verify(self, *, system_info: FullSystemInfo) -> "DefaultVM":
return self
[docs]
class TypeVM(Source, Target):
# pylint: disable=missing-docstring,unused-argument
PREFIX = "@type:"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
info = system_info["domains"]
return other in info and self.value == info[other]["type"]
def expand(
self, *, system_info: FullSystemInfo
) -> Iterable[IntendedTarget]:
for name, domain in system_info["domains"].items():
if name.startswith("uuid:"):
continue
if domain["type"] == self.value:
yield IntendedTarget(name)
[docs]
class TagVM(Source, Target):
# pylint: disable=missing-docstring,unused-argument
PREFIX = "@tag:"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
info = system_info["domains"]
return other in info and self.value in info[other]["tags"]
def expand(
self, *, system_info: FullSystemInfo
) -> Iterable[IntendedTarget]:
for name, domain in system_info["domains"].items():
if name.startswith("uuid:"):
continue
if self.value in domain["tags"]:
yield IntendedTarget(name)
[docs]
class DispVM(Target, Redirect, IntendedTarget):
# pylint: disable=missing-docstring,unused-argument
EXACT = "@dispvm"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
return self == other
def expand(self, *, system_info: FullSystemInfo) -> Iterable["DispVM"]:
yield self
def verify(self, *, system_info: FullSystemInfo) -> "DispVM":
return self
[docs]
@staticmethod
def get_dispvm_template(
source: str,
*,
system_info: FullSystemInfo,
) -> Optional["DispVMTemplate"]:
"""Given source, get appropriate template for DispVM. Maybe None."""
_system_info = system_info["domains"]
if source not in _system_info:
return None
template = _system_info[source].get("default_dispvm", None)
if template is None:
return None
return DispVMTemplate("@dispvm:" + template)
[docs]
class DispVMTemplate(Source, Target, Redirect, IntendedTarget):
# pylint: disable=missing-docstring,unused-argument
PREFIX = "@dispvm:"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
assert self.startswith("@dispvm:"), f"missing prefix in {self!r}"
if isinstance(other, DispVM) and source is not None:
return match_strings(
system_info["domains"],
self,
other.get_dispvm_template(source, system_info=system_info),
)
if not isinstance(other, DispVMTemplate):
return False # not a disposable VM template
return match_strings(system_info["domains"], self[8:], other[8:])
def expand(
self, *, system_info: FullSystemInfo
) -> Iterable["DispVMTemplate"]:
info = system_info["domains"]
if info[self.value]["template_for_dispvms"]:
yield uuid_to_name(info, self)
# else: log a warning?
def verify(self, *, system_info: FullSystemInfo) -> "DispVMTemplate":
_system_info = system_info["domains"]
if (
self.value not in _system_info
or not _system_info[self.value]["template_for_dispvms"]
):
raise AccessDenied(
"not a template for dispvm: {}".format(self.value)
)
return self
[docs]
class DispVMTag(Source, Target):
# pylint: disable=missing-docstring,unused-argument
PREFIX = "@dispvm:@tag:"
def match(
self,
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken] = None,
) -> bool:
if isinstance(other, DispVM):
assert source is not None
other = other.get_dispvm_template(source, system_info=system_info)
if not isinstance(other, DispVMTemplate):
# 1) original other may have been neither @dispvm:<name> nor @dispvm
# 2) other.get_dispvm_template() may have been None
return False
domain = system_info["domains"][other.value]
if not domain["template_for_dispvms"]:
return False
if not self.value in domain["tags"]:
return False
return True
def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]:
for name, domain in system_info["domains"].items():
if name.startswith("uuid:"):
continue
if self.value in domain["tags"] and domain["template_for_dispvms"]:
yield DispVMTemplate("@dispvm:" + name)
#
# resolutions
#
[docs]
class AbstractResolution(metaclass=abc.ABCMeta):
"""Object representing positive policy evaluation result -
either ask or allow action"""
notify: bool
def __init__(
self, rule: "Rule", request: "Request", *, user: Optional[str]
):
#: policy rule from which this action is derived
self.rule = rule
#: request
self.request = request
#: the user to run command as, or None for default
self.user = user
#: whether to notify the user about the action taken
self.notify = rule.action.notify
[docs]
@abc.abstractmethod
async def execute(self) -> str:
"""
Execute the action. For allow, this runs the qrexec. For ask, it asks
user and then (depending on verdict) runs the call.
Args:
caller_ident (str): Service caller ident
(``process_ident,source_name, source_id``)
"""
raise NotImplementedError()
[docs]
class AllowResolution(AbstractResolution):
"""Resolution returned for :py:class:`Rule` with :py:class:`Allow`."""
def __init__(
self,
rule: "Rule",
request: "Request",
*,
user: Optional[str],
target: str,
autostart: bool,
):
super().__init__(rule, request, user=user)
#: target domain the service should be connected to
self.target = target
self.autostart = autostart
assert isinstance(self.autostart, bool)
[docs]
@classmethod
def from_ask_resolution(
cls, ask_resolution: "AskResolution", *, target: str
) -> "AllowResolution":
"""This happens after user manually approved the call"""
if target.startswith("@dispvm:"):
target = DispVMTemplate(target)
return cls(
ask_resolution.rule,
ask_resolution.request,
user=ask_resolution.user,
target=target,
autostart=ask_resolution.autostart,
)
[docs]
async def execute(self) -> str:
"""Return the allowed action"""
request, target = self.request, self.target
assert target is not None
assert isinstance(self.autostart, bool)
# XXX remove when #951 gets fixed
if request.source == target:
raise AccessDenied("loopback qrexec connection not supported")
# Start with the common header lines
lines = []
# Adminvm/dom0 special case
if is_dom0(target):
lines.extend(
[
f"user={self.user or 'DEFAULT'}",
"result=allow",
"target=dom0",
f"autostart={self.autostart}",
f"requested_target={request.target}",
]
)
return "\n".join(lines)
# DispVM case
if target.startswith("@dispvm:"):
target_info = request.system_info["domains"][target[8:]]
lines.extend(
[
f"user={self.user or 'DEFAULT'}",
"result=allow",
f"target={self.target}",
f"target_uuid=@dispvm:uuid:{target_info['uuid']}",
f"autostart={self.autostart}",
f"requested_target={request.target}",
]
)
if request.requested_source:
lines.append(f"policy_source={request.source}")
return "\n".join(lines)
# Lookup target information for the remaining cases
target_info = request.system_info["domains"][target]
if target_info.get("type") == "RemoteVM" and self.user is not None:
logging.warning(
"Ignoring user directive in policy. This is not supported in "
"the case of RemoveVM."
)
user_line = "user=DEFAULT"
else:
user_line = f"user={self.user or 'DEFAULT'}"
lines.append(user_line)
lines.append("result=allow")
# RemoteVM case
if target_info.get("type") == "RemoteVM":
if not target_info.get("relayvm"):
raise AccessDenied(f"{self.target}: relayvm is not set")
if not target_info.get("transport_rpc"):
raise AccessDenied(f"{self.target}: transport RPC is not set")
relayvm_name = target_info["relayvm"]
relayvm_info = request.system_info["domains"][relayvm_name]
transport_rpc = target_info["transport_rpc"]
service = f"{transport_rpc}+{request.target}+{request.service}"
service += f"{request.argument}"
lines.extend(
[
f"target={relayvm_name}",
f"target_uuid=uuid:{relayvm_info['uuid']}",
f"autostart={self.autostart}",
f"requested_target={request.target}",
f"service={service}",
]
)
if request.requested_source:
lines.append(f"policy_source={request.source}")
return "\n".join(lines)
# Default case
lines.extend(
[
f"target={self.target}",
f"target_uuid=uuid:{target_info['uuid']}",
f"autostart={self.autostart}",
f"requested_target={request.target}",
]
)
if request.requested_source:
lines.append(f"policy_source={request.source}")
return "\n".join(lines)
[docs]
class AskResolution(AbstractResolution):
"""Resolution returned for :py:class:`Rule` with :py:class:`Ask`.
This base class is a dummy implementation which behaves as if user always
denied the call. The programmer is expected to inherit from this class and
overload :py:meth:`execute` to display the question to the user by
appropriate means. User should have choice among :py:attr:`targets_for_ask`.
If :py:attr:`default_target` is not :py:obj:`None`, that should be the
default. Otherwise there should be no default. After querying the user,
:py:meth:`handle_user_response` should be called. For negative answers,
raising :py:class:`qrexec.exc.AccessDenied` is also enough.
The child class should be supplied as part of :py:class:`Request`.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
rule: "Rule",
request: "Request",
*,
# targets for the user to choose from
targets_for_ask: Sequence[str],
# default target, or None
default_target: Optional[str],
autostart: bool,
user: Optional[str],
):
super().__init__(rule, request, user=user)
assert default_target is None or default_target in targets_for_ask
self.targets_for_ask = targets_for_ask
self.default_target = default_target
self.autostart = autostart
[docs]
def handle_user_response(
self, response: bool, target: str
) -> AllowResolution:
"""
Handle user response for the 'ask' action. Children class'
:py:meth:`execute` is supposed to call this method to report the
user's verdict.
Args:
response (bool): whether the call was allowed or denied
target (str): target chosen by the user (if reponse==True)
Returns:
AllowResolution: for positive answer
Raises:
qrexec.exc.AccessDenied: for negative answer
"""
# pylint: disable=redefined-variable-type
if not response:
raise AccessDenied(
"denied by the user {}:{}".format(
self.rule.filepath, self.rule.lineno
),
notify=self.notify,
)
if target not in self.targets_for_ask:
raise AccessDenied("target {} is not a valid choice".format(target))
return self.request.allow_resolution_type.from_ask_resolution(
self, target=target
)
[docs]
def handle_invalid_response(self) -> NoReturn:
"""
Handle invalid response for the 'ask' action. Throws AccessDenied.
"""
# pylint: disable=no-self-use
raise AccessDenied("invalid response")
[docs]
async def execute(self) -> NoReturn:
"""Ask the user for permission.
This method should be overloaded in children classes. This
implementation always denies the request.
Raises:
qrexec.exc.AccessDenied: always
"""
raise AccessDenied("denied for non-interactive ask")
#
# request
#
# pylint: disable=too-many-instance-attributes
[docs]
class Request:
"""Qrexec request
A request object keeps what is searched for in the policy. It keeps the
principal quadruple: service, argument, source and target that are
parameters of the qrexec call. There is also `system_info`, which represents
current state of the system, incl. the list of all domains in the system and
their respective properties that are relevant to policy.
Args:
service (str or None): Service name.
argument (str): The argument. Must start with ``'+'``.
source (str): name of source qube
target (str): target designation
system_info (dict): as returned from
:py:func:`qrexec.utils.system_info()`
allow_resolution_type (type): a child of :py:class:`AllowResolution`
ask_resolution_type (type): a child of :py:class:`AskResolution`
"""
def __init__(
self,
service: Optional[str],
argument: str,
source: str,
target: str,
*,
system_info: FullSystemInfo,
allow_resolution_type: Type[AllowResolution] = AllowResolution,
ask_resolution_type: Type[AskResolution] = AskResolution,
requested_source: Optional[str] = "",
):
if target == "":
target = "@default"
assert argument and argument[0] == "+"
#: the service that is being requested
self.service = service
#: argument for the service
self.argument = argument
#: source qube name
self.source = source
#: requested source qube name (from qrexec-client-vm)
self.requested_source = requested_source
#: relay source qube name (the original source initiating the
# qrexec-client-vm call with source qube provided
self.relayvm = None
#: target (qube or token) as requested by source qube
self.target = IntendedTarget(target).verify(system_info=system_info)
#: system info
self.system_info = system_info
#: factory for allow resolution
self.allow_resolution_type = allow_resolution_type
#: factory for ask resolution
self.ask_resolution_type = ask_resolution_type
if self.requested_source:
requested_source_info = self.system_info["domains"].get(
self.requested_source, {}
)
if not requested_source_info:
raise RequestError(
f"unknown requested source qube '{self.requested_source}'"
)
if requested_source_info.get("type") != "RemoteVM":
raise RequestError(
f"{self.requested_source}: requested source is only "
"authorized for RemoteVM"
)
if requested_source_info.get("relayvm", None) != self.source:
raise RequestError(
f"{self.source} is not a relay for {self.requested_source}"
)
self.relayvm = self.source
self.source = self.requested_source
#
# actions
#
[docs]
class ActionType(metaclass=abc.ABCMeta):
"""Base class for actions
Children of this class are types of objects representing action in policy
rule (:py:attr:`Rule.action`). Not to be confused with
:py:class:`AbstractResolution`, which happens when particular rule is
matched to a :py:class:`Request`.
Keyword arguments to __init__ are taken from parsing params in the rule, so
this defines, what params are valid for which action.
"""
target: Optional[_BaseTarget]
def __init__(self, rule: "Rule"):
#: the rule that holds this action
self.rule = rule
self.target = None
[docs]
@abc.abstractmethod
def evaluate(self, request: Request) -> AbstractResolution:
"""Evaluate the request.
Depending on action and possibly user's decision either return
a resolution or raise exception.
Args:
request (Request): the request that was matched to the rule
Returns:
AbstractResolution: for successful requests
Raises:
qrexec.exc.AccessDenied: for denied requests
"""
raise NotImplementedError()
[docs]
def actual_target(self, intended_target: VMToken) -> IntendedTarget:
"""If action has redirect, it is it. Otherwise, the rule's own target
Args:
intended_target (IntendedTarget): :py:attr:`Request.target`
Returns:
IntendedTarget: either :py:attr:`target`, if not None, or
*intended_target*
"""
return IntendedTarget(self.target or intended_target)
[docs]
@staticmethod
def allow_no_autostart(target: str, system_info: FullSystemInfo) -> bool:
"""
Should we allow this target when autostart is disabled
"""
if is_dom0(target):
return True
if target.startswith("@dispvm"):
return False
try:
return system_info["domains"][target]["power_state"] == "Running"
except KeyError as e:
raise AssertionError from e
[docs]
class Deny(ActionType):
# pylint: disable=missing-docstring
def __init__(self, rule: "Rule", *, notify: Optional[bool] = None):
super().__init__(rule)
self.notify = True if notify is None else notify
def __repr__(self) -> str:
return f"<{type(self).__name__}>"
def __str__(self) -> str:
return "deny"
[docs]
def evaluate(self, request: Request) -> NoReturn:
"""
Raises:
qrexec.exc.AccessDenied:
"""
raise AccessDenied(
"denied by policy {}:{}".format(
self.rule.filepath, self.rule.lineno
),
notify=self.notify,
)
def actual_target(self, intended_target: object) -> NoReturn:
"""""" # not documented in HTML
# pylint: disable=empty-docstring
raise AccessDenied("programmer error")
[docs]
class Allow(ActionType):
# pylint: disable=missing-docstring
autostart: bool
notify: bool
user: Optional[str]
target: Redirect
def __init__(
self,
rule: "Rule",
*,
target: Optional[str] = None,
user: Optional[str] = None,
notify: bool = False,
autostart: bool = True,
): # pylint: disable=too-many-arguments
super().__init__(rule)
self.target = Redirect(
target, filepath=self.rule.filepath, lineno=self.rule.lineno
)
self.user = user
self.notify = notify
self.autostart = autostart
def __repr__(self) -> str:
return "<{} target={!r} user={!r}>".format(
type(self).__name__, self.target, self.user
)
def __str__(self) -> str:
return_str = "allow"
if self.target:
return_str += f" target={self.target}"
if self.user:
return_str += f" user={self.user}"
return return_str
[docs]
def evaluate(self, request: Request) -> AllowResolution:
"""
Returns:
AllowResolution: for successful requests
Raises:
qrexec.exc.AccessDenied: for invalid requests
"""
assert self.rule.is_match(request)
target: str = self.actual_target(request.target).verify(
system_info=request.system_info
)
if target == "@default":
raise AccessDenied(
"policy define 'allow' action at {}:{} but no target is "
"specified by caller or policy".format(
self.rule.filepath, self.rule.lineno
)
)
if isinstance(target, DispVM):
target_ = target.get_dispvm_template( # pylint: disable=no-member
request.source, system_info=request.system_info
)
if target_ is None:
raise AccessDenied(
"policy define 'allow' action to @dispvm at {}:{} "
"but no DispVM base is set for this VM".format(
self.rule.filepath, self.rule.lineno
)
)
target = target_
del target_
# expand @adminvm but keep uuid:0
elif target == "@adminvm":
target = "dom0"
if not self.autostart and not self.allow_no_autostart(
target, request.system_info
):
raise AccessDenied(
"target {} is denied because it would require autostart".format(
target
),
notify=self.notify,
)
return request.allow_resolution_type(
self.rule,
request,
user=self.user,
target=target,
autostart=self.autostart,
)
[docs]
class Ask(ActionType):
# pylint: disable=missing-docstring,too-many-arguments
def __init__(
self,
rule: "Rule",
*,
target: Optional[str] = None,
default_target: Optional[str] = None,
user: Optional[str] = None,
notify: bool = False,
autostart: bool = True,
):
super().__init__(rule)
self.target = Redirect(
target, filepath=self.rule.filepath, lineno=self.rule.lineno
)
self.default_target = Redirect(
default_target, filepath=self.rule.filepath, lineno=self.rule.lineno
)
self.user = user
self.notify = False if notify is None else notify
self.autostart = True if autostart is None else autostart
def __repr__(self) -> str:
return "<{} target={!r} default_target={!r} user={!r}>".format(
type(self).__name__, self.target, self.default_target, self.user
)
def __str__(self) -> str:
return_str = "ask"
if self.target:
return_str += f" target={self.target}"
if self.default_target:
return_str += f" default_target={self.default_target}"
if self.user:
return_str += f" user={self.user}"
return return_str
[docs]
def evaluate(self, request: Request) -> AskResolution:
"""
Returns:
AskResolution
Raises:
qrexec.exc.AccessDenied: for invalid requests
"""
assert self.rule.is_match(request)
targets_for_ask: Iterable[str]
if self.target is not None:
if is_dom0(self.target):
targets_for_ask = ["dom0"]
else:
targets_for_ask = [self.target]
else:
targets_for_ask = list(
self.rule.policy.collect_targets_for_ask(request)
)
if not self.autostart:
targets_for_ask = [
target
for target in targets_for_ask
if self.allow_no_autostart(target, request.system_info)
]
if not targets_for_ask:
raise AccessDenied(
"policy define 'ask' action at {}:{} but no target is "
"available to choose from".format(
self.rule.filepath, self.rule.lineno
)
)
default_target: Optional[str] = self.default_target
if default_target is not None:
# expand default DispVM
if isinstance(default_target, DispVM):
# pylint is confused by the metaclass - default_target is
# constructed as Redirect(), but in fact it can be any subclass
# pylint: disable=no-member
default_target = default_target.get_dispvm_template(
request.source, system_info=request.system_info
)
# expand @adminvm and uuid:0 to dom0
elif is_dom0(default_target):
default_target = "dom0"
if default_target and default_target not in targets_for_ask:
logging.warning(
"warning: policy define default_target=%s at %s:%s "
"but it is not among allowed targets (%s)",
default_target,
self.rule.filepath,
self.rule.lineno,
", ".join(targets_for_ask),
)
default_target = None
return request.ask_resolution_type(
self.rule,
request,
user=self.user,
targets_for_ask=targets_for_ask,
default_target=default_target,
autostart=self.autostart,
)
[docs]
@enum.unique
class Action(enum.Enum):
"""Action as defined by policy"""
# pylint: disable=invalid-name
allow = Allow
deny = Deny
ask = Ask
[docs]
class Rule:
"""A single line of policy file
Avoid instantiating manually, use either :py:meth:`from_line()` or
:py:meth:`from_line_service()`.
"""
# pylint: disable=too-many-instance-attributes,too-many-positional-arguments
action: Union[Allow, Deny, Ask]
def __init__(
self,
service: str,
argument: str,
source: str,
target: str,
action: str,
params: List[str],
*,
policy: "AbstractPolicy",
filepath: pathlib.Path,
lineno: Optional[int],
):
# pylint: disable=too-many-arguments
#: the parser that this rule belongs to
self.policy = policy
#: the file path
self.filepath = filepath
#: the line number
self.lineno = lineno
service_, argument_ = validate_service_and_argument(
service, argument, filepath=filepath, lineno=lineno
)
#: the qrexec service
self.service = service_
#: the argument to the service
self.argument = argument_
#: source specification
self.source = Source(source, filepath=filepath, lineno=lineno)
#: target specification
self.target = Target(target, filepath=filepath, lineno=lineno)
try:
actiontype = Action[action].value
except KeyError as err:
raise PolicySyntaxError(
filepath, lineno, "invalid action: {}".format(action)
) from err
kwds: Dict[str, Union[str, bool]] = {}
for param in params:
try:
key, value = param.split("=", maxsplit=1)
except ValueError as err:
raise PolicySyntaxError(
filepath,
lineno,
"invalid action parameter syntax: {!r}".format(param),
) from err
if key in kwds:
raise PolicySyntaxError(
filepath, lineno, "parameter given twice: {!r}".format(key)
)
kwds[key] = value
# boolean parameters
for key in ["notify", "autostart"]:
if key in kwds:
if kwds[key] not in ["yes", "no"]:
raise PolicySyntaxError(
filepath,
lineno,
"{!r} is {!r}, but can be only 'yes' or 'no'".format(
key, kwds[key]
),
)
kwds[key] = kwds[key] == "yes"
try:
#: policy action
self.action = actiontype(rule=self, **kwds)
except TypeError as err:
raise PolicySyntaxError(
filepath,
lineno,
"invalid parameters for action {}: {}".format(
actiontype.__name__, params
),
) from err
# verify special cases
if (
isinstance(self.target, DefaultVM)
and isinstance(self.action, Allow)
and self.action.target is None
):
raise PolicySyntaxError(
filepath,
lineno,
"allow action for @default rule must specify target= option",
)
def __repr__(self) -> str:
return (
"<{} service={!r} argument={!r}"
" source={!r} target={!r} action={!r}>".format(
type(self).__name__,
self.service,
self.argument,
self.source,
self.target,
self.action,
)
)
def __str__(self) -> str:
return_str = f"{self.service}\t"
if self.argument:
return_str += f"{self.argument}\t"
else:
return_str += "*\t"
return_str += f"{self.source}\t{self.target}\t{str(self.action)}"
return return_str
[docs]
@classmethod
def from_line(cls, policy, line, *, filepath, lineno):
"""
Load a single line of qrexec policy and check its syntax.
Do not verify existence of named objects.
Args:
line: a single line of actual qrexec policy (not a comment, empty
line or ``@include``)
filepath (pathlib.Path): Path of the file from which this line is
loaded
lineno: line number from which this line is loaded
Raises:
PolicySyntaxError: when syntax error is found
"""
try:
service, argument, source, target, action, *params = line.split()
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "wrong number of fields"
) from err
return cls(
service,
argument,
source,
target,
action,
params,
policy=policy,
filepath=filepath,
lineno=lineno,
)
[docs]
@classmethod
def from_line_service(
cls, policy, service, argument, line, *, filepath, lineno
):
"""Load a single line in old format.
Args:
service: the service for which this line applies
argument: argument for the service
line (str): the line to be parsed
filepath (pathlib.Path): the file from which this line was taken
lineno (int): the line number
Raises:
PolicySyntaxError: when syntax error is found
"""
try:
source, target, *action_and_params = line.split()
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "wrong number of fields"
) from err
action_and_params = tuple(
itertools.chain(*(p.split(",") for p in action_and_params))
)
try:
action, *params = action_and_params
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "wrong number of fields"
) from err
return cls(
service,
argument,
source,
target,
action,
params,
policy=policy,
filepath=filepath,
lineno=lineno,
)
[docs]
def is_match(self, request: Request) -> bool:
"""Check if given request matches this line.
:param request: request to check against
:return: True or False
"""
return self.is_match_but_target(request) and self.target.match(
request.target,
source=request.source,
system_info=request.system_info,
)
[docs]
def is_match_but_target(self, request: Request) -> bool:
"""Check if given (service, argument source) matches this line.
Target is ignored. This is used for :py:meth:`collect_targets_for_ask`.
:param system_info: information about the system - available VMs,
their types, labels, tags etc. as returned by
:py:func:`app_to_system_info`
:param service: name of the service
:param argument: the argument
:param source: name of the source VM
:param target: name of the target VM, or None if not specified
:param system_info: the context
:return: True or False
"""
return (
(self.service is None or self.service == request.service)
and (self.argument is None or self.argument == request.argument)
and self.source.match(
request.source, system_info=request.system_info
)
)
[docs]
class AbstractParser(metaclass=abc.ABCMeta):
"""A minimal, pluggable, validating policy parser"""
#: default rule type
rule_type = Rule
@staticmethod
def _fix_filepath(file, filepath):
if filepath and not isinstance(filepath, pathlib.Path):
filepath = pathlib.Path(filepath)
if filepath is None:
try:
filepath = pathlib.Path(file.name)
except AttributeError:
if isinstance(file, io.IOBase):
filepath = "<buffer>"
return file, filepath
[docs]
def load_policy_file(self, file, filepath):
"""Parse a policy file"""
file, filepath = self._fix_filepath(file, filepath)
for lineno, line in enumerate(file, start=1):
line = line.strip()
# skip empty lines and comments
if not line or line[0] == "#":
self.handle_comment(line, filepath=filepath, lineno=lineno)
continue
if line.startswith("!"):
directive, *params = line.split()
if directive == "!include":
try:
(included_path,) = params
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "invalid number of params"
) from err
self.handle_include(
pathlib.PurePosixPath(included_path),
filepath=filepath,
lineno=lineno,
)
continue
if directive == "!include-dir":
try:
(included_path,) = params
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "invalid number of params"
) from err
self.handle_include_dir(
pathlib.PurePosixPath(included_path),
filepath=filepath,
lineno=lineno,
)
continue
if directive == "!include-service":
try:
service, argument, included_path = params
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "invalid number of params"
) from err
self.handle_include_service(
service,
argument,
pathlib.PurePosixPath(included_path),
filepath=filepath,
lineno=lineno,
)
continue
if directive == "!compat-4.0":
if params:
raise PolicySyntaxError(
filepath, lineno, "invalid number of params"
)
logging.warning(
"warning: !compat-4.0 directive in file %s line %s"
" is transitional and will be deprecated",
filepath,
lineno,
)
self.handle_compat40(filepath=filepath, lineno=lineno)
continue
raise PolicySyntaxError(filepath, lineno, "invalid directive")
# this can raise PolicySyntaxError on its own
self.handle_rule(
self.rule_type.from_line(
self, line, filepath=filepath, lineno=lineno
),
filepath=filepath,
lineno=lineno,
)
return self
[docs]
def load_policy_file_service(self, service, argument, file, filepath):
"""Parse a policy file from ``!include-service``"""
file, filepath = self._fix_filepath(file, filepath)
for lineno, line in enumerate(file, start=1):
line = line.strip()
# skip empty lines and comments
if not line or line[0] == "#":
continue
# compatibility substitutions, some may be unspecified and may be
# removed in a future version
line = line.replace("$include:", "!include ")
line = line.replace("$", "@")
line = line.replace(",", " ")
if line.startswith("!"):
directive, *params = line.split()
if directive == "!include":
try:
(included_path,) = params
except ValueError as err:
raise PolicySyntaxError(
filepath, lineno, "invalid number of params"
) from err
self.handle_include_service(
service,
argument,
pathlib.PurePosixPath(included_path),
filepath=filepath,
lineno=lineno,
)
continue
raise PolicySyntaxError(filepath, lineno, "invalid directive")
# this can raise PolicySyntaxError on its own
self.handle_rule(
self.rule_type.from_line_service(
self,
service,
argument,
line,
filepath=filepath,
lineno=lineno,
),
filepath=filepath,
lineno=lineno,
)
return self
[docs]
@abc.abstractmethod
def handle_include(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
"""Handle ``!include`` line when encountered in
:meth:`policy_load_file`.
This method is to be provided by subclass.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def handle_include_dir(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
"""Handle ``!include-dir`` line when encountered in
:meth:`policy_load_file`.
This method is to be provided by subclass.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def handle_include_service(
self,
service,
argument,
included_path: pathlib.PurePosixPath,
*,
filepath,
lineno,
):
"""Handle ``!include-service`` line when encountered in
:meth:`policy_load_file`.
This method is to be provided by subclass.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def handle_rule(self, rule, *, filepath, lineno):
"""Handle a line with a rule.
This method is to be provided by subclass.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def handle_compat40(self, *, filepath, lineno):
"""Handle ``!compat-4.0`` line when encountered in
:meth:`policy_load_file`.
This method is to be provided by subclass.
"""
raise NotImplementedError()
[docs]
class AbstractPolicy(AbstractParser):
"""This class is a parser that accumulates the rules to form policy."""
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
#: list of Rule objects
self.rules: List[Rule] = []
[docs]
def handle_rule(self, rule, *, filepath, lineno):
# pylint: disable=unused-argument
self.rules.append(rule)
[docs]
def evaluate(self, request):
"""Evaluate policy
Returns:
AbstractResolution: For allow or ask resolutions.
Raises:
AccessDenied: when action should be denied unconditionally
"""
rule = self.find_matching_rule(request)
return rule.action.evaluate(request)
[docs]
def find_matching_rule(self, request):
"""Find the first rule matching given request"""
for rule in self.rules:
if rule.is_match(request):
return rule
raise AccessDenied("no matching rule found")
def find_rules_for_service(self, service):
for rule in self.rules:
if rule.service is None or rule.service == service:
yield rule
[docs]
def collect_targets_for_ask(self, request):
"""Collect targets the user can choose from in 'ask' action
Word 'targets' is used intentionally instead of 'domains', because it
can also contains @dispvm like keywords.
"""
targets: Set[str] = set()
info: SystemInfo = request.system_info["domains"]
source = request.source
if source.startswith("uuid:"):
source_uuid, source_name = source, info[source]["name"]
else:
source_uuid, source_name = "uuid:" + info[source]["uuid"], source
# iterate over rules in reversed order to easier handle 'deny'
# actions - simply remove matching domains from allowed set
for rule in reversed(self.rules):
if rule.is_match_but_target(request):
# getattr() is for Deny, which doesn't have this attribute
rule_target = (
getattr(rule.action, "target", None) or rule.target
)
expansion = set()
for potential_target in rule_target.expand(
system_info=request.system_info
):
try:
# The policy agent cannot handle UUIDs (and rightly so,
# those are meaningless to humans). Convert them to
# names. VM names can be reused, but in practice, this
# is unlikely to happen except by user request or DispVM
# name reuse. The latter will not happen for a week and
# so is unlikely to confuse humans.
expansion.add(uuid_to_name(info, potential_target))
except KeyError:
continue
if isinstance(rule.action, Action.deny.value):
targets.difference_update(expansion)
else:
targets.update(expansion)
# expand default DispVM
if "@dispvm" in targets:
targets.remove("@dispvm")
dispvm = DispVM("@dispvm").get_dispvm_template(
source, system_info=request.system_info
)
if dispvm is not None:
targets.add(dispvm)
# expand other keywords
if "@adminvm" in targets:
targets.remove("@adminvm")
targets.add("dom0")
# XXX remove when #951 gets fixed
if source_name in targets:
targets.remove(source_name)
if source_uuid in targets:
targets.remove(source_uuid)
for unwanted_target in targets.copy():
if unwanted_target.startswith("@dispvm:"):
unwanted_target_name = unwanted_target[len("@dispvm:") :]
else:
unwanted_target_name = unwanted_target
if info[unwanted_target_name].get("internal", False):
targets.remove(unwanted_target)
return targets
[docs]
class AbstractFileLoader(AbstractParser):
"""Parser that loads next files on ``!include[-service]`` directives
This class uses regular files as accessed by :py:class:`pathlib.Path`, but
it is possible to overload those functions and use file-like objects.
"""
[docs]
def resolve_path(
self, included_path: pathlib.PurePosixPath
) -> pathlib.Path:
"""Resolve path from ``!include*`` to :py:class:`pathlib.Path`"""
raise NotImplementedError()
[docs]
def resolve_filepath(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
) -> Tuple[TextIO, pathlib.PurePath]:
"""Resolve ``!include[-service]`` to open file and filepath
The callee is responsible for closing the file descriptor.
Raises:
qrexec.exc.PolicySyntaxError: when the path does not point to a file
"""
resolved_included_path: pathlib.Path = self.resolve_path(included_path)
if not resolved_included_path.is_file():
raise exc.PolicySyntaxError(
filepath,
lineno,
"not a file: {}".format(resolved_included_path),
)
# pylint: disable=consider-using-with
return (
open(str(resolved_included_path), encoding="utf-8"),
pathlib.PurePath(resolved_included_path),
)
[docs]
def handle_include(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
file, resolved_included_path = self.resolve_filepath(
included_path, filepath=filepath, lineno=lineno
)
with file:
self.load_policy_file(file, resolved_included_path)
[docs]
def handle_include_service(
self,
service,
argument,
included_path: pathlib.PurePosixPath,
*,
filepath,
lineno,
):
service, argument = validate_service_and_argument(
service, argument, filepath=filepath, lineno=lineno
)
file, resolved_included_path = self.resolve_filepath(
included_path, filepath=filepath, lineno=lineno
)
with file:
self.load_policy_file_service(
service, argument, file, resolved_included_path
)
[docs]
class AbstractDirectoryLoader(AbstractFileLoader):
"""Parser that loads next files on ``!include-dir`` directives"""
[docs]
def resolve_dirpath(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
) -> pathlib.Path:
"""Resolve ``!include-dir`` to directory path
Returns:
pathlib.Path:
Raises:
qrexec.exc.PolicySyntaxError: when the path does not point to
a directory
"""
resolved_included_path = self.resolve_path(included_path)
if not resolved_included_path.is_dir():
raise exc.PolicySyntaxError(
filepath,
lineno,
"not a directory: {}".format(resolved_included_path),
)
return resolved_included_path
[docs]
def handle_include_dir(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
resolved_included_path = self.resolve_dirpath(
included_path, filepath=filepath, lineno=lineno
)
self.load_policy_dir(resolved_included_path)
[docs]
def load_policy_dir(self, dirpath):
"""Load all files in the directory (``!include-dir``)
Args:
dirpath (pathlib.Path): the directory to load
Raises:
OSError: for problems in opening files or directories
"""
for path in filter_filepaths(dirpath.iterdir()):
with path.open() as file:
self.load_policy_file(file, path)
[docs]
class AbstractFileSystemLoader(AbstractDirectoryLoader, AbstractFileLoader):
"""This class is used when policy is stored as regular files in a directory.
Args:
policy_path: Load these directories. Paths given to
``!include`` etc. directives in a file are interpreted relative to
the path from which the file was loaded.
"""
policy_path: Optional[pathlib.Path]
def __init__(
self,
*,
policy_path: Union[
None, pathlib.PurePath, Iterable[pathlib.PurePath]
] = None,
) -> None:
super().__init__()
if policy_path is None:
iterable_policy_paths = [RUNTIME_POLICY_PATH, POLICYPATH]
elif isinstance(policy_path, pathlib.Path):
iterable_policy_paths = [policy_path]
elif isinstance(policy_path, list):
iterable_policy_paths = policy_path
else:
raise TypeError(
"unexpected type of policy path in "
"AbstractFileSystemLoader.__init__!"
)
try:
self.load_policy_dirs(iterable_policy_paths)
except OSError as err:
raise AccessDenied(
"failed to load {} file: {!s}".format(err.filename, err)
) from err
self.policy_path = None
def load_policy_dirs(self, paths: Iterable[pathlib.PurePath]) -> None:
already_seen = set()
final_list = []
for path in paths:
for file_path in filter_filepaths(pathlib.Path(path).iterdir()):
basename = file_path.name
if basename not in already_seen:
already_seen.add(basename)
final_list.append(file_path)
final_list.sort(key=lambda x: x.name)
for file_path in final_list:
with file_path.open() as file:
self.policy_path = file_path.parent
try:
self.load_policy_file(file, file_path)
finally:
self.policy_path = None
[docs]
def resolve_path(
self, included_path: pathlib.PurePosixPath
) -> pathlib.Path:
assert (
self.policy_path is not None
), "Tried to resolve a path when not loading policy"
return (self.policy_path / included_path).resolve()
[docs]
class FilePolicy(AbstractFileSystemLoader, AbstractPolicy):
"""Full policy loaded from files.
Usage:
>>> policy = qrexec.policy.parser.FilePolicy()
>>> request = Request(
... 'qrexec.Service', '+argument', 'source-name', 'target-name',
... system_info=qrexec.utils.get_system_info())
>>> resolution = policy.evaluate(request)
>>> await resolution.execute('process-ident') # asynchroneous method
"""
def handle_compat40(self, *, filepath, lineno):
""""""
# late import for circular
from .parser_compat import Compat40Loader
subparser = Compat40Loader(master=self)
subparser.execute(filepath=filepath, lineno=lineno)
[docs]
class ValidateParser(FilePolicy):
"""
A parser that validates the policy directory along with proposed changes.
Pass files to be overriden in the ``overrides`` dictionary, with either
new content, or None if the file is to be deleted.
"""
def __init__(
self,
*,
overrides: Dict[pathlib.Path, Optional[str]],
policy_path: Union[
None, pathlib.PurePath, Iterable[pathlib.PurePath]
] = None,
) -> None:
self.overrides = overrides
super().__init__(policy_path=policy_path)
def load_policy_dirs(self, paths: Iterable[pathlib.PurePath]) -> None:
assert len(paths) == 1
(path,) = paths
self.policy_path = path
self.load_policy_dir(path)
[docs]
def load_policy_dir(self, dirpath: pathlib.Path) -> None:
for path in filter_filepaths(dirpath.iterdir()):
if path not in self.overrides:
with path.open() as file:
self.load_policy_file(file, path)
for path, content in self.overrides.items():
if path.parent == dirpath and content is not None:
self.load_policy_file(io.StringIO(content), path)
[docs]
def resolve_filepath(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
) -> Tuple[TextIO, pathlib.PurePath]:
path = self.resolve_path(included_path)
if path in self.overrides:
if self.overrides[path] is None:
raise exc.PolicySyntaxError(
filepath,
lineno,
"including a file that will be removed: {}".format(path),
)
return io.StringIO(self.overrides[path]), path
return super().resolve_filepath(
included_path, filepath=filepath, lineno=lineno
)
[docs]
def handle_rule(self, rule, *, filepath, lineno):
pass
class ToposortMixIn:
"""A helper for topological sorting the policy files"""
# pylint can't deal with mixins
# pylint: disable=no-member
@enum.unique
class State(enum.Enum):
"""State of topological sort algorithm"""
ON_PATH, IN_ORDER = object(), object()
def __init__(self, **kwds):
self.included_paths = collections.defaultdict(set)
super().__init__(**kwds)
# keys and values are paths to files
self.state = {}
self.order = []
self.queue = None
def _path_to_key(self, path):
assert isinstance(path, pathlib.PurePosixPath)
try:
path = path.relative_to(self.policy_path)
except AttributeError:
# no self.policy_path
pass
except ValueError:
# not in self.policy_path
pass
return str(path)
def toposort(self):
"""Yield (file, filename) in order suitable for mass-uploading.
A file does not include anything from any file that follows in the
sequence.
*file* is an open()'d file for reading.
"""
if not self.order:
self.queue = set(self.included_paths.keys())
self.queue.update(itertools.chain(self.included_paths.values()))
while self.queue:
self.dfs(self.queue.pop())
for path in self.order:
yield self.resolve_filepath(path, filepath=None, lineno=None)
def dfs(self, node):
"""Perform one batch of topological sort"""
self.state[node] = self.State.ON_PATH
for nextnode in self.included_paths[node]:
if self.state[nextnode] == self.State.ON_PATH:
raise ValueError(
"circular include; {} → {}".format(
node.filepath, nextnode.filepath
)
)
if self.state[nextnode] == self.State.IN_ORDER:
continue
self.queue.discard(nextnode)
self.dfs(nextnode)
self.order.append(node)
self.state[node] = self.State.IN_ORDER
def save_included_path(self, included_path, *, filepath, lineno):
"""Store the vertex in the dependency graph.
Only paths inside :py:attr:`policy_path` and ``include`` directory
(as supported by Policy API) are considered.
"""
key = self._path_to_key(included_path)
if "/" in key and (
not key.startswith("include/") or key.count("/") > 1
):
raise PolicySyntaxError(
filepath,
lineno,
"invalid path {}, only paths inside the directories "
"{policypath} and {policypath}/include are considered".format(
included_path, policypath=POLICYPATH
),
)
self.included_paths[key].add(included_path)
def handle_include(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
# pylint: disable=missing-docstring
logging.debug(
"Toposorter.handle_include(included_path=%r, filepath=%r)",
included_path,
filepath,
)
self.save_included_path(included_path, filepath=filepath, lineno=lineno)
super().handle_include( # type: ignore
included_path, filepath=filepath, lineno=lineno
)
def handle_include_service(
self,
service,
argument,
included_path: pathlib.PurePosixPath,
*,
filepath,
lineno,
):
# pylint: disable=missing-docstring
logging.debug(
"Toposorter.handle_include_service(included_path=%r, filepath=%r)",
included_path,
filepath,
)
self.save_included_path(included_path, filepath=filepath, lineno=lineno)
super().handle_include_service( # type: ignore
service, argument, included_path, filepath=filepath, lineno=lineno
)
def load_policy_file(self, file, filepath):
# pylint: disable=missing-docstring,expression-not-assigned
# add filepath as seen
self.included_paths[self._path_to_key(filepath)]
super().load_policy_file(file, filepath)
[docs]
class StringLoader(AbstractFileLoader):
"""An in-memory loader used for tests
Args:
policy (dict or str): policy dictionary. The keys are filenames to be
included. It should contain ``'__main__'`` key which is loaded. If
the argument is :py:class:`str`, it behaves as it was dict's
``'__main__'``.
"""
def __init__(self, *args, policy, **kwds):
super().__init__(*args, **kwds)
self.policy = policy
[docs]
def resolve_filepath(
self,
included_path,
*,
filepath,
lineno,
) -> Tuple[TextIO, pathlib.PurePath]:
"""
Raises:
qrexec.exc.PolicySyntaxError: when wrong path is included
"""
included_path = str(included_path)
try:
file = io.StringIO(self.policy[included_path])
except KeyError as err:
raise exc.PolicySyntaxError(
filepath,
lineno,
"no such policy file: {!r}".format(included_path),
) from err
return file, pathlib.PurePosixPath(included_path + "[in-memory]")
[docs]
def handle_include_dir(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
raise NotImplementedError(
"!include-dir is unsupported in {}".format(type(self).__name__)
)
[docs]
class StringPolicy(ToposortMixIn, StringLoader, AbstractPolicy):
"""String policy, used for tests and loading single files as policy. It
can be used to test most of the code paths used in policy parsing.
>>> testpolicy = StringPolicy(policy={
... '__main__': '!include policy2'
... 'policy2': '* * @anyvm @anyvm allow'})
"""
def __init__(self, *, policy, policy_compat=None, **kwds):
if not isinstance(policy, collections.abc.Mapping):
policy = {"__main__": policy}
super().__init__(policy=policy, **kwds)
if policy_compat is None:
policy_compat = {}
self.policy_compat = policy_compat
file, filepath = self.resolve_filepath(
"__main__", filepath=None, lineno=None
)
with file:
self.load_policy_file(file, filepath)
def handle_compat40(self, *, filepath, lineno):
""""""
# late import for circular
from .parser_compat import TestCompat40Loader
subparser = TestCompat40Loader(master=self, policy=self.policy_compat)
subparser.execute(filepath=filepath, lineno=lineno)