Pass additional tests as a test case attribute
Rather than executing the additional test(s) at the end of the test function, pass it as a test case attribute so that it can reuse variables already set (zone, policy, ksks, zsks).
This commit is contained in:
@@ -860,7 +860,7 @@ def check_subdomain(server, zone, ksks, zsks):
|
||||
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
|
||||
|
||||
|
||||
def update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks):
|
||||
def check_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks):
|
||||
# Test an RRset below the apex and verify it is updated and signed correctly.
|
||||
response = _query(server, qname, qtype)
|
||||
|
||||
|
||||
@@ -208,7 +208,33 @@ def test_kasp_cases(servers):
|
||||
f"zsk 31536000 {alg} {size[2]} omnipresent rumoured none rumoured none",
|
||||
]
|
||||
|
||||
# Test function.
|
||||
# Additional test functions.
|
||||
def test_ixfr_is_signed(
|
||||
expected_updates, zone=None, policy=None, ksks=None, zsks=None
|
||||
):
|
||||
isctest.log.info(f"check that the zone {zone} is correctly signed after ixfr")
|
||||
isctest.log.debug(
|
||||
f"expected updates {expected_updates} policy {policy} ksks {ksks} zsks {zsks}"
|
||||
)
|
||||
|
||||
shutil.copyfile(f"ns2/{zone}.db.in2", f"ns2/{zone}.db")
|
||||
servers["ns2"].rndc(f"reload {zone}", log=False)
|
||||
|
||||
def update_is_signed():
|
||||
parts = update.split()
|
||||
qname = parts[0]
|
||||
qtype = dns.rdatatype.from_text(parts[1])
|
||||
rdata = parts[2]
|
||||
return isctest.kasp.check_update_is_signed(
|
||||
server, zone, qname, qtype, rdata, ksks, zsks
|
||||
)
|
||||
|
||||
for update in expected_updates:
|
||||
isctest.run.retry_with_timeout(update_is_signed, timeout=5)
|
||||
|
||||
isctest.kasp.check_dnssec_verify(server, zone)
|
||||
|
||||
# Test case function.
|
||||
def test_case():
|
||||
zone = test["zone"]
|
||||
policy = test["policy"]
|
||||
@@ -244,6 +270,12 @@ def test_kasp_cases(servers):
|
||||
isctest.kasp.check_keytimes(keys, expected)
|
||||
check_all(server, zone, policy, ksks, zsks)
|
||||
|
||||
if "additional-tests" in test:
|
||||
for additional_test in test["additional-tests"]:
|
||||
callback = additional_test["callback"]
|
||||
arguments = additional_test["arguments"]
|
||||
callback(*arguments, zone=zone, policy=policy, ksks=ksks, zsks=zsks)
|
||||
|
||||
# Test cases.
|
||||
rsa_cases = []
|
||||
if os.environ["RSASHA1_SUPPORTED"] == 1:
|
||||
@@ -333,6 +365,17 @@ def test_kasp_cases(servers):
|
||||
"policy": "rsasha256",
|
||||
"config": kasp_config,
|
||||
"key-properties": fips_properties(8),
|
||||
"additional-tests": [
|
||||
{
|
||||
"callback": test_ixfr_is_signed,
|
||||
"arguments": [
|
||||
[
|
||||
"a.secondary.kasp. A 10.0.0.11",
|
||||
"d.secondary.kasp. A 10.0.0.4",
|
||||
],
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"zone": "some-keys.kasp",
|
||||
@@ -376,33 +419,6 @@ def test_kasp_cases(servers):
|
||||
for test in test_cases:
|
||||
test_case()
|
||||
|
||||
# Additional test case for "bump-in-the-wire" secondary zone.
|
||||
isctest.log.info("check that the zone is correctly signed after ixfr")
|
||||
zone = "secondary.kasp"
|
||||
expected = isctest.kasp.policy_to_properties(ttl=1234, keys=fips_properties(8))
|
||||
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
|
||||
ksks = [k for k in keys if k.is_ksk()]
|
||||
zsks = [k for k in keys if k.is_zsk()]
|
||||
isctest.kasp.check_keys(zone, keys, expected)
|
||||
|
||||
shutil.copyfile(f"ns2/{zone}.db.in2", f"ns2/{zone}.db")
|
||||
servers["ns2"].rndc(f"reload {zone}", log=False)
|
||||
|
||||
def update_is_signed():
|
||||
parts = update.split()
|
||||
qname = parts[0]
|
||||
qtype = dns.rdatatype.from_text(parts[1])
|
||||
rdata = parts[2]
|
||||
return isctest.kasp.update_is_signed(
|
||||
server, zone, qname, qtype, rdata, ksks, zsks
|
||||
)
|
||||
|
||||
expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.4"]
|
||||
for update in expected_updates:
|
||||
isctest.run.retry_with_timeout(update_is_signed, timeout=5)
|
||||
|
||||
isctest.kasp.check_dnssec_verify(server, zone)
|
||||
|
||||
|
||||
def test_kasp_default(servers):
|
||||
server = servers["ns3"]
|
||||
@@ -464,7 +480,7 @@ def test_kasp_default(servers):
|
||||
qname = parts[0]
|
||||
qtype = dns.rdatatype.from_text(parts[1])
|
||||
rdata = parts[2]
|
||||
return isctest.kasp.update_is_signed(
|
||||
return isctest.kasp.check_update_is_signed(
|
||||
server, zone, qname, qtype, rdata, ksks, zsks
|
||||
)
|
||||
|
||||
@@ -554,7 +570,7 @@ def test_kasp_dynamic(servers):
|
||||
qname = parts[0]
|
||||
qtype = dns.rdatatype.from_text(parts[1])
|
||||
rdata = parts[2]
|
||||
return isctest.kasp.update_is_signed(
|
||||
return isctest.kasp.check_update_is_signed(
|
||||
server, zone, qname, qtype, rdata, ksks, zsks
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user