Source code for qubesadmin.tests.tools.qvm_template_postprocess

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import asyncio
import os
import subprocess
import tempfile
from unittest import mock
import qubesadmin.tests
import qubesadmin.tools.qvm_template_postprocess


[docs]class QubesLocalMock(qubesadmin.tests.QubesTest): def __init__(self): super(QubesLocalMock, self).__init__() self.__class__ = qubesadmin.app.QubesLocal qubesd_call = qubesadmin.tests.QubesTest.qubesd_call run_service = qubesadmin.tests.QubesTest.run_service
[docs]class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
[docs] def setUp(self): super(TC_00_qvm_template_postprocess, self).setUp() self.source_dir = tempfile.TemporaryDirectory()
[docs] def tearDown(self): try: self.source_dir.cleanup() except FileNotFoundError: pass super(TC_00_qvm_template_postprocess, self).tearDown()
[docs] def test_000_import_root_img_raw(self): root_img = os.path.join(self.source_dir.name, 'root.img') volume_data = b'volume data' with open(root_img, 'wb') as f: f.write(volume_data) self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None, None)] = \ b'0\0root\nprivate\nvolatile\nkernel\n' self.app.expected_calls[ ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-test-vm-root\n' \ b'size=10737418240\n' \ b'usage=0\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root', str(len(volume_data)).encode())] = \ b'0\0' self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root', volume_data)] = b'0\0' vm = self.app.domains['test-vm'] qubesadmin.tools.qvm_template_postprocess.import_root_img( vm, self.source_dir.name) self.assertAllCalled()
[docs] def test_001_import_root_img_tar(self): root_img = os.path.join(self.source_dir.name, 'root.img') volume_data = b'volume data' * 1000 with open(root_img, 'wb') as f: f.write(volume_data) subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'], cwd=self.source_dir.name) subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar', 'root.img.part.'], cwd=self.source_dir.name) os.unlink(root_img) self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[ ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-test-vm-root\n' \ b'size=10737418240\n' \ b'usage=0\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None, None)] = \ b'0\0root\nprivate\nvolatile\nkernel\n' self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root', str(len(volume_data)).encode())] = \ b'0\0' self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root', volume_data)] = b'0\0' vm = self.app.domains['test-vm'] qubesadmin.tools.qvm_template_postprocess.import_root_img( vm, self.source_dir.name) self.assertAllCalled()
[docs] def test_002_import_root_img_no_overwrite(self): self.app.qubesd_connection_type = 'socket' template_dir = os.path.join(self.source_dir.name, 'vm-templates', 'test-vm') os.makedirs(template_dir) root_img = os.path.join(template_dir, 'root.img') volume_data = b'volume data' with open(root_img, 'wb') as f: f.write(volume_data) self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[ ('test-vm', 'admin.vm.volume.List', None, None)] = \ b'0\0root\nprivate\nvolatile\nkernel\n' self.app.expected_calls[ ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \ b'0\x00pool=default\n' \ b'vid=vm-templates/test-vm/root\n' \ b'size=10737418240\n' \ b'usage=0\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('dom0', 'admin.pool.List', None, None)] = \ b'0\0default\n' self.app.expected_calls[ ('dom0', 'admin.pool.Info', 'default', None)] = \ b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n' vm = self.app.domains['test-vm'] qubesadmin.tools.qvm_template_postprocess.import_root_img( vm, template_dir) self.assertAllCalled()
[docs] def test_005_reset_private_img(self): self.app.qubesd_connection_type = 'socket' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[ ('test-vm', 'admin.vm.volume.List', None, None)] = \ b'0\0root\nprivate\nvolatile\nkernel\n' self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'private', b'')] = b'0\0' vm = self.app.domains['test-vm'] qubesadmin.tools.qvm_template_postprocess.reset_private_img(vm) self.assertAllCalled()
[docs] def test_010_import_appmenus(self): with open(os.path.join(self.source_dir.name, 'vm-whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('firefox.desktop\n') with open(os.path.join(self.source_dir.name, 'whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('org.gnome.Software.desktop\n') f.write('gnome-control-center.desktop\n') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' vm = self.app.domains['test-vm'] with mock.patch('subprocess.check_call') as mock_proc: qubesadmin.tools.qvm_template_postprocess.import_appmenus( vm, self.source_dir.name) self.assertEqual(mock_proc.mock_calls, [ mock.call(['qvm-appmenus', '--set-default-whitelist=' + os.path.join(self.source_dir.name, 'vm-whitelisted-appmenus.list'), 'test-vm']), mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join( self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']), ]) self.assertAllCalled()
[docs] @mock.patch('grp.getgrnam') @mock.patch('os.getuid') def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam): with open(os.path.join(self.source_dir.name, 'vm-whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('firefox.desktop\n') with open(os.path.join(self.source_dir.name, 'whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('org.gnome.Software.desktop\n') f.write('gnome-control-center.desktop\n') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' mock_getuid.return_value = 0 mock_getgrnam.configure_mock(**{ 'return_value.gr_mem.__getitem__.return_value': 'user' }) vm = self.app.domains['test-vm'] with mock.patch('subprocess.check_call') as mock_proc: qubesadmin.tools.qvm_template_postprocess.import_appmenus( vm, self.source_dir.name) self.assertEqual(mock_proc.mock_calls, [ mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0', 'qvm-appmenus', '--set-default-whitelist=' + os.path.join(self.source_dir.name, 'vm-whitelisted-appmenus.list'), 'test-vm']), mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0', 'qvm-appmenus', '--set-whitelist=' + os.path.join( self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']), ]) self.assertAllCalled()
[docs] @mock.patch('grp.getgrnam') @mock.patch('os.getuid') def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam): with open(os.path.join(self.source_dir.name, 'vm-whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('firefox.desktop\n') with open(os.path.join(self.source_dir.name, 'whitelisted-appmenus.list'), 'w') as f: f.write('org.gnome.Terminal.desktop\n') f.write('org.gnome.Software.desktop\n') f.write('gnome-control-center.desktop\n') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' mock_getuid.return_value = 0 mock_getgrnam.side_effect = KeyError vm = self.app.domains['test-vm'] with mock.patch('subprocess.check_call') as mock_proc: qubesadmin.tools.qvm_template_postprocess.import_appmenus( vm, self.source_dir.name) self.assertEqual(mock_proc.mock_calls, []) self.assertAllCalled()
[docs] def add_new_vm_side_effect(self, *args, **kwargs): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.domains.clear_cache() return self.app.domains['test-vm']
[docs] @asyncio.coroutine def wait_for_shutdown(self, vm): pass
[docs] @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') def test_020_post_install(self, mock_import_root_img, mock_import_appmenus): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0' self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect) self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \ = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' if qubesadmin.tools.qvm_template_postprocess.have_events: patch_domain_shutdown = mock.patch( 'qubesadmin.events.utils.wait_for_domain_shutdown') self.addCleanup(patch_domain_shutdown.stop) mock_domain_shutdown = patch_domain_shutdown.start() mock_domain_shutdown.side_effect = self.wait_for_shutdown else: self.app.expected_calls[ ('test-vm', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ '--really', 'post-install', 'test-vm', self.source_dir.name], app=self.app) self.assertEqual(ret, 0) self.app.add_new_vm.assert_called_once_with('TemplateVM', name='test-vm', label='black') mock_import_root_img.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) if qubesadmin.tools.qvm_template_postprocess.have_events: mock_domain_shutdown.assert_called_once_with([self.app.domains[ 'test-vm']]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.PostInstall', {}), ('test-vm', 'qubes.PostInstall', b''), ]) self.assertAllCalled()
[docs] @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img') def test_021_post_install_reinstall(self, mock_reset_private_img, mock_import_root_img, mock_import_appmenus): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.add_new_vm = mock.Mock() self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \ = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' if qubesadmin.tools.qvm_template_postprocess.have_events: patch_domain_shutdown = mock.patch( 'qubesadmin.events.utils.wait_for_domain_shutdown') self.addCleanup(patch_domain_shutdown.stop) mock_domain_shutdown = patch_domain_shutdown.start() mock_domain_shutdown.side_effect = self.wait_for_shutdown else: self.app.expected_calls[ ('test-vm', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ '--really', 'post-install', 'test-vm', self.source_dir.name], app=self.app) self.assertEqual(ret, 0) self.assertFalse(self.app.add_new_vm.called) mock_import_root_img.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) mock_reset_private_img.assert_called_once_with(self.app.domains[ 'test-vm']) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) if qubesadmin.tools.qvm_template_postprocess.have_events: mock_domain_shutdown.assert_called_once_with([self.app.domains[ 'test-vm']]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.PostInstall', {}), ('test-vm', 'qubes.PostInstall', b''), ]) self.assertAllCalled()
[docs] @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img') def test_022_post_install_skip_start(self, mock_reset_private_img, mock_import_root_img, mock_import_appmenus): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \ = b'0\0' self.app.add_new_vm = mock.Mock() if qubesadmin.tools.qvm_template_postprocess.have_events: patch_domain_shutdown = mock.patch( 'qubesadmin.events.utils.wait_for_domain_shutdown') self.addCleanup(patch_domain_shutdown.stop) mock_domain_shutdown = patch_domain_shutdown.start() mock_domain_shutdown.side_effect = self.wait_for_shutdown asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ '--really', '--skip-start', 'post-install', 'test-vm', self.source_dir.name], app=self.app) self.assertEqual(ret, 0) self.assertFalse(self.app.add_new_vm.called) mock_import_root_img.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) mock_reset_private_img.assert_called_once_with(self.app.domains[ 'test-vm']) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name) if qubesadmin.tools.qvm_template_postprocess.have_events: self.assertFalse(mock_domain_shutdown.called) self.assertEqual(self.app.service_calls, []) self.assertAllCalled()
[docs] def test_030_pre_remove(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \ b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'False')]\ = b'0\0' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \ b'test-vm\0' ret = qubesadmin.tools.qvm_template_postprocess.main([ '--really', 'pre-remove', 'test-vm', self.source_dir.name], app=self.app) self.assertEqual(ret, 0) self.assertEqual(self.app.service_calls, []) self.assertAllCalled()
[docs] def test_031_pre_remove_existing_appvm(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' \ b'test-vm2 class=AppVM state=Halted\n' self.app.expected_calls[ ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \ b'test-vm\0' self.app.expected_calls[ ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \ b'0\0default=no type=vm test-vm' with self.assertRaises(SystemExit): qubesadmin.tools.qvm_template_postprocess.main([ '--really', 'pre-remove', 'test-vm', self.source_dir.name], app=self.app) self.assertEqual(self.app.service_calls, []) self.assertAllCalled()
[docs] def test_040_missing_really(self): with self.assertRaises(SystemExit): qubesadmin.tools.qvm_template_postprocess.main([ 'post-install', 'test-vm', self.source_dir.name], app=self.app) self.assertAllCalled()