Observations:
- If I start the VPN, restart systemd-resolved,
1.1.1.1
and 8.8.8.8
are part of Global.
- If I simply start the VPN,
10.139.1.1
and 10.139.1.2
are the only entries in Global. If the dns entries are sorted like in the original code, 1.1.1.1
and 8.8.8.8
wont be in first 2 entries.
I made the following modifications to your code:
qubes-setup-dnat-to-ns
:
#!/usr/bin/python3
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2022 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from __future__ import annotations
import subprocess
import sys
import os
import re
import socket
from ipaddress import IPv4Address, ip_address
from typing import List, Generator
from gi.repository import GObject, Gio, GLib
import qubesdb
class DNSServers(GObject.GObject):
RESOLVER_PATH = "/etc/resolv.conf"
_RESOLVED_NAME = "org.freedesktop.resolve1"
_RESOLVED_MANAGER_INTERFACE = "org.freedesktop.resolve1.Manager"
def __init__(self) -> None:
super().__init__()
@classmethod
def get_servers(cls) -> List[IPv4Address]:
return list(cls._get_servers_from_systemd_resolved() or cls._get_servers_from_resolver())
@classmethod
def _get_servers_from_systemd_resolved(cls) -> Generator[IPv4Address, None, None]:
try:
bus = Gio.bus_get_sync(Gio.BusType.SYSTEM)
except GLib.Error as e:
if 'org.freedesktop.DBus.Error.NoReply' in e.message:
return
raise e
try:
manager_proxy = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SYSTEM,
Gio.DBusProxyFlags.DO_NOT_AUTO_START,
None,
cls._RESOLVED_NAME,
"/org/freedesktop/resolve1",
cls._RESOLVED_MANAGER_INTERFACE,
)
data = manager_proxy.call_sync(
"org.freedesktop.DBus.Properties.Get",
GLib.Variant("(ss)", (cls._RESOLVED_MANAGER_INTERFACE, "DNS")),
Gio.DBusCallFlags.NONE,
-1,
None
).unpack()[0]
except GLib.Error as e:
dbus_patterns = [
'org.freedesktop.DBus.Error.ServiceUnknown',
'org.freedesktop.DBus.Error.NameHasNoOwner',
'org.freedesktop.DBus.Error.NoSuchUnit',
'org.freedesktop.systemd1.',]
for dbus_pattern in dbus_patterns:
if dbus_pattern in e.message:
return
raise e
def get_is_not_global(dns_tuple):
return dns_tuple[0] != 0
def get_is_not_defaultroute(dns_tuple):
try:
object_path = manager_proxy.call_sync(
"GetLink",
GLib.Variant("(i)", (dns_tuple[0],)),
Gio.DBusCallFlags.NONE,
-1,
None
).unpack()[0]
except GLib.GError as e:
if 'org.freedesktop.DBus.Error.InvalidArgs' in e.message and dns_tuple[0] == 0:
return True
raise e
return not bus.call_sync(
cls._RESOLVED_NAME,
object_path,
"org.freedesktop.DBus.Properties",
"Get",
GLib.Variant("(ss)", ("org.freedesktop.resolve1.Link", "DefaultRoute")),
None,
Gio.DBusCallFlags.NONE,
-1,
None
).unpack()[0]
# Place DefaultRoute first, Global second
data.sort(key=get_is_not_global)
data.sort(key=get_is_not_defaultroute)
# Only keep IPv4 entries. systemd-resolved is trusted to return valid
# addresses.
# ToDo: We only need abridged IPv4 DNS entries for ifindex == 0.
# to ensure static DNS of disconnected network interfaces are not added.
for (interface, address_family, address) in data:
if address_family == socket.AF_INET.value:
addr = ip_address('.'.join(str(p) for p in address))
assert isinstance(addr, IPv4Address)
yield addr
@classmethod
def _get_servers_from_resolver(cls) -> Generator[IPv4Address, None, None]:
try:
resolv = open(cls.RESOLVER_PATH)
except IOError:
return
with resolv:
for line in resolv:
match = re.search(r"^nameserver\s+((?:\d{1,3}\.){3}\d{1,3}$)", line)
if match:
yield IPv4Address(match.group(1))
def install_firewall_rules(dns):
qdb = qubesdb.QubesDB()
qubesdb_dns = []
for i in ('/qubes-netvm-primary-dns', '/qubes-netvm-secondary-dns'):
ns_maybe = qdb.read(i)
if ns_maybe is None:
continue
try:
qubesdb_dns.append(IPv4Address(ns_maybe.decode("ascii", "strict")))
except (UnicodeDecodeError, ValueError):
pass
preamble = [
'add table ip qubes',
# Add the chain so that the subsequent delete will work. If the chain already
# exists this is a harmless no-op.
'add chain ip qubes dnat-dns',
# Delete the chain so that if the chain already exists, it will be removed.
# The removal of the old chain and addition of the new one happen as a single
# atomic operation, so there is no period where neither chain is present or
# where both are present.
'delete chain ip qubes dnat-dns',
]
rules = [
'table ip qubes {',
'chain dnat-dns {',
'type nat hook prerouting priority dstnat; policy accept;',
]
if not dns:
# User has no IPv4 DNS set in sys-net. Maybe IPv6 only environment.
# Or maybe user wants to enforce DNS-Over-HTTPS.
# Drop IPv4 DNS requests to qubesdb_dns addresses.
for vm_nameserver in qubesdb_dns:
rules += [
f"ip daddr {vm_nameserver} udp dport 53 drop",
f"ip daddr {vm_nameserver} tcp dport 53 drop",
]
else:
while len(qubesdb_dns) > len(dns):
# Ensure that upstream DNS pool is larger than qubesdb_dns pool
dns = dns + dns
for vm_nameserver, dest in zip(qubesdb_dns, dns):
dns_ = str(dest)
rules += [
f"ip daddr {vm_nameserver} udp dport 53 dnat to {dns_}",
f"ip daddr {vm_nameserver} tcp dport 53 dnat to {dns_}",
]
rules += ["}", "}"]
# check if new rules are the same as the old ones - if so, don't reload
# and return that info via exit code
try:
old_rules = subprocess.check_output(
["nft", "list", "chain", "ip", "qubes", "dnat-dns"]).decode().splitlines()
except subprocess.CalledProcessError:
old_rules = []
old_rules = [line.strip() for line in old_rules]
if old_rules == rules:
sys.exit(100)
os.execvp("nft", ("nft", "--", "\n".join(preamble + rules)))
if __name__ == '__main__':
install_firewall_rules(DNSServers.get_servers())
Patch:
--- qubes-setup-dnat-to-ns.apparatus 2024-11-17 04:05:41.954525914 -0500
+++ qubes-setup-dnat-to-ns.user11 2024-11-17 05:37:30.191662494 -0500
@@ -24,12 +24,7 @@
import subprocess
import sys
-import dbus
-import qubesdb
-from typing import List
-from ipaddress import IPv4Address
import os
-
import re
import socket
from ipaddress import IPv4Address, ip_address
@@ -37,7 +32,9 @@
from gi.repository import GObject, Gio, GLib
-class DNSServersDefaultRoute(GObject.GObject):
+import qubesdb
+
+class DNSServers(GObject.GObject):
RESOLVER_PATH = "/etc/resolv.conf"
_RESOLVED_NAME = "org.freedesktop.resolve1"
_RESOLVED_MANAGER_INTERFACE = "org.freedesktop.resolve1.Manager"
@@ -47,22 +44,26 @@
@classmethod
def get_servers(cls) -> List[IPv4Address]:
- return list(set(cls._get_servers_from_systemd_resolved()) or cls._get_servers_from_resolver())
+ return list(cls._get_servers_from_systemd_resolved() or cls._get_servers_from_resolver())
@classmethod
def _get_servers_from_systemd_resolved(cls) -> Generator[IPv4Address, None, None]:
- bus = Gio.bus_get_sync(Gio.BusType.SYSTEM)
-
- manager_proxy = Gio.DBusProxy.new_for_bus_sync(
- Gio.BusType.SYSTEM,
- Gio.DBusProxyFlags.DO_NOT_AUTO_START,
- None,
- cls._RESOLVED_NAME,
- "/org/freedesktop/resolve1",
- cls._RESOLVED_MANAGER_INTERFACE,
- )
+ try:
+ bus = Gio.bus_get_sync(Gio.BusType.SYSTEM)
+ except GLib.Error as e:
+ if 'org.freedesktop.DBus.Error.NoReply' in e.message:
+ return
+ raise e
try:
+ manager_proxy = Gio.DBusProxy.new_for_bus_sync(
+ Gio.BusType.SYSTEM,
+ Gio.DBusProxyFlags.DO_NOT_AUTO_START,
+ None,
+ cls._RESOLVED_NAME,
+ "/org/freedesktop/resolve1",
+ cls._RESOLVED_MANAGER_INTERFACE,
+ )
data = manager_proxy.call_sync(
"org.freedesktop.DBus.Properties.Get",
GLib.Variant("(ss)", (cls._RESOLVED_MANAGER_INTERFACE, "DNS")),
@@ -70,94 +71,71 @@
-1,
None
).unpack()[0]
- except GLib.Error:
- return
+ except GLib.Error as e:
+ dbus_patterns = [
+ 'org.freedesktop.DBus.Error.ServiceUnknown',
+ 'org.freedesktop.DBus.Error.NameHasNoOwner',
+ 'org.freedesktop.DBus.Error.NoSuchUnit',
+ 'org.freedesktop.systemd1.',]
+ for dbus_pattern in dbus_patterns:
+ if dbus_pattern in e.message:
+ return
+ raise e
- for (interface, address_family, address) in data:
- if address_family != socket.AF_INET.value:
- continue
+ def get_is_not_global(dns_tuple):
+ return dns_tuple[0] != 0
- if interface != 0:
+ def get_is_not_defaultroute(dns_tuple):
+ try:
object_path = manager_proxy.call_sync(
"GetLink",
- GLib.Variant("(i)", (interface,)),
+ GLib.Variant("(i)", (dns_tuple[0],)),
Gio.DBusCallFlags.NONE,
-1,
None
).unpack()[0]
+ except GLib.GError as e:
+ if 'org.freedesktop.DBus.Error.InvalidArgs' in e.message and dns_tuple[0] == 0:
+ return True
+ raise e
+ return not bus.call_sync(
+ cls._RESOLVED_NAME,
+ object_path,
+ "org.freedesktop.DBus.Properties",
+ "Get",
+ GLib.Variant("(ss)", ("org.freedesktop.resolve1.Link", "DefaultRoute")),
+ None,
+ Gio.DBusCallFlags.NONE,
+ -1,
+ None
+ ).unpack()[0]
- if bus.call_sync(
- cls._RESOLVED_NAME,
- object_path,
- "org.freedesktop.DBus.Properties",
- "Get",
- GLib.Variant("(ss)", ("org.freedesktop.resolve1.Link", "DefaultRoute")),
- None,
- Gio.DBusCallFlags.NONE,
- -1,
- None
- ).unpack()[0]:
- addr = ip_address('.'.join(str(p) for p in address))
- assert isinstance(addr, IPv4Address)
- yield addr
+ # Place DefaultRoute first, Global second
+ data.sort(key=get_is_not_global)
+ data.sort(key=get_is_not_defaultroute)
+
+ # Only keep IPv4 entries. systemd-resolved is trusted to return valid
+ # addresses.
+ # ToDo: We only need abridged IPv4 DNS entries for ifindex == 0.
+ # to ensure static DNS of disconnected network interfaces are not added.
+ for (interface, address_family, address) in data:
+ if address_family == socket.AF_INET.value:
+ addr = ip_address('.'.join(str(p) for p in address))
+ assert isinstance(addr, IPv4Address)
+ yield addr
@classmethod
def _get_servers_from_resolver(cls) -> Generator[IPv4Address, None, None]:
- with open(cls.RESOLVER_PATH) as f:
- for line in f:
+ try:
+ resolv = open(cls.RESOLVER_PATH)
+ except IOError:
+ return
+ with resolv:
+ for line in resolv:
match = re.search(r"^nameserver\s+((?:\d{1,3}\.){3}\d{1,3}$)", line)
if match:
yield IPv4Address(match.group(1))
-
-def get_dns_resolv_conf():
- nameservers = []
- try:
- resolv = open("/etc/resolv.conf", "r", encoding="UTF-8")
- except IOError:
- return nameservers
- with resolv:
- for line in resolv:
- tokens = line.split(None, 2)
- if len(tokens) < 2 or tokens[0] != "nameserver":
- continue
- try:
- nameservers.append(IPv4Address(tokens[1]))
- except ValueError:
- pass
- return nameservers
-
-def get_dns_resolved():
- try:
- bus = dbus.SystemBus()
- except dbus.exceptions.DBusException as s:
- if s.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
- return get_dns_resolv_conf()
- raise
- try:
- resolve1 = bus.get_object('org.freedesktop.resolve1',
- '/org/freedesktop/resolve1')
- dns = resolve1.Get('org.freedesktop.resolve1.Manager',
- 'DNS',
- dbus_interface='org.freedesktop.DBus.Properties')
- except dbus.exceptions.DBusException as s:
- error = s.get_dbus_name()
- if error in (
- 'org.freedesktop.DBus.Error.ServiceUnknown',
- 'org.freedesktop.DBus.Error.NameHasNoOwner',
- 'org.freedesktop.DBus.Error.NoSuchUnit',
- ) or error.startswith('org.freedesktop.systemd1.'):
- return get_dns_resolv_conf()
- raise
- # Use global entries first
- dns.sort(key=lambda x: x[0] != 0)
- # Only keep IPv4 entries. systemd-resolved is trusted to return valid
- # addresses.
- # ToDo: We only need abridged IPv4 DNS entries for ifindex == 0.
- # to ensure static DNS of disconnected network interfaces are not added.
- return [IPv4Address(bytes(addr)) for ifindex, family, addr in dns
- if family == 2]
-
def install_firewall_rules(dns):
qdb = qubesdb.QubesDB()
qubesdb_dns = []
@@ -221,5 +199,4 @@
os.execvp("nft", ("nft", "--", "\n".join(preamble + rules)))
if __name__ == '__main__':
- install_firewall_rules(DNSServersDefaultRoute.get_servers())
-# install_firewall_rules(get_dns_resolved())
+ install_firewall_rules(DNSServers.get_servers())
- Instead of
_get_servers_from_systemd_resolved()
returning only DefaultRoute DNS, I made it return all DNS including duplicates, but DefaultRoute ones are placed first, Global ones second. This should solve the ordering problem when the VPN is off.
- I added the error handlings and the comments from the original functions to their respective methods in the class.