Convert default named test cases to pytest
This commit mostly deals with converting the test cases related to the default dnssec-policy. For this we need a new utility that can retrieve a list of Keys from a given directory, belonging to a specific zone, 'keydir_to_keylist'. One more test case is converted, dealing with an error case related to a too high maximum TTL. Other error test cases will go into 'test_kasp_errors' too. Remove the counterparts for the newly added test from the kasp shell tests script.
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from functools import total_ordering
|
||||
import glob
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
@@ -828,5 +829,25 @@ def check_subdomain(server, zone, ksks, zsks):
|
||||
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
|
||||
|
||||
|
||||
def keydir_to_keylist(zone: str, keydir: Optional[str] = None) -> List[Key]:
|
||||
# Retrieve all keys from the key files in a directory. If 'zone' is None,
|
||||
# retrieve all keys in the directory, otherwise only those matching the
|
||||
# zone name. If 'keydir' is None, search the current directory.
|
||||
if zone is None:
|
||||
zone = ""
|
||||
|
||||
lsdir = keydir
|
||||
if lsdir is None:
|
||||
lsdir = ""
|
||||
|
||||
regex = rf"{lsdir}/(K{zone}\.\+.*\+.*)\.key"
|
||||
|
||||
return [
|
||||
Key(re.match(regex, filename).group(1), keydir)
|
||||
for filename in glob.glob(f"{lsdir}/K{zone}.+*+*.key")
|
||||
if re.match(regex, filename) is not None
|
||||
]
|
||||
|
||||
|
||||
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
|
||||
return [Key(name, keydir) for name in keystr.split()]
|
||||
|
||||
@@ -84,18 +84,6 @@ retry_quiet 30 _wait_for_done_apexnsec || ret=1
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
# Test max-zone-ttl rejects zones with too high TTL.
|
||||
n=$((n + 1))
|
||||
echo_i "check that max-zone-ttl rejects zones with too high TTL ($n)"
|
||||
ret=0
|
||||
set_zone "max-zone-ttl.kasp"
|
||||
grep "loading from master file ${ZONE}.db failed: out of range" "ns3/named.run" >/dev/null || ret=1
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
#
|
||||
# Zone: default.kasp.
|
||||
#
|
||||
set_keytimes_csk_policy() {
|
||||
# The first key is immediately published and activated.
|
||||
created=$(key_get KEY1 CREATED)
|
||||
@@ -109,124 +97,6 @@ set_keytimes_csk_policy() {
|
||||
# Key lifetime is unlimited, so not setting RETIRED and REMOVED.
|
||||
}
|
||||
|
||||
# Check the zone with default kasp policy has loaded and is signed.
|
||||
set_zone "default.kasp"
|
||||
set_policy "default" "1" "3600"
|
||||
set_server "ns3" "10.53.0.3"
|
||||
# Key properties.
|
||||
set_keyrole "KEY1" "csk"
|
||||
set_keylifetime "KEY1" "0"
|
||||
set_keyalgorithm "KEY1" "13" "ECDSAP256SHA256" "256"
|
||||
set_keysigning "KEY1" "yes"
|
||||
set_zonesigning "KEY1" "yes"
|
||||
# DNSKEY, RRSIG (ksk), RRSIG (zsk) are published. DS needs to wait.
|
||||
set_keystate "KEY1" "GOAL" "omnipresent"
|
||||
set_keystate "KEY1" "STATE_DNSKEY" "rumoured"
|
||||
set_keystate "KEY1" "STATE_KRRSIG" "rumoured"
|
||||
set_keystate "KEY1" "STATE_ZRRSIG" "rumoured"
|
||||
set_keystate "KEY1" "STATE_DS" "hidden"
|
||||
|
||||
check_keys
|
||||
check_dnssecstatus "$SERVER" "$POLICY" "$ZONE"
|
||||
set_keytimes_csk_policy
|
||||
check_keytimes
|
||||
check_apex
|
||||
check_subdomain
|
||||
dnssec_verify
|
||||
|
||||
# Trigger a keymgr run. Make sure the key files are not touched if there are
|
||||
# no modifications to the key metadata.
|
||||
n=$((n + 1))
|
||||
echo_i "make sure key files are untouched if metadata does not change ($n)"
|
||||
ret=0
|
||||
basefile=$(key_get KEY1 BASEFILE)
|
||||
privkey_stat=$(key_get KEY1 PRIVKEY_STAT)
|
||||
pubkey_stat=$(key_get KEY1 PUBKEY_STAT)
|
||||
state_stat=$(key_get KEY1 STATE_STAT)
|
||||
|
||||
nextpart $DIR/named.run >/dev/null
|
||||
rndccmd 10.53.0.3 loadkeys "$ZONE" >/dev/null || log_error "rndc loadkeys zone ${ZONE} failed"
|
||||
wait_for_log 3 "keymgr: $ZONE done" $DIR/named.run || ret=1
|
||||
privkey_stat2=$(key_stat "${basefile}.private")
|
||||
pubkey_stat2=$(key_stat "${basefile}.key")
|
||||
state_stat2=$(key_stat "${basefile}.state")
|
||||
test "$privkey_stat" = "$privkey_stat2" || log_error "wrong private key file stat (expected $privkey_stat got $privkey_stat2)"
|
||||
test "$pubkey_stat" = "$pubkey_stat2" || log_error "wrong public key file stat (expected $pubkey_stat got $pubkey_stat2)"
|
||||
test "$state_stat" = "$state_stat2" || log_error "wrong state file stat (expected $state_stat got $state_stat2)"
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
n=$((n + 1))
|
||||
echo_i "again ($n)"
|
||||
ret=0
|
||||
|
||||
nextpart $DIR/named.run >/dev/null
|
||||
rndccmd 10.53.0.3 loadkeys "$ZONE" >/dev/null || log_error "rndc loadkeys zone ${ZONE} failed"
|
||||
wait_for_log 3 "keymgr: $ZONE done" $DIR/named.run || ret=1
|
||||
privkey_stat2=$(key_stat "${basefile}.private")
|
||||
pubkey_stat2=$(key_stat "${basefile}.key")
|
||||
state_stat2=$(key_stat "${basefile}.state")
|
||||
test "$privkey_stat" = "$privkey_stat2" || log_error "wrong private key file stat (expected $privkey_stat got $privkey_stat2)"
|
||||
test "$pubkey_stat" = "$pubkey_stat2" || log_error "wrong public key file stat (expected $pubkey_stat got $pubkey_stat2)"
|
||||
test "$state_stat" = "$state_stat2" || log_error "wrong state file stat (expected $state_stat got $state_stat2)"
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
# Update zone.
|
||||
n=$((n + 1))
|
||||
echo_i "modify unsigned zone file and check that new record is signed for zone ${ZONE} ($n)"
|
||||
ret=0
|
||||
cp "${DIR}/template2.db.in" "${DIR}/${ZONE}.db"
|
||||
rndccmd 10.53.0.3 reload "$ZONE" >/dev/null || log_error "rndc reload zone ${ZONE} failed"
|
||||
|
||||
update_is_signed() {
|
||||
ip_a=$1
|
||||
ip_d=$2
|
||||
|
||||
if [ "$ip_a" != "-" ]; then
|
||||
dig_with_opts "a.${ZONE}" "@${SERVER}" A >"dig.out.$DIR.test$n.a" || return 1
|
||||
grep "status: NOERROR" "dig.out.$DIR.test$n.a" >/dev/null || return 1
|
||||
grep "a.${ZONE}\..*${DEFAULT_TTL}.*IN.*A.*${ip_a}" "dig.out.$DIR.test$n.a" >/dev/null || return 1
|
||||
lines=$(get_keys_which_signed A 0 "dig.out.$DIR.test$n.a" | wc -l)
|
||||
test "$lines" -eq 1 || return 1
|
||||
get_keys_which_signed A 0 "dig.out.$DIR.test$n.a" | grep "^${KEY_ID}$" >/dev/null || return 1
|
||||
fi
|
||||
|
||||
if [ "$ip_d" != "-" ]; then
|
||||
dig_with_opts "d.${ZONE}" "@${SERVER}" A >"dig.out.$DIR.test$n".d || return 1
|
||||
grep "status: NOERROR" "dig.out.$DIR.test$n".d >/dev/null || return 1
|
||||
grep "d.${ZONE}\..*${DEFAULT_TTL}.*IN.*A.*${ip_d}" "dig.out.$DIR.test$n".d >/dev/null || return 1
|
||||
lines=$(get_keys_which_signed A 0 "dig.out.$DIR.test$n".d | wc -l)
|
||||
test "$lines" -eq 1 || return 1
|
||||
get_keys_which_signed A 0 "dig.out.$DIR.test$n".d | grep "^${KEY_ID}$" >/dev/null || return 1
|
||||
fi
|
||||
}
|
||||
|
||||
retry_quiet 10 update_is_signed "10.0.0.11" "10.0.0.44" || ret=1
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
# Move the private key file, a rekey event should not introduce replacement
|
||||
# keys.
|
||||
ret=0
|
||||
echo_i "test that if private key files are inaccessible this doesn't trigger a rollover ($n)"
|
||||
basefile=$(key_get KEY1 BASEFILE)
|
||||
mv "${basefile}.private" "${basefile}.offline"
|
||||
rndccmd 10.53.0.3 loadkeys "$ZONE" >/dev/null || log_error "rndc loadkeys zone ${ZONE} failed"
|
||||
wait_for_log 3 "zone $ZONE/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing" $DIR/named.run || ret=1
|
||||
mv "${basefile}.offline" "${basefile}.private"
|
||||
test "$ret" -eq 0 || echo_i "failed"
|
||||
status=$((status + ret))
|
||||
|
||||
# Nothing has changed.
|
||||
check_keys
|
||||
check_dnssecstatus "$SERVER" "$POLICY" "$ZONE"
|
||||
set_keytimes_csk_policy
|
||||
check_keytimes
|
||||
check_apex
|
||||
check_subdomain
|
||||
dnssec_verify
|
||||
|
||||
#
|
||||
# A zone with special characters.
|
||||
#
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
||||
from datetime import timedelta
|
||||
@@ -29,6 +30,7 @@ pytestmark = pytest.mark.extra_artifacts(
|
||||
"K*.cmp",
|
||||
"K*.key",
|
||||
"K*.state",
|
||||
"*.axfr",
|
||||
"*.created",
|
||||
"dig.out*",
|
||||
"keyevent.out.*",
|
||||
@@ -44,8 +46,9 @@ pytestmark = pytest.mark.extra_artifacts(
|
||||
"unused.key-*",
|
||||
"verify.out.*",
|
||||
"zone.out.*",
|
||||
"ns*/K*.private",
|
||||
"ns*/K*.key",
|
||||
"ns*/K*.offline",
|
||||
"ns*/K*.private",
|
||||
"ns*/K*.state",
|
||||
"ns*/*.db",
|
||||
"ns*/*.db.infile",
|
||||
@@ -73,6 +76,159 @@ pytestmark = pytest.mark.extra_artifacts(
|
||||
)
|
||||
|
||||
|
||||
def check_all(server, zone, policy, ksks, zsks):
|
||||
isctest.kasp.check_dnssecstatus(server, zone, ksks + zsks, policy=policy)
|
||||
isctest.kasp.check_apex(server, zone, ksks, zsks)
|
||||
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
|
||||
isctest.kasp.check_dnssec_verify(server, zone)
|
||||
|
||||
|
||||
def test_kasp_default(servers):
|
||||
ns3 = servers["ns3"]
|
||||
emptydict = {}
|
||||
|
||||
dig_with_opts = isctest.run.Dig(
|
||||
f"+tcp +noadd +nosea +nostat +nocmd +dnssec -p {str(named_port)}"
|
||||
)
|
||||
|
||||
# check the zone with default kasp policy has loaded and is signed.
|
||||
isctest.log.info("check a zone with the default policy is signed")
|
||||
zone = "default.kasp"
|
||||
policy = "default"
|
||||
|
||||
# Key properties.
|
||||
prop_csk = {
|
||||
"expect": True,
|
||||
"private": True,
|
||||
"legacy": False,
|
||||
"role": "csk",
|
||||
"role_full": "key-signing",
|
||||
"dnskey_ttl": 3600,
|
||||
"flags": 257,
|
||||
}
|
||||
# DNSKEY, RRSIG (ksk), RRSIG (zsk) are published. DS needs to wait.
|
||||
meta1 = {
|
||||
"Algorithm": 13, # ECDSAP256SHA256
|
||||
"Length": 256,
|
||||
"Lifetime": 0,
|
||||
"KSK": "yes",
|
||||
"ZSK": "yes",
|
||||
"GoalState": "omnipresent",
|
||||
"DNSKEYState": "rumoured",
|
||||
"KRRSIGState": "rumoured",
|
||||
"ZRRSIGState": "rumoured",
|
||||
"DSState": "hidden",
|
||||
}
|
||||
key1 = KeyProperties("KEY1", prop_csk, meta1, emptydict)
|
||||
# Initial checks.
|
||||
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
|
||||
expected = [key1]
|
||||
isctest.kasp.check_zone_is_signed(ns3, zone)
|
||||
isctest.kasp.check_keys(zone, keys, expected)
|
||||
# The first key is immediately published and activated.
|
||||
key1.timing["Generated"] = key1.key.get_timing("Created")
|
||||
key1.timing["Published"] = key1.timing["Generated"]
|
||||
key1.timing["Active"] = key1.timing["Generated"]
|
||||
# The DS can be published if the DNSKEY and RRSIG records are
|
||||
# OMNIPRESENT. This happens after max-zone-ttl (1d) plus
|
||||
# publish-safety (1h) plus zone-propagation-delay (300s).
|
||||
key1.timing["PublishCDS"] = key1.timing["Published"] + timedelta(
|
||||
days=1, hours=1, seconds=300
|
||||
)
|
||||
# Key lifetime is unlimited, so not setting 'Retired' nor 'Removed'.
|
||||
key1.timing["DNSKEYChange"] = key1.timing["Published"]
|
||||
key1.timing["DSChange"] = key1.timing["Published"]
|
||||
key1.timing["KRRSIGChange"] = key1.timing["Active"]
|
||||
key1.timing["ZRRSIGChange"] = key1.timing["Active"]
|
||||
# Remainder checks.
|
||||
expected = [key1]
|
||||
isctest.kasp.check_keytimes(keys, expected)
|
||||
check_all(ns3, zone, policy, keys, [])
|
||||
|
||||
# Trigger a keymgr run. Make sure the key files are not touched if there are
|
||||
# no modifications to the key metadata.
|
||||
isctest.log.info(
|
||||
"check that key files are untouched if there are no metadata changes"
|
||||
)
|
||||
key = keys[0]
|
||||
privkey_stat = os.stat(key.privatefile)
|
||||
pubkey_stat = os.stat(key.keyfile)
|
||||
state_stat = os.stat(key.statefile)
|
||||
|
||||
with ns3.watch_log_from_here() as watcher:
|
||||
ns3.rndc(f"loadkeys {zone}", log=False)
|
||||
watcher.wait_for_line(f"keymgr: {zone} done")
|
||||
|
||||
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
|
||||
assert pubkey_stat.st_mtime == os.stat(key.keyfile).st_mtime
|
||||
assert state_stat.st_mtime == os.stat(key.statefile).st_mtime
|
||||
|
||||
# again
|
||||
with ns3.watch_log_from_here() as watcher:
|
||||
ns3.rndc(f"loadkeys {zone}", log=False)
|
||||
watcher.wait_for_line(f"keymgr: {zone} done")
|
||||
|
||||
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
|
||||
assert pubkey_stat.st_mtime == os.stat(key.keyfile).st_mtime
|
||||
assert state_stat.st_mtime == os.stat(key.statefile).st_mtime
|
||||
|
||||
# modify unsigned zone file and check that new record is signed.
|
||||
isctest.log.info("check that an updated zone signs the new record")
|
||||
shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
|
||||
ns3.rndc(f"reload {zone}", log=False)
|
||||
|
||||
def update_is_signed():
|
||||
out = dig_with_opts(f"{owner} {type} @10.53.0.3")
|
||||
if "status: NOERROR" not in out:
|
||||
return False
|
||||
|
||||
regex = rf"{owner}\s+{ttl}\s+IN\s+{type}\s+{rdata}"
|
||||
sregex = rf"{owner}\s+{ttl}\s+IN\s+RRSIG\s+{type}"
|
||||
if alg is not None:
|
||||
sregex = rf"{owner}\s+{ttl}\s+IN\s+RRSIG\s+{type}\s+{alg}"
|
||||
|
||||
matches = re.findall(regex, out)
|
||||
smatches = re.findall(sregex, out)
|
||||
|
||||
return len(matches) == 1 and len(smatches) == 1
|
||||
|
||||
owner = f"a.{zone}."
|
||||
type = "A"
|
||||
ttl = 300
|
||||
rdata = "10.0.0.11"
|
||||
alg = key1.metadata["Algorithm"]
|
||||
isctest.run.retry_with_timeout(update_is_signed, timeout=10)
|
||||
|
||||
owner = f"d.{zone}."
|
||||
rdata = "10.0.0.44"
|
||||
isctest.run.retry_with_timeout(update_is_signed, timeout=10)
|
||||
|
||||
# Move the private key file, a rekey event should not introduce replacement keys.
|
||||
isctest.log.info(
|
||||
"check that if private key is inaccessible this doesn't trigger a rollover"
|
||||
)
|
||||
shutil.move(f"{key.privatefile}", f"{key.path}.offline")
|
||||
with ns3.watch_log_from_here() as watcher:
|
||||
ns3.rndc(f"loadkeys {zone}", log=False)
|
||||
watcher.wait_for_line(
|
||||
f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
|
||||
)
|
||||
# Nothing has changed.
|
||||
key1.properties["private"] = False
|
||||
expected = [key1]
|
||||
isctest.kasp.check_keys(zone, keys, expected)
|
||||
isctest.kasp.check_keytimes(keys, expected)
|
||||
check_all(ns3, zone, policy, keys, [])
|
||||
|
||||
|
||||
def test_kasp_errors(servers):
|
||||
ns3 = servers["ns3"]
|
||||
|
||||
# check that max-zone-ttl rejects zones with too high TTL.
|
||||
zone = "max-zone-ttl.kasp"
|
||||
assert f"loading from master file {zone}.db failed: out of range" in ns3.log
|
||||
|
||||
|
||||
def test_kasp_dnssec_keygen():
|
||||
def keygen(zone, policy, keydir=None):
|
||||
if keydir is None:
|
||||
|
||||
Reference in New Issue
Block a user