Source code for qubesadmin.tools

# encoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015  Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2015  Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

'''Qubes' command line tools
'''

from __future__ import print_function

import argparse
import fnmatch
import importlib
import importlib.metadata
import logging
import os
import subprocess
import sys
from argparse import Namespace
from collections.abc import Sequence

import qubesadmin.log
import qubesadmin.exc
import qubesadmin.vm

[docs] class QubesAction(argparse.Action): """ Custom Action for Qubes """
[docs] def parse_qubes_app(self, parser, namespace): """ This method is called by :py:class:`qubes.tools.QubesArgumentParser` after `namespace.app` is instantiated. Used to initialize values based on `namespace.app`. """ raise NotImplementedError
[docs] class PropertyAction(argparse.Action): """Action for argument parser that stores a property. Format: `--<option_name> property=value` """ # pylint: disable=redefined-builtin def __init__(self, option_strings: list[str], dest: str, *, metavar: str='NAME=VALUE', required: bool=False, help: str='set property to a value'): super().__init__(option_strings, 'properties', metavar=metavar, help=help, required=required) def __call__(self, parser: argparse.ArgumentParser, namespace: Namespace, values: str | Sequence | None, option_string: str | None=None) -> None: if not isinstance(values, str): parser.error(f'Invalid property token: {values!r}') try: prop, value = values.split('=', 1) except ValueError: parser.error('Invalid property token: {!r}'.format(values)) properties = getattr(namespace, self.dest) if properties is None: properties = {} properties[prop] = value setattr(namespace, self.dest, properties)
[docs] class SinglePropertyAction(argparse.Action): """Action for argument parser that stores a property. Format: `--property_name value` or `--property_name` """ # pylint: disable=redefined-builtin def __init__(self, option_strings, dest, *, metavar: str='VALUE', const: object=None, nargs: int | str | None=None, required: bool=False, help: str | None=None): if help is None: help = 'set {!r} property to a value'.format(dest) if const is not None: help += ' {!r}'.format(const) if const is not None and nargs is None: nargs = argparse.OPTIONAL super().__init__(option_strings, 'properties', required=required, metavar=metavar, help=help, const=const, nargs=nargs) self.name = dest def __call__(self, parser: argparse.ArgumentParser, namespace: Namespace, values: str | Sequence | None, option_string: str | None=None)\ -> None: if not values: # If we omit the argument and it's optional, __call__ will # not be called. # If we omit it and it's positional, `values` will be either # `None` or `[]` depending on `nargs`. # We don't want to modify `propertie` in either case, unless # `const` is set. if self.const is None or self.const == []: return values = self.const properties = getattr(namespace, self.dest) if properties is None: properties = {} properties[self.name] = values or self.const setattr(namespace, self.dest, properties)
[docs] class VmNameAction(QubesAction): """ Action for parsing one or multiple domains from provided VMNAMEs """ # pylint: disable=redefined-builtin def __init__(self, option_strings, nargs=1, dest='vmnames', help=None, **kwargs): if help is None: if nargs == argparse.OPTIONAL: help = 'at most one domain name' elif nargs == 1: help = 'a domain name' elif nargs == argparse.ZERO_OR_MORE: help = 'zero or more domain names' elif nargs == argparse.ONE_OR_MORE: help = 'one or more domain names' elif nargs > 1: help = '%s domain names' % nargs else: raise argparse.ArgumentError( nargs, "Passed unexpected value {!s} as {!s} nargs ".format( nargs, dest)) super().__init__(option_strings, dest=dest, help=help, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): ''' Set ``namespace.vmname`` to ``values`` ''' setattr(namespace, self.dest, values)
[docs] def parse_qubes_app(self, parser, namespace): ''' Set ``namespace.domains`` to ``values`` ''' # pylint: disable=too-many-nested-blocks assert hasattr(namespace, 'app') setattr(namespace, 'domains', []) app = namespace.app if hasattr(namespace, 'all_domains') and namespace.all_domains: namespace.domains = [ vm for vm in app.domains if not vm.klass == 'AdminVM' and vm.name not in namespace.exclude ] else: if hasattr(namespace, 'exclude') and namespace.exclude: parser.error('--exclude can only be used with --all') if self.nargs == argparse.OPTIONAL: if getattr(namespace, 'dispvm', None) is not None: return vm_name = getattr(namespace, self.dest, None) if vm_name is not None: try: namespace.domains += [app.domains[vm_name]] except KeyError: parser.error('no such domain: {!r}'.format(vm_name)) else: destinations = set() for destination in getattr(namespace, self.dest): if any(wildcard in destination for wildcard in '*?[!]'): for domain in app.domains: if fnmatch.fnmatch(domain.name, destination): destinations.add(domain.name) else: destinations.add(destination) for vm_name in destinations: try: namespace.domains += [app.domains[vm_name]] except KeyError: parser.error('no such domain: {!r}'.format(vm_name))
[docs] class RunningVmNameAction(VmNameAction): ''' Action for argument parser that gets a running domain from VMNAME ''' def __init__(self, option_strings, nargs=1, dest='vmnames', help=None, **kwargs): # pylint: disable=redefined-builtin if help is None: if nargs == argparse.OPTIONAL: help = 'at most one running domain' elif nargs == 1: help = 'running domain name' elif nargs == argparse.ZERO_OR_MORE: help = 'zero or more running domains' elif nargs == argparse.ONE_OR_MORE: help = 'one or more running domains' elif nargs > 1: help = '%s running domains' % nargs else: raise argparse.ArgumentError( nargs, "Passed unexpected value {!s} as {!s} nargs ".format( nargs, dest)) super().__init__( option_strings, dest=dest, help=help, nargs=nargs, **kwargs)
[docs] def parse_qubes_app(self, parser, namespace): super().parse_qubes_app(parser, namespace) for vm in namespace.domains: if not vm.is_running(): parser.error_runtime("domain {!r} is not running".format( vm.name))
[docs] class VMVolumeAction(QubesAction): ''' Action for argument parser that gets the :py:class:``qubes.storage.Volume`` from a VM:VOLUME string. ''' def __init__(self, help='A pool & volume id combination', required=True, **kwargs): # pylint: disable=redefined-builtin super().__init__(help=help, required=required, **kwargs) def __call__(self, parser, namespace, values, option_string=None): ''' Set ``namespace.vmname`` to ``values`` ''' setattr(namespace, self.dest, values)
[docs] def parse_qubes_app(self, parser, namespace): ''' Acquire the :py:class:``qubes.storage.Volume`` object from ``namespace.app``. ''' assert hasattr(namespace, 'app') app = namespace.app try: vm_name, vol_name = getattr(namespace, self.dest).split(':') try: vm = app.domains[vm_name] try: volume = vm.volumes[vol_name] setattr(namespace, self.dest, volume) except KeyError: parser.error_runtime('vm {!r} has no volume {!r}'.format( vm_name, vol_name)) except KeyError: parser.error_runtime('no vm {!r}'.format(vm_name)) except ValueError: parser.error('expected a vm & volume combination like foo:bar')
[docs] class PoolsAction(QubesAction): ''' Action for argument parser to gather multiple pools ''' def __call__(self, parser, namespace, values, option_string=None): ''' Set ``namespace.vmname`` to ``values`` ''' if hasattr(namespace, self.dest) and getattr(namespace, self.dest): names = getattr(namespace, self.dest) else: names = [] names += [values] setattr(namespace, self.dest, names)
[docs] def parse_qubes_app(self, parser, namespace): app = namespace.app pool_names = getattr(namespace, self.dest) if pool_names: try: pools = [app.pools[name] for name in pool_names] setattr(namespace, self.dest, pools) except qubesadmin.exc.QubesException as e: parser.error(str(e)) sys.exit(2) except KeyError: parser.error('No such pools: %s' % pool_names) sys.exit(2)
[docs] class QubesArgumentParser(argparse.ArgumentParser): '''Parser preconfigured for use in most of the Qubes command-line tools. :param mixed vmname_nargs: The number of ``VMNAME`` arguments that should be consumed. Values include: * N (an integer) consumes N arguments (and produces a list) * '?' consumes zero or one arguments * '*' consumes zero or more arguments (and produces a list) * '+' consumes one or more arguments (and produces a list) :param show_forceroot: don't hide --force-root parameter, prevent running as root unless it is given *kwargs* are passed to :py:class:`argparser.ArgumentParser`. Currenty supported options: ``--force-root`` (optional, ignored, help is suppressed) ``--offline-mode`` do not talk to hypervisor (help is suppressed) ``--verbose`` and ``--quiet`` Calling program should set the ``version`` argument for ``--version`` option The default is extracted from `qubesadmin` package information. Setting ``version`` argument to '' will disable ``--version`` option. ''' def __init__(self, vmname_nargs=None, show_forceroot=False, version=None, \ **kwargs): super().__init__(add_help=False, **kwargs) self._vmname_nargs = vmname_nargs self.add_argument('--verbose', '-v', action='count', help='increase verbosity') self.add_argument('--quiet', '-q', action='count', help='decrease verbosity') if show_forceroot: self.add_argument( '--force-root', action='store_true', default=False, help="Force running the tool even if called as root") else: self.add_argument('--force-root', action='store_true', default=False, help=argparse.SUPPRESS) self._complain_if_root = show_forceroot self.add_argument('--help', '-h', action=SubParsersHelpAction, help='show this help message and exit') if version is not None: self.version = version else: _metadata_ = importlib.metadata.metadata('qubesadmin') self.version = '{} ({}) {}'.format(os.path.basename(sys.argv[0]), \ _metadata_['summary'], _metadata_['version']) self.version += '\nCopyright (C) {}'.format(_metadata_['author']) self.version += '\nLicense: {}'.format(_metadata_['license']) if self.version != '': self.add_argument('--version', action='version') if self._vmname_nargs in [argparse.ZERO_OR_MORE, argparse.ONE_OR_MORE]: vm_name_group = VmNameGroup(self, required=(self._vmname_nargs not in [argparse.ZERO_OR_MORE, argparse.OPTIONAL])) self._mutually_exclusive_groups.append(vm_name_group) elif self._vmname_nargs is not None: self.add_argument('VMNAME', nargs=self._vmname_nargs, action=VmNameAction) self.set_defaults(verbose=1, quiet=0)
[docs] def parse_args(self, *args, **kwargs): # pylint: disable=arguments-differ,signature-differs # hack for tests app = kwargs.pop('app', None) namespace = super().parse_args(*args, **kwargs) if self._complain_if_root and \ os.getuid() == 0 and \ not namespace.force_root: self.error('refusing to run as root; add --force-root to override') self.set_qubes_verbosity(namespace) if app is not None: namespace.app = app else: namespace.app = qubesadmin.Qubes() for action in self._actions: # pylint: disable=protected-access if isinstance(action, QubesAction): action.parse_qubes_app(self, namespace) elif isinstance(action, argparse._SubParsersAction): # pylint: disable=no-member assert hasattr(namespace, 'command') command = namespace.command if command is None: continue subparser = action._name_parser_map[command] for subaction in subparser._actions: if isinstance(subaction, QubesAction): subaction.parse_qubes_app(self, namespace) return namespace
[docs] def error_runtime(self, message, exit_code=1): '''Runtime error, without showing usage. :param str message: message to show ''' self.exit(exit_code, '{}: error: {}\n'.format(self.prog, message))
[docs] @staticmethod def get_loglevel_from_verbosity(namespace): ''' Return loglevel calculated from quiet and verbose arguments ''' return (namespace.quiet - namespace.verbose) * 10 + logging.WARNING
[docs] @staticmethod def set_qubes_verbosity(namespace): '''Apply a verbosity setting. This is done by configuring global logging. :param argparse.Namespace args: args as parsed by parser ''' verbose = namespace.verbose - namespace.quiet if verbose >= 2: qubesadmin.log.enable_debug() elif verbose >= 1: qubesadmin.log.enable()
[docs] def print_error(self, *args, **kwargs): """ Print to ``sys.stderr``""" print("Error:", *args, file=sys.stderr, **kwargs)
[docs] class SubParsersHelpAction(argparse._HelpAction): ''' Print help for all options and all subparsers ''' # source https://stackoverflow.com/a/24122778 # pylint: disable=protected-access @staticmethod def _indent(indent, text): '''Indent *text* by *indent* spaces''' return '\n'.join((' ' * indent) + l for l in text.splitlines()) def __call__(self, parser, namespace, values, option_string=None): parser.print_help() # retrieve subparsers from parser subparsers_actions = [ action for action in parser._actions if isinstance(action, argparse._SubParsersAction)] # there will probably only be one subparser_action, # but better save than sorry for subparsers_action in subparsers_actions: # get all subparsers and print help for pseudo_action in subparsers_action._choices_actions: choice = pseudo_action.dest.split(' ', 1)[0] subparser = subparsers_action.choices[choice] print("\nCommand '{}':".format(choice)) choice_help = subparser.format_usage() choice_help = self._indent(2, choice_help) print(choice_help) parser.exit()
[docs] class AliasedSubParsersAction(argparse._SubParsersAction): '''SubParser with support for action aliases''' # source https://gist.github.com/sampsyo/471779 # pylint: disable=protected-access,missing-docstring class _AliasedPseudoAction(argparse.Action): # pylint: disable=redefined-builtin def __init__(self, name, aliases, help): dest = name if aliases: dest += ' (%s)' % ','.join(aliases) super().__init__(option_strings=[], dest=dest, help=help) def __call__(self, parser, namespace, values, option_string=None): pass
[docs] def add_parser(self, name, **kwargs): if 'aliases' in kwargs: aliases = kwargs['aliases'] del kwargs['aliases'] else: aliases = [] local_parser = super().add_parser(name, **kwargs) # Make the aliases work. for alias in aliases: self._name_parser_map[alias] = local_parser # Make the help text reflect them, first removing old help entry. if 'help' in kwargs: self._choices_actions.pop() pseudo_action = self._AliasedPseudoAction(name, aliases, kwargs.pop('help')) self._choices_actions.append(pseudo_action) return local_parser
[docs] def get_parser_for_command(command): '''Get parser for given qvm-tool. :param str command: command name :rtype: argparse.ArgumentParser :raises ImportError: when command's module is not found :raises AttributeError: when parser was not found ''' module = importlib.import_module( '.' + command.replace('-', '_'), 'qubesadmin.tools') try: parser = module.parser except AttributeError: try: parser = module.get_parser() except AttributeError: raise AttributeError('cannot find parser in module') return parser
# pylint: disable=protected-access
[docs] class VmNameGroup(argparse._MutuallyExclusiveGroup): ''' Adds an a VMNAME, --all & --exclude parameters to a :py:class:``argparse.ArgumentParser```. ''' def __init__(self, container, required, vm_action=VmNameAction, help=None): # pylint: disable=redefined-builtin super().__init__(container, required=required) if not help: help = 'perform the action on all qubes' self.add_argument('--all', action='store_true', dest='all_domains', help=help) container.add_argument('--exclude', action='append', default=[], help='exclude the qube from --all') # ⚠ the default parameter below is important! ⚠ # See https://stackoverflow.com/questions/35044288 and # `argparse.ArgumentParser.parse_args()` implementation self.add_argument('VMNAME', action=vm_action, nargs='*', default=[])