diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index 26302fb6b4..01c34de132 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -11,6 +11,8 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +from typing import NamedTuple, Tuple + import os import subprocess import sys @@ -40,8 +42,8 @@ def has_signed_apex_nsec(zone, response): ttl = 300 nextname = "a." types = "NS SOA RRSIG NSEC DNSKEY" - match = "{0} {1} IN NSEC {2}{0} {3}".format(zone, ttl, nextname, types) - sig = "{0} {1} IN RRSIG NSEC 13 2 300".format(zone, ttl) + match = f"{zone}. {ttl} IN NSEC {nextname}{zone}. {types}" + sig = f"{zone}. {ttl} IN RRSIG NSEC 13 2 300" for rr in response.answer: if match in rr.to_text(): @@ -75,7 +77,7 @@ def verify_zone(zone, transfer): verify = os.getenv("VERIFY") assert verify is not None - filename = "{}out".format(zone) + filename = f"{zone}.out" with open(filename, "w", encoding="utf-8") as file: for rr in transfer.answer: file.write(rr.to_text()) @@ -87,7 +89,7 @@ def verify_zone(zone, transfer): verifier = subprocess.run(verify_cmd, capture_output=True, check=True) if verifier.returncode != 0: - print("error: dnssec-verify {} failed".format(zone)) + print(f"error: dnssec-verify {zone}. failed") sys.stderr.buffer.write(verifier.stderr) return verifier.returncode == 0 @@ -101,7 +103,7 @@ def read_statefile(server, zone): response = do_query(server, zone, "DS", tcp=True) if not isinstance(response, dns.message.Message): - print("error: no response for {} DS from {}".format(zone, addr)) + print(f"error: no response for {zone}. DS from {addr}") return {} if response.rcode() == dns.rcode.NOERROR: @@ -119,20 +121,19 @@ def read_statefile(server, zone): if count != 1: print( - "error: expected a single DS in response for {} from {}," - "got {}".format(zone, addr, count) + f"error: expected a single DS in response for {zone}. " + "from {addr}, got {count}" ) return {} else: print( - "error: {} response for {} DNSKEY from {}".format( - dns.rcode.to_text(response.rcode()), zone, addr - ) + f"error: {dns.rcode.to_text(response.rcode())} response for {zone}. " + "DNSKEY from {addr}" ) return {} - filename = "ns9/K{}+013+{:05d}.state".format(zone, keyid) - print("read state file {}".format(filename)) + filename = f"ns9/K{zone}.+013+{keyid:05d}.state" + print(f"read state file {filename}") try: with open(filename, "r", encoding="utf-8") as file: @@ -157,14 +158,13 @@ def zone_check(server, zone): for _ in range(10): response = do_query(server, zone, "NSEC") if not isinstance(response, dns.message.Message): - print("error: no response for {} NSEC from {}".format(zone, addr)) + print(f"error: no response for {zone}. NSEC from {addr}") elif response.rcode() == dns.rcode.NOERROR: signed = has_signed_apex_nsec(zone, response) else: print( - "error: {} response for {} NSEC from {}".format( - dns.rcode.to_text(response.rcode()), zone, addr - ) + f"error: {dns.rcode.to_text(response.rcode())} response for {zone}. " + "NSEC from {addr}" ) if signed: @@ -178,14 +178,13 @@ def zone_check(server, zone): verified = False transfer = do_query(server, zone, "AXFR", tcp=True) if not isinstance(transfer, dns.message.Message): - print("error: no response for {} AXFR from {}".format(zone, addr)) + print(f"error: no response for {zone}. AXFR from {addr}") elif transfer.rcode() == dns.rcode.NOERROR: verified = verify_zone(zone, transfer) else: print( - "error: {} response for {} AXFR from {}".format( - dns.rcode.to_text(transfer.rcode()), zone, addr - ) + f"error: {dns.rcode.to_text(transfer.rcode())} response for {zone}. " + "AXFR from {addr}" ) assert verified @@ -220,223 +219,150 @@ def keystate_check(server, zone, key): assert val != 0 -def test_checkds_dspublished(named_port, servers): +class CheckDSTest(NamedTuple): + zone: str + logs_to_wait_for: Tuple[str] + expected_parent_state: str + + +dspublished_tests = [ # DS correctly published in parent. - zone_check(servers["ns9"], "dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone dspublished.checkds/IN (signed): checkds: DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "dspublished.checkds.", "DSPublish") - + CheckDSTest( + zone="dspublished.checkds", + logs_to_wait_for=("DS response from 10.53.0.2",), + expected_parent_state="DSPublish", + ), # DS correctly published in parent (reference to parental-agent). - zone_check(servers["ns9"], "reference.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = "zone reference.checkds/IN (signed): checkds: DS response from 10.53.0.2" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "reference.checkds.", "DSPublish") - + CheckDSTest( + zone="reference.checkds", + logs_to_wait_for=("DS response from 10.53.0.2",), + expected_parent_state="DSPublish", + ), # DS not published in parent. - zone_check(servers["ns9"], "missing-dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone missing-dspublished.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "missing-dspublished.checkds.", "!DSPublish") - + CheckDSTest( + zone="missing-dspublished.checkds", + logs_to_wait_for=("empty DS response from 10.53.0.5",), + expected_parent_state="!DSPublish", + ), # Badly configured parent. - zone_check(servers["ns9"], "bad-dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad-dspublished.checkds/IN (signed): checkds: " - "bad DS response from 10.53.0.6" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "bad-dspublished.checkds.", "!DSPublish") - + CheckDSTest( + zone="bad-dspublished.checkds", + logs_to_wait_for=("bad DS response from 10.53.0.6",), + expected_parent_state="!DSPublish", + ), # TBD: DS published in parent, but bogus signature. - # DS correctly published in all parents. - zone_check(servers["ns9"], "multiple-dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone multiple-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone multiple-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.4" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "multiple-dspublished.checkds.", "DSPublish") - + CheckDSTest( + zone="multiple-dspublished.checkds", + logs_to_wait_for=("DS response from 10.53.0.2", "DS response from 10.53.0.4"), + expected_parent_state="DSPublish", + ), # DS published in only one of multiple parents. - zone_check(servers["ns9"], "incomplete-dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.4" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dspublished.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "incomplete-dspublished.checkds.", "!DSPublish") - + CheckDSTest( + zone="incomplete-dspublished.checkds", + logs_to_wait_for=( + "DS response from 10.53.0.2", + "DS response from 10.53.0.4", + "empty DS response from 10.53.0.5", + ), + expected_parent_state="!DSPublish", + ), # One of the parents is badly configured. - zone_check(servers["ns9"], "bad2-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.4" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dspublished.checkds/IN (signed): checkds: " - "bad DS response from 10.53.0.6" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "bad2-dspublished.checkds.", "!DSPublish") - + CheckDSTest( + zone="bad2-dspublished.checkds", + logs_to_wait_for=( + "DS response from 10.53.0.2", + "DS response from 10.53.0.4", + "bad DS response from 10.53.0.6", + ), + expected_parent_state="!DSPublish", + ), # Check with resolver parental-agent. - zone_check(servers["ns9"], "resolver-dspublished.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone resolver-dspublished.checkds/IN (signed): checkds: " - "DS response from 10.53.0.3" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "resolver-dspublished.checkds.", "DSPublish") - + CheckDSTest( + zone="resolver-dspublished.checkds", + logs_to_wait_for=("DS response from 10.53.0.3",), + expected_parent_state="DSPublish", + ), # TBD: DS published in all parents, but one has bogus signature. - # TBD: Check with TSIG - # TBD: Check with TLS +] -def test_checkds_dswithdrawn(named_port, servers): +dswithdrawn_tests = [ # DS correctly published in single parent. - zone_check(servers["ns9"], "dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "dswithdrawn.checkds.", "DSRemoved") - + CheckDSTest( + zone="dswithdrawn.checkds", + logs_to_wait_for=("empty DS response from 10.53.0.5",), + expected_parent_state="DSRemoved", + ), # DS not withdrawn from parent. - zone_check(servers["ns9"], "missing-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone missing-dswithdrawn.checkds/IN (signed): checkds: " - "DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "missing-dswithdrawn.checkds.", "!DSRemoved") - + CheckDSTest( + zone="missing-dswithdrawn.checkds", + logs_to_wait_for=("DS response from 10.53.0.2",), + expected_parent_state="!DSRemoved", + ), # Badly configured parent. - zone_check(servers["ns9"], "bad-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad-dswithdrawn.checkds/IN (signed): checkds: " - "bad DS response from 10.53.0.6" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "bad-dswithdrawn.checkds.", "!DSRemoved") - + CheckDSTest( + zone="bad-dswithdrawn.checkds", + logs_to_wait_for=("bad DS response from 10.53.0.6",), + expected_parent_state="!DSRemoved", + ), # TBD: DS published in parent, but bogus signature. - # DS correctly withdrawn from all parents. - zone_check(servers["ns9"], "multiple-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone multiple-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone multiple-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.7" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "multiple-dswithdrawn.checkds.", "DSRemoved") - + CheckDSTest( + zone="multiple-dswithdrawn.checkds", + logs_to_wait_for=( + "empty DS response from 10.53.0.5", + "empty DS response from 10.53.0.7", + ), + expected_parent_state="DSRemoved", + ), # DS withdrawn from only one of multiple parents. - zone_check(servers["ns9"], "incomplete-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dswithdrawn.checkds/IN (signed): checkds: " - "DS response from 10.53.0.2" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone incomplete-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.7" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "incomplete-dswithdrawn.checkds.", "!DSRemoved") - + CheckDSTest( + zone="incomplete-dswithdrawn.checkds", + logs_to_wait_for=( + "DS response from 10.53.0.2", + "empty DS response from 10.53.0.5", + "empty DS response from 10.53.0.7", + ), + expected_parent_state="!DSRemoved", + ), # One of the parents is badly configured. - zone_check(servers["ns9"], "bad2-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.5" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.7" - ) - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone bad2-dswithdrawn.checkds/IN (signed): checkds: " - "bad DS response from 10.53.0.6" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "bad2-dswithdrawn.checkds.", "!DSRemoved") - + CheckDSTest( + zone="bad2-dswithdrawn.checkds", + logs_to_wait_for=( + "empty DS response from 10.53.0.5", + "empty DS response from 10.53.0.7", + "bad DS response from 10.53.0.6", + ), + expected_parent_state="!DSRemoved", + ), # Check with resolver parental-agent. - zone_check(servers["ns9"], "resolver-dswithdrawn.checkds.") - with servers["ns9"].watch_log_from_start() as watcher: - line = ( - "zone resolver-dswithdrawn.checkds/IN (signed): checkds: " - "empty DS response from 10.53.0.8" - ) - watcher.wait_for_line(line) - keystate_check(servers["ns2"], "resolver-dswithdrawn.checkds.", "DSRemoved") - + CheckDSTest( + zone="resolver-dswithdrawn.checkds", + logs_to_wait_for=("empty DS response from 10.53.0.8",), + expected_parent_state="DSRemoved", + ) # TBD: DS withdrawn from all parents, but one has bogus signature. +] + + +checkds_tests = dspublished_tests + dswithdrawn_tests + + +@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone) +def test_checkds(servers, params): + # Wait until the provided zone is signed and then verify its DNSSEC data. + zone_check(servers["ns9"], params.zone) + + # Wait until all the expected log lines are found in the log file for the + # provided server. + for log_string in params.logs_to_wait_for: + with servers["ns9"].watch_log_from_start() as watcher: + line = f"zone {params.zone}/IN (signed): checkds: {log_string}" + watcher.wait_for_line(line) + + # Check whether key states on the parent server provided match + # expectations. + keystate_check(servers["ns2"], params.zone, params.expected_parent_state)