Compare commits

...

23 Commits

Author SHA1 Message Date
Petr Špaček
7294e2b4a9 Add test for dns_qp_lookup() chains
Reproduces a problem on BIND BIND 9.19.25-dev 9357019

Related: #4717
2024-05-09 10:07:30 +02:00
Petr Špaček
07a4f7c6cd reproduces an issue in lookup() - BIND 657ee2b997 2024-05-08 14:47:30 +02:00
Petr Špaček
f78ac9e879 lookup & iterator test seems to work 2024-05-08 14:47:30 +02:00
Petr Špaček
206002d37d resurrect QPIterator tests and keep full copy of data model in iterator 2024-05-08 14:47:30 +02:00
Petr Špaček
9121d1a98c qp_dns_loopup test works
In rouhly half of test executions pytest hangs indefinitely - tested
against BIND commit a863450 which contains bug in fix_iterator() QP
function. Repeated pstack confirms that execution is cycling in
fix_iterator().
2024-05-08 14:47:30 +02:00
Petr Špaček
ee65ee0ad0 Fix off by one and related type errors in DNS name generator 2024-05-08 14:47:30 +02:00
Petr Špaček
4dd0797316 WIP: faster turnaround for debug 2024-05-08 14:47:30 +02:00
Petr Špaček
854a9ad735 Ignore .hypothesis test metadata directory 2024-05-08 14:47:30 +02:00
Petr Špaček
8c62a779a2 WIP: single iterator test which covers next() and prev() into more depth 2024-05-08 14:47:30 +02:00
Petr Špaček
590e434700 WIP: it does not explore iterators in sufficient depth 2024-05-08 14:47:30 +02:00
Petr Špaček
decec5b53b WIP: generic single-step iteration checkers 2024-05-08 14:47:30 +02:00
Petr Špaček
8f908624b5 WIP: broken values_agree() 2024-05-08 14:47:30 +02:00
Petr Špaček
9a77309024 Add basic forward iteration & key check 2024-05-08 14:47:30 +02:00
Petr Špaček
c5376e49b2 Add basic add/delete name QP trie unit test with Python hypothesis 2024-05-08 14:47:30 +02:00
Petr Špaček
f6da2db1b2 WIP: qp tests based on python hypothesis 2024-05-08 14:47:30 +02:00
Petr Špaček
27e4001f77 Ignore autogenerated CFFI files in tests/dns 2024-05-08 14:47:30 +02:00
Petr Špaček
ed3834578f Sort tests/dns/.gitignore 2024-05-08 14:47:30 +02:00
Petr Špaček
91cb7306f7 WIP: rename and move CFFI test files into tests/dns 2024-05-08 14:47:30 +02:00
Petr Špaček
68c194a96e WIP: test for name_downcase and name_hash 2024-05-08 14:47:30 +02:00
Petr Špaček
60d4d8d21f WIP: unit test skeleton, no integration yet 2024-05-08 14:47:30 +02:00
Petr Špaček
d90c7f238d WIP: copy from stepan/hypothesis 6250d2306a3c4910d175c4e720c17f84f772ed75 2024-05-08 14:47:29 +02:00
Petr Špaček
4c36408f34 WIP: mctx + de/compression 2024-05-08 14:47:29 +02:00
Petr Špaček
fb23539fb8 WIP 2024-05-08 14:47:29 +02:00
7 changed files with 1016 additions and 2 deletions

View File

@@ -1,4 +1,6 @@
/zone.data
/.hypothesis
/_*_cffi.*
/badcache.out
/testdata/dnstap/dnstap.file
/testdata/master/master18.data
/badcache.out
/zone.data

