Compare commits
23 Commits
ondrej/sie
...
pspacek/hy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7294e2b4a9 | ||
|
|
07a4f7c6cd | ||
|
|
f78ac9e879 | ||
|
|
206002d37d | ||
|
|
9121d1a98c | ||
|
|
ee65ee0ad0 | ||
|
|
4dd0797316 | ||
|
|
854a9ad735 | ||
|
|
8c62a779a2 | ||
|
|
590e434700 | ||
|
|
decec5b53b | ||
|
|
8f908624b5 | ||
|
|
9a77309024 | ||
|
|
c5376e49b2 | ||
|
|
f6da2db1b2 | ||
|
|
27e4001f77 | ||
|
|
ed3834578f | ||
|
|
91cb7306f7 | ||
|
|
68c194a96e | ||
|
|
60d4d8d21f | ||
|
|
d90c7f238d | ||
|
|
4c36408f34 | ||
|
|
fb23539fb8 |
6
tests/dns/.gitignore
vendored
6
tests/dns/.gitignore
vendored
@@ -1,4 +1,6 @@
|
||||
/zone.data
|
||||
/.hypothesis
|
||||
/_*_cffi.*
|
||||
/badcache.out
|
||||
/testdata/dnstap/dnstap.file
|
||||
/testdata/master/master18.data
|
||||
/badcache.out
|
||||
/zone.data
|
||||
|
||||
110
tests/dns/name_test.py
Executable file
110
tests/dns/name_test.py
Executable file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
#
|
||||
# SPDX-License-Identifier: MPL-2.0
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
|
||||
"""
|
||||
Example property-based test for dns_name_ API.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
# in FIPs mode md5 fails so we need 4.41.2 or later which does not use md5
|
||||
try:
|
||||
import hashlib
|
||||
|
||||
hashlib.md5(b"1234")
|
||||
pytest.importorskip("hypothesis")
|
||||
except ValueError:
|
||||
pytest.importorskip("hypothesis", minversion="4.41.2")
|
||||
|
||||
from hypothesis import assume, example, given
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.name
|
||||
|
||||
from strategies import dns_names
|
||||
|
||||
from _name_test_cffi import ffi
|
||||
from _name_test_cffi import lib as isclibs
|
||||
|
||||
NULL = ffi.NULL
|
||||
|
||||
# MCTXP = ffi.new('isc_mem_t **')
|
||||
# isclibs.isc__mem_create(MCTXP)
|
||||
|
||||
|
||||
class ISCName:
|
||||
"""dns_name_t instance with a private fixed buffer"""
|
||||
|
||||
def __init__(self, from_text=None):
|
||||
self.fixedname = ffi.new("dns_fixedname_t *")
|
||||
self.name = isclibs.dns_fixedname_initname(self.fixedname)
|
||||
# self.cctx = ffi.new("dns_compress_t *")
|
||||
# self.dctx = ffi.new("dns_decompress_t *")
|
||||
self.formatbuf = ffi.new("char[1024]") # DNS_NAME_FORMATSIZE
|
||||
|
||||
if from_text is not None:
|
||||
assert (
|
||||
isclibs.dns_name_fromstring(
|
||||
self.name, from_text.encode("ascii"), NULL, 0, NULL
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
def format(self):
|
||||
isclibs.dns_name_format(self.name, self.formatbuf, len(self.formatbuf))
|
||||
return ffi.string(self.formatbuf).decode("ascii")
|
||||
|
||||
|
||||
@given(pyname=dns_names(suffix=dns.name.root))
|
||||
def test_totext_fromtext_roundtrip(pyname: dns.name.Name) -> None:
|
||||
"""
|
||||
text formatting and parsing roundtrip must not change the name
|
||||
|
||||
dnspython to_text -> ISC from_string -> ISC format -> dnspython from_text
|
||||
"""
|
||||
iscname = ISCName(from_text=str(pyname))
|
||||
assert pyname == dns.name.from_text(iscname.format())
|
||||
|
||||
|
||||
@given(pyname=dns_names(suffix=dns.name.root))
|
||||
def test_downcase(pyname: dns.name.Name) -> None:
|
||||
downcased = ISCName(from_text=str(pyname))
|
||||
assert isclibs.dns_name_hash(downcased.name)
|
||||
|
||||
isclibs.dns_name_downcase(downcased.name, downcased.name, NULL)
|
||||
assert not any(
|
||||
letter.isupper() for letter in downcased.format()
|
||||
), "downcasing removes all ASCII uppercase letters"
|
||||
|
||||
|
||||
@given(pyname=dns_names(suffix=dns.name.root))
|
||||
def test_hash_downcase(pyname: dns.name.Name) -> None:
|
||||
"""downcasing must not affect hash value"""
|
||||
orig = ISCName(from_text=str(pyname))
|
||||
orig_hash = isclibs.dns_name_hash(orig.name)
|
||||
|
||||
downcased = ISCName(from_text=str(pyname))
|
||||
assert isclibs.dns_name_hash(downcased.name) == orig_hash, "hash is stable"
|
||||
|
||||
isclibs.dns_name_downcase(downcased.name, downcased.name, NULL)
|
||||
assert not any(
|
||||
letter.isupper() for letter in downcased.format()
|
||||
), "downcasing actually works"
|
||||
|
||||
assert pyname == dns.name.from_text(
|
||||
downcased.format()
|
||||
), "downcasing must not change semantic value"
|
||||
|
||||
downcased_hash = isclibs.dns_name_hash(downcased.name)
|
||||
assert orig_hash == downcased_hash, "downcasing must not change hash value"
|
||||
69
tests/dns/name_test_pythonbuild.py
Normal file
69
tests/dns/name_test_pythonbuild.py
Normal file
@@ -0,0 +1,69 @@
|
||||
#!/usr/bin/python
|
||||
from cffi import FFI
|
||||
|
||||
ffibuilder = FFI()
|
||||
|
||||
# cdef() expects a single string declaring the C types, functions and
|
||||
# globals needed to use the shared object. It must be in valid C syntax.
|
||||
ffibuilder.cdef(
|
||||
"""
|
||||
typedef int... isc_result_t;
|
||||
typedef ... isc_mem_t;
|
||||
typedef ... isc_buffer_t;
|
||||
|
||||
typedef struct { ...; } dns_compress_t;
|
||||
typedef int... dns_decompress_t;
|
||||
|
||||
typedef struct { ...; } dns_name_t;
|
||||
typedef struct { ...; } dns_fixedname_t;
|
||||
|
||||
void
|
||||
isc__mem_create(isc_mem_t **);
|
||||
|
||||
void
|
||||
isc_mem_attach(isc_mem_t *, isc_mem_t **);
|
||||
|
||||
isc_result_t
|
||||
dns_name_fromstring(dns_name_t *target, const char *src,
|
||||
const dns_name_t *origin, unsigned int options,
|
||||
isc_mem_t *mctx);
|
||||
|
||||
void
|
||||
dns_name_format(const dns_name_t *name, char *cp, unsigned int size);
|
||||
|
||||
static inline void
|
||||
dns_name_init(dns_name_t *name, unsigned char *offsets);
|
||||
|
||||
dns_name_t *
|
||||
dns_fixedname_initname(dns_fixedname_t *fixed);
|
||||
|
||||
isc_result_t
|
||||
dns_name_downcase(const dns_name_t *source, dns_name_t *name,
|
||||
isc_buffer_t *target);
|
||||
|
||||
uint32_t
|
||||
dns_name_hash(const dns_name_t *name);
|
||||
"""
|
||||
)
|
||||
|
||||
# set_source() gives the name of the python extension module to
|
||||
# produce, and some C source code as a string. This C code needs
|
||||
# to make the declarated functions, types and globals available,
|
||||
# so it is often just the "#include".
|
||||
ffibuilder.set_source(
|
||||
"_name_test_cffi",
|
||||
"""
|
||||
#include "isc/buffer.h"
|
||||
#include "isc/mem.h"
|
||||
#include "dns/name.h"
|
||||
#include "dns/compress.h"
|
||||
#include "dns/fixedname.h"
|
||||
""",
|
||||
libraries=["dns"],
|
||||
include_dirs=["../../lib/isc/include", "../../lib/dns/include"],
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
ffibuilder.compile(
|
||||
verbose=True,
|
||||
)
|
||||
512
tests/dns/qp_test.py
Executable file
512
tests/dns/qp_test.py
Executable file
@@ -0,0 +1,512 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
#
|
||||
# SPDX-License-Identifier: MPL-2.0
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
|
||||
"""
|
||||
Example property-based test for dns_name_ API.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
# in FIPs mode md5 fails so we need 4.41.2 or later which does not use md5
|
||||
try:
|
||||
import hashlib
|
||||
|
||||
hashlib.md5(b"1234")
|
||||
pytest.importorskip("hypothesis")
|
||||
except ValueError:
|
||||
pytest.importorskip("hypothesis", minversion="4.41.2")
|
||||
|
||||
from hypothesis import assume, example, event, given
|
||||
from hypothesis.stateful import Bundle, RuleBasedStateMachine, rule, precondition
|
||||
import hypothesis
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.name
|
||||
|
||||
from strategies import dns_names, composite
|
||||
|
||||
from _qp_test_cffi import ffi
|
||||
from _qp_test_cffi import lib as isclibs
|
||||
|
||||
NULL = ffi.NULL
|
||||
|
||||
MCTXP = ffi.new("isc_mem_t **")
|
||||
isclibs.isc__mem_create(MCTXP)
|
||||
MCTX = MCTXP[0]
|
||||
|
||||
|
||||
def event(*args):
|
||||
pass
|
||||
|
||||
|
||||
#def print(*args):
|
||||
# pass
|
||||
|
||||
|
||||
@composite
|
||||
def subdomains(draw, named_bundle):
|
||||
parent = draw(named_bundle)
|
||||
# the parent name has less then two bytes left, no way to add a subdomain to it
|
||||
if len(parent) + sum(map(len, parent)) > 253:
|
||||
return parent
|
||||
subdomain = draw(dns_names(suffix=parent))
|
||||
return subdomain
|
||||
|
||||
|
||||
class ISCName:
|
||||
"""
|
||||
dns_name_t instance with a private fixed buffer
|
||||
|
||||
Make sure Python keeps reference to this object as long
|
||||
as it can be referenced from the C side.
|
||||
"""
|
||||
|
||||
def __init__(self, initval=None):
|
||||
self.fixedname = ffi.new("dns_fixedname_t *")
|
||||
self.cobj = isclibs.dns_fixedname_initname(self.fixedname)
|
||||
# self.cctx = ffi.new("dns_compress_t *")
|
||||
# self.dctx = ffi.new("dns_decompress_t *")
|
||||
self.formatbuf = ffi.new("char[1024]") # DNS_NAME_FORMATSIZE
|
||||
|
||||
if initval is None:
|
||||
return
|
||||
|
||||
if isinstance(initval, dns.name.Name):
|
||||
initval = str(initval)
|
||||
if isinstance(initval, str):
|
||||
assert (
|
||||
isclibs.dns_name_fromstring(
|
||||
self.cobj, initval.encode("ascii"), NULL, 0, NULL
|
||||
)
|
||||
== 0
|
||||
)
|
||||
return
|
||||
raise NotImplementedError(type(initval))
|
||||
|
||||
def cformat(self):
|
||||
isclibs.dns_name_format(self.cobj, self.formatbuf, len(self.formatbuf))
|
||||
return ffi.string(self.formatbuf).decode("ascii")
|
||||
|
||||
def pyname(self):
|
||||
return dns.name.from_text(self.cformat())
|
||||
|
||||
|
||||
@given(pyname_source=dns_names(suffix=dns.name.root))
|
||||
def test_fromname_toname_roundtrip(pyname_source: dns.name.Name) -> None:
|
||||
"""
|
||||
name to/from qpkey must not change the name
|
||||
"""
|
||||
iscname_source = ISCName(pyname_source)
|
||||
assert pyname_source == iscname_source.pyname()
|
||||
|
||||
qpkey = ffi.new("dns_qpkey_t *")
|
||||
qpkeysize = isclibs.dns_qpkey_fromname(qpkey[0], iscname_source.cobj)
|
||||
|
||||
iscname_target = ISCName()
|
||||
isclibs.dns_qpkey_toname(qpkey[0], qpkeysize, iscname_target.cobj)
|
||||
|
||||
pyname_target = iscname_target.pyname()
|
||||
assert pyname_source == pyname_target
|
||||
|
||||
|
||||
class QPChain:
|
||||
def __init__(self, testcase):
|
||||
self.testcase = testcase
|
||||
self.iter_generation = testcase.generation
|
||||
self.qp = testcase.qp
|
||||
|
||||
self.cchain = ffi.new("dns_qpchain_t *")
|
||||
# print(id(self), isclibs.dns_qpiter_init)
|
||||
isclibs.dns_qpchain_init(self.qp, self.cchain)
|
||||
|
||||
def length(self):
|
||||
ret = isclibs.dns_qpchain_length(self.cchain)
|
||||
print(self, 'length', ret)
|
||||
return ret
|
||||
|
||||
def print(self):
|
||||
print('C chain structure print out', self)
|
||||
for chainidx in range(self.length()):
|
||||
got_iscname, got_pval_r, _got_ival_r = self.node(chainidx)
|
||||
print('idx', chainidx, got_iscname.pyname())
|
||||
|
||||
def check(self, pylookupname: dns.name.Name):
|
||||
print('check chain for lookup', pylookupname)
|
||||
print('model', sorted(self.testcase.model))
|
||||
chainidx = 0
|
||||
for idx in range(1, len(pylookupname) + 1):
|
||||
parentname = pylookupname.split(idx)[1]
|
||||
assert self.length() >= chainidx
|
||||
if parentname not in self.testcase.model:
|
||||
print(parentname, 'NOT present in model')
|
||||
continue
|
||||
print(parentname, 'IS present in model')
|
||||
got_iscname, got_pval_r, _got_ival_r = self.node(chainidx)
|
||||
print(self, 'idx', chainidx, got_iscname.pyname())
|
||||
assert parentname == got_iscname.pyname(), (
|
||||
"chain points to unexpected name, idx",
|
||||
idx,
|
||||
)
|
||||
assert self.testcase.model[parentname].cobj == got_pval_r
|
||||
|
||||
chainidx += 1
|
||||
|
||||
assert (
|
||||
self.length() == chainidx
|
||||
), "chain length does not match"
|
||||
|
||||
def node(self, level):
|
||||
iscname = ISCName()
|
||||
got_pval_r = ffi.new("void **")
|
||||
got_ival_r = ffi.new("uint32_t *")
|
||||
isclibs.dns_qpchain_node(
|
||||
self.cchain, level, iscname.cobj, got_pval_r, got_ival_r
|
||||
)
|
||||
print(
|
||||
id(self),
|
||||
"qpchain_node",
|
||||
"\n-> returned: ",
|
||||
iscname.pyname(),
|
||||
got_pval_r[0],
|
||||
got_ival_r[0],
|
||||
)
|
||||
return iscname, got_pval_r[0], got_ival_r[0]
|
||||
|
||||
|
||||
class QPIterator:
|
||||
def __init__(self, testcase):
|
||||
self.testcase = testcase
|
||||
self.iter_generation = testcase.generation
|
||||
self.qp = testcase.qp
|
||||
|
||||
self.citer = ffi.new("dns_qpiter_t *")
|
||||
# print(id(self), isclibs.dns_qpiter_init)
|
||||
isclibs.dns_qpiter_init(self.qp, self.citer)
|
||||
|
||||
self.model = testcase.model.copy()
|
||||
self.sorted = sorted(self.model)
|
||||
self.position = None
|
||||
|
||||
def set_to_predecessor(self, name: dns.name.Name):
|
||||
self.position = self._find_predecesor(name)
|
||||
|
||||
def set_to_name(self, name: dns.name.Name):
|
||||
self.position = self.sorted.index(name)
|
||||
|
||||
def _find_predecesor(self, lookup: dns.name.Name):
|
||||
"""ridiculously ineffective method for finding closest predecesor of a given name"""
|
||||
print(self.sorted)
|
||||
for reversed_idx, present in enumerate(reversed(self.sorted)):
|
||||
print(present, "?? < ??", lookup)
|
||||
if present < lookup:
|
||||
print("yes!")
|
||||
idx = len(self.sorted) - 1 - reversed_idx
|
||||
print(
|
||||
"_find_predecesor regular, idx",
|
||||
len(self.sorted) - 1 - reversed_idx,
|
||||
self.sorted[idx],
|
||||
)
|
||||
return idx
|
||||
print("_find_predecesor wraparound", len(self.sorted) - 1)
|
||||
if len(self.sorted) > 0:
|
||||
# predecessor is BEFORE the first existing name, wrap around to the last name
|
||||
return len(self.sorted) - 1
|
||||
else:
|
||||
# predecessor does not exist at all - an empty QP
|
||||
return None
|
||||
|
||||
def _step(self, cfunc):
|
||||
iscname = ISCName()
|
||||
got_pval_r = ffi.new("void **")
|
||||
got_ival_r = ffi.new("uint32_t *")
|
||||
got_ret = cfunc(self.citer, iscname.cobj, got_pval_r, got_ival_r)
|
||||
print(
|
||||
id(self),
|
||||
"_step",
|
||||
cfunc,
|
||||
"\n-> returned: ",
|
||||
got_ret,
|
||||
iscname.pyname(),
|
||||
got_pval_r[0],
|
||||
got_ival_r[0],
|
||||
)
|
||||
return got_ret, iscname, got_pval_r[0], got_ival_r[0]
|
||||
|
||||
def _check_return_values(self, got_iscname, got_pval_r, _got_ival_r):
|
||||
assert self.position is not None, "usage error in test script"
|
||||
exp_pyname = self.sorted[self.position]
|
||||
exp_iscname = self.model[exp_pyname]
|
||||
assert exp_pyname == got_iscname.pyname()
|
||||
assert exp_iscname.cobj == got_pval_r
|
||||
|
||||
def is_valid(self):
|
||||
"""Check if QP this iterator referenced is supposed to be still valid"""
|
||||
return self.iter_generation == self.testcase.generation
|
||||
|
||||
def next_(self):
|
||||
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
|
||||
isclibs.dns_qpiter_next
|
||||
)
|
||||
if len(self.model) == 0 or self.position == len(self.model) - 1:
|
||||
assert got_ret == isclibs.ISC_R_NOMORE
|
||||
self.position = None
|
||||
else:
|
||||
assert got_ret == isclibs.ISC_R_SUCCESS
|
||||
if self.position is None:
|
||||
self.position = 0
|
||||
else:
|
||||
self.position += 1
|
||||
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
|
||||
return got_ret, got_iscname, got_pval_r, got_ival_r
|
||||
|
||||
def prev(self):
|
||||
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
|
||||
isclibs.dns_qpiter_prev
|
||||
)
|
||||
if len(self.model) == 0 or self.position == 0:
|
||||
assert got_ret == isclibs.ISC_R_NOMORE
|
||||
self.position = None
|
||||
else:
|
||||
assert got_ret == isclibs.ISC_R_SUCCESS
|
||||
if self.position is None:
|
||||
self.position = len(self.model) - 1
|
||||
else:
|
||||
self.position -= 1
|
||||
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
|
||||
return got_ret, got_iscname, got_pval_r, got_ival_r
|
||||
|
||||
def current(self):
|
||||
got_ret, got_iscname, got_pval_r, got_ival_r = self._step(
|
||||
isclibs.dns_qpiter_current
|
||||
)
|
||||
|
||||
if self.position is None:
|
||||
assert got_ret == isclibs.ISC_R_FAILURE
|
||||
return
|
||||
|
||||
assert got_ret == isclibs.ISC_R_SUCCESS
|
||||
self._check_return_values(got_iscname, got_pval_r, got_ival_r)
|
||||
return got_ret, got_iscname, got_pval_r, got_ival_r
|
||||
|
||||
|
||||
class BareQPTest(RuleBasedStateMachine):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.generation = 0
|
||||
print("\n\nTEST RESTART FROM SCRATCH, GENERATION", self.generation)
|
||||
|
||||
self.qpptr = ffi.new("dns_qp_t **")
|
||||
isclibs.dns_qp_create(MCTX, ffi.addressof(isclibs.qp_methods), NULL, self.qpptr)
|
||||
self.qp = self.qpptr[0]
|
||||
|
||||
self.model = {}
|
||||
self.iter_ = QPIterator(self)
|
||||
self.chain = QPChain(self)
|
||||
|
||||
names = Bundle("names")
|
||||
iterators = Bundle("iterators")
|
||||
|
||||
def invalidate_refs(self):
|
||||
"""Mark current QP as changed - iterators which depend on unchanged state are now invalid"""
|
||||
self.generation += 1
|
||||
return # TODO
|
||||
|
||||
self.iter_ = QPIterator(self)
|
||||
self.chain = QPChain(self)
|
||||
print("GENERATION ", self.generation)
|
||||
|
||||
@rule(target=names, pyname=dns_names())
|
||||
def add_random(self, pyname):
|
||||
event("ADD random")
|
||||
return self._add(pyname)
|
||||
|
||||
@precondition(lambda self: len(self.model) > 0)
|
||||
@rule(target=names, pyname=subdomains(names))
|
||||
def add_subdomain(self, pyname):
|
||||
event("ADD subdomain")
|
||||
return self._add(pyname)
|
||||
|
||||
def _add(self, pyname):
|
||||
iscname = ISCName(pyname)
|
||||
|
||||
ret = isclibs.dns_qp_insert(self.qp, iscname.cobj, 0)
|
||||
print("insert", pyname, ret)
|
||||
event("INSERT", ret)
|
||||
if pyname not in self.model:
|
||||
assert ret == isclibs.ISC_R_SUCCESS
|
||||
self.model[pyname] = iscname
|
||||
else:
|
||||
assert ret == isclibs.ISC_R_EXISTS
|
||||
|
||||
self.invalidate_refs()
|
||||
return pyname
|
||||
|
||||
@rule(pyname=names)
|
||||
def delete(self, pyname):
|
||||
print("DELETENAME", pyname)
|
||||
exists = pyname in self.model
|
||||
|
||||
iscname = ISCName(pyname)
|
||||
|
||||
pval = ffi.new("void **")
|
||||
ret = isclibs.dns_qp_deletename(self.qp, iscname.cobj, pval, NULL)
|
||||
event("DELETENAME", ret)
|
||||
if exists:
|
||||
assert ret == isclibs.ISC_R_SUCCESS
|
||||
assert pval[0] == self.model[pyname].cobj
|
||||
del self.model[pyname]
|
||||
else:
|
||||
assert ret == isclibs.ISC_R_NOTFOUND
|
||||
self.invalidate_refs()
|
||||
|
||||
def iter_init(self):
|
||||
event("init")
|
||||
self.iter_ = QPIterator(self)
|
||||
|
||||
@precondition(lambda self: self.iter_.is_valid())
|
||||
@rule()
|
||||
def iter_next(self):
|
||||
if not self.iter_.is_valid():
|
||||
event("iter invalid")
|
||||
return
|
||||
|
||||
event("next", self.iter_.position)
|
||||
self.iter_.next_()
|
||||
|
||||
@precondition(lambda self: self.iter_.is_valid())
|
||||
@rule()
|
||||
def iter_prev(self):
|
||||
if not self.iter_.is_valid():
|
||||
event("iter invalid")
|
||||
return
|
||||
|
||||
event("prev", self.iter_.position)
|
||||
self.iter_.prev()
|
||||
|
||||
@precondition(lambda self: self.iter_.is_valid())
|
||||
@rule()
|
||||
def iter_current(self):
|
||||
if not self.iter_.is_valid():
|
||||
event("iter invalid")
|
||||
return
|
||||
|
||||
event("current")
|
||||
self.iter_.current()
|
||||
|
||||
@rule(pylookupname=dns_names())
|
||||
def lookup_random(self, pylookupname):
|
||||
return self._lookup(pylookupname)
|
||||
|
||||
@rule(pylookupname=names)
|
||||
def lookup_known(self, pylookupname):
|
||||
return self._lookup(pylookupname)
|
||||
|
||||
@precondition(lambda self: len(self.model) > 0)
|
||||
@rule(pylookupname=subdomains(names))
|
||||
def lookup_subdomain(self, pylookupname):
|
||||
return self._lookup(pylookupname)
|
||||
|
||||
def _lookup(self, pylookupname):
|
||||
outiter = QPIterator(self)
|
||||
lookupname = ISCName(pylookupname)
|
||||
foundname = ISCName()
|
||||
ret = isclibs.dns_qp_lookup(
|
||||
self.qp,
|
||||
lookupname.cobj,
|
||||
foundname.cobj,
|
||||
outiter.citer,
|
||||
self.chain.cchain,
|
||||
NULL,
|
||||
NULL,
|
||||
)
|
||||
print("LOOKUP", ret, pylookupname)
|
||||
event("LOOKUP", ret)
|
||||
|
||||
# verify that no unepected parent name exists in our model
|
||||
if ret == isclibs.ISC_R_NOTFOUND:
|
||||
# no parent can be present, not even the root
|
||||
common_labels = 0
|
||||
outiter.set_to_predecessor(pylookupname)
|
||||
elif ret == isclibs.DNS_R_PARTIALMATCH:
|
||||
assert (
|
||||
foundname.pyname() < pylookupname
|
||||
), "foundname is not a subdomain of looked up name"
|
||||
common_labels = len(foundname.pyname())
|
||||
outiter.set_to_predecessor(pylookupname)
|
||||
elif ret == isclibs.ISC_R_SUCCESS:
|
||||
# exact match!
|
||||
assert pylookupname == foundname.pyname()
|
||||
common_labels = len(pylookupname)
|
||||
outiter.set_to_name(pylookupname)
|
||||
else:
|
||||
raise NotImplementedError(ret)
|
||||
|
||||
for splitidx in range(len(pylookupname), common_labels, -1):
|
||||
parentname = pylookupname.split(splitidx)[1]
|
||||
assert (
|
||||
parentname not in self.model
|
||||
), "found parent node which reportedly does not exist"
|
||||
|
||||
self.chain.print()
|
||||
# verify chain produced by lookup
|
||||
self.chain.check(pylookupname)
|
||||
|
||||
# iterator must point to the foundname or predecessor
|
||||
outiter.current()
|
||||
|
||||
# overwrite the previous iterator with the one produced by lookup()
|
||||
# this should allow the state machine to excercise iteration after lookup
|
||||
self.iter_ = outiter
|
||||
|
||||
@rule()
|
||||
def values_agree_forward(self):
|
||||
"""Iterate through all values and check ordering"""
|
||||
tmp_iter = QPIterator(self)
|
||||
event("values_agree_forward", len(tmp_iter.model))
|
||||
|
||||
qp_count = 0
|
||||
while (got_ret := tmp_iter.next_()[0]) == isclibs.ISC_R_SUCCESS:
|
||||
qp_count += 1
|
||||
|
||||
assert qp_count == len(tmp_iter.model)
|
||||
|
||||
@rule()
|
||||
def values_agree_backwards(self):
|
||||
"""Iterate through all values and check ordering"""
|
||||
tmp_iter = QPIterator(self)
|
||||
event("values_agree_backwards", len(tmp_iter.model))
|
||||
|
||||
qp_count = 0
|
||||
while (got_ret := tmp_iter.prev()[0]) == isclibs.ISC_R_SUCCESS:
|
||||
qp_count += 1
|
||||
|
||||
assert qp_count == len(tmp_iter.model)
|
||||
|
||||
|
||||
TestTrees = BareQPTest.TestCase
|
||||
TestTrees.settings = hypothesis.settings(
|
||||
max_examples=1000,
|
||||
deadline=None,
|
||||
# stateful_step_count=10000,
|
||||
# suppress_health_check=[hypothesis.HealthCheck.large_base_example, hypothesis.HealthCheck.too_slow]
|
||||
)
|
||||
|
||||
# Or just run with pytest's unittest support
|
||||
if __name__ == "__main__":
|
||||
state = BareQPTest()
|
||||
state.add_random(dns.name.from_text(r"a."))
|
||||
state.add_random(dns.name.from_text(r"d.b.a."))
|
||||
state.add_random(dns.name.from_text(r"z.d.b.a."))
|
||||
state.lookup_subdomain(dns.name.from_text(r"f.c.b.a."))
|
||||
# unittest.main()
|
||||
152
tests/dns/qp_test_pythonbuild.py
Normal file
152
tests/dns/qp_test_pythonbuild.py
Normal file
@@ -0,0 +1,152 @@
|
||||
#!/usr/bin/python
|
||||
from cffi import FFI
|
||||
|
||||
ffibuilder = FFI()
|
||||
|
||||
# cdef() expects a single string declaring the C types, functions and
|
||||
# globals needed to use the shared object. It must be in valid C syntax.
|
||||
ffibuilder.cdef(
|
||||
"""
|
||||
typedef enum {ISC_R_SUCCESS, ISC_R_EXISTS, ISC_R_NOTFOUND, ISC_R_NOMORE, ISC_R_FAILURE, DNS_R_PARTIALMATCH, ...} isc_result_t;
|
||||
typedef ... isc_mem_t;
|
||||
typedef ... isc_buffer_t;
|
||||
|
||||
typedef struct { ...; } dns_name_t;
|
||||
typedef struct { ...; } dns_fixedname_t;
|
||||
|
||||
typedef int... dns_qpshift_t;
|
||||
typedef dns_qpshift_t dns_qpkey_t[...];
|
||||
typedef ... dns_qp_t;
|
||||
typedef ... dns_qpmulti_t;
|
||||
typedef union { ...; } dns_qpreadable_t;
|
||||
typedef struct { ...; } dns_qpmethods_t;
|
||||
|
||||
typedef struct { ...; } dns_qpiter_t;
|
||||
typedef struct { ...; } dns_qpchain_t;
|
||||
|
||||
// FIXME: first argument's type is modified to make it work with CFFI
|
||||
void
|
||||
dns_qpiter_init(dns_qp_t *qpr, dns_qpiter_t *qpi);
|
||||
|
||||
isc_result_t
|
||||
dns_qpiter_next(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
|
||||
uint32_t *ival_r);
|
||||
isc_result_t
|
||||
dns_qpiter_prev(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
|
||||
uint32_t *ival_r);
|
||||
|
||||
isc_result_t
|
||||
dns_qpiter_current(dns_qpiter_t *qpi, dns_name_t *name, void **pval_r,
|
||||
uint32_t *ival_r);
|
||||
|
||||
void
|
||||
isc__mem_create(isc_mem_t **);
|
||||
|
||||
void
|
||||
isc_mem_attach(isc_mem_t *, isc_mem_t **);
|
||||
|
||||
isc_result_t
|
||||
dns_name_fromstring(dns_name_t *target, const char *src,
|
||||
const dns_name_t *origin, unsigned int options,
|
||||
isc_mem_t *mctx);
|
||||
|
||||
void
|
||||
dns_name_format(const dns_name_t *name, char *cp, unsigned int size);
|
||||
|
||||
static inline void
|
||||
dns_name_init(dns_name_t *name, unsigned char *offsets);
|
||||
|
||||
dns_name_t *
|
||||
dns_fixedname_initname(dns_fixedname_t *fixed);
|
||||
|
||||
isc_result_t
|
||||
dns_name_downcase(const dns_name_t *source, dns_name_t *name,
|
||||
isc_buffer_t *target);
|
||||
|
||||
void
|
||||
dns_qpkey_toname(const dns_qpkey_t key, size_t keylen, dns_name_t *name);
|
||||
|
||||
size_t
|
||||
dns_qpkey_fromname(dns_qpkey_t key, const dns_name_t *name);
|
||||
|
||||
void
|
||||
dns_qp_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
|
||||
dns_qp_t **qptp);
|
||||
|
||||
void
|
||||
dns_qpmulti_create(isc_mem_t *mctx, const dns_qpmethods_t *methods, void *uctx,
|
||||
dns_qpmulti_t **qpmp);
|
||||
|
||||
extern const dns_qpmethods_t qp_methods;
|
||||
|
||||
isc_result_t
|
||||
dns_qp_insert(dns_qp_t *qp, void *pval, uint32_t ival);
|
||||
|
||||
isc_result_t
|
||||
dns_qp_deletename(dns_qp_t *qp, const dns_name_t *name, void **pval_r,
|
||||
uint32_t *ival_r);
|
||||
|
||||
isc_result_t
|
||||
dns_qp_getname(dns_qpreadable_t qpr, const dns_name_t *name, void **pval_r,
|
||||
uint32_t *ival_r);
|
||||
|
||||
// FIXME: first argument's type is modified to make it work with CFFI
|
||||
isc_result_t
|
||||
dns_qp_lookup(dns_qp_t *qpr, const dns_name_t *name,
|
||||
dns_name_t *foundname, dns_qpiter_t *iter, dns_qpchain_t *chain,
|
||||
void **pval_r, uint32_t *ival_r);
|
||||
|
||||
// FIXME: first argument's type is modified to make it work with CFFI
|
||||
void
|
||||
dns_qpchain_init(dns_qp_t *qpr, dns_qpchain_t *chain);
|
||||
|
||||
unsigned int
|
||||
dns_qpchain_length(dns_qpchain_t *chain);
|
||||
|
||||
void
|
||||
dns_qpchain_node(dns_qpchain_t *chain, unsigned int level, dns_name_t *name,
|
||||
void **pval_r, uint32_t *ival_r);
|
||||
"""
|
||||
)
|
||||
|
||||
# set_source() gives the name of the python extension module to
|
||||
# produce, and some C source code as a string. This C code needs
|
||||
# to make the declarated functions, types and globals available,
|
||||
# so it is often just the "#include".
|
||||
ffibuilder.set_source(
|
||||
"_qp_test_cffi",
|
||||
"""
|
||||
#include "isc/buffer.h"
|
||||
#include "isc/mem.h"
|
||||
#include "dns/name.h"
|
||||
#include "dns/fixedname.h"
|
||||
#include "dns/qp.h"
|
||||
|
||||
static void
|
||||
noopref(void *uctx, void *pval, uint32_t ival) {}
|
||||
|
||||
static void
|
||||
noopgetname(void *uctx, char *buf, size_t size) {}
|
||||
|
||||
size_t
|
||||
qp_makekey(dns_qpkey_t key, void *uctx, void *pval,
|
||||
uint32_t ival) {
|
||||
dns_name_t *name = pval;
|
||||
return dns_qpkey_fromname(key, name);
|
||||
}
|
||||
|
||||
const dns_qpmethods_t qp_methods = {
|
||||
noopref,
|
||||
noopref,
|
||||
qp_makekey,
|
||||
noopgetname,
|
||||
};
|
||||
""",
|
||||
libraries=["dns"],
|
||||
include_dirs=["../../lib/isc/include", "../../lib/dns/include"],
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
ffibuilder.compile(
|
||||
verbose=True,
|
||||
)
|
||||
4
tests/dns/run.sh
Normal file
4
tests/dns/run.sh
Normal file
@@ -0,0 +1,4 @@
|
||||
#!/usr/bin/bash
|
||||
set -o nounset -o errexit -o xtrace
|
||||
|
||||
python qp_test_pythonbuild.py && LD_PRELOAD=/usr/lib/libjemalloc.so.2 pytest qp_test.py --hypothesis-show-statistics -k TestTrees
|
||||
165
tests/dns/strategies.py
Normal file
165
tests/dns/strategies.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
#
|
||||
# SPDX-License-Identifier: MPL-2.0
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from typing import List
|
||||
from warnings import warn
|
||||
|
||||
from hypothesis.strategies import (
|
||||
binary,
|
||||
builds,
|
||||
composite,
|
||||
integers,
|
||||
just,
|
||||
nothing,
|
||||
permutations,
|
||||
)
|
||||
|
||||
import dns.name
|
||||
import dns.message
|
||||
import dns.rdataclass
|
||||
import dns.rdatatype
|
||||
|
||||
# LATER: Move this file so it can be easily reused.
|
||||
|
||||
|
||||
@composite
|
||||
def dns_names(
|
||||
draw,
|
||||
*,
|
||||
prefix: dns.name.Name = dns.name.empty,
|
||||
suffix: dns.name.Name = dns.name.root,
|
||||
min_labels: int = 1,
|
||||
max_labels: int = 128,
|
||||
) -> dns.name.Name:
|
||||
"""
|
||||
This is a hypothesis strategy to be used for generating DNS names with given `prefix`, `suffix`
|
||||
and with total number of labels specified by `min_labels` and `max labels`.
|
||||
|
||||
For example, calling
|
||||
```
|
||||
dns_names(
|
||||
prefix=dns.name.from_text("test"),
|
||||
suffix=dns.name.from_text("isc.org"),
|
||||
max_labels=6
|
||||
).example()
|
||||
```
|
||||
will result in names like `test.abc.isc.org.` or `test.abc.def.isc.org`.
|
||||
|
||||
There is no attempt to make the distribution of the generated names uniform in any way.
|
||||
The strategy however minimizes towards shorter names with shorter labels.
|
||||
|
||||
It can be used with to build compound strategies, like this one which generates random DNS queries.
|
||||
|
||||
```
|
||||
dns_queries = builds(
|
||||
dns.message.make_query,
|
||||
qname=dns_names(),
|
||||
rdtype=dns_rdatatypes,
|
||||
rdclass=dns_rdataclasses,
|
||||
)
|
||||
```
|
||||
"""
|
||||
|
||||
prefix = prefix.relativize(dns.name.root)
|
||||
suffix = suffix.derelativize(dns.name.root)
|
||||
|
||||
try:
|
||||
outer_name = prefix + suffix
|
||||
remaining_bytes = 255 - len(outer_name.to_wire())
|
||||
assert remaining_bytes >= 0
|
||||
except dns.name.NameTooLong:
|
||||
warn(
|
||||
"Maximal length name of name execeeded by prefix and suffix. Strategy won't generate any names.",
|
||||
RuntimeWarning,
|
||||
)
|
||||
return draw(nothing())
|
||||
|
||||
minimum_number_of_labels_to_generate = max(0, min_labels - len(outer_name.labels))
|
||||
maximum_number_of_labels_to_generate = max_labels - len(outer_name.labels)
|
||||
if maximum_number_of_labels_to_generate < 0:
|
||||
warn(
|
||||
"Maximal number of labels execeeded by prefix and suffix. Strategy won't generate any names.",
|
||||
RuntimeWarning,
|
||||
)
|
||||
return draw(nothing())
|
||||
|
||||
maximum_number_of_labels_to_generate = min(
|
||||
maximum_number_of_labels_to_generate, remaining_bytes // 2
|
||||
)
|
||||
if maximum_number_of_labels_to_generate < minimum_number_of_labels_to_generate:
|
||||
warn(
|
||||
f"Minimal number set to {minimum_number_of_labels_to_generate}, but in {remaining_bytes} bytes there is only space for maximum of {maximum_number_of_labels_to_generate} labels.",
|
||||
RuntimeWarning,
|
||||
)
|
||||
return draw(nothing())
|
||||
|
||||
if remaining_bytes == 0 or maximum_number_of_labels_to_generate == 0:
|
||||
warn(
|
||||
f"Strategy will return only one name ({outer_name}) as it exactly matches byte or label length limit.",
|
||||
RuntimeWarning,
|
||||
)
|
||||
return draw(just(outer_name))
|
||||
|
||||
chosen_number_of_labels_to_generate = draw(
|
||||
integers(
|
||||
minimum_number_of_labels_to_generate, maximum_number_of_labels_to_generate
|
||||
)
|
||||
)
|
||||
chosen_number_of_bytes_to_partion = draw(
|
||||
integers(2 * chosen_number_of_labels_to_generate, remaining_bytes)
|
||||
)
|
||||
chosen_lengths_of_labels = draw(
|
||||
_partition_bytes_to_labels(
|
||||
chosen_number_of_bytes_to_partion, chosen_number_of_labels_to_generate
|
||||
)
|
||||
)
|
||||
generated_labels = tuple(
|
||||
draw(binary(min_size=l - 1, max_size=l - 1)) for l in chosen_lengths_of_labels
|
||||
)
|
||||
|
||||
return dns.name.Name(prefix.labels + generated_labels + suffix.labels)
|
||||
|
||||
|
||||
RDATACLASS_MAX = RDATATYPE_MAX = 65535
|
||||
dns_rdataclasses = builds(dns.rdataclass.RdataClass, integers(0, RDATACLASS_MAX))
|
||||
dns_rdataclasses_without_meta = dns_rdataclasses.filter(dns.rdataclass.is_metaclass)
|
||||
dns_rdatatypes = builds(dns.rdatatype.RdataType, integers(0, RDATATYPE_MAX))
|
||||
|
||||
# NOTE: This should really be `dns_rdatatypes_without_meta = dns_rdatatypes_without_meta.filter(dns.rdatatype.is_metatype()`,
|
||||
# but hypothesis then complains about the filter being too strict, so it is done in a “constructive” way.
|
||||
dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns.rdatatype.OPT + 1, 127) | integers(256, RDATATYPE_MAX) # type: ignore
|
||||
|
||||
|
||||
@composite
|
||||
def _partition_bytes_to_labels(
|
||||
draw, remaining_bytes: int, number_of_labels: int
|
||||
) -> List[int]:
|
||||
two_bytes_reserved_for_label = 2
|
||||
|
||||
# Reserve two bytes for each label
|
||||
partition = [two_bytes_reserved_for_label] * number_of_labels
|
||||
remaining_bytes -= two_bytes_reserved_for_label * number_of_labels
|
||||
|
||||
assert remaining_bytes >= 0
|
||||
|
||||
# Add a random number between 0 and the remainder to each partition
|
||||
for i in range(number_of_labels):
|
||||
added = draw(
|
||||
integers(0, min(remaining_bytes, 64 - two_bytes_reserved_for_label))
|
||||
)
|
||||
partition[i] += added
|
||||
remaining_bytes -= added
|
||||
|
||||
# NOTE: Some of the remaining bytes will usually not be assigned to any label, but we don't care.
|
||||
|
||||
return draw(permutations(partition))
|
||||
Reference in New Issue
Block a user