Source code for qubesadmin.utils

# encoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2010-2015  Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2013-2015  Marek Marczykowski-Górecki
#                              <marmarek@invisiblethingslab.com>
# Copyright (C) 2014-2015  Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

"""Various utility functions."""

from __future__ import annotations

import argparse
import asyncio
import fcntl
import os
import re
import string
import subprocess
import time
import typing
from collections.abc import Iterable

import qubesadmin.exc
from qubesadmin.exc import QubesValueError, QubesVMAlreadyStartedError
from qubesadmin.device_protocol import DeviceAssignment, UnknownDevice

if typing.TYPE_CHECKING:
    from qubesadmin.app import QubesBase
    from qubesadmin.vm import QubesVM


[docs] def parse_size(size: str) -> int: """Parse human readable size into bytes.""" units = [ ("K", 1000), ("KB", 1000), ("M", 1000 * 1000), ("MB", 1000 * 1000), ("G", 1000 * 1000 * 1000), ("GB", 1000 * 1000 * 1000), ("Ki", 1024), ("KiB", 1024), ("Mi", 1024 * 1024), ("MiB", 1024 * 1024), ("Gi", 1024 * 1024 * 1024), ("GiB", 1024 * 1024 * 1024), ] size = size.strip().upper() if size.isdigit(): return int(size) for unit, multiplier in units: if size.endswith(unit.upper()): size = size[: -len(unit)].strip() return int(size) * multiplier raise qubesadmin.exc.QubesException("Invalid size: {0}.".format(size))
[docs] def mbytes_to_kmg(size: float | int) -> str: """Convert mbytes to human readable format.""" if size > 1024: return "%d GiB" % (size / 1024) return "%d MiB" % size
[docs] def kbytes_to_kmg(size: float | int) -> str: """Convert kbytes to human readable format.""" if size > 1024: return mbytes_to_kmg(size / 1024) return "%d KiB" % size
[docs] def bytes_to_kmg(size: int) -> str: """Convert bytes to human readable format.""" if size > 1024: return kbytes_to_kmg(size / 1024) return "%d B" % size
[docs] def size_to_human(size: int) -> str: """Humane readable size, with 1/10 precision""" if size < 1024: return str(size) if size < 1024 * 1024: return str(round(size / 1024.0, 1)) + " KiB" if size < 1024 * 1024 * 1024: return str(round(size / (1024.0 * 1024), 1)) + " MiB" return str(round(size / (1024.0 * 1024 * 1024), 1)) + " GiB"
UPDATES_DEFAULT_VM_DISABLE_FLAG = ( "/var/lib/qubes/updates/vm-default-disable-updates" )
[docs] def updates_vms_status(qvm_collection: QubesBase) -> bool | None: """Check whether all VMs have the same check-updates value; if yes, return it; otherwise, return None """ # default value: status = not os.path.exists(UPDATES_DEFAULT_VM_DISABLE_FLAG) # check if all the VMs uses the default value for vm in qvm_collection.domains: if vm.qid == 0: continue if vm.features.get("check-updates", True) != status: # "mixed" return None return status
[docs] def vm_dependencies( app: QubesBase, reference_vm: QubesVM ) -> list[tuple[QubesVM | None, str]]: """Helper function that returns a list of all the places a given VM is used in. Output is a list of tuples (property_holder, property_name), with None as property_holder for global properties """ result = [] global_properties = [ "default_dispvm", "default_netvm", "default_guivm", "default_audiovm", "default_template", "clockvm", "updatevm", "management_dispvm", ] for prop in global_properties: if reference_vm == getattr(app, prop, None): result.append((None, prop)) vm_properties = [ "template", "netvm", "guivm", "audiovm", "default_dispvm", "management_dispvm", ] for vm in app.domains: if vm == reference_vm: continue is_preload = getattr(vm, "is_preload", False) for prop in vm_properties: if not hasattr(vm, prop): continue try: is_prop_default = vm.property_is_default(prop) except qubesadmin.exc.QubesPropertyAccessError: is_prop_default = False if ( reference_vm == getattr(vm, prop, None) and not is_prop_default and not ( is_preload and prop == "template" or ( prop == "default_dispvm" and getattr(vm, "template", None) == vm ) ) ): result.append((vm, prop)) return result
[docs] def encode_for_vmexec(args: Iterable[str]) -> str: """ Encode an argument list for qubes.VMExec call. """ def encode(part: re.Match) -> bytes: if part.group(0) == b"-": return b"--" return "-{:02X}".format(ord(part.group(0))).encode("ascii") parts = [] for arg in args: part = re.sub(rb"[^a-zA-Z0-9_.]", encode, arg.encode("utf-8")) parts.append(part) return b"+".join(parts).decode("ascii")
[docs] class LockFile: """Simple locking context manager. It opens a file with an advisory lock taken (fcntl.lockf)""" def __init__(self, path: str, nonblock: bool = False): """Open the file. Call *acquire* or enter the context to lock the file""" # pylint: disable=consider-using-with self.file = open(path, "w", encoding="ascii") self.nonblock = nonblock def __enter__(self, *args, **kwargs) -> LockFile: self.acquire() return self
[docs] def acquire(self) -> None: """Lock the opened file""" fcntl.lockf( self.file, fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0) )
def __exit__( self, exc_type: object | None = None, exc_value: object | None = None, traceback: object | None = None, ) -> None: self.release()
[docs] def release(self) -> None: """Unlock the file and close the file object""" fcntl.lockf(self.file, fcntl.LOCK_UN) self.file.close()
[docs] def qbool(value: str | int | bool) -> bool: """ Property setter for boolean properties. It accepts (case-insensitive) ``'0'``, ``'no'`` and ``false`` as :py:obj:`False` and ``'1'``, ``'yes'`` and ``'true'`` as :py:obj:`True`. """ if isinstance(value, str): lcvalue = value.lower() if lcvalue in ("0", "no", "false", "off"): return False if lcvalue in ("1", "yes", "true", "on"): return True raise QubesValueError( "Invalid literal for boolean property: {!r}".format(value) ) return bool(value)
[docs] async def start(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously start qubes and return ones that failed. """ return await generic_action(domains, action="start", **kwargs)
[docs] async def pause(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously pause qubes and return ones that failed. """ return await generic_action(domains, action="pause", **kwargs)
[docs] async def unpause(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously unpause qubes and return ones that failed. """ return await generic_action( domains, action=lambda qube: "resume" if qube.is_suspended() else "unpause", **kwargs, )
[docs] async def suspend(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously suspend qubes and return ones that failed. """ return await generic_action(domains, action="suspend", **kwargs)
[docs] async def resume(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously resume qubes and return ones that failed. """ return await generic_action(domains, action="resume", **kwargs)
[docs] async def shutdown(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously resume shutdown and return ones that failed. """ return await generic_action( domains, action="shutdown", ignored_exceptions=(qubesadmin.exc.QubesVMNotStartedError), **kwargs, )
[docs] async def kill(domains: list[qubesadmin.vm.QubesVM], **kwargs): """ Asynchronously kill qubes and return ones that failed. """ return await generic_action( domains, action="kill", ignored_exceptions=(qubesadmin.exc.QubesVMNotStartedError), **kwargs, )
[docs] async def generic_action( domains: list[qubesadmin.vm.QubesVM], action: str | typing.Callable, *args, **kwargs, ): """ Asynchronously run action on qubes and return ones that failed. """ def wrapper(qube, action): func = None if isinstance(action, str): func = getattr(qube, action) elif callable(action): method = action(qube) func = getattr(qube, method) if not func: raise ValueError("Invalid action provided") return func ignored_exceptions: tuple = tuple() if "ignored_exceptions" in kwargs: ignored_exceptions = kwargs.pop("ignored_exceptions") tasks = [ asyncio.to_thread(wrapper(qube, action), *args, **kwargs) for qube in domains ] results = await asyncio.gather(*tasks, return_exceptions=True) failed: dict[qubesadmin.vm.QubesVM, BaseException] = {} for qube, res in zip(domains, results): if not isinstance(res, BaseException): continue if isinstance(res, ignored_exceptions): continue failed[qube] = res return failed
[docs] class DriveAction(argparse.Action): """Action for argument parser that stores drive image path. Intended use for device attachment before domain is started.""" # pylint: disable=redefined-builtin def __init__( self, option_strings, dest="drive", *, prefix="cdrom:", metavar="IMAGE", required=False, help="Attach drive", ): super().__init__(option_strings, dest, metavar=metavar, help=help) self.prefix = prefix def __call__(self, parser, namespace, values, option_string=None): # pylint: disable=redefined-outer-name setattr(namespace, self.dest, self.prefix + values)
[docs] def get_drive_assignment(app, drive_str): """ Prepare :py:class:`qubesadmin.device_protocol.DeviceAssignment` object for a given drive. Intended to be used during before domain is started. If running in dom0, it will also take care about creating the appropriate loop device (if necessary). Otherwise, only existing block devices are supported. :param app: Qubes() instance :param drive_str: drive argument :return: DeviceAssignment matching *drive_str* """ devtype = "cdrom" if drive_str.startswith("cdrom:"): devtype = "cdrom" drive_str = drive_str[len("cdrom:") :] elif drive_str.startswith("hd:"): devtype = "disk" drive_str = drive_str[len("hd:") :] try: backend_domain_name, port_id = drive_str.split(":", 1) except ValueError: raise ValueError( "Incorrect image name: image must be in the format " "of VMNAME:full_path, for example " "dom0:/home/user/test.iso" ) try: backend_domain = app.domains[backend_domain_name] except KeyError: raise qubesadmin.exc.QubesVMNotFoundError( "No such VM: %s", backend_domain_name ) if port_id.startswith("/"): # it is a path - if we're running in dom0, try to call losetup to # export the device, otherwise reject if app.qubesd_connection_type == "qrexec": raise qubesadmin.exc.QubesException( "Existing block device identifier needed when running from " "outside of dom0 (see qvm-block)" ) try: if backend_domain.klass == "AdminVM": loop_name = subprocess.check_output( ["sudo", "losetup", "-f", "--show", port_id] ) loop_name = loop_name.strip() else: untrusted_loop_name, _ = backend_domain.run_with_args( "losetup", "-f", "--show", port_id, user="root" ) untrusted_loop_name = untrusted_loop_name.strip() allowed_chars = string.ascii_lowercase + string.digits + "/" allowed_chars = allowed_chars.encode("ascii") if not all(c in allowed_chars for c in untrusted_loop_name): raise qubesadmin.exc.QubesException( "Invalid loop device name received from {}".format( backend_domain.name ) ) loop_name = untrusted_loop_name del untrusted_loop_name except subprocess.CalledProcessError: raise qubesadmin.exc.QubesException( "Failed to setup loop device for %s", port_id ) assert loop_name.startswith(b"/dev/loop") port_id = loop_name.decode().split("/")[2] # wait for device to appear # FIXME: convert this to waiting for event timeout = 10 while isinstance( backend_domain.devices["block"][port_id], UnknownDevice ): if timeout == 0: raise qubesadmin.exc.QubesException( "Timeout waiting for {}:{} device to appear".format( backend_domain.name, port_id ) ) timeout -= 1 time.sleep(1) options = {"devtype": devtype, "read-only": devtype == "cdrom"} assignment = DeviceAssignment.new( backend_domain=backend_domain, port_id=port_id, devclass="block", options=options, mode="required", ) return assignment
[docs] def start_expert( domain, skip_if_running: bool = False, drive: str | None = None ): """ Start the domain, optionally specifying a drive. """ if domain.is_running(): if skip_if_running: return raise QubesVMAlreadyStartedError("Domain is already running") drive_assignment = None try: if drive: drive_assignment = get_drive_assignment(domain.app, drive) try: domain.devices["block"].assign(drive_assignment) except Exception: drive_assignment = None raise domain.start() if drive_assignment: # don't reconnect this device after VM reboot domain.devices["block"].unassign(drive_assignment) except (IOError, OSError, qubesadmin.exc.QubesException, ValueError) as e: if drive_assignment: try: domain.devices["block"].detach(drive_assignment) except qubesadmin.exc.QubesException: pass raise e