110
tests/dns/name_test.py Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
"""
Example property-based test for dns_name_ API.
"""
import pytest
# in FIPs mode md5 fails so we need 4.41.2 or later which does not use md5
try:
import hashlib
hashlib.md5(b"1234")
pytest.importorskip("hypothesis")
except ValueError:
pytest.importorskip("hypothesis", minversion="4.41.2")
from hypothesis import assume, example, given
pytest.importorskip("dns", minversion="2.0.0")
import dns.name
from strategies import dns_names
from _name_test_cffi import ffi
from _name_test_cffi import lib as isclibs
NULL = ffi.NULL
# MCTXP = ffi.new('isc_mem_t **')
# isclibs.isc__mem_create(MCTXP)
class ISCName:
"""dns_name_t instance with a private fixed buffer"""
def __init__(self, from_text=None):
self.fixedname = ffi.new("dns_fixedname_t *")
self.name = isclibs.dns_fixedname_initname(self.fixedname)
# self.cctx = ffi.new("dns_compress_t *")
# self.dctx = ffi.new("dns_decompress_t *")
self.formatbuf = ffi.new("char[1024]") # DNS_NAME_FORMATSIZE
if from_text is not None:
assert (
isclibs.dns_name_fromstring(
self.name, from_text.encode("ascii"), NULL, 0, NULL
)
== 0
)
def format(self):
isclibs.dns_name_format(self.name, self.formatbuf, len(self.formatbuf))
return ffi.string(self.formatbuf).decode("ascii")
@given(pyname=dns_names(suffix=dns.name.root))
def test_totext_fromtext_roundtrip(pyname: dns.name.Name) -> None:
"""
text formatting and parsing roundtrip must not change the name
dnspython to_text -> ISC from_string -> ISC format -> dnspython from_text
"""
iscname = ISCName(from_text=str(pyname))
assert pyname == dns.name.from_text(iscname.format())
@given(pyname=dns_names(suffix=dns.name.root))
def test_downcase(pyname: dns.name.Name) -> None:
downcased = ISCName(from_text=str(pyname))
assert isclibs.dns_name_hash(downcased.name)
isclibs.dns_name_downcase(downcased.name, downcased.name, NULL)
assert not any(
letter.isupper() for letter in downcased.format()
), "downcasing removes all ASCII uppercase letters"
@given(pyname=dns_names(suffix=dns.name.root))
def test_hash_downcase(pyname: dns.name.Name) -> None:
"""downcasing must not affect hash value"""
orig = ISCName(from_text=str(pyname))
orig_hash = isclibs.dns_name_hash(orig.name)
downcased = ISCName(from_text=str(pyname))
assert isclibs.dns_name_hash(downcased.name) == orig_hash, "hash is stable"
isclibs.dns_name_downcase(downcased.name, downcased.name, NULL)
assert not any(
letter.isupper() for letter in downcased.format()
), "downcasing actually works"
assert pyname == dns.name.from_text(
downcased.format()
), "downcasing must not change semantic value"
downcased_hash = isclibs.dns_name_hash(downcased.name)
assert orig_hash == downcased_hash, "downcasing must not change hash value"

View File

@@ -0,0 +1,69 @@
#!/usr/bin/python
from cffi import FFI
ffibuilder = FFI()
# cdef() expects a single string declaring the C types, functions and
# globals needed to use the shared object. It must be in valid C syntax.
ffibuilder.cdef(
"""
typedef int... isc_result_t;
typedef ... isc_mem_t;
typedef ... isc_buffer_t;
typedef struct { ...; } dns_compress_t;
typedef int... dns_decompress_t;
typedef struct { ...; } dns_name_t;
typedef struct { ...; } dns_fixedname_t;
void
isc__mem_create(isc_mem_t **);
void
isc_mem_attach(isc_mem_t *, isc_mem_t **);
isc_result_t
dns_name_fromstring(dns_name_t *target, const char *src,
const dns_name_t *origin, unsigned int options,
isc_mem_t *mctx);
void
dns_name_format(const dns_name_t *name, char *cp, unsigned int size);
static inline void
dns_name_init(dns_name_t *name, unsigned char *offsets);
dns_name_t *
dns_fixedname_initname(dns_fixedname_t *fixed);
isc_result_t
dns_name_downcase(const dns_name_t *source, dns_name_t *name,
isc_buffer_t *target);
uint32_t
dns_name_hash(const dns_name_t *name);
"""
)
# set_source() gives the name of the python extension module to
# produce, and some C source code as a string. This C code needs
# to make the declarated functions, types and globals available,
# so it is often just the "#include".
ffibuilder.set_source(
"_name_test_cffi",
"""
#include "isc/buffer.h"
#include "isc/mem.h"
#include "dns/name.h"
#include "dns/compress.h"
#include "dns/fixedname.h"
""",
libraries=["dns"],
include_dirs=["../../lib/isc/include", "../../lib/dns/include"],
)
if __name__ == "__main__":
ffibuilder.compile(
verbose=True,
)

