Source code for qubes.vm.mix.net

#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2010-2016  Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2013-2016  Marek Marczykowski-Górecki
#                              <marmarek@invisiblethingslab.com>
# Copyright (C) 2014-2016  Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#

""" This module contains the NetVMMixin """
import ipaddress
import os
import re

import libvirt  # pylint: disable=import-error
import qubes
import qubes.config
import qubes.events
import qubes.firewall
import qubes.exc


def _setter_mac(self, prop, value):
    """Helper for setting the MAC address"""
    # pylint: disable=unused-argument
    if not isinstance(value, str):
        raise ValueError("MAC address must be a string")
    value = value.lower()
    if re.match(r"\A([0-9a-f][0-9a-f]:){5}[0-9a-f][0-9a-f]\Z", value) is None:
        raise ValueError("Invalid MAC address value")
    return value


def _default_ip(self):
    if not self.is_networked():
        return None
    if self.netvm is not None:
        return self.netvm.get_ip_for_vm(self)  # pylint: disable=no-member

    return self.get_ip_for_vm(self)


def _default_ip6(self):
    if not self.is_networked():
        return None
    if not self.features.check_with_netvm("ipv6", False):
        return None
    if self.netvm is not None:
        return self.netvm.get_ip6_for_vm(self)  # pylint: disable=no-member

    return self.get_ip6_for_vm(self)


def _setter_netvm(self, prop, value):
    # pylint: disable=unused-argument
    if value is None:
        return None
    if not value.provides_network:
        raise qubes.exc.QubesValueError(
            "The {!s} qube does not provide network".format(value)
        )

    # skip check for netvm loops during qubes.xml loading, to avoid tricky
    # loading order
    if self.events_enabled:
        if value is self or value in self.app.domains.get_vms_connected_to(
            self
        ):
            raise qubes.exc.QubesValueError("Loops in network are unsupported")
    return value


def _setter_provides_network(self, prop, value):
    value = qubes.property.bool(self, prop, value)
    if not value:
        if list(self.connected_vms):
            raise qubes.exc.QubesValueError(
                "The qube is still used by other qubes, change theirs "
                "'netvm' first"
            )

    return value


class StrSerializableTuple(tuple):
    def __str__(self):
        # verify it can be deserialized later(currently 'dns'
        # property is the only using this, and it is safe)
        if any(" " in el for el in self):
            raise ValueError("space found in a list element {!r}".format(self))
        return " ".join(self)


