Source code for qubesadmin.vm

# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

"""Qubes VM objects."""
from __future__ import annotations
import logging
import shlex

import subprocess
import typing
import warnings
from logging import Logger
from typing import Literal, TypeVar

import qubesadmin.exc
import qubesadmin.storage
import qubesadmin.features
import qubesadmin.devices
import qubesadmin.device_protocol
import qubesadmin.firewall
import qubesadmin.tags

if typing.TYPE_CHECKING:
    import qubesadmin.base


# ["AppVM", "AdminVM", "TemplateVM", "DispVM", "StandaloneVM"]
# but can be extended
Klass = str
PowerState = Literal["Transient", "Running", "Halted", "Paused",
"Suspended", "Halting", "Dying", "Crashed", "NA"]


[docs] class QubesVM(qubesadmin.base.PropertyHolder): """Qubes domain.""" log: Logger tags: qubesadmin.tags.Tags features: qubesadmin.features.Features devices: qubesadmin.devices.DeviceManager firewall: qubesadmin.firewall.Firewall def __init__(self, app, name, klass=None, power_state=None): super().__init__(app, "admin.vm.property.", name) self._volumes = None self._klass = klass # the cache is maintained by EventsDispatcher(), # through helper functions in QubesBase() self._power_state_cache = power_state self.log = logging.getLogger(name) self.tags = qubesadmin.tags.Tags(self) self.features = qubesadmin.features.Features(self) self.devices = qubesadmin.devices.DeviceManager(self) self.firewall = qubesadmin.firewall.Firewall(self) @property def name(self) -> str: """Domain name""" return self._method_dest @name.setter def name(self, new_value): self.qubesd_call( self._method_dest, self._method_prefix + "Set", "name", str(new_value).encode("utf-8"), ) self._method_dest = new_value self._volumes = None self.app.domains.clear_cache() def __str__(self) -> str: return self._method_dest def __lt__(self, other): if isinstance(other, QubesVM): return self.name < other.name return NotImplemented def __eq__(self, other): if isinstance(other, QubesVM): return self.name == other.name if isinstance(other, str): return self.name == other return NotImplemented def __hash__(self): return hash(self.name)
[docs] def start(self): """ Start domain. :return: """ self.qubesd_call(self._method_dest, "admin.vm.Start")
[docs] def shutdown(self, force=False, wait=False): """ Shutdown domain. :return: """ arg_list = [] if force: arg_list.append("force") if wait: arg_list.append("wait") args = "+".join(arg_list) params = [self._method_dest, "admin.vm.Shutdown"] if args: params.append(args) self.qubesd_call(*params)
[docs] def kill(self): """ Kill domain (forcefuly shutdown). :return: """ self.qubesd_call(self._method_dest, "admin.vm.Kill")
[docs] def force_shutdown(self): """Deprecated alias for :py:meth:`kill`""" warnings.warn( "Call to deprecated function force_shutdown(), use kill() instead", DeprecationWarning, stacklevel=2, ) return self.kill()
[docs] def pause(self): """ Pause domain. Pause its execution without any prior notification. :return: """ self.qubesd_call(self._method_dest, "admin.vm.Pause")
[docs] def unpause(self): """ Unpause domain. Opposite to :py:meth:`pause`. :return: """ self.qubesd_call(self._method_dest, "admin.vm.Unpause")
[docs] def suspend(self): """ Suspend domain. Suspend domain (S3). Pauses it if it does not suspend. :return: """ self.qubesd_call(self._method_dest, "admin.vm.Suspend")
[docs] def resume(self): """ Resume domain (from S3). Opposite to :py:meth:`suspend`. :return: """ self.qubesd_call(self._method_dest, "admin.vm.Resume")
[docs] def get_power_state(self): """Return power state description string. Return value may be one of those: =============== ======================================================== return value meaning =============== ======================================================== ``'Halted'`` Machine is not active. ``'Transient'`` Machine is running, but does not have :program:`guid` or :program:`qrexec` available. ``'Running'`` Machine is ready and running. ``'Paused'`` Machine is paused. ``'Suspended'`` Machine is S3-suspended. ``'Halting'`` Machine is in process of shutting down (OS shutdown). ``'Dying'`` Machine is in process of shutting down (cleanup). ``'Crashed'`` Machine crashed and is unusable. ``'NA'`` Machine is in unknown state. =============== ======================================================== .. seealso:: http://wiki.libvirt.org/page/VM_lifecycle Description of VM life cycle from the point of view of libvirt. https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainState Libvirt's enum describing precise state of a domain. """ if self._power_state_cache is not None: return self._power_state_cache try: power_state = self._get_current_state()["power_state"] if self.app.cache_enabled: self._power_state_cache = power_state return power_state except ( qubesadmin.exc.QubesDaemonNoResponseError, qubesadmin.exc.QubesVMNotFoundError, ): return "NA"
[docs] def get_mem(self): """Get current memory usage from VM.""" return int(self._get_current_state()["mem"])
def _get_current_state(self): """Call admin.vm.CurrentState, and return the result as a dict.""" state = {} response = self.qubesd_call(self._method_dest, "admin.vm.CurrentState") for part in response.decode("ascii").split(): name, value = part.split("=", 1) state[name] = value return state
[docs] def is_halted(self): """ Check whether this domain's state is 'Halted' :returns: :py:obj:`True` if this domain is halted, \ :py:obj:`False` otherwise. :rtype: bool """ return self.get_power_state() == "Halted"
[docs] def is_paused(self): """Check whether this domain is paused. :returns: :py:obj:`True` if this domain is paused, \ :py:obj:`False` otherwise. :rtype: bool """ return self.get_power_state() == "Paused"
[docs] def is_suspended(self): """Check whether this domain is suspended. :returns: :py:obj:`True` if this domain is suspended, \ :py:obj:`False` otherwise. :rtype: bool """ return self.get_power_state() == "Suspended"
[docs] def is_running(self): """Check whether this domain is running. :returns: :py:obj:`True` if this domain is started, \ :py:obj:`False` otherwise. :rtype: bool """ return self.get_power_state() not in ("Halted", "NA")
[docs] def is_networked(self): """Check whether this VM can reach network (firewall notwithstanding). :returns: :py:obj:`True` if is machine can reach network, \ :py:obj:`False` otherwise. :rtype: bool """ if self.provides_network: return True return self.netvm is not None
@property def volumes(self): """VM disk volumes""" if self._volumes is None: volumes_list = self.qubesd_call( self._method_dest, "admin.vm.volume.List" ) self._volumes = {} for volname in volumes_list.decode("ascii").splitlines(): if not volname: continue self._volumes[volname] = qubesadmin.storage.Volume( self.app, vm=self.name, vm_name=volname ) return self._volumes
[docs] def get_disk_utilization(self): """Get total disk usage of the VM""" return sum(vol.usage for vol in self.volumes.values())
[docs] def run_service(self, service, **kwargs): """Run service on this VM :param str service: service name :rtype: subprocess.Popen """ return self.app.run_service(self._method_dest, service, **kwargs)
[docs] def run_service_for_stdio( self, service, input=None, timeout=None, **kwargs ): """Run a service, pass an optional input and return (stdout, stderr). Raises an exception if return code != 0. *args* and *kwargs* are passed verbatim to :py:meth:`run_service`. .. warning:: There are some combinations if stdio-related *kwargs*, which are not filtered for problems originating between the keyboard and the chair. """ # pylint: disable=redefined-builtin p = self.run_service(service, **kwargs) # this one is actually a tuple, but there is no need to unpack it stdouterr = p.communicate(input=input, timeout=timeout) if p.returncode: exc = subprocess.CalledProcessError(p.returncode, service) # Python < 3.5 didn't have those exc.output, exc.stderr = stdouterr raise exc return stdouterr
[docs] def prepare_input_for_vmshell(self, command, input=None): """Prepare shell input for the given command and optional (real) input""" # pylint: disable=redefined-builtin if input is None: input = b"" close_shell_suffix = b"; exit\n" if self.features.check_with_template("os", "Linux") == "Windows": close_shell_suffix = b"& exit\n" return b"".join( (command.rstrip("\n").encode("utf-8"), close_shell_suffix, input) )
[docs] def run(self, command, input=None, **kwargs): """Run a shell command inside the domain using qubes.VMShell qrexec.""" # pylint: disable=redefined-builtin try: service = "qubes.VMShell" # intentionally check for qubes.VMRootExec, as this is when # qubes.VMRootShell got force-user='root' in its config if kwargs.get( "user", None ) == "root" and self.features.check_with_template( "supported-rpc.qubes.VMRootExec", False ): kwargs.pop("user") service = "qubes.VMRootShell" return self.run_service_for_stdio( service, input=self.prepare_input_for_vmshell(command, input), **kwargs, ) except subprocess.CalledProcessError as e: e.cmd = command raise e
[docs] def run_with_args(self, *args, **kwargs): """Run a single command inside the domain. Use the qubes.VMExec qrexec, if available. This method execute a single command, without interpreting any shell special characters. """ # pylint: disable=redefined-builtin if self.features.check_with_template("vmexec", False): try: service = "qubes.VMExec+" if kwargs.get("user", None) == "root": if self.features.check_with_template( "supported-rpc.qubes.VMRootExec", False ): kwargs.pop("user") service = "qubes.VMRootExec+" return self.run_service_for_stdio( service + qubesadmin.utils.encode_for_vmexec(args), **kwargs ) except subprocess.CalledProcessError as e: e.cmd = str(args) raise e return self.run(" ".join(shlex.quote(arg) for arg in args), **kwargs)
@property def appvms(self): """Returns a generator containing all domains based on the current TemplateVM. Do not check vm type of self, core (including its extentions) have ultimate control what can be a template of what. """ for vm in self.app.domains: try: if vm.template == self: yield vm except AttributeError: pass @property def derived_vms(self): """ Return list of all domains based on the current TemplateVM at any level of inheritance. """ return list(QubesVM._get_derived_vms(self)) @staticmethod def _get_derived_vms(vm): """ Return `set` of all domains based on the current TemplateVM at any level of inheritance. """ result = set(vm.appvms) for appvm in vm.appvms: result.update(QubesVM._get_derived_vms(appvm)) return result @property def connected_vms(self): """Return a generator containing all domains connected to the current NetVM. """ for vm in self.app.domains: try: if vm.netvm == self: yield vm except AttributeError: pass @property def klass(self): """Qube class""" # use cached value if available if self._klass is None: try: # use List method as that should be allowed for VMs that are # visible vm_list_data = self.qubesd_call( self._method_dest, "admin.vm.List" ) assert vm_list_data.count(b"\n") == 1 vm_name, props = vm_list_data.decode("ascii").split(" ", 1) assert vm_name == self.name props = props.split(" ") props_dict = dict([vm_prop.split("=", 1) for vm_prop in props]) self._klass = props_dict["class"] except qubesadmin.exc.QubesDaemonAccessError: # pylint: disable=no-member self._klass = super().klass return self._klass
[docs] def get_notes(self) -> str: """Get qube notes""" response = self.qubesd_call(self._method_dest, "admin.vm.notes.Get") return response.decode()
[docs] def set_notes(self, notes: str): """Set qube notes""" self.qubesd_call( self._method_dest, "admin.vm.notes.Set", payload=str(notes).encode(encoding="utf-8"), )
[docs] class DispVMWrapper(QubesVM): """ Wrapper class for new disposable, supporting only service call. Note that when running in dom0, one need to manually kill the disposable after service call ends. """ def __init__(self, *args, **kwargs): self._redirect_dispvm_calls = kwargs.pop("redirect_dispvm_calls", False) super().__init__(*args, **kwargs)
[docs] def run_service(self, service, **kwargs): """Create disposable if absent and run service.""" if ( self.app.qubesd_connection_type == "socket" and self._method_dest.startswith("@dispvm") ): self.create_disposable() # Service call may wait for session start, give it more time # than default 5s kwargs["connect_timeout"] = self.qrexec_timeout return super().run_service(service, **kwargs)
[docs] def cleanup(self): """ Cleanup after disposable usage. Disposable is cleaned up automatically in 'remote' case. """ if ( self.app.qubesd_connection_type == "socket" and not self._method_dest.startswith("@dispvm") ): try: self.kill() except qubesadmin.exc.QubesVMNotRunningError: pass
[docs] def start(self): """Create disposable if absent and start it.""" if self._method_dest.startswith("@dispvm"): self.create_disposable() super().start()
[docs] def create_disposable(self): """Create disposable.""" if self._method_dest.startswith("@dispvm"): if self._method_dest.startswith("@dispvm:"): method_dest = self._method_dest[len("@dispvm:") :] else: method_dest = "dom0" dispvm = self.app.qubesd_call( method_dest, "admin.vm.CreateDisposable" ) dispvm = dispvm.decode("ascii") self._method_dest = dispvm return self
[docs] class DispVM(QubesVM): """Disposable VM"""
[docs] @classmethod def from_appvm(cls, app, appvm, redirect_dispvm_calls=False): """Returns a wrapper for calling service in a new DispVM based on given AppVM. If *appvm* is none, use default DispVM template If **redirect_dispvm_calls** is used, calls made before the disposable is created will instead guess the disposable template to query for features, properties etc. """ if appvm: method_dest = "@dispvm:" + str(appvm) else: method_dest = "@dispvm" wrapper = DispVMWrapper( app, method_dest, redirect_dispvm_calls=redirect_dispvm_calls ) return wrapper