Replace dns.resolver module in system tests

This commit is contained in:
Michal Nowak
2024-07-11 13:01:29 +02:00
parent 3808567de1
commit bfe338b965
3 changed files with 94 additions and 123 deletions

View File

@@ -15,10 +15,12 @@ import os
import re
import subprocess
import isctest
import pytest
import dns.message
pytest.importorskip("dns", minversion="2.0.0")
import dns.resolver
def run_rndc(server, rndc_command):
@@ -34,15 +36,13 @@ def run_rndc(server, rndc_command):
subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10)
def test_dnstap_dispatch_socket_addresses(named_port):
# Prepare for querying ns3.
resolver = dns.resolver.Resolver()
resolver.nameservers = ["10.53.0.3"]
resolver.port = named_port
def test_dnstap_dispatch_socket_addresses():
# Send some query to ns3 so that it records something in its dnstap file.
ans = resolver.resolve("mail.example.", "A")
assert ans[0].address == "10.0.0.2"
msg = dns.message.make_query("mail.example.", "A")
res = isctest.query.tcp(msg, "10.53.0.2")
assert res.answer == [
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
]
# Before continuing, roll dnstap file to ensure it is flushed to disk.
run_rndc("10.53.0.3", ["dnstap", "-roll", "1"])

View File

