Source code for qubes.ext.custom_persist
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2024 Guillaume Chinal <guiiix@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import os
import qubes.ext
import qubes.config
FEATURE_PREFIX = "custom-persist."
QDB_PREFIX = "/persist/"
QDB_KEY_LIMIT = 63
[docs]
class CustomPersist(qubes.ext.Extension):
"""This extension allows to create minimal-state APP with by configuring an
exhaustive list of bind dirs(and files)
"""
@staticmethod
def _extract_key_from_feature(feature) -> str:
return feature[len(FEATURE_PREFIX) :]
@staticmethod
def _is_expected_feature(feature) -> bool:
return feature.startswith(FEATURE_PREFIX)
@staticmethod
def _check_key(key):
if not key:
raise qubes.exc.QubesValueError(
"custom-persist key cannot be empty"
)
# QubesDB key length limit
key_maxlen = QDB_KEY_LIMIT - len(QDB_PREFIX)
if len(key) > key_maxlen:
raise qubes.exc.QubesValueError(
"custom-persist key is too long (max {}), ignoring: "
"{}".format(key_maxlen, key)
)
@staticmethod
def _check_value_path(value):
if not os.path.isabs(value):
raise qubes.exc.QubesValueError(f"invalid path '{value}'")
def _check_value(self, value):
if value.startswith("/"):
self._check_value_path(value)
else:
options = value.split(":")
if len(options) < 5 or not options[4].startswith("/"):
raise qubes.exc.QubesValueError(
f"invalid value format: '{value}'"
)
resource_type = options[0]
mode = options[3]
if resource_type not in ("file", "dir"):
raise qubes.exc.QubesValueError(
f"invalid resource type option '{resource_type}' "
f"in value '{value}'"
)
try:
if not 0 <= int(mode, 8) <= 0o7777:
raise qubes.exc.QubesValueError(
f"invalid mode option '{mode}' in value '{value}'"
)
except ValueError:
raise qubes.exc.QubesValueError(
f"invalid mode option '{mode}' in value '{value}'"
)
self._check_value_path(":".join(options[4:]))
def _write_db_value(self, feature, value, vm):
vm.untrusted_qdb.write(
"{}{}".format(QDB_PREFIX, self._extract_key_from_feature(feature)),
str(value),
)
@qubes.ext.handler("domain-qdb-create")
def on_domain_qdb_create(self, vm, event):
"""Actually export features"""
# pylint: disable=unused-argument
for feature, value in vm.features.items():
if self._is_expected_feature(feature):
self._check_key(self._extract_key_from_feature(feature))
self._check_value(value)
self._write_db_value(feature, value, vm)
@qubes.ext.handler("domain-feature-set:*")
def on_domain_feature_set(self, vm, event, feature, value, oldvalue=None):
"""Inject persist keys in QubesDB in runtime"""
# pylint: disable=unused-argument
if not self._is_expected_feature(feature):
return
self._check_key(self._extract_key_from_feature(feature))
self._check_value(value)
if not vm.is_running():
return
self._write_db_value(feature, value, vm)
@qubes.ext.handler("domain-feature-delete:*")
def on_domain_feature_delete(self, vm, event, feature):
"""Update /persist/ QubesDB tree in runtime"""
# pylint: disable=unused-argument
if not vm.is_running():
return
if not feature.startswith(FEATURE_PREFIX):
return
vm.untrusted_qdb.rm(
"{}{}".format(QDB_PREFIX, self._extract_key_from_feature(feature))
)