Retry dnssec-verify in kasp test code
It is possible that the zone is not yet fully signed because it is signed in batches. Retry the AXFR and verify command a couple of times.
This commit is contained in:
@@ -13,6 +13,7 @@ from functools import total_ordering
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional, Union
|
||||
|
||||
@@ -21,7 +22,7 @@ from datetime import timedelta
|
||||
|
||||
import dns
|
||||
import isctest.log
|
||||
|
||||
import isctest.query
|
||||
|
||||
DEFAULT_TTL = 300
|
||||
|
||||
@@ -29,7 +30,7 @@ DEFAULT_TTL = 300
|
||||
def _query(server, qname, qtype):
|
||||
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
|
||||
try:
|
||||
response = dns.query.tcp(query, server.ip, port=server.ports.dns, timeout=3)
|
||||
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
|
||||
except dns.exception.Timeout:
|
||||
isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}")
|
||||
return None
|
||||
@@ -278,21 +279,34 @@ def check_zone_is_signed(server, zone):
|
||||
def check_dnssec_verify(server, zone):
|
||||
# Check if zone if DNSSEC valid with dnssec-verify.
|
||||
fqdn = f"{zone}."
|
||||
transfer = _query(server, fqdn, dns.rdatatype.AXFR)
|
||||
if not isinstance(transfer, dns.message.Message):
|
||||
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
|
||||
elif transfer.rcode() != dns.rcode.NOERROR:
|
||||
rcode = dns.rcode.to_text(transfer.rcode())
|
||||
isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}")
|
||||
else:
|
||||
zonefile = f"{zone}.axfr"
|
||||
with open(zonefile, "w", encoding="utf-8") as file:
|
||||
for rr in transfer.answer:
|
||||
file.write(rr.to_text())
|
||||
file.write("\n")
|
||||
|
||||
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
|
||||
isctest.run.cmd(verify_command)
|
||||
verified = False
|
||||
for _ in range(10):
|
||||
transfer = _query(server, fqdn, dns.rdatatype.AXFR)
|
||||
if not isinstance(transfer, dns.message.Message):
|
||||
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
|
||||
elif transfer.rcode() != dns.rcode.NOERROR:
|
||||
rcode = dns.rcode.to_text(transfer.rcode())
|
||||
isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}")
|
||||
else:
|
||||
zonefile = f"{zone}.axfr"
|
||||
with open(zonefile, "w", encoding="utf-8") as file:
|
||||
for rr in transfer.answer:
|
||||
file.write(rr.to_text())
|
||||
file.write("\n")
|
||||
|
||||
try:
|
||||
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
|
||||
verified = isctest.run.cmd(verify_command)
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
|
||||
if verified:
|
||||
break
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
assert verified
|
||||
|
||||
|
||||
def check_dnssecstatus(server, zone, keys, policy=None, view=None):
|
||||
|
||||
Reference in New Issue
Block a user