Clean up the "checkds" system test
The "checkds" system test contains a lot of duplicated code despite
carrying out the same set of actions for every tested scenario
(zone_check() → wait for logs to appear → keystate_check()). Extract
the parts of the code shared between all tests into a new function,
test_checkds(), and use pytest's test parametrization capabilities to
pass distinct sets of test parameters to this new function, in an
attempt to cleanly separate the fixed parts of this system test from the
variable ones. Replace format() calls with f-strings.
(cherry picked from commit aa31a872d0)
This commit is contained in:
committed by
Štěpán Balážik
parent
ab058db54c
commit
c56b8136a0
@@ -11,6 +11,8 @@
|
||||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from typing import NamedTuple, Tuple
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -40,8 +42,8 @@ def has_signed_apex_nsec(zone, response):
|
||||
ttl = 300
|
||||
nextname = "a."
|
||||
types = "NS SOA RRSIG NSEC DNSKEY"
|
||||
match = "{0} {1} IN NSEC {2}{0} {3}".format(zone, ttl, nextname, types)
|
||||
sig = "{0} {1} IN RRSIG NSEC 13 2 300".format(zone, ttl)
|
||||
match = f"{zone}. {ttl} IN NSEC {nextname}{zone}. {types}"
|
||||
sig = f"{zone}. {ttl} IN RRSIG NSEC 13 2 300"
|
||||
|
||||
for rr in response.answer:
|
||||
if match in rr.to_text():
|
||||
@@ -75,7 +77,7 @@ def verify_zone(zone, transfer):
|
||||
verify = os.getenv("VERIFY")
|
||||
assert verify is not None
|
||||
|
||||
filename = "{}out".format(zone)
|
||||
filename = f"{zone}.out"
|
||||
with open(filename, "w", encoding="utf-8") as file:
|
||||
for rr in transfer.answer:
|
||||
file.write(rr.to_text())
|
||||
@@ -87,7 +89,7 @@ def verify_zone(zone, transfer):
|
||||
verifier = subprocess.run(verify_cmd, capture_output=True, check=True)
|
||||
|
||||
if verifier.returncode != 0:
|
||||
print("error: dnssec-verify {} failed".format(zone))
|
||||
print(f"error: dnssec-verify {zone}. failed")
|
||||
sys.stderr.buffer.write(verifier.stderr)
|
||||
|
||||
return verifier.returncode == 0
|
||||
@@ -101,7 +103,7 @@ def read_statefile(server, zone):
|
||||
|
||||
response = do_query(server, zone, "DS", tcp=True)
|
||||
if not isinstance(response, dns.message.Message):
|
||||
print("error: no response for {} DS from {}".format(zone, addr))
|
||||
print(f"error: no response for {zone}. DS from {addr}")
|
||||
return {}
|
||||
|
||||
if response.rcode() == dns.rcode.NOERROR:
|
||||
@@ -119,20 +121,19 @@ def read_statefile(server, zone):
|
||||
|
||||
if count != 1:
|
||||
print(
|
||||
"error: expected a single DS in response for {} from {},"
|
||||
"got {}".format(zone, addr, count)
|
||||
f"error: expected a single DS in response for {zone}. "
|
||||
"from {addr}, got {count}"
|
||||
)
|
||||
return {}
|
||||
else:
|
||||
print(
|
||||
"error: {} response for {} DNSKEY from {}".format(
|
||||
dns.rcode.to_text(response.rcode()), zone, addr
|
||||
)
|
||||
f"error: {dns.rcode.to_text(response.rcode())} response for {zone}. "
|
||||
"DNSKEY from {addr}"
|
||||
)
|
||||
return {}
|
||||
|
||||
filename = "ns9/K{}+013+{:05d}.state".format(zone, keyid)
|
||||
print("read state file {}".format(filename))
|
||||
filename = f"ns9/K{zone}.+013+{keyid:05d}.state"
|
||||
print(f"read state file {filename}")
|
||||
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as file:
|
||||
@@ -157,14 +158,13 @@ def zone_check(server, zone):
|
||||
for _ in range(10):
|
||||
response = do_query(server, zone, "NSEC")
|
||||
if not isinstance(response, dns.message.Message):
|
||||
print("error: no response for {} NSEC from {}".format(zone, addr))
|
||||
print(f"error: no response for {zone}. NSEC from {addr}")
|
||||
elif response.rcode() == dns.rcode.NOERROR:
|
||||
signed = has_signed_apex_nsec(zone, response)
|
||||
else:
|
||||
print(
|
||||
"error: {} response for {} NSEC from {}".format(
|
||||
dns.rcode.to_text(response.rcode()), zone, addr
|
||||
)
|
||||
f"error: {dns.rcode.to_text(response.rcode())} response for {zone}. "
|
||||
"NSEC from {addr}"
|
||||
)
|
||||
|
||||
if signed:
|
||||
@@ -178,14 +178,13 @@ def zone_check(server, zone):
|
||||
verified = False
|
||||
transfer = do_query(server, zone, "AXFR", tcp=True)
|
||||
if not isinstance(transfer, dns.message.Message):
|
||||
print("error: no response for {} AXFR from {}".format(zone, addr))
|
||||
print(f"error: no response for {zone}. AXFR from {addr}")
|
||||
elif transfer.rcode() == dns.rcode.NOERROR:
|
||||
verified = verify_zone(zone, transfer)
|
||||
else:
|
||||
print(
|
||||
"error: {} response for {} AXFR from {}".format(
|
||||
dns.rcode.to_text(transfer.rcode()), zone, addr
|
||||
)
|
||||
f"error: {dns.rcode.to_text(transfer.rcode())} response for {zone}. "
|
||||
"AXFR from {addr}"
|
||||
)
|
||||
|
||||
assert verified
|
||||
@@ -220,223 +219,150 @@ def keystate_check(server, zone, key):
|
||||
assert val != 0
|
||||
|
||||
|
||||
def test_checkds_dspublished(named_port, servers):
|
||||
class CheckDSTest(NamedTuple):
|
||||
zone: str
|
||||
logs_to_wait_for: Tuple[str]
|
||||
expected_parent_state: str
|
||||
|
||||
|
||||
dspublished_tests = [
|
||||
# DS correctly published in parent.
|
||||
zone_check(servers["ns9"], "dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone dspublished.checkds/IN (signed): checkds: DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "dspublished.checkds.", "DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="dspublished.checkds",
|
||||
logs_to_wait_for=("DS response from 10.53.0.2",),
|
||||
expected_parent_state="DSPublish",
|
||||
),
|
||||
# DS correctly published in parent (reference to parental-agent).
|
||||
zone_check(servers["ns9"], "reference.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = "zone reference.checkds/IN (signed): checkds: DS response from 10.53.0.2"
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "reference.checkds.", "DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="reference.checkds",
|
||||
logs_to_wait_for=("DS response from 10.53.0.2",),
|
||||
expected_parent_state="DSPublish",
|
||||
),
|
||||
# DS not published in parent.
|
||||
zone_check(servers["ns9"], "missing-dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone missing-dspublished.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "missing-dspublished.checkds.", "!DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="missing-dspublished.checkds",
|
||||
logs_to_wait_for=("empty DS response from 10.53.0.5",),
|
||||
expected_parent_state="!DSPublish",
|
||||
),
|
||||
# Badly configured parent.
|
||||
zone_check(servers["ns9"], "bad-dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad-dspublished.checkds/IN (signed): checkds: "
|
||||
"bad DS response from 10.53.0.6"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "bad-dspublished.checkds.", "!DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="bad-dspublished.checkds",
|
||||
logs_to_wait_for=("bad DS response from 10.53.0.6",),
|
||||
expected_parent_state="!DSPublish",
|
||||
),
|
||||
# TBD: DS published in parent, but bogus signature.
|
||||
|
||||
# DS correctly published in all parents.
|
||||
zone_check(servers["ns9"], "multiple-dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone multiple-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone multiple-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.4"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "multiple-dspublished.checkds.", "DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="multiple-dspublished.checkds",
|
||||
logs_to_wait_for=("DS response from 10.53.0.2", "DS response from 10.53.0.4"),
|
||||
expected_parent_state="DSPublish",
|
||||
),
|
||||
# DS published in only one of multiple parents.
|
||||
zone_check(servers["ns9"], "incomplete-dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.4"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dspublished.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "incomplete-dspublished.checkds.", "!DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="incomplete-dspublished.checkds",
|
||||
logs_to_wait_for=(
|
||||
"DS response from 10.53.0.2",
|
||||
"DS response from 10.53.0.4",
|
||||
"empty DS response from 10.53.0.5",
|
||||
),
|
||||
expected_parent_state="!DSPublish",
|
||||
),
|
||||
# One of the parents is badly configured.
|
||||
zone_check(servers["ns9"], "bad2-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.4"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dspublished.checkds/IN (signed): checkds: "
|
||||
"bad DS response from 10.53.0.6"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "bad2-dspublished.checkds.", "!DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="bad2-dspublished.checkds",
|
||||
logs_to_wait_for=(
|
||||
"DS response from 10.53.0.2",
|
||||
"DS response from 10.53.0.4",
|
||||
"bad DS response from 10.53.0.6",
|
||||
),
|
||||
expected_parent_state="!DSPublish",
|
||||
),
|
||||
# Check with resolver parental-agent.
|
||||
zone_check(servers["ns9"], "resolver-dspublished.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone resolver-dspublished.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.3"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "resolver-dspublished.checkds.", "DSPublish")
|
||||
|
||||
CheckDSTest(
|
||||
zone="resolver-dspublished.checkds",
|
||||
logs_to_wait_for=("DS response from 10.53.0.3",),
|
||||
expected_parent_state="DSPublish",
|
||||
),
|
||||
# TBD: DS published in all parents, but one has bogus signature.
|
||||
|
||||
# TBD: Check with TSIG
|
||||
|
||||
# TBD: Check with TLS
|
||||
]
|
||||
|
||||
|
||||
def test_checkds_dswithdrawn(named_port, servers):
|
||||
dswithdrawn_tests = [
|
||||
# DS correctly published in single parent.
|
||||
zone_check(servers["ns9"], "dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "dswithdrawn.checkds.", "DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="dswithdrawn.checkds",
|
||||
logs_to_wait_for=("empty DS response from 10.53.0.5",),
|
||||
expected_parent_state="DSRemoved",
|
||||
),
|
||||
# DS not withdrawn from parent.
|
||||
zone_check(servers["ns9"], "missing-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone missing-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "missing-dswithdrawn.checkds.", "!DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="missing-dswithdrawn.checkds",
|
||||
logs_to_wait_for=("DS response from 10.53.0.2",),
|
||||
expected_parent_state="!DSRemoved",
|
||||
),
|
||||
# Badly configured parent.
|
||||
zone_check(servers["ns9"], "bad-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"bad DS response from 10.53.0.6"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "bad-dswithdrawn.checkds.", "!DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="bad-dswithdrawn.checkds",
|
||||
logs_to_wait_for=("bad DS response from 10.53.0.6",),
|
||||
expected_parent_state="!DSRemoved",
|
||||
),
|
||||
# TBD: DS published in parent, but bogus signature.
|
||||
|
||||
# DS correctly withdrawn from all parents.
|
||||
zone_check(servers["ns9"], "multiple-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone multiple-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone multiple-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.7"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "multiple-dswithdrawn.checkds.", "DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="multiple-dswithdrawn.checkds",
|
||||
logs_to_wait_for=(
|
||||
"empty DS response from 10.53.0.5",
|
||||
"empty DS response from 10.53.0.7",
|
||||
),
|
||||
expected_parent_state="DSRemoved",
|
||||
),
|
||||
# DS withdrawn from only one of multiple parents.
|
||||
zone_check(servers["ns9"], "incomplete-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"DS response from 10.53.0.2"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone incomplete-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.7"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "incomplete-dswithdrawn.checkds.", "!DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="incomplete-dswithdrawn.checkds",
|
||||
logs_to_wait_for=(
|
||||
"DS response from 10.53.0.2",
|
||||
"empty DS response from 10.53.0.5",
|
||||
"empty DS response from 10.53.0.7",
|
||||
),
|
||||
expected_parent_state="!DSRemoved",
|
||||
),
|
||||
# One of the parents is badly configured.
|
||||
zone_check(servers["ns9"], "bad2-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.5"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.7"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone bad2-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"bad DS response from 10.53.0.6"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "bad2-dswithdrawn.checkds.", "!DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="bad2-dswithdrawn.checkds",
|
||||
logs_to_wait_for=(
|
||||
"empty DS response from 10.53.0.5",
|
||||
"empty DS response from 10.53.0.7",
|
||||
"bad DS response from 10.53.0.6",
|
||||
),
|
||||
expected_parent_state="!DSRemoved",
|
||||
),
|
||||
# Check with resolver parental-agent.
|
||||
zone_check(servers["ns9"], "resolver-dswithdrawn.checkds.")
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = (
|
||||
"zone resolver-dswithdrawn.checkds/IN (signed): checkds: "
|
||||
"empty DS response from 10.53.0.8"
|
||||
)
|
||||
watcher.wait_for_line(line)
|
||||
keystate_check(servers["ns2"], "resolver-dswithdrawn.checkds.", "DSRemoved")
|
||||
|
||||
CheckDSTest(
|
||||
zone="resolver-dswithdrawn.checkds",
|
||||
logs_to_wait_for=("empty DS response from 10.53.0.8",),
|
||||
expected_parent_state="DSRemoved",
|
||||
)
|
||||
# TBD: DS withdrawn from all parents, but one has bogus signature.
|
||||
]
|
||||
|
||||
|
||||
checkds_tests = dspublished_tests + dswithdrawn_tests
|
||||
|
||||
|
||||
@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
|
||||
def test_checkds(servers, params):
|
||||
# Wait until the provided zone is signed and then verify its DNSSEC data.
|
||||
zone_check(servers["ns9"], params.zone)
|
||||
|
||||
# Wait until all the expected log lines are found in the log file for the
|
||||
# provided server.
|
||||
for log_string in params.logs_to_wait_for:
|
||||
with servers["ns9"].watch_log_from_start() as watcher:
|
||||
line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
|
||||
watcher.wait_for_line(line)
|
||||
|
||||
# Check whether key states on the parent server provided match
|
||||
# expectations.
|
||||
keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
|
||||
|
||||
Reference in New Issue
Block a user