Source code for qubesadmin.tools.qvm_run

# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
#                               <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

"""qvm-run tool"""
import argparse
import contextlib
import io
import os
import shlex
import signal
import subprocess
import sys

import qubesadmin.tools
import qubesadmin.exc
import qubesadmin.utils

parser = qubesadmin.tools.QubesArgumentParser()

parser.add_argument(
    "--user",
    "-u",
    metavar="USER",
    help="run command in a qube as USER (available only from dom0)",
)

parser.add_argument(
    "--autostart",
    "--auto",
    "-a",
    action="store_true",
    default=True,
    help="option ignored, this is default",
)

parser.add_argument(
    "--no-autostart",
    "--no-auto",
    "-n",
    action="store_false",
    dest="autostart",
    help="do not autostart/unpause qube",
)

parser.add_argument(
    "--pass-io",
    "-p",
    action="store_true",
    dest="passio",
    default=False,
    help="pass stdio from remote program",
)

parser.add_argument(
    "--localcmd",
    metavar="COMMAND",
    help="with --pass-io, pass stdio to the given program",
)

parser.add_argument(
    "--gui",
    action="store_true",
    default=None,
    help='run the command with GUI (default on if "DISPLAY" is set, guivm '
    "property is set and gui feature is True)",
)

parser.add_argument(
    "--no-gui",
    "--nogui",
    action="store_false",
    dest="gui",
    help="run the command without GUI",
)

parser.add_argument(
    "--colour-output",
    "--color-output",
    metavar="COLOUR",
    action="store",
    dest="color_output",
    default=None,
    help='mark the qube output with given ANSI colour (ie. "31" for red)',
)

parser.add_argument(
    "--colour-stderr",
    "--color-stderr",
    metavar="COLOUR",
    action="store",
    dest="color_stderr",
    default=None,
    help='mark the qube stderr with given ANSI colour (ie. "31" for red)',
)

parser.add_argument(
    "--no-colour-output",
    "--no-color-output",
    action="store_false",
    dest="color_output",
    help="disable colouring the stdio",
)

parser.add_argument(
    "--no-colour-stderr",
    "--no-color-stderr",
    action="store_false",
    dest="color_stderr",
    help="disable colouring the stderr",
)

try:
    _stdout_isatty = os.isatty(sys.stdout.fileno())
except io.UnsupportedOperation:
    _stdout_isatty = False

parser.add_argument(
    "--filter-escape-chars",
    action="store_true",
    dest="filter_esc",
    default=_stdout_isatty,
    help="filter terminal escape sequences (default if output is terminal)",
)

parser.add_argument(
    "--no-filter-escape-chars",
    action="store_false",
    dest="filter_esc",
    help="do not filter terminal escape sequences; DANGEROUS when output is a"
    " terminal emulator",
)

parser.add_argument(
    "--service",
    action="store_true",
    dest="service",
    help="run a qrexec service (named by COMMAND) instead of shell command",
)

parser.add_argument(
    "--no-shell",
    action="store_true",
    help="treat COMMAND as a simple executable, not a shell command",
)

target_parser = parser.add_mutually_exclusive_group()

target_parser.add_argument(
    "--dispvm",
    action="store",
    nargs="?",
    const=True,
    metavar="DISPOSABLE_TEMPLATE",
    help="start a service in new disposable qube; optionally specify a "
    "disposable template, else the system default will be used",
)
parser.add_argument("VMNAME", nargs="?", action=qubesadmin.tools.VmNameAction)

# add those manually instead of vmname_args, because of mutually exclusive
# group with --dispvm; parsing is still handled by QubesArgumentParser
target_parser.add_argument(
    "--all",
    action="store_true",
    dest="all_domains",
    help="run command on all running qubes",
)

parser.add_argument(
    "--exclude", action="append", default=[], help="exclude the qube from --all"
)

parser.add_argument("cmd", metavar="COMMAND", help="command or service to run")

# use argparse.REMAINDER here, not '*' ― the latter swallows a leading "--".
parser.add_argument(
    "cmd_args",
    nargs=argparse.REMAINDER,
    metavar="ARG",
    help="command arguments (implies --no-shell)",
)






