# encoding=utf-8
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
# Copyright (C) 2024 Piotr Bartman-Szwarc <prbartman@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Qubes volume and block device management"""
import argparse
import os
import sys
import qubesadmin
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.device_protocol
from qubesadmin.device_protocol import (
Port,
DeviceInfo,
UnknownDevice,
DeviceAssignment,
VirtualDevice,
DeviceInterface,
ProtocolError,
)
[docs]
def prepare_table(dev_list, with_sbdf=False):
"""Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a
list of tuples for the :py:func:`qubes.tools.print_table`.
If :program:`qvm-devices` is running in a TTY, it will omit duplicate
data.
:param iterable dev_list: List of :py:class:`qubes.devices.DeviceInfo`
objects.
:param bool with_sbdf: when True, include SBDF identifier of PCI device
:returns: list of tuples
"""
output = []
header = []
if sys.stdout.isatty():
if with_sbdf:
header += [("BACKEND:DEVID", "SBDF", "DESCRIPTION", "USED BY")]
else:
header += [("BACKEND:DEVID", "DESCRIPTION", "USED BY")]
for line in dev_list:
if with_sbdf:
output += [
(
line.ident,
line.sbdf,
line.description,
str(line.assignments),
)
]
else:
output += [
(
line.ident,
line.description,
str(line.assignments),
)
]
return header + sorted(output)
[docs]
class Line:
"""Helper class to hold single device info for listing"""
def __init__(self, device: DeviceInfo, assignment=False):
self.ident = "{!s}:{!s}".format(device.backend_domain, device.port_id)
self.description = device.description
self.assignment = assignment
self.frontends = []
self.sbdf = getattr(device, "data", {}).get("sbdf")
@property
def assignments(self):
"""list of frontends the device is assigned to"""
fronts = (
f'{"*" if self.assignment else ""}' + front
for front in self.frontends
)
return ", ".join(fronts)
[docs]
def list_devices(args):
"""
Called by the parser to execute the qubes-devices list subcommand."""
domains = args.domains if hasattr(args, "domains") else None
if args.devclass != "pci":
args.with_sbdf = False
lines = _load_lines(args.app, domains, args.devclass, actual_devices=True)
lines = list(lines.values())
# short command without (list/ls) should print just existing devices
# short command is not a great place for introducing listing specific flags
if getattr(args, "assignments", False):
# we need to check assignments for all domains since
# selected vm can be mentioned there as backend
extra_lines = _load_lines(
args.app, [], args.devclass, actual_devices=False
)
lines += list(extra_lines.values())
qubesadmin.tools.print_table(
prepare_table(lines, with_sbdf=getattr(args, "with_sbdf", False))
)
def _load_lines(app, domains, devclass, actual_devices: bool):
# pylint: disable=missing-function-docstring
devices = _load_devices(app, domains, devclass, actual_devices)
result = {dev: Line(dev, not actual_devices) for dev in devices}
for dev in result:
for vm in app.domains:
frontends = _load_frontends_info(vm, dev, devclass, actual_devices)
result[dev].frontends.extend(frontends)
return result
def _load_devices(app, domains, devclass, actual_devices):
"""
Loads device exposed or connected to given domains.
If `domains` is empty/`None` load all devices.
If `actual_devices` is True only devices currently present will be included,
otherwise only device assignments
"""
devices = set()
if domains:
ignore_errors = False
else:
ignore_errors = True
domains = app.domains
try:
for vm in domains:
try:
if actual_devices:
for ass in vm.devices[devclass].get_attached_devices():
devices.add(ass.device)
for dev in vm.devices[devclass].get_exposed_devices():
devices.add(dev)
else:
for ass in vm.devices[devclass].get_assigned_devices():
devices.add(ass.virtual_device)
except qubesadmin.exc.QubesVMNotFoundError:
if ignore_errors:
continue
raise
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesException(
"Failed to list '%s' devices, this device type either "
"does not exist or you do not have access to it.",
devclass,
)
return devices
def _load_frontends_info(vm, dev, devclass, actual_devices):
"""
Returns string of vms to which a device is connected or `None`.
"""
if vm == dev.backend_domain:
return
try:
if actual_devices:
for assignment in vm.devices[devclass].get_attached_devices():
if dev in assignment.devices:
yield _frontend_desc(vm, assignment)
else:
for assignment in vm.devices[devclass].get_assigned_devices():
if assignment.matches(dev):
yield _frontend_desc(vm, assignment, virtual=True)
except qubesadmin.exc.QubesVMNotFoundError:
pass
def _frontend_desc(vm, assignment, virtual=False):
"""
Generate description of frontend vm with optional device connection options.
"""
mode = assignment.mode.value
if not virtual:
mode = "attached"
if assignment.options:
return "{!s} ({}: {})".format(
vm,
mode,
", ".join(
"{}={}".format(key, value)
for key, value in assignment.options.items()
),
)
return f"{vm} ({mode})"
[docs]
def attach_device(args):
"""Called by the parser to execute the :program:`qvm-devices attach`
subcommand.
"""
vm = args.domains[0]
device = args.device
assignment = DeviceAssignment(
device,
# backward compatibility
mode="required" if args.required else "manual",
)
options = dict(opt.split("=", 1) for opt in args.option or [])
if args.ro:
options["read-only"] = "yes"
parse_ro_option_as_read_only(options)
assignment.options = options
try:
try:
dev = assignment.device
if isinstance(dev, UnknownDevice):
raise qubesadmin.exc.QubesException(
"Unknown device, skipping attachment of device from the "
f"port {assignment}")
except ProtocolError as exc:
raise qubesadmin.exc.QubesException(str(exc))
if not assignment.matches(dev):
raise qubesadmin.exc.QubesException(
"Unrecognized identity, skipping attachment of device "
f"from the port {assignment}"
)
if isinstance(dev, UnknownDevice):
raise qubesadmin.exc.QubesException(
f"{device.devclass} device not recognized "
f"at {device.port_id} port."
)
vm.devices[args.devclass].attach(assignment)
except qubesadmin.exc.QubesException as exc:
# backward compatibility
# if `--persistent` we ignore if attachment fails,
# we want at least assign device
if not args.required:
raise exc
# backward compatibility
if args.required:
vm.devices[args.devclass].assign(assignment)
[docs]
def parse_ro_option_as_read_only(options):
"""
For backward compatibility.
Read-only option could be represented as `--ro`, `-o read-only=yes`
or `-o ro=True` etc.
"""
if "ro" in options.keys():
if options["ro"].lower() in ("1", "true", "yes"):
options["read-only"] = "yes"
del options["ro"]
elif options["ro"].lower() in ("0", "false", "no"):
options["read-only"] = "no"
del options["ro"]
else:
raise ValueError(
f"Unknown `read-only` option value: {options['ro']}"
)
[docs]
def detach_device(args):
"""Called by the parser to execute the :program:`qvm-devices detach`
subcommand.
"""
vm = args.domains[0]
device = args.device
if device and device.port_id != "*":
assignment = DeviceAssignment(device)
try:
actual_dev = assignment.device
except ProtocolError as exc:
raise qubesadmin.exc.QubesException(str(exc))
if not assignment.matches(actual_dev):
raise qubesadmin.exc.QubesException(f"{device} is not attached.")
if isinstance(actual_dev, UnknownDevice):
raise qubesadmin.exc.QubesException(f"{device} not found.")
vm.devices[args.devclass].detach(assignment)
elif args.device:
assignment = DeviceAssignment(device)
no_device_detached = True
for ass in vm.devices[args.devclass].get_attached_devices():
if assignment.matches(ass.device):
vm.devices[args.devclass].detach(ass)
no_device_detached = False
if no_device_detached:
raise qubesadmin.exc.QubesException(f"{device} is not attached.")
else:
for ass in vm.devices[args.devclass].get_attached_devices():
vm.devices[args.devclass].detach(ass)
[docs]
def assign_device(args):
"""Called by the parser to execute the :program:`qvm-devices assign`
subcommand.
"""
vm = args.domains[0]
device = args.device
if args.only_port:
device = device.clone(device_id="*")
elif device.device_id == UnknownDevice(device.port).device_id:
raise qubesadmin.exc.QubesException(
f"backend vm {device.backend_name} doesn't expose "
f"{device.devclass} device {device.port_id!r}"
)
if args.only_device:
device = device.clone(
port=Port(device.backend_domain, "*", device.devclass)
)
options = dict(opt.split("=", 1) for opt in args.option or [])
if args.ro:
options["read-only"] = "yes"
parse_ro_option_as_read_only(options)
mode = "auto-attach"
if args.required:
mode = "required"
if args.ask:
mode = "ask-to-attach"
assignment = DeviceAssignment(device, mode=mode, options=options)
vm.devices[args.devclass].assign(assignment)
# retrieve current port info
assignment = DeviceAssignment(
args.device, mode=mode, options=options, frontend_domain=vm
)
if is_on_deny_list(args.device, vm) and not args.quiet:
print(
"Warning: The assigned device is on the denied list."
"\n Auto-attach will work, "
"but make sure that the assignment is correct."
)
if vm.is_running() and not args.quiet and device.devclass != "pci":
_print_attach_hint(assignment, vm)
def _build_options_str(options):
"""Build CLI option flags string from assignment options dict."""
parts = [f"-o {key}={value}" for key, value in options.items()]
return (" " + " ".join(parts)) if parts else ""
def _print_attach_hint(assignment, vm):
# pylint: disable=missing-function-docstring
attached = vm.devices[assignment.devclass].get_attached_devices()
options_str = _build_options_str(assignment.options)
ports = [
f"\tqvm-{assignment.devclass} attach{options_str} {vm} "
f"{assignment.backend_domain}:{dev.port_id}"
for dev in assignment.devices
if dev not in attached and not isinstance(dev, UnknownDevice)
]
if ports:
print(
"Assigned. To attach you can now restart domain or run: \n"
+ "\n".join(ports)
)
[docs]
def is_on_deny_list(device, dest_vm):
"""
Checks if *any* interface of the device is on the deny list for
`dest_vm` vm.
"""
deny_list = []
try:
deny_list = DeviceInterface.from_str_bulk(dest_vm.devices_denied)
except (qubesadmin.exc.QubesValueError,
qubesadmin.exc.QubesPropertyAccessError,
AttributeError):
pass
# check if any presented interface is on deny list
for interface in deny_list:
for devint in device.interfaces:
if interface.matches(devint):
return True
return False
[docs]
def unassign_device(args):
"""Called by the parser to execute the :program:`qvm-devices unassign`
subcommand.
"""
vm = args.domains[0]
if args.device:
device = args.device
if args.only_port:
device = device.clone(device_id="*")
if args.only_device:
device = device.clone(
port=Port(device.backend_domain, "*", device.devclass)
)
assignment = DeviceAssignment(device, frontend_domain=vm)
_unassign_and_show_message(assignment, vm, args)
else:
for assignment in vm.devices[args.devclass].get_assigned_devices():
_unassign_and_show_message(assignment, vm, args)
def _unassign_and_show_message(assignment, vm, args):
"""
Helper for informing a user.
"""
vm.devices[args.devclass].unassign(assignment)
attached = vm.devices[assignment.devclass].get_attached_devices()
ports = [
f"\tqvm-{assignment.devclass} detach {vm} "
f"{assignment.backend_domain}:{dev.port_id}"
for dev in assignment.devices
if dev in attached
]
if ports and not args.quiet:
print(
"Unassigned. To detach you can now restart domain or run: \n",
"\n".join(ports),
)
[docs]
def info_device(args):
"""Called by the parser to execute the :program:`qvm-devices info`
subcommand.
"""
if args.device:
device = args.device
print(device.description)
print(f"device ID: {device.device_id}")
for key, value in device.data.items():
print(key.replace("_", " ") + ":", value)
[docs]
def init_list_parser(sub_parsers):
"""Configures the parser for the :program:`qvm-devices list` subcommand"""
# pylint: disable=protected-access
list_parser = sub_parsers.add_parser(
"list", aliases=("ls", "l"), help="list devices"
)
list_parser.add_argument(
"--assignments",
"-s",
action="store_true",
default=False,
help="Include info about device assignments, "
"indicated by '*' before qube name.",
)
list_parser.add_argument(
"--with-sbdf",
"--resolve-paths",
action="store_true",
help="Include resolved PCI path (SBDF) identifier of the PCI "
"devices; ignored for non-PCI devices",
)
vm_name_group = qubesadmin.tools.VmNameGroup(
list_parser,
required=False,
vm_action=qubesadmin.tools.VmNameAction,
help="list devices assigned to specific domain(s)",
)
list_parser._mutually_exclusive_groups.append(vm_name_group)
list_parser.set_defaults(func=list_devices)
[docs]
class DeviceAction(qubesadmin.tools.QubesAction):
"""Action for argument parser that gets the
:py:class:``qubesadmin.device_protocol.VirtualDevice`` from a
BACKEND:PORT_ID:DEVICE_ID string.
"""
def __init__(
self,
help="A backend, port & device id combination",
required=True,
allow_unknown=False,
only_port=False,
**kwargs,
):
# pylint: disable=redefined-builtin
self.allow_unknown = allow_unknown
self.only_port = only_port
super().__init__(help=help, required=required, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
"""Set ``namespace.device`` to ``values``"""
setattr(namespace, self.dest, values)
[docs]
def parse_qubes_app(self, parser, namespace):
app = namespace.app
representation = getattr(namespace, self.dest)
devclass = namespace.devclass
if representation is None:
return
try:
try:
dev = VirtualDevice.from_str(
representation, devclass, app.domains
)
except KeyError:
parser.error_runtime("no such backend vm!")
return
try:
# load device info
_dev = dev.backend_domain.devices[devclass][dev.port_id]
if not dev.is_device_id_set or dev.device_id == _dev.device_id:
dev = _dev
elif self.only_port:
parser.error_runtime(
"this option works only for explicitly given port ID "
"and does not support device ID"
)
else:
dev = UnknownDevice.from_device(dev)
if not self.allow_unknown and isinstance(dev, UnknownDevice):
raise KeyError(dev.port_id)
except KeyError:
parser.error_runtime(
f"backend vm {dev.backend_name} doesn't expose "
f"{devclass} device {dev.port_id!r}"
)
dev = UnknownDevice.from_device(dev)
setattr(namespace, self.dest, dev)
except ValueError:
parser.error(
"expected a backend vm, port id and [optional] device id "
f"combination like foo:bar[:baz] got {representation}"
)
[docs]
def get_parser(device_class=None):
"""Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`.
"""
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register(
"action", "parsers", qubesadmin.tools.AliasedSubParsersAction
)
parser.allow_abbrev = False
if device_class:
parser.add_argument(
"devclass",
const=device_class,
action="store_const",
help=argparse.SUPPRESS,
)
else:
parser.add_argument(
"devclass",
metavar="DEVICE_CLASS",
action="store",
help="Device class to manage ('pci', 'usb', etc)",
)
# default action
parser.set_defaults(func=list_devices)
sub_parsers = parser.add_subparsers(
title="commands",
description="For more information see qvm-device command -h",
dest="command",
)
init_list_parser(sub_parsers)
attach_parser = sub_parsers.add_parser(
"attach", help="Attach device to domain", aliases=("at", "a")
)
detach_parser = sub_parsers.add_parser(
"detach", help="Detach device from domain", aliases=("d", "dt")
)
assign_parser = sub_parsers.add_parser(
"assign",
help="Assign device to domain or edit existing assignment",
aliases=("s",),
)
unassign_parser = sub_parsers.add_parser(
"unassign",
help="Remove assignment of device from domain",
aliases=("u",),
)
info_parser = sub_parsers.add_parser(
"info", help="Show info about device from domain", aliases=("i",)
)
attach_parser.add_argument(
"VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction
)
detach_parser.add_argument(
"VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction
)
assign_parser.add_argument(
"VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction
)
unassign_parser.add_argument(
"VMNAME", nargs=1, action=qubesadmin.tools.VmNameAction
)
attach_parser.add_argument(
metavar="BACKEND:DEVICE_ID", dest="device", action=DeviceAction
)
detach_parser.add_argument(
metavar="BACKEND:DEVICE_ID",
dest="device",
nargs=argparse.OPTIONAL,
action=DeviceAction,
allow_unknown=True,
)
assign_parser.add_argument(
metavar="BACKEND:DEVICE_ID",
dest="device",
action=DeviceAction,
allow_unknown=True,
)
unassign_parser.add_argument(
metavar="BACKEND:DEVICE_ID",
dest="device",
nargs=argparse.OPTIONAL,
action=DeviceAction,
allow_unknown=True,
)
info_parser.add_argument(
metavar="BACKEND:DEVICE_ID",
dest="device",
nargs=argparse.OPTIONAL,
action=DeviceAction,
only_port=True,
)
option = (
(
"--option",
"-o",
),
{
"action": "append",
"help": "Set option for the device in opt=value form "
"(can be specified multiple times), "
"see man qvm-device for details",
},
)
attach_parser.add_argument(*option[0], **option[1])
assign_parser.add_argument(*option[0], **option[1])
read_only = (
("--ro",),
{
"action": "store_true",
"default": False,
"help": "Attach device read-only (alias for read-only=yes "
"option, takes precedence)",
},
)
attach_parser.add_argument(*read_only[0], **read_only[1])
assign_parser.add_argument(*read_only[0], **read_only[1])
attach_parser.add_argument(
"--persistent",
"-p",
dest="required",
action="store_true",
default=False,
help="Alias to `assign --required` for backward " "compatibility",
)
mode_parser = assign_parser.add_mutually_exclusive_group()
mode_parser.add_argument(
"--ask",
"--ask-to-attach",
action="store_true",
default=False,
help="Always ask before auto-attachment",
)
mode_parser.add_argument(
"--required",
"-r",
dest="required",
action="store_true",
default=False,
help="Mark device as required so it will "
"be required to the qube's startup and then"
" automatically attached)",
)
for pars in (assign_parser, unassign_parser):
id_parser = pars.add_mutually_exclusive_group()
id_parser.add_argument(
"--port",
"--only-port",
dest="only_port",
action="store_true",
default=False,
help="Ignore device presented identity",
)
id_parser.add_argument(
"--device",
"--only-device",
dest="only_device",
action="store_true",
default=False,
help="Ignore current port identity",
)
attach_parser.set_defaults(func=attach_device)
detach_parser.set_defaults(func=detach_device)
assign_parser.set_defaults(func=assign_device)
unassign_parser.set_defaults(func=unassign_device)
info_parser.set_defaults(func=info_device)
parser.add_argument(
"--list-device-classes", action="store_true", default=False
)
return parser
[docs]
def main(args=None, app=None):
"""Main routine of :program:`qvm-block`."""
app = app or qubesadmin.Qubes()
app.cache_enabled = True
basename = os.path.basename(sys.argv[0])
devclass = None
if basename.startswith("qvm-") and basename != "qvm-device":
devclass = basename[4:]
# Special treatment for '--list-device-classes' (alias --list-classes)
sys_args = ["--" + arg for arg in args] if args else []
curr_action = sys.argv[1:] + sys_args
if set(curr_action).intersection(
{"--list-device-classes", "--list-classes"}
):
print("\n".join(app.list_deviceclass()))
return 0
parser = get_parser(devclass)
args = parser.parse_args(args, app=app)
try:
args.func(args)
except qubesadmin.exc.QubesException as e:
parser.print_error(str(e))
return 1
return 0
if __name__ == "__main__":
sys.exit(main())