Source code for qubes.ext.core_features

# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <marmarek@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import datetime
import re
import string

import qubes.ext

_version_re = re.compile(r"[0-9]{1,3}\.[0-9]{1,3}")


[docs] class CoreFeatures(qubes.ext.Extension): # pylint: disable=too-few-public-methods @qubes.ext.handler("features-request") async def qubes_features_request(self, vm, event, untrusted_features): """Handle features provided by qubes-core-agent and qubes-gui-agent""" # pylint: disable=unused-argument if getattr(vm, "template", None): vm.log.warning("Ignoring qubes.NotifyTools for template-based VM") return if ( "os-distribution" in untrusted_features and untrusted_features["os-distribution"] ): # entry point already validates values for safe characters vm.features["os-distribution"] = untrusted_features[ "os-distribution" ] if ( "os-distribution-like" in untrusted_features and untrusted_features["os-distribution-like"] ): # entry point already validates values for safe characters vm.features["os-distribution-like"] = untrusted_features[ "os-distribution-like" ] if ( "os-version" in untrusted_features and untrusted_features["os-version"] ): # no letters in versions please safe_set = string.digits + ".-" untrusted_version = untrusted_features["os-version"] if ( all(c in safe_set for c in untrusted_version) and untrusted_version[0].isdigit() ): vm.features["os-version"] = untrusted_version else: # safe to log the value as passed preliminary filtering already vm.log.warning( "Invalid 'os-version' value '%s', must start " "with a digit and only digits and _ or . are allowed", untrusted_version, ) if "os-eol" in untrusted_features and untrusted_features["os-eol"]: untrusted_eol = untrusted_features["os-eol"] valid = False if re.match(r"\A\d{4}-\d{2}-\d{2}\Z", untrusted_eol): try: datetime.date.fromisoformat(untrusted_eol) valid = True except ValueError: pass if valid: vm.features["os-eol"] = untrusted_eol else: vm.log.warning( "Invalid 'os-eol' value '%s', expected YYYY-MM-DD", untrusted_eol, ) requested_features = {} for feature in ( "qrexec", "gui", "gui-emulated", "qubes-firewall", "vmexec", ): untrusted_value = untrusted_features.get(feature, None) if untrusted_value in ("1", "0"): requested_features[feature] = bool(int(untrusted_value)) if "qubes-agent-version" in untrusted_features: untrusted_value = untrusted_features["qubes-agent-version"] if _version_re.fullmatch(untrusted_value): vm.features["qubes-agent-version"] = untrusted_value # handle boot mode advertisement old_bootmode_info = {} for feature_key, feature_val in vm.features.items(): if feature_key.startswith( "boot-mode.kernelopts." ) or feature_key.startswith("boot-mode.name."): old_bootmode_info[feature_key] = feature_val new_bootmode_info = {} new_bootmode_names = [] for ( untrusted_feature_key, untrusted_feature_value, ) in untrusted_features.items(): if untrusted_feature_key.startswith("boot-mode.kernelopts."): bootmode_key_parts = untrusted_feature_key.split(".") if len(bootmode_key_parts) != 3: # Boot mode key contains unexpected data, reject it continue bootmode_name = bootmode_key_parts[2] if bootmode_name == "": continue if bootmode_name == "default": # "default" is reserved, cannot set kernelopts for it continue bootmode_feature = untrusted_feature_key bootmode_value = untrusted_feature_value new_bootmode_info[bootmode_feature] = bootmode_value for ( untrusted_feature_key, untrusted_feature_value, ) in untrusted_features.items(): if untrusted_feature_key.startswith("boot-mode.name."): bootmode_key_parts = untrusted_feature_key.split(".") if len(bootmode_key_parts) != 3: # Boot mode key contains unexpected data, reject it continue bootmode_name = bootmode_key_parts[2] if bootmode_name == "": continue if ( f"boot-mode.kernelopts.{bootmode_name}" not in new_bootmode_info ) and bootmode_name != "default": continue bootmode_feature = untrusted_feature_key bootmode_value = untrusted_feature_value new_bootmode_info[bootmode_feature] = bootmode_value new_bootmode_names.append(bootmode_value) if ( # Disallow duplicate boot mode names len(new_bootmode_names) == len(set(new_bootmode_names)) # Don't allow more than 64 boot modes and len(new_bootmode_info) <= 64 # Don't allow wiping all boot modes and len(new_bootmode_info) > 0 ): for feature_key in old_bootmode_info: if feature_key not in new_bootmode_info: del vm.features[feature_key] for feature_key, feature_val in new_bootmode_info.items(): vm.features[feature_key] = feature_val if "boot-mode.active" in untrusted_features: untrusted_feature_value = untrusted_features["boot-mode.active"] if ( f"boot-mode.kernelopts.{untrusted_feature_value}" in vm.features or untrusted_feature_value == "default" ): bootmode_value = untrusted_feature_value vm.features["boot-mode.active"] = bootmode_value if "boot-mode.appvm-default" in untrusted_features: untrusted_feature_value = untrusted_features[ "boot-mode.appvm-default" ] if ( f"boot-mode.kernelopts.{untrusted_feature_value}" in vm.features or untrusted_feature_value == "default" ) and hasattr(vm, "appvm_default_bootmode"): bootmode_value = untrusted_feature_value vm.features["boot-mode.appvm-default"] = bootmode_value del untrusted_features # default user for qvm-run etc # starting with Qubes 4.x ignored # qrexec agent presence (0 or 1) # gui agent presence (0 or 1) qrexec_before = vm.features.get("qrexec", False) for feature in ("qrexec", "gui", "gui-emulated"): # do not allow (Template)VM to override setting if already set # some other way if feature in requested_features and feature not in vm.features: vm.features[feature] = requested_features[feature] # those features can be freely enabled or disabled by template for feature in ("qubes-firewall", "vmexec"): if feature in requested_features: vm.features[feature] = requested_features[feature] if not qrexec_before and vm.features.get("qrexec", False): # if this is the first time qrexec was advertised, now can finish # template setup await vm.fire_event_async("template-postinstall") def set_servicevm_feature(self, subject): if getattr(subject, "provides_network", False): subject.features["servicevm"] = 1 # icon is calculated based on this feature subject.fire_event("property-reset:icon", name="icon") elif "servicevm" in subject.features: del subject.features["servicevm"] # icon is calculated based on this feature subject.fire_event("property-reset:icon", name="icon") @qubes.ext.handler("property-set:provides_network") def on_property_set(self, subject, event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument self.set_servicevm_feature(subject) @qubes.ext.handler("property-reset:provides_network") def on_property_reset(self, subject, event, name, oldvalue=None): # pylint: disable=unused-argument self.set_servicevm_feature(subject) @qubes.ext.handler("domain-load") def on_domain_load(self, subject, event): # pylint: disable=unused-argument self.set_servicevm_feature(subject)