From bfe338b965b2d1d982c3f7435603cc748bc20816 Mon Sep 17 00:00:00 2001 From: Michal Nowak Date: Thu, 11 Jul 2024 13:01:29 +0200 Subject: [PATCH] Replace dns.resolver module in system tests --- bin/tests/system/dnstap/tests_dnstap.py | 18 +-- bin/tests/system/rpzextra/tests_rpzextra.py | 158 +++++++++----------- bin/tests/system/shutdown/tests_shutdown.py | 41 +++-- 3 files changed, 94 insertions(+), 123 deletions(-) diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index 0dbf2aa3b8..555435d810 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -15,10 +15,12 @@ import os import re import subprocess +import isctest import pytest +import dns.message + pytest.importorskip("dns", minversion="2.0.0") -import dns.resolver def run_rndc(server, rndc_command): @@ -34,15 +36,13 @@ def run_rndc(server, rndc_command): subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10) -def test_dnstap_dispatch_socket_addresses(named_port): - # Prepare for querying ns3. - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port - +def test_dnstap_dispatch_socket_addresses(): # Send some query to ns3 so that it records something in its dnstap file. - ans = resolver.resolve("mail.example.", "A") - assert ans[0].address == "10.0.0.2" + msg = dns.message.make_query("mail.example.", "A") + res = isctest.query.tcp(msg, "10.53.0.2") + assert res.answer == [ + dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2") + ] # Before continuing, roll dnstap file to ensure it is flushed to disk. run_rndc("10.53.0.3", ["dnstap", "-roll", "1"]) diff --git a/bin/tests/system/rpzextra/tests_rpzextra.py b/bin/tests/system/rpzextra/tests_rpzextra.py index ab5da45973..25cd1f44f8 100644 --- a/bin/tests/system/rpzextra/tests_rpzextra.py +++ b/bin/tests/system/rpzextra/tests_rpzextra.py @@ -11,121 +11,99 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import time import os - import pytest pytest.importorskip("dns", minversion="2.0.0") -import dns.resolver +import isctest + +import dns.message def wait_for_transfer(ip, port, client_ip, name, rrtype): - resolver = dns.resolver.Resolver() - resolver.nameservers = [ip] - resolver.port = port - + msg = dns.message.make_query(name, rrtype) for _ in range(10): try: - resolver.resolve(name, rrtype, source=client_ip) - except dns.resolver.NoNameservers: - time.sleep(1) - else: - break + res = isctest.query.udp(msg, ip, source=client_ip) + if res.rcode() == dns.rcode.NOERROR: + break + except dns.exception.Timeout: + pass else: raise RuntimeError( "zone transfer failed: " - f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}" + f"client: {client_ip}, name: {name}, rrtype: {rrtype} from @{ip}:{port}" ) -def test_rpz_multiple_views(named_port): - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port - +@pytest.mark.parametrize( + "qname,source,rcode", + [ + # For 10.53.0.1 source IP: + # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - gooddomain.com is allowed + # - allowed. is allowed + ("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN), + ("gooddomain.", "10.53.0.1", dns.rcode.NOERROR), + ("allowed.", "10.53.0.1", dns.rcode.NOERROR), + # For 10.53.0.2 source IP: + # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN + # - baddomain.com is allowed + # - gooddomain.com is allowed + ("baddomain.", "10.53.0.2", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.2", dns.rcode.NOERROR), + ("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN), + # For 10.53.0.3 source IP: + # - gooddomain.com is allowed + # - baddomain.com is allowed + # - allowed. is allowed + ("baddomain.", "10.53.0.3", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.3", dns.rcode.NOERROR), + ("allowed.", "10.53.0.3", dns.rcode.NOERROR), + # For 10.53.0.4 source IP: + # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - allowed. is allowed + ("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN), + ("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN), + ("allowed.", "10.53.0.4", dns.rcode.NOERROR), + # For 10.53.0.5 (any) source IP: + # - baddomain.com is allowed + # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN + ("baddomain.", "10.53.0.5", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN), + ("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN), + ], +) +def test_rpz_multiple_views(qname, source, rcode, named_port): wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA") wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA") - # For 10.53.0.1 source IP: - # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - gooddomain.com is allowed - # - allowed. is allowed - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.1") - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("allowed.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.2 source IP: - # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN - # - baddomain.com is allowed - # - gooddomain.com is allowed - ans = resolver.resolve("baddomain.", "A", source="10.53.0.2") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2") - assert ans[0].address == "10.53.0.2" - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("allowed.", "A", source="10.53.0.2") - - # For 10.53.0.3 source IP: - # - gooddomain.com is allowed - # - baddomain.com is allowed - # - allowed. is allowed - ans = resolver.resolve("baddomain.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("allowed.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.4 source IP: - # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - allowed. is allowed - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.4") - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("gooddomain.", "A", source="10.53.0.4") - - ans = resolver.resolve("allowed.", "A", source="10.53.0.4") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.5 (any) source IP: - # - baddomain.com is allowed - # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN - ans = resolver.resolve("baddomain.", "A", source="10.53.0.5") - assert ans[0].address == "10.53.0.2" - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("gooddomain.", "A", source="10.53.0.5") - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("allowed.", "A", source="10.53.0.5") + msg = dns.message.make_query(qname, "A") + res = isctest.query.udp(msg, "10.53.0.3", source=source) + assert res.rcode() == rcode + if rcode == dns.rcode.NOERROR: + assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")] -def test_rpz_passthru_logging(named_port): - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port +def test_rpz_passthru_logging(): + resolver_ip = "10.53.0.3" # Should generate a log entry into rpz_passthru.txt - ans = resolver.resolve("allowed.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" + msg_allowed = dns.message.make_query("allowed.", "A") + res_allowed = isctest.query.udp(msg_allowed, resolver_ip, source="10.53.0.1") + assert res_allowed.answer == [ + dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2") + ] # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN # Should generate a log entry into rpz.txt - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.1") + msg_not_allowed = dns.message.make_query("baddomain.", "A") + res_not_allowed = isctest.query.udp( + msg_not_allowed, resolver_ip, source="10.53.0.1" + ) + isctest.check.nxdomain(res_not_allowed) rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt") rpz_logfile = os.path.join("ns3", "rpz.txt") diff --git a/bin/tests/system/shutdown/tests_shutdown.py b/bin/tests/system/shutdown/tests_shutdown.py index a993cc9ffa..3b64389a62 100755 --- a/bin/tests/system/shutdown/tests_shutdown.py +++ b/bin/tests/system/shutdown/tests_shutdown.py @@ -23,12 +23,11 @@ import pytest pytest.importorskip("dns", minversion="2.0.0") import dns.exception -import dns.resolver import isctest -def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): +def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries): """Creates a number of A queries to run in parallel in order simulate a slightly more realistic test scenario. @@ -47,8 +46,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): :param named_proc: named process instance :type named_proc: subprocess.Popen - :param resolver: target resolver - :type resolver: dns.resolver.Resolver + :param resolver_ip: target resolver's IP address + :type resolver_ip: str :param instance: the named instance to send RNDC commands to :type instance: isctest.instance.NamedInstance @@ -74,7 +73,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): return -1 # We're going to execute queries in parallel by means of a thread pool. - # dnspython functions block, so we need to circunvent that. + # dnspython functions block, so we need to circumvent that. with ThreadPoolExecutor(n_workers + 1) as executor: # Helper dict, where keys=Future objects and values are tags used # to process results later. @@ -83,7 +82,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): # 50% of work will be A queries. # 1 work will be rndc stop. # Remaining work will be rndc status (so we test parallel control - # connections that were crashing named). + # connections that were crashing named). shutdown = True for i in range(n_queries): if i < (n_queries // 2): @@ -101,7 +100,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): ) qname = relname + ".test" - futures[executor.submit(resolver.resolve, qname, "A")] = tag + msg = dns.message.make_query(qname, "A") + futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag elif shutdown: # We attempt to stop named in the middle shutdown = False if kill_method == "rndc": @@ -125,24 +125,21 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): # named process exited gracefully after SIGTERM signal. if futures[future] == "stop": ret_code = result - - except ( - dns.resolver.NXDOMAIN, - dns.resolver.NoNameservers, - dns.exception.Timeout, - ): + except dns.exception.Timeout: pass if kill_method == "rndc": assert ret_code == 0 -def wait_for_named_loaded(resolver, retries=10): +def wait_for_named_loaded(resolver_ip, retries=10): + msg = dns.message.make_query("version.bind", "TXT", "CH") for _ in range(retries): try: - resolver.resolve("version.bind", "TXT", "CH") - return True - except (dns.resolver.NoNameservers, dns.exception.Timeout): + res = isctest.query.udp(msg, resolver_ip) + if res.rcode() == dns.rcode.NOERROR: + return True + except dns.exception.Timeout: time.sleep(1) return False @@ -189,11 +186,7 @@ def test_named_shutdown(kill_method): named_ports = isctest.instance.NamedPorts.from_env() instance = isctest.instance.NamedInstance("ns3", named_ports) - # We create a resolver instance that will be used to send queries. - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_ports.dns - + resolver_ip = "10.53.0.3" named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"] with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log: with subprocess.Popen( @@ -201,10 +194,10 @@ def test_named_shutdown(kill_method): ) as named_proc: try: assert named_proc.poll() is None, "named isn't running" - assert wait_for_named_loaded(resolver) + assert wait_for_named_loaded(resolver_ip) do_work( named_proc, - resolver, + resolver_ip, instance, kill_method, n_workers=12,