Source code for qubesadmin.tools.qvm_device

# encoding=utf-8

#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
# Copyright (C) 2016 Marek Marczykowski-Górecki
#                              <marmarek@invisiblethingslab.com>
# Copyright (C) 2024 Piotr Bartman-Szwarc <prbartman@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Qubes volume and block device management"""

import argparse
import os
import sys

import qubesadmin
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.device_protocol
from qubesadmin.device_protocol import (
    Port,
    DeviceInfo,
    UnknownDevice,
    DeviceAssignment,
    VirtualDevice,
    DeviceInterface,
    ProtocolError,
)


[docs] def prepare_table(dev_list, with_sbdf=False): """Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a list of tuples for the :py:func:`qubes.tools.print_table`. If :program:`qvm-devices` is running in a TTY, it will omit duplicate data. :param iterable dev_list: List of :py:class:`qubes.devices.DeviceInfo` objects. :param bool with_sbdf: when True, include SBDF identifier of PCI device :returns: list of tuples """ output = [] header = [] if sys.stdout.isatty(): if with_sbdf: header += [("BACKEND:DEVID", "SBDF", "DESCRIPTION", "USED BY")] else: header += [("BACKEND:DEVID", "DESCRIPTION", "USED BY")] for line in dev_list: if with_sbdf: output += [ ( line.ident, line.sbdf, line.description, str(line.assignments), ) ] else: output += [ ( line.ident, line.description, str(line.assignments), ) ] return header + sorted(output)
[docs] class Line: """Helper class to hold single device info for listing""" def __init__(self, device: DeviceInfo, assignment=False): self.ident = "{!s}:{!s}".format(device.backend_domain, device.port_id) self.description = device.description self.assignment = assignment self.frontends = [] self.sbdf = getattr(device, "data", {}).get("sbdf") @property def assignments(self): """list of frontends the device is assigned to""" fronts = ( f'{"*" if self.assignment else ""}' + front for front in self.frontends ) return ", ".join(fronts)
[docs] def list_devices(args): """ Called by the parser to execute the qubes-devices list subcommand.""" domains = args.domains if hasattr(args, "domains") else None if args.devclass != "pci": args.with_sbdf = False lines = _load_lines(args.app, domains, args.devclass, actual_devices=True) lines = list(lines.values()) # short command without (list/ls) should print just existing devices # short command is not a great place for introducing listing specific flags if getattr(args, "assignments", False): # we need to check assignments for all domains since # selected vm can be mentioned there as backend extra_lines = _load_lines( args.app, [], args.devclass, actual_devices=False ) lines += list(extra_lines.values()) qubesadmin.tools.print_table( prepare_table(lines, with_sbdf=getattr(args, "with_sbdf", False)) )
def _load_lines(app, domains, devclass, actual_devices: bool): # pylint: disable=missing-function-docstring devices = _load_devices(app, domains, devclass, actual_devices) result = {dev: Line(dev, not actual_devices) for dev in devices} for dev in result: for vm in app.domains: frontends = _load_frontends_info(vm, dev, devclass, actual_devices) result[dev].frontends.extend(frontends) return result def _load_devices(app, domains, devclass, actual_devices): """ Loads device exposed or connected to given domains. If `domains` is empty/`None` load all devices. If `actual_devices` is True only devices currently present will be included, otherwise only device assignments """ devices = set() if domains: ignore_errors = False else: ignore_errors = True domains = app.domains try: for vm in domains: try: if actual_devices: for ass in vm.devices[devclass].get_attached_devices(): devices.add(ass.device) for dev in vm.devices[devclass].get_exposed_devices(): devices.add(dev) else: for ass in vm.devices[devclass].get_assigned_devices(): devices.add(ass.virtual_device) except qubesadmin.exc.QubesVMNotFoundError: if ignore_errors: continue raise except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesException( "Failed to list '%s' devices, this device type either " "does not exist or you do not have access to it.", devclass, ) return devices def _load_frontends_info(vm, dev, devclass, actual_devices): """ Returns string of vms to which a device is connected or `None`. """ if vm == dev.backend_domain: return try: if actual_devices: for assignment in vm.devices[devclass].get_attached_devices(): if dev in assignment.devices: yield _frontend_desc(vm, assignment) else: for assignment in vm.devices[devclass].get_assigned_devices(): if assignment.matches(dev): yield _frontend_desc(vm, assignment, virtual=True) except qubesadmin.exc.QubesVMNotFoundError: pass def _frontend_desc(vm, assignment, virtual=False): """ Generate description of frontend vm with optional device connection options. """ mode = assignment.mode.value if not virtual: mode = "attached" if assignment.options: return "{!s} ({}: {})".format( vm, mode, ", ".join( "{}={}".format(key, value) for key, value in assignment.options.items() ), ) return f"{vm} ({mode})"
[docs] def attach_device(args): """Called by the parser to execute the :program:`qvm-devices attach` subcommand. """ vm = args.domains[0] device = args.device assignment = DeviceAssignment( device, # backward compatibility mode="required" if args.required else "manual", ) options = dict(opt.split("=", 1) for opt in args.option or []) if args.ro: options["read-only"] = "yes" parse_ro_option_as_read_only(options) assignment.options = options try: try: dev = assignment.device if isinstance(dev, UnknownDevice): raise qubesadmin.exc.QubesException( "Unknown device, skipping attachment of device from the " f"port {assignment}") except ProtocolError as exc: raise qubesadmin.exc.QubesException(str(exc)) if not assignment.matches(dev): raise qubesadmin.exc.QubesException( "Unrecognized identity, skipping attachment of device " f"from the port {assignment}" ) if isinstance(dev, UnknownDevice): raise qubesadmin.exc.QubesException( f"{device.devclass} device not recognized " f"at {device.port_id} port." ) vm.devices[args.devclass].attach(assignment) except qubesadmin.exc.QubesException as exc: # backward compatibility # if `--persistent` we ignore if attachment fails, # we want at least assign device if not args.required: raise exc # backward compatibility if args.required: vm.devices[args.devclass].assign(assignment)
[docs] def parse_ro_option_as_read_only(options): """ For backward compatibility. Read-only option could be represented as `--ro`, `-o read-only=yes` or `-o ro=True` etc. """ if "ro" in options.keys(): if options["ro"].lower() in ("1", "true", "yes"): options["read-only"] = "yes" del options["ro"] elif options["ro"].lower() in ("0", "false", "no"): options["read-only"] = "no" del options["ro"] else: raise ValueError( f"Unknown `read-only` option value: {options['ro']}" )
[docs] def detach_device(args): """Called by the parser to execute the :program:`qvm-devices detach` subcommand. """ vm = args.domains[0] device = args.device if device and device.port_id != "*": assignment = DeviceAssignment(device) try: actual_dev = assignment.device except ProtocolError as exc: raise qubesadmin.exc.QubesException(str(exc)) if not assignment.matches(actual_dev): raise qubesadmin.exc.QubesException(f"{device} is not attached.") if isinstance(actual_dev, UnknownDevice): raise qubesadmin.exc.QubesException(f"{device} not found.") vm.devices[args.devclass].detach(assignment) elif args.device: assignment = DeviceAssignment(device) no_device_detached = True for ass in vm.devices[args.devclass].get_attached_devices(): if assignment.matches(ass.device): vm.devices[args.devclass].detach(ass) no_device_detached = False if no_device_detached: raise qubesadmin.exc.QubesException(f"{device} is not attached.") else: for ass in vm.devices[args.devclass].get_attached_devices(): vm.devices[args.devclass].detach(ass)
[docs] def assign_device(args): """Called by the parser to execute the :program:`qvm-devices assign` subcommand. """ vm = args.domains[0] device = args.device if args.only_port: device = device.clone(device_id="*") elif device.device_id == UnknownDevice(device.port).device_id: raise qubesadmin.exc.QubesException( f"backend vm {device.backend_name} doesn't expose " f"{device.devclass} device {device.port_id!r}" ) if args.only_device: device = device.clone( port=Port(device.backend_domain, "*", device.devclass) ) options = dict(opt.split("=", 1) for opt in args.option or []) if args.ro: options["read-only"] = "yes" parse_ro_option_as_read_only(options) mode = "auto-attach" if args.required: mode = "required" if args.ask: mode = "ask-to-attach" assignment = DeviceAssignment(device, mode=mode, options=options) vm.devices[args.devclass].assign(assignment) # retrieve current port info assignment = DeviceAssignment( args.device, mode=mode, options=options, frontend_domain=vm ) if is_on_deny_list(args.device, vm) and not args.quiet: print( "Warning: The assigned device is on the denied list." "\n Auto-attach will work, " "but make sure that the assignment is correct." ) if vm.is_running() and not args.quiet and device.devclass != "pci": _print_attach_hint(assignment, vm)
def _build_options_str(options): """Build CLI option flags string from assignment options dict.""" parts = [f"-o {key}={value}" for key, value in options.items()] return (" " + " ".join(parts)) if parts else "" def _print_attach_hint(assignment, vm): # pylint: disable=missing-function-docstring attached = vm.devices[assignment.devclass].get_attached_devices() options_str = _build_options_str(assignment.options) ports = [ f"\tqvm-{assignment.devclass} attach{options_str} {vm} " f"{assignment.backend_domain}:{dev.port_id}" for dev in assignment.devices if dev not in attached and not isinstance(dev, UnknownDevice) ] if ports: print( "Assigned. To attach you can now restart domain or run: \n" + "\n".join(ports) )
[docs] def is_on_deny_list(device, dest_vm): """ Checks if *any* interface of the device is on the deny list for `dest_vm` vm. """ deny_list = [] try: deny_list = DeviceInterface.from_str_bulk(dest_vm.devices_denied) except (qubesadmin.exc.QubesValueError, qubesadmin.exc.QubesPropertyAccessError, AttributeError): pass # check if any presented interface is on deny list for interface in deny_list: for devint in device.interfaces: if interface.matches(devint): return True return False
[docs] def unassign_device(args): """Called by the parser to execute the :program:`qvm-devices unassign` subcommand. """ vm = args.domains[0] if args.device: device = args.device if args.only_port: device = device.clone(device_id="*") if args.only_device: device = device.clone( port=Port(device.backend_domain, "*", device.devclass) ) assignment = DeviceAssignment(device, frontend_domain=vm) _unassign_and_show_message(assignment, vm, args) else: for assignment in vm.devices[args.devclass].get_assigned_devices(): _unassign_and_show_message(assignment, vm, args)
def _unassign_and_show_message(assignment, vm, args): """ Helper for informing a user. """ vm.devices[args.devclass].unassign(assignment) attached = vm.devices[assignment.devclass].get_attached_devices() ports = [ f"\tqvm-{assignment.devclass} detach {vm} " f"{assignment.backend_domain}:{dev.port_id}" for dev in assignment.devices if dev in attached ] if ports and not args.quiet: print( "Unassigned. To detach you can now restart domain or run: \n", "\n".join(ports), )
[docs] def info_device(args): """Called by the parser to execute the :program:`qvm-devices info` subcommand. """ if args.device: device = args.device print(device.description) print(f"device ID: {device.device_id}") for key, value in device.data.items(): print(key.replace("_", " ") + ":", value)
[docs] def init_list_parser(sub_parsers): """Configures the parser for the :program:`qvm-devices list` subcommand""" # pylint: disable=protected-access list_parser = sub_parsers.add_parser( "list", aliases=("ls", "l"), help="list devices" ) list_parser.add_argument( "--assignments", "-s", action="store_true", default=False, help="Include info about device assignments, " "indicated by '*' before qube name.", ) list_parser.add_argument( "--with-sbdf", "--resolve-paths", action="store_true", help="Include resolved PCI path (SBDF) identifier of the PCI " "devices; ignored for non-PCI devices", ) vm_name_group = qubesadmin.tools.VmNameGroup( list_parser, required=False, vm_action=qubesadmin.tools.VmNameAction, help="list devices assigned to specific domain(s)", ) list_parser._mutually_exclusive_groups.append(vm_name_group) list_parser.set_defaults(func=list_devices)
[docs] class DeviceAction(qubesadmin.tools.QubesAction): """Action for argument parser that gets the :py:class:``qubesadmin.device_protocol.VirtualDevice`` from a BACKEND:PORT_ID:DEVICE_ID string. """ def __init__( self, help="A backend, port & device id combination", required=True, allow_unknown=False, only_port=False, **kwargs, ): # pylint: disable=redefined-builtin self.allow_unknown = allow_unknown self.only_port = only_port super().__init__(help=help, required=required, **kwargs) def __call__(self, parser, namespace, values, option_string=None): """Set ``namespace.device`` to ``values``""" setattr(namespace, self.dest, values)
[docs] def parse_qubes_app(self, parser, namespace): app = namespace.app representation = getattr(namespace, self.dest) devclass = namespace.devclass if representation is None: return try: try: dev = VirtualDevice.from_str( representation, devclass, app.domains ) except KeyError: parser.error_runtime("no such backend vm!") return try: # load device info _dev = dev.backend_domain.devices[devclass][dev.port_id] if not dev.is_device_id_set or dev.device_id == _dev.device_id: dev = _dev elif self.only_port: parser.error_runtime( "this option works only for explicitly given port ID " "and does not support device ID" ) else: dev = UnknownDevice.from_device(dev) if not self.allow_unknown and isinstance(dev, UnknownDevice): raise KeyError(dev.port_id) except KeyError: parser.error_runtime( f"backend vm {dev.backend_name} doesn't expose " f"{devclass} device {dev.port_id!r}" ) dev = UnknownDevice.from_device(dev) setattr(namespace, self.dest, dev) except ValueError: parser.error( "expected a backend vm, port id and [optional] device id " f"combination like foo:bar[:baz] got {representation}" )
[docs] def get_parser(device_class=None): """Create :py:class:`argparse.ArgumentParser` suitable for :program:`qvm-block`. """ parser = qubesadmin.tools.QubesArgumentParser(description=__doc__) parser.register( "action", "parsers", qubesadmin.tools.AliasedSubParsersAction ) parser.allow_abbrev = False if device_class: parser.add_argument( "devclass", const=device_class, action="store_const", help=argparse.SUPPRESS, ) else: parser.add_argument( "devclass", metavar="DEVICE_CLASS", action="store", help="Device class to manage ('pci', 'usb', etc)", ) # default action parser.set_defaults(func=list_devices) sub_parsers = parser.add_subparsers( title="commands", description="For more information see qvm-device command -h", dest="command", ) init_list_parser(sub_parsers) attach_parser = sub_parsers.add_parser( "attach", help="Attach device to domain", aliases=("at", "a") ) detach_parser = sub_parsers.add_parser( "detach", help="Detach device from domain", aliases=("d", "dt") ) assign_parser = sub_parsers.add_parser( "assign", help="Assign device to domain or edit existing assignment", aliases=("s",), ) unassign_parser = sub_parsers.add_parser( "unassign", help="Remove assignment of device from domain", aliases=("u",), ) info_parser = sub_parsers.add_parser( "info", help="Show info about device from domain", aliases=("i",) ) attach_parser.add_argument( "VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction ) detach_parser.add_argument( "VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction ) assign_parser.add_argument( "VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction ) unassign_parser.add_argument( "VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction ) attach_parser.add_argument( metavar="BACKEND:DEVICE_ID", dest="device", action=DeviceAction ) detach_parser.add_argument( metavar="BACKEND:DEVICE_ID", dest="device", nargs=argparse.OPTIONAL, action=DeviceAction, allow_unknown=True, ) assign_parser.add_argument( metavar="BACKEND:DEVICE_ID", dest="device", action=DeviceAction, allow_unknown=True, ) unassign_parser.add_argument( metavar="BACKEND:DEVICE_ID", dest="device", nargs=argparse.OPTIONAL, action=DeviceAction, allow_unknown=True, ) info_parser.add_argument( metavar="BACKEND:DEVICE_ID", dest="device", nargs=argparse.OPTIONAL, action=DeviceAction, only_port=True, ) option = ( ( "--option", "-o", ), { "action": "append", "help": "Set option for the device in opt=value form " "(can be specified multiple times), " "see man qvm-device for details", }, ) attach_parser.add_argument(*option[0], **option[1]) assign_parser.add_argument(*option[0], **option[1]) read_only = ( ("--ro",), { "action": "store_true", "default": False, "help": "Attach device read-only (alias for read-only=yes " "option, takes precedence)", }, ) attach_parser.add_argument(*read_only[0], **read_only[1]) assign_parser.add_argument(*read_only[0], **read_only[1]) attach_parser.add_argument( "--persistent", "-p", dest="required", action="store_true", default=False, help="Alias to `assign --required` for backward " "compatibility", ) mode_parser = assign_parser.add_mutually_exclusive_group() mode_parser.add_argument( "--ask", "--ask-to-attach", action="store_true", default=False, help="Always ask before auto-attachment", ) mode_parser.add_argument( "--required", "-r", dest="required", action="store_true", default=False, help="Mark device as required so it will " "be required to the qube's startup and then" " automatically attached)", ) for pars in (assign_parser, unassign_parser): id_parser = pars.add_mutually_exclusive_group() id_parser.add_argument( "--port", "--only-port", dest="only_port", action="store_true", default=False, help="Ignore device presented identity", ) id_parser.add_argument( "--device", "--only-device", dest="only_device", action="store_true", default=False, help="Ignore current port identity", ) attach_parser.set_defaults(func=attach_device) detach_parser.set_defaults(func=detach_device) assign_parser.set_defaults(func=assign_device) unassign_parser.set_defaults(func=unassign_device) info_parser.set_defaults(func=info_device) parser.add_argument( "--list-device-classes", action="store_true", default=False ) return parser
[docs] def main(args=None, app=None): """Main routine of :program:`qvm-block`.""" app = app or qubesadmin.Qubes() app.cache_enabled = True basename = os.path.basename(sys.argv[0]) devclass = None if basename.startswith("qvm-") and basename != "qvm-device": devclass = basename[4:] # Special treatment for '--list-device-classes' (alias --list-classes) sys_args = ["--" + arg for arg in args] if args else [] curr_action = sys.argv[1:] + sys_args if set(curr_action).intersection( {"--list-device-classes", "--list-classes"} ): print("\n".join(app.list_deviceclass())) return 0 parser = get_parser(devclass) args = parser.parse_args(args, app=app) try: args.func(args) except qubesadmin.exc.QubesException as e: parser.print_error(str(e)) return 1 return 0
if __name__ == "__main__": sys.exit(main())