512
tests/dns/qp_test.py Executable file
View File

@@ -0,0 +1,512 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
"""
Example property-based test for dns_name_ API.
"""
import pytest
# in FIPs mode md5 fails so we need 4.41.2 or later which does not use md5
try:
import hashlib
hashlib.md5(b"1234")
pytest.importorskip("hypothesis")
except ValueError:
pytest.importorskip("hypothesis", minversion="4.41.2")
from hypothesis import assume, example, event, given
from hypothesis.stateful import Bundle, RuleBasedStateMachine, rule, precondition
import hypothesis
pytest.importorskip("dns", minversion="2.0.0")
import dns.name
from strategies import dns_names, composite
from _qp_test_cffi import ffi
from _qp_test_cffi import lib as isclibs
NULL = ffi.NULL
MCTXP = ffi.new("isc_mem_t **")
isclibs.isc__mem_create(MCTXP)
MCTX = MCTXP[0]
def event(*args):
pass
#def print(*args):
# pass
@composite
def subdomains(draw, named_bundle):
parent = draw(named_bundle)
# the parent name has less then two bytes left, no way to add a subdomain to it
if len(parent) + sum(map(len, parent)) > 253:
return parent
subdomain = draw(dns_names(suffix=parent))
return subdomain
class ISCName:
"""
dns_name_t instance with a private fixed buffer
Make sure Python keeps reference to this object as long
as it can be referenced from the C side.
"""
def __init__(self, initval=None):
self.fixedname = ffi.new("dns_fixedname_t *")
self.cobj = isclibs.dns_fixedname_initname(self.fixedname)
# self.cctx = ffi.new("dns_compress_t *")
# self.dctx = ffi.new("dns_decompress_t *")
self.formatbuf = ffi.new("char[1024]") # DNS_NAME_FORMATSIZE
if initval is None:
return
if isinstance(initval, dns.name.Name):
initval = str(initval)
if isinstance(initval, str):
assert (
isclibs.dns_name_fromstring(
self.cobj, initval.encode("ascii"), NULL, 0, NULL
)
== 0
)
return
raise NotImplementedError(type(initval))
def cformat(self):
isclibs.dns_name_format(self.cobj, self.formatbuf, len(self.formatbuf))
return ffi.string(self.formatbuf).decode("ascii")
def pyname(self):
return dns.name.from_text(self.cformat())
@given(pyname_source=dns_names(suffix=dns.name.root))
def test_fromname_toname_roundtrip(pyname_source: dns.name.Name) -> None:
"""
name to/from qpkey must not change the name
"""
iscname_source = ISCName(pyname_source)
assert pyname_source == iscname_source.pyname()
qpkey = ffi.new("dns_qpkey_t *")
qpkeysize = isclibs.dns_qpkey_fromname(qpkey[0], iscname_source.cobj)
iscname_target = ISCName()
isclibs.dns_qpkey_toname(qpkey[0], qpkeysize, iscname_target.cobj)
pyname_target = iscname_target.pyname()
assert pyname_source == pyname_target
class QPChain:
def __init__(self, testcase):
self.testcase = testcase
self.iter_generation = testcase.generation
self.qp = testcase.qp
self.cchain = ffi.new("dns_qpchain_t *")
# print(id(self), isclibs.dns_qpiter_init)
isclibs.dns_qpchain_init(self.qp, self.cchain)
def length(self):
ret = isclibs.dns_qpchain_length(self.cchain)
print(self, 'length', ret)
return ret
def print(self):
print('C chain structure print out', self)
for chainidx in range(self.length()):
got_iscname, got_pval_r, _got_ival_r = self.node(chainidx)
print('idx', chainidx, got_iscname.pyname())
def check(self, pylookupname: dns.name.Name):
print('check chain for lookup', pylookupname)
print('model', sorted(self.testcase.model))
chainidx = 0
for idx in range(1, len(pylookupname) + 1):
parentname = pylookupname.split(idx)[1]
assert self.length() >= chainidx
if parentname not in self.testcase.model:
print(parentname, 'NOT present in model')
continue
print(parentname, 'IS present in model')
got_iscname, got_pval_r, _got_ival_r = self.node(chainidx)
print(self, 'idx', chainidx, got_iscname.pyname())
assert parentname == got_iscname.pyname(), (
"chain points to unexpected name, idx",
idx,
)
assert self.testcase.model[parentname].cobj == got_pval_r
chainidx += 1
assert (
self.length() == chainidx
), "chain length does not match"
def node(self, level):
iscname = ISCName()
got_pval_r = ffi.new("void **")
got_ival_r = ffi.new("uint32_t *")
isclibs.dns_qpchain_node(
self.cchain, level, iscname.cobj, got_pval_r, got_ival_r
)
print(
id(self),
"qpchain_node",
"\n-> returned: ",
iscname.pyname(),
got_pval_r[0],
got_ival_r[0],
)
return iscname, got_pval_r[0], got_ival_r[0]
class QPIterator:
def __init__(self, testcase):
self.testcase = testcase
self.iter_generation = testcase.generation
self.qp = testcase.qp
self.citer = ffi.new("dns_qpiter_t *")
# print(id(self), isclibs.dns_qpiter_init)
isclibs.dns_qpiter_init(self.qp, self.citer)
self.model = testcase.model.copy()
self.sorted = sorted(self.model)
self.position = None
def set_to_predecessor(self, name: dns.name.Name):
self.position = self._find_predecesor(name)
def set_to_name(self, name: dns.name.Name):
self.position = self.sorted.index(name)
def _find_predecesor(self, lookup: dns.name.Name):
"""ridiculously ineffective method for finding closest predecesor of a given name"""
print(self.sorted)
for reversed_idx, present in enumerate(reversed(self.sorted)):
print(present, "?? < ??", lookup)
if present < lookup:
print("yes!")
idx = len(self.sorted) - 1 - reversed_idx
print(
"_find_predecesor regular, idx",
len(self.sorted) - 1 - reversed_idx,
self.sorted[idx],
)
return idx
print("_find_predecesor wraparound", len(self.sorted) - 1)
if len(self.sorted) > 0:
# predecessor is BEFORE the first existing name, wrap around to the last name
return len(self.sorted) - 1
else:
# predecessor does not exist at all - an empty QP
return None
def _step(self, cfunc):
iscname = ISCName()
got_pval_r = ffi.new("void **")
got_ival_r = ffi.new("uint32_t *")
got_ret = cfunc(self.citer, iscname.cobj, got_pval_r, got_ival_r)
print(
id(self),
"_step",
cfunc,
"\n-> returned: ",
got_ret,
iscname.pyname(),
got_pval_r[0],
got_ival_r[0],
)
return got_ret, iscname, got_pval_r[0], got_ival_r[0]
def _check_return_values(self, got_iscname, got_pval_r, _got_ival_r):
assert self.position is not None, "usage error in test script"
exp_pyname = self.sorted[self.position]
exp_iscname = self.model[exp_pyname]
assert exp_pyname == got_iscname.pyname()
assert exp_iscname.cobj == got_pval_r
def is_valid(self):
"""Check if QP this iterator referenced is supposed to be still valid"""
return self.iter_generation == self.testcase.generation
def next_(self):
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
isclibs.dns_qpiter_next
)
if len(self.model) == 0 or self.position == len(self.model) - 1:
assert got_ret == isclibs.ISC_R_NOMORE
self.position = None
else:
assert got_ret == isclibs.ISC_R_SUCCESS
if self.position is None:
self.position = 0
else:
self.position += 1
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
return got_ret, got_iscname, got_pval_r, got_ival_r
def prev(self):
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
isclibs.dns_qpiter_prev
)
if len(self.model) == 0 or self.position == 0:
assert got_ret == isclibs.ISC_R_NOMORE
self.position = None
else:
assert got_ret == isclibs.ISC_R_SUCCESS
if self.position is None:
self.position = len(self.model) - 1
else:
self.position -= 1
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
return got_ret, got_iscname, got_pval_r, got_ival_r
def current(self):
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
isclibs.dns_qpiter_current
)
if self.position is None:
assert got_ret == isclibs.ISC_R_FAILURE
return
assert got_ret == isclibs.ISC_R_SUCCESS
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
return got_ret, got_iscname, got_pval_r, got_ival_r
class BareQPTest(RuleBasedStateMachine):
def __init__(self):
super().__init__()
self.generation = 0
print("\n\nTEST RESTART FROM SCRATCH, GENERATION", self.generation)
self.qpptr = ffi.new("dns_qp_t **")
isclibs.dns_qp_create(MCTX, ffi.addressof(isclibs.qp_methods), NULL, self.qpptr)
self.qp = self.qpptr[0]
self.model = {}
self.iter_ = QPIterator(self)
self.chain = QPChain(self)
names = Bundle("names")
iterators = Bundle("iterators")
def invalidate_refs(self):
"""Mark current QP as changed - iterators which depend on unchanged state are now invalid"""
self.generation += 1
return # TODO
self.iter_ = QPIterator(self)
self.chain = QPChain(self)
print("GENERATION ", self.generation)
@rule(target=names, pyname=dns_names())
def add_random(self, pyname):
event("ADD random")
return self._add(pyname)
@precondition(lambda self: len(self.model) > 0)
@rule(target=names, pyname=subdomains(names))
def add_subdomain(self, pyname):
event("ADD subdomain")
return self._add(pyname)
def _add(self, pyname):
iscname = ISCName(pyname)
ret = isclibs.dns_qp_insert(self.qp, iscname.cobj, 0)
print("insert", pyname, ret)
event("INSERT", ret)
if pyname not in self.model:
assert ret == isclibs.ISC_R_SUCCESS
self.model[pyname] = iscname
else:
assert ret == isclibs.ISC_R_EXISTS
self.invalidate_refs()
return pyname
@rule(pyname=names)
def delete(self, pyname):
print("DELETENAME", pyname)
exists = pyname in self.model
iscname = ISCName(pyname)
pval = ffi.new("void **")
ret = isclibs.dns_qp_deletename(self.qp, iscname.cobj, pval, NULL)
event("DELETENAME", ret)
if exists:
assert ret == isclibs.ISC_R_SUCCESS
assert pval[0] == self.model[pyname].cobj
del self.model[pyname]
else:
assert ret == isclibs.ISC_R_NOTFOUND
self.invalidate_refs()
def iter_init(self):
event("init")
self.iter_ = QPIterator(self)
@precondition(lambda self: self.iter_.is_valid())
@rule()
def iter_next(self):
if not self.iter_.is_valid():
event("iter invalid")
return
event("next", self.iter_.position)
self.iter_.next_()
@precondition(lambda self: self.iter_.is_valid())
@rule()
def iter_prev(self):
if not self.iter_.is_valid():
event("iter invalid")
return
event("prev", self.iter_.position)
self.iter_.prev()
@precondition(lambda self: self.iter_.is_valid())
@rule()
def iter_current(self):
if not self.iter_.is_valid():
event("iter invalid")
return
event("current")
self.iter_.current()
@rule(pylookupname=dns_names())
def lookup_random(self, pylookupname):
return self._lookup(pylookupname)
@rule(pylookupname=names)
def lookup_known(self, pylookupname):
return self._lookup(pylookupname)
@precondition(lambda self: len(self.model) > 0)
@rule(pylookupname=subdomains(names))
def lookup_subdomain(self, pylookupname):
return self._lookup(pylookupname)
def _lookup(self, pylookupname):
outiter = QPIterator(self)
lookupname = ISCName(pylookupname)
foundname = ISCName()
ret = isclibs.dns_qp_lookup(
self.qp,
lookupname.cobj,
foundname.cobj,
outiter.citer,
self.chain.cchain,
NULL,
NULL,
)
print("LOOKUP", ret, pylookupname)
event("LOOKUP", ret)
# verify that no unepected parent name exists in our model
if ret == isclibs.ISC_R_NOTFOUND:
# no parent can be present, not even the root
common_labels = 0
outiter.set_to_predecessor(pylookupname)
elif ret == isclibs.DNS_R_PARTIALMATCH:
assert (
foundname.pyname() < pylookupname
), "foundname is not a subdomain of looked up name"
common_labels = len(foundname.pyname())
outiter.set_to_predecessor(pylookupname)
elif ret == isclibs.ISC_R_SUCCESS:
# exact match!
assert pylookupname == foundname.pyname()
common_labels = len(pylookupname)
outiter.set_to_name(pylookupname)
else:
raise NotImplementedError(ret)
for splitidx in range(len(pylookupname), common_labels, -1):
parentname = pylookupname.split(splitidx)[1]
assert (
parentname not in self.model
), "found parent node which reportedly does not exist"
self.chain.print()
# verify chain produced by lookup
self.chain.check(pylookupname)
# iterator must point to the foundname or predecessor
outiter.current()
# overwrite the previous iterator with the one produced by lookup()
# this should allow the state machine to excercise iteration after lookup
self.iter_ = outiter
@rule()
def values_agree_forward(self):
"""Iterate through all values and check ordering"""
tmp_iter = QPIterator(self)
event("values_agree_forward", len(tmp_iter.model))
qp_count = 0
while (got_ret := tmp_iter.next_()[0]) == isclibs.ISC_R_SUCCESS:
qp_count += 1
assert qp_count == len(tmp_iter.model)
@rule()
def values_agree_backwards(self):
"""Iterate through all values and check ordering"""
tmp_iter = QPIterator(self)
event("values_agree_backwards", len(tmp_iter.model))
qp_count = 0
while (got_ret := tmp_iter.prev()[0]) == isclibs.ISC_R_SUCCESS:
qp_count += 1
assert qp_count == len(tmp_iter.model)
TestTrees = BareQPTest.TestCase
TestTrees.settings = hypothesis.settings(
max_examples=1000,
deadline=None,
# stateful_step_count=10000,
# suppress_health_check=[hypothesis.HealthCheck.large_base_example, hypothesis.HealthCheck.too_slow]
)
# Or just run with pytest's unittest support
if __name__ == "__main__":
state = BareQPTest()
state.add_random(dns.name.from_text(r"a."))
state.add_random(dns.name.from_text(r"d.b.a."))
state.add_random(dns.name.from_text(r"z.d.b.a."))
state.lookup_subdomain(dns.name.from_text(r"f.c.b.a."))
# unittest.main()

