Source code for qubesadmin.storage

# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

"""Storage subsystem."""
import qubesadmin.exc


[docs] class Volume(object): """Storage volume.""" def __init__(self, app, pool=None, vid=None, vm=None, vm_name=None): """Construct a Volume object. Volume may be identified using pool+vid, or vm+vm_name. Either of those argument pairs must be given. :param Qubes app: application instance :param str pool: pool name :param str vid: volume id (within pool) :param str vm: owner VM name :param str vm_name: name within owning VM (like 'private', 'root' etc) """ self.app = app if pool is None and vm is None: raise ValueError('Either pool or vm must be given') if pool is not None and vid is None: raise ValueError('If pool is given, vid must be too.') if vm is not None and vm_name is None: raise ValueError('If vm is given, vm_name must be too.') self._pool = pool self._vid = vid self._vm = vm self._vm_name = vm_name self._info = None def _qubesd_call(self, func_name, payload=None, payload_stream=None): """Make a call to qubesd regarding this volume :param str func_name: API function name, like `Info` or `Resize` :param bytes payload: Payload to send. :param file payload_stream: Stream to pipe payload from. Only one of `payload` and `payload_stream` can be used. """ if self._vm is not None: method = 'admin.vm.volume.' + func_name dest = self._vm arg = self._vm_name else: if payload_stream: raise NotImplementedError( 'payload_stream not implemented for ' 'admin.pool.volume.* calls') method = 'admin.pool.volume.' + func_name dest = 'dom0' arg = self._pool if payload is not None: payload = self._vid.encode('ascii') + b' ' + payload else: payload = self._vid.encode('ascii') return self.app.qubesd_call( dest, method, arg, payload=payload, payload_stream=payload_stream) def _fetch_info(self, force=True): """Fetch volume properties Populate self._info dict :param bool force: refresh self._info, even if already populated. """ if not force and self._info is not None: return info = self._qubesd_call('Info') info = info.decode('ascii') self._info = dict([line.split('=', 1) for line in info.splitlines()]) def __eq__(self, other): if isinstance(other, Volume): return self.pool == other.pool and self.vid == other.vid return NotImplemented def __lt__(self, other): # pylint: disable=protected-access if isinstance(other, Volume): if self._vm and other._vm: return (self._vm, self._vm_name) < (other._vm, other._vm_name) if self._vid and other._vid: return (self._pool, self._vid) < (other._pool, other._vid) return NotImplemented @property def name(self): """per-VM volume name, if available""" return self._vm_name @property def pool(self): """Storage volume pool name.""" if self._pool is not None: return self._pool try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('pool') return str(self._info['pool']) @property def vid(self): """Storage volume id, unique within given pool.""" if self._vid is not None: return self._vid try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('vid') return str(self._info['vid']) @property def size(self): """Size of volume, in bytes.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('size') return int(self._info['size']) @property def usage(self): """Used volume space, in bytes.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('usage') return int(self._info['usage']) @property def rw(self): """True if volume is read-write.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('rw') return self._info['rw'] == 'True' @rw.setter def rw(self, value): """Set rw property""" self._qubesd_call('Set.rw', str(value).encode('ascii')) self._info = None @property def ephemeral(self): """True if volume is read-write.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('ephemeral') return self._info.get('ephemeral', 'False') == 'True' @ephemeral.setter def ephemeral(self, value): """Set rw property""" self._qubesd_call('Set.ephemeral', str(value).encode('ascii')) self._info = None @property def snap_on_start(self): """Create a snapshot from source on VM start.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('snap_on_start') return self._info['snap_on_start'] == 'True' @property def save_on_stop(self): """Commit changes to original volume on VM stop.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('save_on_stop') return self._info['save_on_stop'] == 'True' @property def source(self): """Volume ID of source volume (for :py:attr:`snap_on_start`). If None, this volume itself will be used. """ try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('source') if self._info['source']: return self._info['source'] return None @property def revisions_to_keep(self): """Number of revisions to keep around""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('revisions_to_keep') return int(self._info['revisions_to_keep']) @revisions_to_keep.setter def revisions_to_keep(self, value): """Set revisions_to_keep property""" self._qubesd_call('Set.revisions_to_keep', str(value).encode('ascii')) self._info = None
[docs] def is_outdated(self): """Returns `True` if this snapshot of a source volume (for `snap_on_start`=True) is outdated. """ try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('is_outdated') return self._info.get('is_outdated', False) == 'True'
[docs] def resize(self, size): """Resize volume. Currently only extending is supported. :param int size: new size in bytes. """ self._qubesd_call('Resize', str(size).encode('ascii'))
@property def revisions(self): """ Returns iterable containing revision identifiers""" revisions = self._qubesd_call('ListSnapshots') return revisions.decode('ascii').splitlines()
[docs] def revert(self, revision): """ Revert volume to previous revision :param str revision: Revision identifier to revert to """ if not isinstance(revision, str): raise TypeError('revision must be a str') self._qubesd_call('Revert', revision.encode('ascii'))
[docs] def import_data(self, stream): """ Import volume data from a given file-like object. This function overrides existing volume content. :param stream: file-like object, must support fileno() """ self._qubesd_call('Import', payload_stream=stream)
[docs] def import_data_with_size(self, stream, size): """ Import volume data from a given file-like object, informing qubesd that data has a specific size. This function overrides existing volume content. :param stream: file-like object, must support fileno() :param size: size of data in bytes """ size_line = str(size) + '\n' self._qubesd_call( 'ImportWithSize', payload=size_line.encode(), payload_stream=stream)
[docs] def clear_data(self): """ Clear existing volume content. """ self._qubesd_call('Clear')
[docs] def clone(self, source): """ Clone data from sane volume of another VM. This function override existing volume content. This operation is implemented for VM volumes - those in vm.volumes collection (not pool.volumes). :param source: source volume object """ # pylint: disable=protected-access # get a token from source volume token = source._qubesd_call('CloneFrom') # and use it to actually clone volume data self._qubesd_call('CloneTo', payload=token)
[docs] class Pool(object): """ A Pool is used to manage different kind of volumes (File based/LVM/Btrfs/...). """ def __init__(self, app, name=None): """ Initialize storage pool wrapper :param app: Qubes() object :param name: name of the pool """ self.app = app self.name = name self._config = None def __str__(self): return self.name def __eq__(self, other): if isinstance(other, Pool): return self.name == other.name if isinstance(other, str): return self.name == other return NotImplemented def __lt__(self, other): if isinstance(other, Pool): return self.name < other.name return NotImplemented @property def usage_details(self): """ Storage pool usage details (current - not cached) """ try: pool_usage_data = self.app.qubesd_call( 'dom0', 'admin.pool.UsageDetails', self.name, None) except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('usage_details') pool_usage_data = pool_usage_data.decode('utf-8') assert pool_usage_data.endswith('\n') or pool_usage_data == '' pool_usage_data = pool_usage_data[:-1] def _int_split(text): # pylint: disable=missing-docstring key, value = text.split("=", 1) return key, int(value) return dict(_int_split(l) for l in pool_usage_data.splitlines()) @property def config(self): """ Storage pool config """ if self._config is None: try: pool_info_data = self.app.qubesd_call( 'dom0', 'admin.pool.Info', self.name, None) except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('config') pool_info_data = pool_info_data.decode('utf-8') assert pool_info_data.endswith('\n') pool_info_data = pool_info_data[:-1] self._config = dict( l.split('=', 1) for l in pool_info_data.splitlines()) return self._config @property def size(self): """ Storage pool size, in bytes""" try: return int(self.usage_details['data_size']) except KeyError: # pool driver does not provide size information return None @property def usage(self): """ Space used in the pool, in bytes """ try: return int(self.usage_details['data_usage']) except KeyError: # pool driver does not provide usage information return None @property def driver(self): """ Storage pool driver """ return self.config['driver'] @property def revisions_to_keep(self): """Number of revisions to keep around""" return int(self.config['revisions_to_keep']) @revisions_to_keep.setter def revisions_to_keep(self, value): """Set revisions_to_keep property""" self.app.qubesd_call( 'dom0', 'admin.pool.Set.revisions_to_keep', self.name, str(value).encode('ascii')) self._config = None @property def ephemeral_volatile(self): """Whether volatile volumes in this pool should be encrypted with an ephemeral key in dom0""" return bool(self.config['ephemeral_volatile']) @ephemeral_volatile.setter def ephemeral_volatile(self, value): """Set ephemeral_volatile property""" self.app.qubesd_call( 'dom0', 'admin.pool.Set.ephemeral_volatile', self.name, str(value).encode('ascii')) self._config = None @property def volumes(self): """ Volumes managed by this pool """ try: volumes_data = self.app.qubesd_call( 'dom0', 'admin.pool.volume.List', self.name, None) except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('volumes') if volumes_data == b'': return assert volumes_data.endswith(b'\n') volumes_data = volumes_data[:-1].decode('ascii') for vid in volumes_data.splitlines(): yield Volume(self.app, self.name, vid)