Source code for qubesadmin.tests.tools

# encoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015  Marek Marczykowski-Górecki
#                                       <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.


import io
import sys

import asyncio


[docs]class StdoutBuffer(object): def __init__(self): self.orig_stdout = None if sys.version_info[0] >= 3: self.stdout = io.StringIO() else: self.stdout = io.BytesIO() def __enter__(self): self.orig_stdout = sys.stdout sys.stdout = self.stdout return self.stdout def __exit__(self, exc_type, exc_val, exc_tb): sys.stdout = self.orig_stdout return False
[docs]class StderrBuffer(object): def __init__(self): self.orig_stderr = None if sys.version_info[0] >= 3: self.stderr = io.StringIO() else: self.stderr = io.BytesIO() def __enter__(self): self.orig_stderr = sys.stderr sys.stderr = self.stderr return self.stderr def __exit__(self, exc_type, exc_val, exc_tb): sys.stderr = self.orig_stderr return False
[docs]class MockEventsReader(object): def __init__(self, events, delay=0.05): self.events = events self.delay = delay self.current_event = None
[docs] def at_eof(self): return not bool(self.events)
[docs] @asyncio.coroutine def readuntil(self, delim): if not self.current_event: if not self.events: raise asyncio.IncompleteReadError(b'', delim) yield from asyncio.sleep(self.delay) self.current_event = self.events.pop(0) data, rest = self.current_event.split(delim, 1) self.current_event = rest return data + delim
@asyncio.coroutine def __call__(self, vm=None): return self, (lambda: None)