View File

@@ -0,0 +1,152 @@
#!/usr/bin/python
from cffi import FFI
ffibuilder = FFI()
# cdef() expects a single string declaring the C types, functions and
# globals needed to use the shared object. It must be in valid C syntax.
ffibuilder.cdef(
"""
typedef enum {ISC_R_SUCCESS, ISC_R_EXISTS, ISC_R_NOTFOUND, ISC_R_NOMORE, ISC_R_FAILURE, DNS_R_PARTIALMATCH, ...} isc_result_t;
typedef ... isc_mem_t;
typedef ... isc_buffer_t;
typedef struct { ...; } dns_name_t;
typedef struct { ...; } dns_fixedname_t;
typedef int... dns_qpshift_t;
typedef dns_qpshift_t dns_qpkey_t[...];
typedef ... dns_qp_t;
typedef ... dns_qpmulti_t;
typedef union { ...; } dns_qpreadable_t;
typedef struct { ...; } dns_qpmethods_t;
typedef struct { ...; } dns_qpiter_t;
typedef struct { ...; } dns_qpchain_t;
// FIXME: first argument's type is modified to make it work with CFFI
void
dns_qpiter_init(dns_qp_t *qpr, dns_qpiter_t *qpi);
isc_result_t
dns_qpiter_next(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
uint32_t *ival_r);
isc_result_t
dns_qpiter_prev(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
uint32_t *ival_r);
isc_result_t
dns_qpiter_current(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
uint32_t *ival_r);
void
isc__mem_create(isc_mem_t **);
void
isc_mem_attach(isc_mem_t *, isc_mem_t **);
isc_result_t
dns_name_fromstring(dns_name_t *target, const char *src,
const dns_name_t *origin, unsigned int options,
isc_mem_t *mctx);
void
dns_name_format(const dns_name_t *name, char *cp, unsigned int size);
static inline void
dns_name_init(dns_name_t *name, unsigned char *offsets);
dns_name_t *
dns_fixedname_initname(dns_fixedname_t *fixed);
isc_result_t
dns_name_downcase(const dns_name_t *source, dns_name_t *name,
isc_buffer_t *target);
void
dns_qpkey_toname(const dns_qpkey_t key, size_t keylen, dns_name_t *name);
size_t
dns_qpkey_fromname(dns_qpkey_t key, const dns_name_t *name);
void
dns_qp_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
dns_qp_t **qptp);
void
dns_qpmulti_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
dns_qpmulti_t **qpmp);
extern const dns_qpmethods_t qp_methods;
isc_result_t
dns_qp_insert(dns_qp_t *qp, void *pval, uint32_t ival);
isc_result_t
dns_qp_deletename(dns_qp_t *qp, const dns_name_t *name, void **pval_r,
uint32_t *ival_r);
isc_result_t
dns_qp_getname(dns_qpreadable_t qpr, const dns_name_t *name, void **pval_r,
uint32_t *ival_r);
// FIXME: first argument's type is modified to make it work with CFFI
isc_result_t
dns_qp_lookup(dns_qp_t *qpr, const dns_name_t *name,
dns_name_t *foundname, dns_qpiter_t *iter, dns_qpchain_t *chain,
void **pval_r, uint32_t *ival_r);
// FIXME: first argument's type is modified to make it work with CFFI
void
dns_qpchain_init(dns_qp_t *qpr, dns_qpchain_t *chain);
unsigned int
dns_qpchain_length(dns_qpchain_t *chain);
void
dns_qpchain_node(dns_qpchain_t *chain, unsigned int level, dns_name_t *name,
void **pval_r, uint32_t *ival_r);
"""
)
# set_source() gives the name of the python extension module to
# produce, and some C source code as a string. This C code needs
# to make the declarated functions, types and globals available,
# so it is often just the "#include".
ffibuilder.set_source(
"_qp_test_cffi",
"""
#include "isc/buffer.h"
#include "isc/mem.h"
#include "dns/name.h"
#include "dns/fixedname.h"
#include "dns/qp.h"
static void
noopref(void *uctx, void *pval, uint32_t ival) {}
static void
noopgetname(void *uctx, char *buf, size_t size) {}
size_t
qp_makekey(dns_qpkey_t key, void *uctx, void *pval,
uint32_t ival) {
dns_name_t *name = pval;
return dns_qpkey_fromname(key, name);
}
const dns_qpmethods_t qp_methods = {
noopref,
noopref,
qp_makekey,
noopgetname,
};
""",
libraries=["dns"],
include_dirs=["../../lib/isc/include", "../../lib/dns/include"],
)
if __name__ == "__main__":
ffibuilder.compile(
verbose=True,
)