[docs] def run_command_single(args, vm): """Handle a single VM to run the command in""" run_kwargs = {} if not args.passio: run_kwargs["stdin"] = subprocess.DEVNULL run_kwargs["stdout"] = subprocess.DEVNULL run_kwargs["stderr"] = subprocess.DEVNULL elif args.localcmd: run_kwargs["stdin"] = subprocess.PIPE run_kwargs["stdout"] = subprocess.PIPE run_kwargs["stderr"] = None else: # connect process output to stdin/stdout/err directly if --pass-io is # given run_kwargs["stdin"] = None run_kwargs["stdout"] = None run_kwargs["stderr"] = None if args.filter_esc: run_kwargs["filter_esc"] = True if ( isinstance(args.app, qubesadmin.app.QubesLocal) and not args.passio and not args.localcmd and args.service and not args.dispvm ): # wait=False works only in dom0; but it's still useful, to save on # simultaneous vchan connections run_kwargs["wait"] = False # Ugly hack: if args.dispvm is not None, then args.VMNAME is actually the # command, and args.command the first argument to the command. if args.dispvm is not None and args.VMNAME is not None: args.cmd_args.insert(0, args.cmd) args.cmd = args.VMNAME args.VMNAME = None if args.dispvm and args.gui is None: args.gui = has_gui(vm) use_exec = len(args.cmd_args) > 0 or args.no_shell local_proc = None shell_cmd = None if args.service: service = args.cmd elif use_exec: all_args = [args.cmd] + args.cmd_args if vm.features.check_with_template("vmexec", False): service = "qubes.VMExec" if args.gui and args.dispvm: service = "qubes.VMExecGUI" elif args.user == "root" and vm.features.check_with_template( "supported-rpc.qubes.VMRootExec", False ): service = "qubes.VMRootExec" args.user = None service += "+" + qubesadmin.utils.encode_for_vmexec(all_args) else: service = "qubes.VMShell" if args.gui and args.dispvm: service += "+WaitForSession" # if there is no vmexec, surely qubes.VMRootShell is also too old # - see comment below shell_cmd = " ".join(shlex.quote(arg) for arg in all_args) else: service = "qubes.VMShell" if args.gui and args.dispvm: service += "+WaitForSession" elif args.user == "root" and vm.features.check_with_template( "supported-rpc.qubes.VMRootExec", False ): # The above intentionally checks for VMRootExec, not VMRootShell. # When VMRootExec was introduced, both VMRootExec and VMRootShell # got also force-user=root setting necessary for the below to work. service = "qubes.VMRootShell" args.user = None shell_cmd = args.cmd if shell_cmd: run_kwargs["prefix_data"] = vm.prepare_input_for_vmshell(shell_cmd) proc = vm.run_service(service, user=args.user, **run_kwargs) if args.localcmd: # pylint: disable=consider-using-with local_proc = subprocess.Popen( args.localcmd, shell=True, stdout=proc.stdin, stdin=proc.stdout ) proc.stdin.close() proc.stdout.close() else: assert proc.stdin is None, repr(proc.stdin) return proc, local_proc
[docs] def has_gui(qube) -> bool: """Returns ``True`` if qube can have a GUI.""" return ( os.environ.get("DISPLAY") is not None and getattr(qube, "guivm", None) and qube.features.check_with_template("gui", True) )
# pylint: disable=too-many-statements
[docs] def main(args=None, app=None): """Main function of qvm-run tool""" args = parser.parse_args(args, app=app) # pylint: disable=unidiomatic-typecheck if type(args.cmd) is not str: # pylint: disable=unidiomatic-typecheck if type(args.cmd) is list and not args.cmd: # Work around an argparse bug: if COMMAND is literally "--", it is # misparsed as an empty list! args.cmd = "--" else: raise AssertionError("args.cmd misparsed somehow? (this is a bug)") if args.passio: if args.color_output is None and args.filter_esc: args.color_output = 31 try: stderr_isatty = os.isatty(sys.stderr.fileno()) except io.UnsupportedOperation: stderr_isatty = False if args.color_stderr is None and stderr_isatty: args.color_stderr = 31 if len(args.domains) > 1 and args.passio and not args.localcmd: parser.error( "--passio cannot be used when more than 1 qube is chosen " "and no --localcmd is used" ) if args.localcmd and not args.passio: parser.error("--localcmd have no effect without --pass-io") if args.color_output and not args.filter_esc: parser.error("--color-output must be used with --filter-escape-chars") if args.service and args.no_shell: parser.error("--no-shell does not apply to --service") retcode = 0 verbose = args.verbose - args.quiet if args.passio: verbose -= 1 # --all and --exclude are handled by QubesArgumentParser domains = args.domains dispvm = None gui_per_domain = {} if args.dispvm: if args.exclude: parser.error("Cannot use --exclude with --dispvm") dispvm = qubesadmin.vm.DispVM.from_appvm( args.app, None if args.dispvm is True else args.dispvm, redirect_dispvm_calls=True ) domains = [dispvm] elif args.all_domains: # --all consider only running VMs domains = [ vm for vm in domains if vm.is_running() and not vm.features.get("internal") and not vm.is_paused() ] if args.gui is None: for qube in domains: gui_per_domain[qube] = has_gui(qube) else: if len(domains) != 1: parser.print_usage(sys.stderr) sys.exit(2) if args.gui is None: args.gui = has_gui(domains[0]) if args.color_output: sys.stdout.write("\033[0;{}m".format(args.color_output)) sys.stdout.flush() if args.color_stderr: sys.stderr.write("\033[0;{}m".format(args.color_stderr)) sys.stderr.flush() try: procs = [] for vm in domains: if not args.autostart and not vm.is_running(): if verbose > 0: print_no_color( "Qube '{}' not started".format(vm.name), file=sys.stderr, color=args.color_stderr, ) retcode = max(retcode, 1) continue if not args.dispvm and vm.is_paused(): if not args.autostart: if verbose > 0: print_no_color( "Qube '{}' is paused".format(vm.name), file=sys.stderr, color=args.color_stderr, ) retcode = max(retcode, 1) continue try: vm.unpause() except qubesadmin.exc.QubesException: if verbose > 0: print_no_color( "Qube '{}' cannot be unpaused".format(vm.name), file=sys.stderr, color=args.color_stderr, ) retcode = max(retcode, 1) continue try: if verbose > 0: print_no_color( "Running '{}' on {}".format(args.cmd, vm.name), file=sys.stderr, color=args.color_stderr, ) if not args.dispvm and ( args.gui or (args.all_domains and gui_per_domain.get(vm)) ): wait_session = vm.run_service( "qubes.WaitForSession", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) try: wait_session.communicate(vm.default_user.encode()) except KeyboardInterrupt: with contextlib.suppress(ProcessLookupError): wait_session.send_signal(signal.SIGINT) break proc, local_proc = run_command_single(args, vm) procs.append((vm, proc)) if local_proc: procs.append((vm, local_proc)) except qubesadmin.exc.QubesException as e: if args.color_output: sys.stdout.write("\033[0m") sys.stdout.flush() vm.log.error(str(e)) return -1 try: for vm, proc in procs: this_retcode = proc.wait() if this_retcode and verbose > 0: print_no_color( "{}: command failed with code: {}".format( vm.name, this_retcode ), file=sys.stderr, color=args.color_stderr, ) retcode = max(retcode, proc.wait()) except KeyboardInterrupt: for vm, proc in procs: with contextlib.suppress(ProcessLookupError): proc.send_signal(signal.SIGINT) for vm, proc in procs: retcode = max(retcode, proc.wait()) finally: if dispvm: dispvm.cleanup() if args.color_output: sys.stdout.write("\033[0m") sys.stdout.flush() if args.color_stderr: sys.stderr.write("\033[0m") sys.stderr.flush() return retcode
if __name__ == "__main__": sys.exit(main())