Source code for qrexec.policy.parser_compat

# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2019  Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.

"""This module is transitional and may go away any time.

.. autofunction:: walk_compat_files

.. autoclass:: Compat40Loader
   :members:
   :member-order: bysource

.. autoclass:: Compat40Parser
   :members:
   :member-order: bysource

.. autoclass:: TestCompat40Loader
   :members:
   :member-order: bysource
"""

import abc
import collections
import functools
import logging
import pathlib

from .. import POLICYPATH_OLD
from ..exc import PolicySyntaxError
from . import parser

IGNORED_SUFFIXES = (".rpmsave", ".rpmnew", ".swp")


@functools.total_ordering
class _NoArgumentLastKey:
    def __init__(self, arg):
        self.arg = arg

    def __eq__(self, other):
        return self.arg == other.arg

    def __lt__(self, other):
        if self.arg == "*":
            return False
        if other.arg == "*":
            return True
        return self.arg < other.arg


def _sorted_compat_files(filepaths):
    services = collections.defaultdict(dict)

    for filepath in filepaths:
        service, argument = parser.parse_service_and_argument(
            filepath, no_arg="*"
        )
        services[service][argument] = filepath

    for service in sorted(services):
        for argument in sorted(services[service], key=_NoArgumentLastKey):
            yield service, argument, services[service][argument]


def _list_compat_files(legacy_path):
    for filepath in legacy_path.iterdir():
        if not filepath.is_file():
            logging.info("ignoring %s (not a file)", filepath)
            continue

        if filepath.suffix in IGNORED_SUFFIXES:
            logging.info("ignoring %s (ignored suffix)", filepath)
            continue

        if filepath.name.startswith("."):
            logging.info("ignoring %s (dotfile)", filepath)
            continue

        invalid_chars = parser.get_invalid_characters(filepath.name)
        if invalid_chars:
            logging.info(
                "ignoring %s (invalid characters: %r)", filepath, invalid_chars
            )
            continue

        yield filepath


[docs] def walk_compat_files(legacy_path=POLICYPATH_OLD): """Walks files in correct order for generating compat policy. Args: legacy_path (pathlib.Path): base path for legacy policy Yields: (service, argument, filepath) """ yield from _sorted_compat_files(_list_compat_files(legacy_path))
[docs] class Compat40Parser(parser.AbstractDirectoryLoader, parser.AbstractFileLoader): """Abstract parser for compat policy. Needs :py:func:`walk_includes`. Args: master (qrexec.policy.parser.AbstractPolicyParser): the parser that will handle all the syntax parsed from the legacy policy """ def __init__(self, *, master, **kwds): super().__init__(**kwds) self.master = master
[docs] @abc.abstractmethod def walk_includes(self): """An iterator that walks over all files to be included via ``!compat-4.0`` statement. Yields: (service, argument, filepath) """ raise NotImplementedError()
[docs] def execute(self, *, filepath, lineno): """Insert the policy into :py:attr:`master` parser.""" for service, argument, path in self.walk_includes(): self.handle_include_service( service, argument, path, filepath=filepath, lineno=lineno ) # After each file describing particular argument we add deny lines, # which were implicit. After non-specific we don't do that so the # default policy will not be shadowed. if argument != "*": self.handle_rule( self.rule_type.from_line_service( self, service, argument, "@anyvm @anyvm deny", filepath=path, lineno=None, ), filepath=path, lineno=None, ) self.handle_rule( self.rule_type.from_line_service( self, service, argument, "@anyvm @adminvm deny", filepath=path, lineno=None, ), filepath=path, lineno=None, )
def handle_rule(self, rule, *, filepath, lineno): """""" return self.master.handle_rule(rule, filepath=filepath, lineno=lineno) def collect_targets_for_ask(self, request): return self.master.collect_targets_for_ask(request) def load_policy_file(self, file, filepath): """""" raise RuntimeError("this method should not be called") def handle_compat40(self, *, filepath, lineno): """""" raise PolicySyntaxError( filepath, lineno, "!compat-4.0 is not recursive" )
[docs] class Compat40Loader(Compat40Parser): """This parser should be used as helper for executing compatibility statement: >>> class MyParser(qrexec.policy.parser.AbstractPolicyParser): ... def handle_compat40(self, *, filepath, lineno): ... subparser = Compat40Parser(master=self) ... subparser.execute(filepath=filepath, lineno=lineno) """ def __init__(self, *, legacy_path=None, **kwds): super().__init__(**kwds) if legacy_path is None: legacy_path = POLICYPATH_OLD self.legacy_path = pathlib.Path(legacy_path) def resolve_path(self, included_path): """""" return (self.legacy_path / included_path).resolve() def walk_includes(self): """""" yield from walk_compat_files(self.legacy_path)
[docs] class TestCompat40Loader(Compat40Loader, parser.StringLoader): """Used for tests. See :py:class:`qrexec.policy.parser.StringPolicy`.""" def walk_includes(self): """""" yield from _sorted_compat_files(self.policy)