@@ -11,121 +11,99 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import time
import os
import pytest
pytest.importorskip("dns", minversion="2.0.0")
import dns.resolver
import isctest
import dns.message
def wait_for_transfer(ip, port, client_ip, name, rrtype):
resolver = dns.resolver.Resolver()
resolver.nameservers = [ip]
resolver.port = port
msg = dns.message.make_query(name, rrtype)
for _ in range(10):
try:
resolver.resolve(name, rrtype, source=client_ip)
except dns.resolver.NoNameservers:
time.sleep(1)
else:
break
res = isctest.query.udp(msg, ip, source=client_ip)
if res.rcode() == dns.rcode.NOERROR:
break
except dns.exception.Timeout:
pass
else:
raise RuntimeError(
"zone transfer failed: "
f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}"
f"client: {client_ip}, name: {name}, rrtype: {rrtype} from @{ip}:{port}"
)
def test_rpz_multiple_views(named_port):
resolver = dns.resolver.Resolver()
resolver.nameservers = ["10.53.0.3"]
resolver.port = named_port
@pytest.mark.parametrize(
"qname,source,rcode",
[
# For 10.53.0.1 source IP:
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - gooddomain.com is allowed
# - allowed. is allowed
("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN),
("gooddomain.", "10.53.0.1", dns.rcode.NOERROR),
("allowed.", "10.53.0.1", dns.rcode.NOERROR),
# For 10.53.0.2 source IP:
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
# - baddomain.com is allowed
# - gooddomain.com is allowed
("baddomain.", "10.53.0.2", dns.rcode.NOERROR),
("gooddomain.", "10.53.0.2", dns.rcode.NOERROR),
("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN),
# For 10.53.0.3 source IP:
# - gooddomain.com is allowed
# - baddomain.com is allowed
# - allowed. is allowed
("baddomain.", "10.53.0.3", dns.rcode.NOERROR),
("gooddomain.", "10.53.0.3", dns.rcode.NOERROR),
("allowed.", "10.53.0.3", dns.rcode.NOERROR),
# For 10.53.0.4 source IP:
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - allowed. is allowed
("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
("allowed.", "10.53.0.4", dns.rcode.NOERROR),
# For 10.53.0.5 (any) source IP:
# - baddomain.com is allowed
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
("baddomain.", "10.53.0.5", dns.rcode.NOERROR),
("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN),
("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN),
],
)
def test_rpz_multiple_views(qname, source, rcode, named_port):
wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA")
wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA")
# For 10.53.0.1 source IP:
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - gooddomain.com is allowed
# - allowed. is allowed
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("baddomain.", "A", source="10.53.0.1")
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1")
assert ans[0].address == "10.53.0.2"
ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
assert ans[0].address == "10.53.0.2"
# For 10.53.0.2 source IP:
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
# - baddomain.com is allowed
# - gooddomain.com is allowed
ans = resolver.resolve("baddomain.", "A", source="10.53.0.2")
assert ans[0].address == "10.53.0.2"
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2")
assert ans[0].address == "10.53.0.2"
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("allowed.", "A", source="10.53.0.2")
# For 10.53.0.3 source IP:
# - gooddomain.com is allowed
# - baddomain.com is allowed
# - allowed. is allowed
ans = resolver.resolve("baddomain.", "A", source="10.53.0.3")
assert ans[0].address == "10.53.0.2"
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3")
assert ans[0].address == "10.53.0.2"
ans = resolver.resolve("allowed.", "A", source="10.53.0.3")
assert ans[0].address == "10.53.0.2"
# For 10.53.0.4 source IP:
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - allowed. is allowed
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("baddomain.", "A", source="10.53.0.4")
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("gooddomain.", "A", source="10.53.0.4")
ans = resolver.resolve("allowed.", "A", source="10.53.0.4")
assert ans[0].address == "10.53.0.2"
# For 10.53.0.5 (any) source IP:
# - baddomain.com is allowed
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
ans = resolver.resolve("baddomain.", "A", source="10.53.0.5")
assert ans[0].address == "10.53.0.2"
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("gooddomain.", "A", source="10.53.0.5")
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("allowed.", "A", source="10.53.0.5")
msg = dns.message.make_query(qname, "A")
res = isctest.query.udp(msg, "10.53.0.3", source=source)
assert res.rcode() == rcode
if rcode == dns.rcode.NOERROR:
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
def test_rpz_passthru_logging(named_port):
resolver = dns.resolver.Resolver()
resolver.nameservers = ["10.53.0.3"]
resolver.port = named_port
def test_rpz_passthru_logging():
resolver_ip = "10.53.0.3"
# Should generate a log entry into rpz_passthru.txt
ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
assert ans[0].address == "10.53.0.2"
msg_allowed = dns.message.make_query("allowed.", "A")
res_allowed = isctest.query.udp(msg_allowed, resolver_ip, source="10.53.0.1")
assert res_allowed.answer == [
dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2")
]
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# Should generate a log entry into rpz.txt
with pytest.raises(dns.resolver.NXDOMAIN):
resolver.resolve("baddomain.", "A", source="10.53.0.1")
msg_not_allowed = dns.message.make_query("baddomain.", "A")
res_not_allowed = isctest.query.udp(
msg_not_allowed, resolver_ip, source="10.53.0.1"
)
isctest.check.nxdomain(res_not_allowed)
rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt")
rpz_logfile = os.path.join("ns3", "rpz.txt")

View File

@@ -23,12 +23,11 @@ import pytest
pytest.importorskip("dns", minversion="2.0.0")
import dns.exception
import dns.resolver
import isctest
def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries):
"""Creates a number of A queries to run in parallel
in order simulate a slightly more realistic test scenario.
@@ -47,8 +46,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
:param named_proc: named process instance
:type named_proc: subprocess.Popen
:param resolver: target resolver
:type resolver: dns.resolver.Resolver
:param resolver_ip: target resolver's IP address
:type resolver_ip: str
:param instance: the named instance to send RNDC commands to
:type instance: isctest.instance.NamedInstance
@@ -74,7 +73,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
return -1
# We're going to execute queries in parallel by means of a thread pool.
# dnspython functions block, so we need to circunvent that.
# dnspython functions block, so we need to circumvent that.
with ThreadPoolExecutor(n_workers + 1) as executor:
# Helper dict, where keys=Future objects and values are tags used
# to process results later.
@@ -83,7 +82,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
# 50% of work will be A queries.
# 1 work will be rndc stop.
# Remaining work will be rndc status (so we test parallel control
# connections that were crashing named).
# connections that were crashing named).
shutdown = True
for i in range(n_queries):
if i < (n_queries // 2):
@@ -101,7 +100,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
)
qname = relname + ".test"
futures[executor.submit(resolver.resolve, qname, "A")] = tag
msg = dns.message.make_query(qname, "A")
futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag
elif shutdown: # We attempt to stop named in the middle
shutdown = False
if kill_method == "rndc":
@@ -125,24 +125,21 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
# named process exited gracefully after SIGTERM signal.
if futures[future] == "stop":
ret_code = result
except (
dns.resolver.NXDOMAIN,
dns.resolver.NoNameservers,
dns.exception.Timeout,
):
except dns.exception.Timeout:
pass
if kill_method == "rndc":
assert ret_code == 0
def wait_for_named_loaded(resolver, retries=10):
def wait_for_named_loaded(resolver_ip, retries=10):
msg = dns.message.make_query("version.bind", "TXT", "CH")
for _ in range(retries):
try:
resolver.resolve("version.bind", "TXT", "CH")
return True
except (dns.resolver.NoNameservers, dns.exception.Timeout):
res = isctest.query.udp(msg, resolver_ip)
if res.rcode() == dns.rcode.NOERROR:
return True
except dns.exception.Timeout:
time.sleep(1)
return False
@@ -189,11 +186,7 @@ def test_named_shutdown(kill_method):
named_ports = isctest.instance.NamedPorts.from_env()
instance = isctest.instance.NamedInstance("ns3", named_ports)
# We create a resolver instance that will be used to send queries.
resolver = dns.resolver.Resolver()
resolver.nameservers = ["10.53.0.3"]
resolver.port = named_ports.dns
resolver_ip = "10.53.0.3"
named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"]
with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log:
with subprocess.Popen(
@@ -201,10 +194,10 @@ def test_named_shutdown(kill_method):
) as named_proc:
try:
assert named_proc.poll() is None, "named isn't running"
assert wait_for_named_loaded(resolver)
assert wait_for_named_loaded(resolver_ip)
do_work(
named_proc,
resolver,
resolver_ip,
instance,
kill_method,
n_workers=12,