Source code for qubesadmin.tests.tools.qvm_volume

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import tempfile
import unittest.mock

import qubesadmin.tests
import qubesadmin.tests.tools
import qubesadmin.tools.qvm_volume


[docs]class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
[docs] def setup_expected_calls_for_list(self, vms=('vm1', 'sys-net')): self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00vm1 class=AppVM state=Running\n' \ b'sys-net class=AppVM state=Running\n' for vm in vms: for vol in ('root', 'private'): self.app.expected_calls[ (vm, 'admin.vm.volume.Info', vol, None)] = \ b'0\x00' + \ (b'pool=pool-file\n' if vol == 'root' else b'pool=other-pool\n') + \ b'vid=' + vm.encode() + b'-' + vol.encode() + b'\n' \ b'size=10737418240\n' self.app.expected_calls[ (vm, 'admin.vm.volume.ListSnapshots', vol, None)] = \ b'0\x00snap1\n' if vol == 'private' else b'0\x00' self.app.expected_calls[ (vm, 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n'
[docs] def test_000_list(self): self.setup_expected_calls_for_list() with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main(['ls'], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME ' 'REVERT_POSSIBLE\n' 'other-pool:sys-net-private sys-net private Yes\n' 'other-pool:vm1-private vm1 private Yes\n' 'pool-file:sys-net-root sys-net root No\n' 'pool-file:vm1-root vm1 root No\n' ) self.assertAllCalled()
[docs] def test_001_list_domain(self): self.setup_expected_calls_for_list(vms=('vm1',)) with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main(['ls', 'vm1'], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n' 'other-pool:vm1-private vm1 private Yes\n' 'pool-file:vm1-root vm1 root No\n' ) self.assertAllCalled()
[docs] def test_002_list_domain_pool(self): self.setup_expected_calls_for_list(vms=('vm1',)) self.app.expected_calls[('dom0', 'admin.pool.List', None, None)] = \ b'0\x00pool-file\nother-pool\n' del self.app.expected_calls[ ('vm1', 'admin.vm.volume.ListSnapshots', 'private', None)] with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['ls', '-p', 'pool-file', 'vm1'], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n' 'pool-file:vm1-root vm1 root No\n' ) self.assertAllCalled()
[docs] def test_003_list_pool(self): self.setup_expected_calls_for_list() self.app.expected_calls[('dom0', 'admin.pool.List', None, None)] = \ b'0\x00pool-file\nother-pool\n' del self.app.expected_calls[ ('vm1', 'admin.vm.volume.ListSnapshots', 'private', None)] del self.app.expected_calls[ ('sys-net', 'admin.vm.volume.ListSnapshots', 'private', None)] with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['ls', '-p', 'pool-file'], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n' 'pool-file:sys-net-root sys-net root No\n' 'pool-file:vm1-root vm1 root No\n' ) self.assertAllCalled()
[docs] def test_004_list_multiple_domains(self): self.setup_expected_calls_for_list(vms=('vm1', 'vm2')) self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00vm1 class=AppVM state=Running\n' \ b'vm2 class=AppVM state=Running\n' \ b'vm3 class=AppVM state=Running\n' with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['ls', 'vm1', 'vm2'], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n' 'other-pool:vm1-private vm1 private Yes\n' 'other-pool:vm2-private vm2 private Yes\n' 'pool-file:vm1-root vm1 root No\n' 'pool-file:vm2-root vm2 root No\n' ) self.assertAllCalled()
[docs] def test_005_list_default_action(self): self.setup_expected_calls_for_list() with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main([], app=self.app)) self.assertEqual(stdout.getvalue(), 'POOL:VOLUME VMNAME VOLUME_NAME ' 'REVERT_POSSIBLE\n' 'other-pool:sys-net-private sys-net private Yes\n' 'other-pool:vm1-private vm1 private Yes\n' 'pool-file:sys-net-root sys-net root No\n' 'pool-file:vm1-root vm1 root No\n' ) self.assertAllCalled()
[docs] def test_010_extend(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'10737418240')] = \ b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['extend', 'testvm:private', '10GiB'], app=self.app)) self.assertAllCalled()
[docs] def test_011_extend_error(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'10737418240')] = \ b'2\x00StoragePoolException\x00\x00Failed to resize volume: ' \ b'error: success\x00' with qubesadmin.tests.tools.StderrBuffer() as stderr: self.assertEqual(1, qubesadmin.tools.qvm_volume.main( ['extend', 'testvm:private', '10GiB'], app=self.app)) self.assertIn('error: success', stderr.getvalue()) self.assertAllCalled()
[docs] def test_012_extend_deny_shrink(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' with qubesadmin.tests.tools.StderrBuffer() as stderr: self.assertEqual(1, qubesadmin.tools.qvm_volume.main( ['resize', 'testvm:private', '1GiB'], app=self.app)) self.assertIn('shrinking of private is disabled', stderr.getvalue()) self.assertAllCalled()
[docs] def test_013_resize_force_shrink(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'1073741824')] = \ b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['resize', '-f', 'testvm:private', '1GiB'], app=self.app)) self.assertAllCalled()
[docs] def test_020_revert(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \ b'0\x00200101010000\n200201010000\n200301010000\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Revert', 'private', b'200301010000')] = \ b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['revert', 'testvm:private'], app=self.app)) self.assertAllCalled()
[docs] def test_021_revert_error(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \ b'0\x00200101010000\n200201010000\n200301010000\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Revert', 'private', b'200301010000')] = \ b'2\x00StoragePoolException\x00\x00Failed to revert volume: ' \ b'some error\x00' with qubesadmin.tests.tools.StderrBuffer() as stderr: self.assertEqual(1, qubesadmin.tools.qvm_volume.main( ['revert', 'testvm:private'], app=self.app)) self.assertIn('some error', stderr.getvalue()) self.assertAllCalled()
[docs] def test_022_revert_no_snapshots(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \ b'0\x00' with qubesadmin.tests.tools.StderrBuffer() as stderr: self.assertEqual(1, qubesadmin.tools.qvm_volume.main( ['revert', 'testvm:private'], app=self.app)) self.assertIn('No snapshots', stderr.getvalue()) self.assertAllCalled()
[docs] def test_023_revert_specific(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Revert', 'private', b'20050101')] = \ b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['revert', 'testvm:private', '20050101'], app=self.app)) self.assertAllCalled()
[docs] def test_030_set_revisions_to_keep(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Set.revisions_to_keep', 'private', b'3')] = b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['set', 'testvm:private', 'revisions_to_keep', '3'], app=self.app)) self.assertAllCalled()
[docs] def test_031_set_rw(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Set.rw', 'private', b'True')] = b'0\x00' self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['set', 'testvm:private', 'rw', 'True'], app=self.app)) self.assertAllCalled()
[docs] def test_032_set_invalid(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.assertNotEqual(0, qubesadmin.tools.qvm_volume.main( ['set', 'testvm:private', 'invalid', 'True'], app=self.app)) self.assertAllCalled()
[docs] def test_040_info(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=3\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \ b'0\x00200101010000\n200201010000\n200301010000\n' with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main(['info', 'testvm:private'], app=self.app)) output = stdout.getvalue() # travis... output = output.replace('\nsource\n', '\nsource \n') self.assertEqual(output, 'pool lvm\n' 'vid qubes_dom0/vm-testvm-private\n' 'rw True\n' 'source \n' 'save_on_stop True\n' 'snap_on_start False\n' 'size 2147483648\n' 'usage 10000000\n' 'revisions_to_keep 3\n' 'is_outdated False\n' 'Available revisions (for revert):\n' ' 200101010000\n' ' 200201010000\n' ' 200301010000\n') self.assertAllCalled()
[docs] def test_041_info_no_revisions(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'root', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-root\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=qubes_dom0/vm-fedora-26-root\n' \ b'save_on_stop=False\n' \ b'snap_on_start=True\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'root', None)] = \ b'0\x00' with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main(['info', 'testvm:root'], app=self.app)) self.assertEqual(stdout.getvalue(), 'pool lvm\n' 'vid qubes_dom0/vm-testvm-root\n' 'rw True\n' 'source qubes_dom0/vm-fedora-26-root\n' 'save_on_stop False\n' 'snap_on_start True\n' 'size 2147483648\n' 'usage 10000000\n' 'revisions_to_keep 0\n' 'is_outdated False\n' 'Available revisions (for revert): none\n') self.assertAllCalled()
[docs] def test_042_info_single_prop(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'root', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-root\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'source=qubes_dom0/vm-fedora-26-root\n' \ b'save_on_stop=False\n' \ b'snap_on_start=True\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['info', 'testvm:root', 'usage'], app=self.app)) self.assertEqual(stdout.getvalue(), '10000000\n') self.assertAllCalled()
[docs] def test_043_info_revisions_only(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \ b'0\x00200101010000\n200201010000\n200301010000\n' with qubesadmin.tests.tools.StdoutBuffer() as stdout: self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['info', 'testvm:private', 'revisions'], app=self.app)) self.assertEqual(stdout.getvalue(), '200101010000\n' '200201010000\n' '200301010000\n') self.assertAllCalled()
[docs] def test_050_import_file(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \ b'0\x00' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') input_file.seek(0) input_file.flush() self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['import', 'testvm:private', input_file.name], app=self.app)) self.assertAllCalled()
[docs] def test_051_import_stdin(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \ b'0\x00' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') input_file.seek(0) with unittest.mock.patch('sys.stdin') as mock_stdin: mock_stdin.buffer = input_file self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['import', 'testvm:private', '-'], app=self.app)) self.assertAllCalled()
[docs] def test_052_import_file_size(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Resize', 'private', b'512')] = \ b'0\x00' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') input_file.seek(0) input_file.flush() self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['import', '--size=512', 'testvm:private', input_file.name], app=self.app)) self.assertAllCalled()
[docs] def test_053_import_file_noresize(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=2147483648\n' \ b'usage=10000000\n' \ b'rw=True\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') input_file.seek(0) input_file.flush() self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['import', '--no-resize', 'testvm:private', input_file.name], app=self.app)) self.assertAllCalled()
[docs] def test_053_import_file_matching_size(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00testvm class=AppVM state=Running\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.List', None, None)] = \ b'0\x00root\nprivate\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Info', 'private', None)] = \ b'0\x00pool=lvm\n' \ b'vid=qubes_dom0/vm-testvm-private\n' \ b'size=9\n' \ b'usage=1\n' \ b'rw=True\n' \ b'save_on_stop=True\n' \ b'snap_on_start=False\n' \ b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') input_file.seek(0) input_file.flush() self.assertEqual(0, qubesadmin.tools.qvm_volume.main( ['import', 'testvm:private', input_file.name], app=self.app)) self.assertAllCalled()