def vmid_to_ipv4(prefix, vmid):
    # avoid .0 and .255 addresses, it may trip some heuristics
    # if OS assumes /24 netmask
    # preserve unchanged IPs for low vmid
    if vmid < 255:
        return ipaddress.IPv4Address(
            "{}.{}.{}".format(prefix, (vmid >> 8) & 0xFF, vmid & 0xFF)
        )
    # don't reserve first .1 for vmid 0, as it is invalid
    vmid -= 1
    return ipaddress.IPv4Address(
        "{}.{}.{}".format(prefix, vmid // 254, (vmid % 254) + 1)
    )


[docs] class NetVMMixin(qubes.events.Emitter): """Mixin containing network functionality""" mac = qubes.property( "mac", type=str, default="00:16:3e:5e:6c:00", setter=_setter_mac, doc="MAC address of the NIC emulated inside VM", ) ip = qubes.property( "ip", type=ipaddress.IPv4Address, default=_default_ip, doc="IP address of this domain.", ) ip6 = qubes.property( "ip6", type=ipaddress.IPv6Address, default=_default_ip6, doc="IPv6 address of this domain.", ) # CORE2: swallowed uses_default_netvm netvm = qubes.VMProperty( "netvm", load_stage=4, allow_none=True, default=(lambda self: self.app.default_netvm), setter=_setter_netvm, doc="""VM that provides network connection to this domain. When `None`, machine is disconnected. When absent, domain uses default NetVM.""", ) provides_network = qubes.property( "provides_network", default=False, type=bool, setter=_setter_provides_network, doc="""If this domain can act as network provider (formerly known as NetVM or ProxyVM)""", ) @property def firewall_conf(self): return "firewall.xml" # # used in networked appvms or proxyvms (netvm is not None) # @qubes.stateless_property def visible_ip(self): """IP address of this domain as seen by the domain.""" return self.features.check_with_template("net.fake-ip", None) or self.ip @qubes.stateless_property def visible_ip6(self): """IPv6 address of this domain as seen by the domain.""" return self.ip6 @qubes.stateless_property def visible_gateway(self): """Default gateway of this domain as seen by the domain.""" return self.features.check_with_template("net.fake-gateway", None) or ( self.netvm.gateway if self.netvm else None ) @qubes.stateless_property def visible_gateway6(self): """Default (IPv6) gateway of this domain as seen by the domain.""" if self.features.check_with_netvm("ipv6", False): return self.netvm.gateway6 if self.netvm else None return None @qubes.stateless_property def visible_netmask(self): """Netmask as seen by the domain.""" return self.features.check_with_template("net.fake-netmask", None) or ( self.netvm.netmask if self.netvm else None ) # # used in netvms (provides_network=True) # those properties and methods are most likely accessed as vm.netvm.<prop> #
[docs] @staticmethod def get_ip_for_vm(vm): """ Get IP address for (appvm) domain connected to this (netvm) domain. """ import qubes.vm.dispvm # pylint: disable=redefined-outer-name if isinstance(vm, qubes.vm.dispvm.DispVM): return vmid_to_ipv4("10.138", vm.dispid) return vmid_to_ipv4("10.137", vm.qid)
[docs] @staticmethod def get_ip6_for_vm(vm): """Get IPv6 address for (appvm) domain connected to this (netvm) domain. Default address is constructed with Qubes-specific site-local prefix, and IPv4 suffix (0xa89 is 10.137.). """ import qubes.vm.dispvm # pylint: disable=redefined-outer-name if isinstance(vm, qubes.vm.dispvm.DispVM): return ipaddress.IPv6Address( "{}::a8a:{:x}".format(qubes.config.qubes_ipv6_prefix, vm.dispid) ) return ipaddress.IPv6Address( "{}::a89:{:x}".format(qubes.config.qubes_ipv6_prefix, vm.qid) )
@qubes.stateless_property def gateway(self): """Gateway for other domains that use this domain as netvm.""" return self.visible_ip if self.provides_network else None @qubes.stateless_property def gateway6(self): """Gateway (IPv6) for other domains that use this domain as netvm.""" if self.features.check_with_netvm("ipv6", False): return self.visible_ip6 if self.provides_network else None return None @property def netmask(self): """Netmask for gateway address.""" return "255.255.255.255" if self.is_networked() else None @property def connected_vms(self): """Return a generator containing all domains connected to the current NetVM. """ for vm in self.app.domains: if getattr(vm, "netvm", None) is self: yield vm # # used in both # @qubes.stateless_property def dns(self): """DNS servers set up for this domain.""" if self.netvm is not None or self.provides_network: return StrSerializableTuple( ( "10.139.1.1", "10.139.1.2", ) ) return None def __init__(self, *args, **kwargs): self._firewall = None super().__init__(*args, **kwargs) @qubes.events.handler("domain-load") def on_domain_load_netvm_loop_check(self, event): # pylint: disable=unused-argument # make sure there are no netvm loops - which could cause qubesd # looping infinitely if self is self.netvm: self.log.error( "vm '%s' network-connected to itself, breaking the " "connection", self.name, ) self.netvm = None elif self.netvm in self.app.domains.get_vms_connected_to(self): self.log.error( "netvm loop detected on '%s', breaking the connection", self.name, ) self.netvm = None
[docs] @qubes.events.handler("domain-shutdown") def on_domain_shutdown(self, event, **kwargs): """Cleanup network interfaces of connected, running VMs. This will allow re-reconnecting them cleanly later. """ # pylint: disable=unused-argument for vm in self.connected_vms: if not vm.is_running(): continue try: vm.detach_network() except (qubes.exc.QubesException, libvirt.libvirtError): # ignore errors pass
[docs] @qubes.events.handler("domain-start") def on_domain_started(self, event, **kwargs): """Connect this domain to its downstream domains. Also reload firewall in its netvm. This is needed when starting netvm *after* its connected domains. """ # pylint: disable=unused-argument if self.netvm: self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member for vm in self.connected_vms: if not vm.is_running(): continue vm.log.info("Attaching network") try: vm.attach_network() except (qubes.exc.QubesException, libvirt.libvirtError): vm.log.warning("Cannot attach network", exc_info=1)
[docs] @qubes.events.handler("domain-pre-shutdown") def on_domain_pre_shutdown(self, event, force=False): """Checks before NetVM shutdown if any connected domains are running. If `force` is `True` tries to detach network interfaces of connected vms """ # pylint: disable=unused-argument connected_vms = [vm for vm in self.connected_vms if vm.is_running()] if connected_vms and not force: raise qubes.exc.QubesVMError( self, "There are other VMs connected to this VM: {}".format( ", ".join(vm.name for vm in connected_vms) ), )
[docs] def attach_network(self): """Attach network in this machine to it's netvm.""" if not self.is_running(): raise qubes.exc.QubesVMNotRunningError(self) if self.netvm is None: raise qubes.exc.QubesVMError( self, "netvm should not be {}".format(self.netvm) ) if not self.netvm.is_running(): # pylint: disable=no-member # pylint: disable=no-member self.log.info("Starting NetVM ({0})".format(self.netvm.name)) self.netvm.start() self.netvm.set_mapped_ip_info_for_vm(self) self.libvirt_domain.attachDevice( self.app.env.get_template("libvirt/devices/net.xml").render(vm=self) )
[docs] def detach_network(self): """Detach machine from it's netvm""" if not self.is_running(): raise qubes.exc.QubesVMNotRunningError(self) if self.netvm is None: raise qubes.exc.QubesVMError( self, "netvm should not be {}".format(self.netvm) ) self.libvirt_domain.detachDevice( self.app.env.get_template("libvirt/devices/net.xml").render(vm=self) )
[docs] def is_networked(self): """Check whether this VM can reach network (firewall notwithstanding). :returns: :py:obj:`True` if is machine can reach network, \ :py:obj:`False` otherwise. :rtype: bool """ if self.provides_network: return True return self.netvm is not None
[docs] def reload_firewall_for_vm(self, vm): """Reload the firewall rules for the vm""" if not self.is_running(): return for addr_family in (4, 6): ip = vm.ip6 if addr_family == 6 else vm.ip if ip is None: continue base_dir = "/qubes-firewall/{}/".format(ip) # remove old entries if any (but don't touch base empty entry - it # would trigger reload right away self.untrusted_qdb.rm(base_dir) # write new rules for key, value in vm.firewall.qdb_entries( addr_family=addr_family ).items(): self.untrusted_qdb.write(base_dir + key, value) # signal its done self.untrusted_qdb.write(base_dir[:-1], "")
[docs] def set_mapped_ip_info_for_vm(self, vm): """ Set configuration to possibly hide real IP from the VM. This needs to be done before executing 'script' (`/etc/xen/scripts/vif-route-qubes`) in network providing VM """ # add info about remapped IPs (VM IP hidden from the VM itself) mapped_ip_base = "/mapped-ip/{}".format(vm.ip) if vm.visible_ip: self.untrusted_qdb.write( mapped_ip_base + "/visible-ip", str(vm.visible_ip) ) else: self.untrusted_qdb.rm(mapped_ip_base + "/visible-ip") if vm.visible_gateway: self.untrusted_qdb.write( mapped_ip_base + "/visible-gateway", str(vm.visible_gateway) ) else: self.untrusted_qdb.rm(mapped_ip_base + "/visible-gateway")
[docs] def reload_connected_ips(self): """ Update list of IPs possibly connected to this machine. This is used by qubes-firewall to implement anti-spoofing. """ connected_ips = [ str(vm.visible_ip) for vm in self.connected_vms if vm.visible_ip is not None ] connected_ips6 = [ str(vm.visible_ip6) for vm in self.connected_vms if vm.visible_ip6 is not None ] self.untrusted_qdb.write("/connected-ips", " ".join(connected_ips)) self.untrusted_qdb.write("/connected-ips6", " ".join(connected_ips6))
[docs] @qubes.events.handler("property-pre-reset:netvm") def on_property_pre_reset_netvm(self, event, name, oldvalue=None): """Sets the the NetVM to default NetVM""" # pylint: disable=unused-argument # we are changing to default netvm newvalue = type(self).netvm.get_default(self) # check for netvm loop _setter_netvm(self, type(self).netvm, newvalue) if newvalue == oldvalue: return self.fire_event( "property-pre-set:netvm", pre_event=True, name="netvm", newvalue=newvalue, oldvalue=oldvalue, )
[docs] @qubes.events.handler("property-reset:netvm") def on_property_reset_netvm(self, event, name, oldvalue=None): """Sets the the NetVM to default NetVM""" # pylint: disable=unused-argument # we are changing to default netvm newvalue = self.netvm if newvalue == oldvalue: return self.fire_event( "property-set:netvm", name="netvm", newvalue=newvalue, oldvalue=oldvalue, )
[docs] @qubes.events.handler("property-pre-set:netvm") def on_property_pre_set_netvm(self, event, name, newvalue, oldvalue=None): """Run sanity checks before setting a new NetVM""" # pylint: disable=unused-argument if newvalue is not None: if ( not self.app.vmm.offline_mode and self.is_running() and not newvalue.is_running() ): raise qubes.exc.QubesVMNotStartedError( newvalue, "Cannot dynamically attach to stopped qube {!s}".format( newvalue ), ) # don't check oldvalue, because it's missing if it was default if self.netvm is not None: if self.is_running() and self.netvm.is_running(): self.detach_network()
[docs] @qubes.events.handler("property-set:netvm") def on_property_set_netvm(self, event, name, newvalue, oldvalue=None): """Replaces the current NetVM with a new one and fires net-domain-connect event """ # pylint: disable=unused-argument if oldvalue is not None and oldvalue.is_running(): oldvalue.reload_connected_ips() if newvalue is None: return if newvalue.is_running(): newvalue.reload_connected_ips() if self.is_running(): # refresh IP, DNS etc self.create_qdb_entries() self.attach_network() newvalue.fire_event("net-domain-connect", vm=self)
[docs] @qubes.events.handler("net-domain-connect") def on_net_domain_connect(self, event, vm): """Reloads the firewall config for vm""" # pylint: disable=unused-argument self.reload_firewall_for_vm(vm)
@qubes.events.handler("property-set:ip", "property-reset:ip") def on_property_set_ip(self, _event, name, newvalue=None, oldvalue=None): # pylint: disable=unused-argument if newvalue == oldvalue: return if self.provides_network: self.fire_event("property-reset:gateway", name="gateway") self.fire_event("property-reset:visible_ip", name="visible_ip") for vm in self.connected_vms: vm.fire_event( "property-reset:visible_gateway", name="visible_gateway" ) @qubes.events.handler("property-set:ip6", "property-reset:ipv6") def on_property_set_ip6(self, _event, name, newvalue=None, oldvalue=None): # pylint: disable=unused-argument if newvalue == oldvalue: return if self.provides_network: self.fire_event("property-reset:gateway6", name="gateway6") self.fire_event("property-reset:visible_ip6", name="visible_ip6") for vm in self.connected_vms: vm.fire_event( "property-reset:visible_gateway6", name="visible_gateway6" ) @qubes.events.handler("feature-pre-set:net.fake-ip") def on_feature_pre_set_net_fake_ip( self, event, name, newvalue, oldvalue=None ): # pylint: disable=unused-argument # format validation ipaddress.IPv4Address(newvalue) @qubes.events.handler("feature-pre-set:net.fake-gateway") def on_feature_pre_set_net_fake_gw( self, event, name, newvalue, oldvalue=None ): # pylint: disable=unused-argument # format validation ipaddress.IPv4Address(newvalue) @qubes.events.handler("feature-pre-set:net.fake-netmask") def on_feature_pre_set_net_fake_nm( self, event, name, newvalue, oldvalue=None ): # pylint: disable=unused-argument # format validation if not newvalue.isdigit(): ipaddress.IPv4Address(newvalue) elif not 0 <= int(newvalue) <= 24: raise qubes.exc.QubesValueError("Invalid netmask value") @qubes.events.handler("feature-set:net.fake-ip") def on_feature_set_net_fake_ip(self, event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument if oldvalue == newvalue: return self.fire_event("property-reset:visible_ip", name="visible_ip") for vm in self.connected_vms: vm.fire_event( "property-reset:visible_gateway", name="visible_gateway" ) @qubes.events.handler("feature-set:ipv6") def on_feature_set_ipv6(self, event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument if oldvalue == newvalue: return self.fire_event("property-reset:visible_ip6", name="visible_ip6") for vm in self.connected_vms: vm.fire_event( "property-reset:visible_gateway6", name="visible_gateway6" ) @qubes.events.handler("property-set:provides_network") def on_property_set_provides(self, _event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument if newvalue == oldvalue: return self.fire_event("property-reset:gateway", name="gateway") self.fire_event("property-reset:gateway6", name="gateway6")
[docs] @qubes.events.handler("domain-qdb-create") def on_domain_qdb_create(self, event): """Fills the QubesDB with firewall entries.""" # pylint: disable=unused-argument # Keep the following in sync with on_firewall_changed. self.reload_connected_ips() for vm in self.connected_vms: if vm.is_running(): self.set_mapped_ip_info_for_vm(vm) self.reload_firewall_for_vm(vm)
[docs] @qubes.events.handler("firewall-changed") def on_firewall_changed(self, event, **kwargs): """Reloads the firewall if vm is running and has a NetVM assigned""" # pylint: disable=unused-argument if self.is_running() and self.netvm: self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member
[docs] @qubes.events.handler("domain-pre-spawn") def on_pre_spawn(self, event, **kwargs): """Prepare qubesdb in netvm entries before relevant interface is created""" # pylint: disable=unused-argument if self.netvm: self.netvm.reload_connected_ips() self.netvm.set_mapped_ip_info_for_vm(self) self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member
# CORE2: swallowed get_firewall_conf, write_firewall_conf, # get_firewall_defaults @property def firewall(self): if self._firewall is None: self._firewall = qubes.firewall.Firewall(self) return self._firewall
[docs] def has_firewall(self): """Return `True` if there are some vm specific firewall rules set""" return os.path.exists(os.path.join(self.dir_path, self.firewall_conf))