4
tests/dns/run.sh Normal file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/bash
set -o nounset -o errexit -o xtrace
python qp_test_pythonbuild.py && LD_PRELOAD=/usr/lib/libjemalloc.so.2 pytest qp_test.py --hypothesis-show-statistics -k TestTrees

165
tests/dns/strategies.py Normal file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List
from warnings import warn
from hypothesis.strategies import (
binary,
builds,
composite,
integers,
just,
nothing,
permutations,
)
import dns.name
import dns.message
import dns.rdataclass
import dns.rdatatype
# LATER: Move this file so it can be easily reused.
@composite
def dns_names(
draw,
*,
prefix: dns.name.Name = dns.name.empty,
suffix: dns.name.Name = dns.name.root,
min_labels: int = 1,
max_labels: int = 128,
) -> dns.name.Name:
"""
This is a hypothesis strategy to be used for generating DNS names with given `prefix`, `suffix`
and with total number of labels specified by `min_labels` and `max labels`.
For example, calling
```
dns_names(
prefix=dns.name.from_text("test"),
suffix=dns.name.from_text("isc.org"),
max_labels=6
).example()
```
will result in names like `test.abc.isc.org.` or `test.abc.def.isc.org`.
There is no attempt to make the distribution of the generated names uniform in any way.
The strategy however minimizes towards shorter names with shorter labels.
It can be used with to build compound strategies, like this one which generates random DNS queries.
```
dns_queries = builds(
dns.message.make_query,
qname=dns_names(),
rdtype=dns_rdatatypes,
rdclass=dns_rdataclasses,
)
```
"""
prefix = prefix.relativize(dns.name.root)
suffix = suffix.derelativize(dns.name.root)
try:
outer_name = prefix + suffix
remaining_bytes = 255 - len(outer_name.to_wire())
assert remaining_bytes >= 0
except dns.name.NameTooLong:
warn(
"Maximal length name of name execeeded by prefix and suffix. Strategy won't generate any names.",
RuntimeWarning,
)
return draw(nothing())
minimum_number_of_labels_to_generate = max(0, min_labels - len(outer_name.labels))
maximum_number_of_labels_to_generate = max_labels - len(outer_name.labels)
if maximum_number_of_labels_to_generate < 0:
warn(
"Maximal number of labels execeeded by prefix and suffix. Strategy won't generate any names.",
RuntimeWarning,
)
return draw(nothing())
maximum_number_of_labels_to_generate = min(
maximum_number_of_labels_to_generate, remaining_bytes // 2
)
if maximum_number_of_labels_to_generate < minimum_number_of_labels_to_generate:
warn(
f"Minimal number set to {minimum_number_of_labels_to_generate}, but in {remaining_bytes} bytes there is only space for maximum of {maximum_number_of_labels_to_generate} labels.",
RuntimeWarning,
)
return draw(nothing())
if remaining_bytes == 0 or maximum_number_of_labels_to_generate == 0:
warn(
f"Strategy will return only one name ({outer_name}) as it exactly matches byte or label length limit.",
RuntimeWarning,
)
return draw(just(outer_name))
chosen_number_of_labels_to_generate = draw(
integers(
minimum_number_of_labels_to_generate, maximum_number_of_labels_to_generate
)
)
chosen_number_of_bytes_to_partion = draw(
integers(2 * chosen_number_of_labels_to_generate, remaining_bytes)
)
chosen_lengths_of_labels = draw(
_partition_bytes_to_labels(
chosen_number_of_bytes_to_partion, chosen_number_of_labels_to_generate
)
)
generated_labels = tuple(
draw(binary(min_size=l - 1, max_size=l - 1)) for l in chosen_lengths_of_labels
)
return dns.name.Name(prefix.labels + generated_labels + suffix.labels)
RDATACLASS_MAX = RDATATYPE_MAX = 65535
dns_rdataclasses = builds(dns.rdataclass.RdataClass, integers(0, RDATACLASS_MAX))
dns_rdataclasses_without_meta = dns_rdataclasses.filter(dns.rdataclass.is_metaclass)
dns_rdatatypes = builds(dns.rdatatype.RdataType, integers(0, RDATATYPE_MAX))
# NOTE: This should really be `dns_rdatatypes_without_meta = dns_rdatatypes_without_meta.filter(dns.rdatatype.is_metatype()`,
# but hypothesis then complains about the filter being too strict, so it is done in a “constructive” way.
dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns.rdatatype.OPT + 1, 127) | integers(256, RDATATYPE_MAX) # type: ignore
@composite
def _partition_bytes_to_labels(
draw, remaining_bytes: int, number_of_labels: int
) -> List[int]:
two_bytes_reserved_for_label = 2
# Reserve two bytes for each label
partition = [two_bytes_reserved_for_label] * number_of_labels
remaining_bytes -= two_bytes_reserved_for_label * number_of_labels
assert remaining_bytes >= 0
# Add a random number between 0 and the remainder to each partition
for i in range(number_of_labels):
added = draw(
integers(0, min(remaining_bytes, 64 - two_bytes_reserved_for_label))
)
partition[i] += added
remaining_bytes -= added
# NOTE: Some of the remaining bytes will usually not be assigned to any label, but we don't care.
return draw(permutations(partition))