Compare commits

...

27 Commits

Author SHA1 Message Date
Witold Kręcicki
92eb269a7d xxx 2020-02-07 21:42:56 +01:00
Witold Kręcicki
9a4e4fde31 xxfix 2020-02-07 15:00:04 +01:00
Witold Kręcicki
b6c25c0f00 xxfix 2020-02-07 14:50:03 +01:00
Witold Kręcicki
3f19388210 xxfix 2020-02-07 14:16:37 +01:00
Witold Kręcicki
30bdd05727 1 ntask per cpu 2020-02-07 13:52:05 +01:00
Witold Kręcicki
b7aac20168 fixup! Make client resolver tasks pooled and CPU-bound 2020-02-07 13:18:30 +01:00
Witold Kręcicki
b1db9d5325 fixup! xxx processed counter 2020-02-07 13:02:20 +01:00
Witold Kręcicki
7f4a8bf853 xxx processed counter 2020-02-07 12:54:22 +01:00
Witold Kręcicki
2f9e77b2ee fixup! Make client resolver tasks pooled and CPU-bound 2020-02-07 12:06:03 +01:00
Witold Kręcicki
9fb6709515 fixup! Make client resolver tasks pooled and CPU-bound 2020-02-07 11:59:56 +01:00
Witold Kręcicki
a628172de2 Revert "Revert "Make ns_client mctxpool more thread-friendly by sharding it by netmgr threadid""
This reverts commit 650900f2e9.
2020-02-07 11:56:46 +01:00
Witold Kręcicki
650900f2e9 Revert "Make ns_client mctxpool more thread-friendly by sharding it by netmgr threadid"
This reverts commit 3d9763b968.
2020-02-07 11:56:40 +01:00
Witold Kręcicki
3fc5214464 Revert "Don't check if the client is on recursing list (requires locking) if it's not RECURSING"
This reverts commit 41cec3a619.
2020-02-07 11:00:58 +01:00
Witold Kręcicki
b2d93928af Revert "Increase inactivehandles and inactivereqs size for better reuse."
This reverts commit 39645122da.
2020-02-07 10:54:00 +01:00
Witold Kręcicki
e610325783 Use isc_rwlock for isc_result tables 2020-02-07 10:43:06 +01:00
Witold Kręcicki
fe24b48e9f Bucketed statistics.
Even though statistics are lockless they still use atomics which
might cause contention. Split stats counters into buckets, sharded
by an artificial thread identifier, to increase throughput.
2020-02-07 10:32:25 +01:00
Witold Kręcicki
ded06dc3c5 Make client resolver tasks pooled and CPU-bound 2020-02-07 10:26:41 +01:00
Witold Kręcicki
1d802a23a1 test: don't use DISPATCHATTR_EXCLUSIVE, less random but waaay less sockets used 2020-02-07 10:24:39 +01:00
Witold Kręcicki
39645122da Increase inactivehandles and inactivereqs size for better reuse. 2020-02-07 10:20:33 +01:00
Witold Kręcicki
3d9763b968 Make ns_client mctxpool more thread-friendly by sharding it by netmgr threadid 2020-02-07 10:13:53 +01:00
Witold Kręcicki
b6d9add750 Increase nodelock count for both cache and regular db. 2020-02-07 10:04:34 +01:00
Witold Kręcicki
f71e673751 We don't need to fill udp local address every time since we are bound to it. 2020-02-07 09:59:31 +01:00
Witold Kręcicki
2873ad7f46 Make nm->recvbuf larger and heap allocated, to allow uv_recvmmsg usage. 2020-02-07 09:41:48 +01:00
Witold Kręcicki
41cec3a619 Don't check if the client is on recursing list (requires locking) if it's not RECURSING 2020-02-07 09:29:07 +01:00
Witold Kręcicki
8a7bfc8850 Don't update LRU if the node was recently used.
Updating LRU requires write-locking the node, which causes contention.
Update LRU only if time difference is large enough.
2020-02-07 09:28:12 +01:00
Witold Kręcicki
ecb619b71c Make isc_task_pause/isc_task_unpause thread safe.
isc_task_pause/unpause were inherently thread-unsafe - a task
could be paused only once by one thread, if the task was running
while we paused it it led to races. Fix it by making sure that
the task will pause if requested to, and by using a 'pause reference
counter' to count task pause requests - a task will be unpaused
iff all threads unpause it.

Don't remove from queue when pausing task - we lock the queue lock
(expensive), while it's unlikely that the task will be running -
and we'll remove it anyway in dispatcher
2020-02-07 09:19:44 +01:00
Witold Kręcicki
c0bf9b9e26 xxxtest: use pthread_rwlock by default 2020-02-07 08:29:57 +01:00
15 changed files with 225 additions and 140 deletions

View File

@@ -1344,7 +1344,7 @@ get_view_querysource_dispatch(const cfg_obj_t **maps, int af,
break;
}
if (isc_sockaddr_getport(&sa) == 0) {
attrs |= DNS_DISPATCHATTR_EXCLUSIVE;
// attrs |= DNS_DISPATCHATTR_EXCLUSIVE;
maxdispatchbuffers = EXCLBUFFERS;
} else {
INSIST(obj != NULL);
@@ -9544,7 +9544,7 @@ run_server(isc_task_t *task, isc_event_t *event) {
named_g_nm,
named_g_dispatchmgr,
server->task, named_g_udpdisp, geoip,
&server->interfacemgr),
named_g_cpus, &server->interfacemgr),
"creating interface manager");
CHECKFATAL(isc_timer_create(named_g_timermgr, isc_timertype_inactive,

30
configure vendored
View File

@@ -852,6 +852,7 @@ infodir
docdir
oldincludedir
includedir
runstatedir
localstatedir
sharedstatedir
sysconfdir
@@ -1025,6 +1026,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@@ -1277,6 +1279,15 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@@ -1414,7 +1425,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir
libdir localedir mandir runstatedir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@@ -1567,6 +1578,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]
@@ -4011,7 +4023,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -4057,7 +4069,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -4081,7 +4093,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -4126,7 +4138,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -4150,7 +4162,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -16046,7 +16058,7 @@ esac
if test "${enable_pthread_rwlock+set}" = set; then :
enableval=$enable_pthread_rwlock;
else
enable_pthread_rwlock=no
enable_pthread_rwlock=yes
fi
@@ -16070,10 +16082,6 @@ $as_echo "#define USE_PTHREAD_RWLOCK 1" >>confdefs.h
fi
if test "$enable_pthread_rwlock" = "yes" -a "$enable_developer" != "yes"; then :
as_fn_error $? "pthread rwlock is not meant used in production and the developer mode must be enabled" "$LINENO" 5
fi
CRYPTO=OpenSSL
#

View File

@@ -721,7 +721,7 @@ AC_SUBST(INSTALL_LIBRARY)
AC_ARG_ENABLE([pthread_rwlock],
[AS_HELP_STRING([--enable-pthread-rwlock],
[use pthread rwlock instead of internal rwlock implementation (EXPERIMENTAL)])],
[], [enable_pthread_rwlock=no])
[], [enable_pthread_rwlock=yes])
AS_IF([test "$enable_pthread_rwlock" = "yes"],
[AC_CHECK_FUNCS([pthread_rwlock_rdlock], [],
@@ -729,9 +729,6 @@ AS_IF([test "$enable_pthread_rwlock" = "yes"],
AC_DEFINE([USE_PTHREAD_RWLOCK],[1],[Define if you want to use pthread rwlock implementation])
])
AS_IF([test "$enable_pthread_rwlock" = "yes" -a "$enable_developer" != "yes"],
[AC_MSG_ERROR([pthread rwlock is not meant used in production and the developer mode must be enabled])])
CRYPTO=OpenSSL
#

View File

@@ -172,9 +172,12 @@ typedef isc_rwlock_t nodelock_t;
* to be 0 by default either with or without threads.
*/
#ifndef DNS_RBTDB_LIMITLRUUPDATE
#define DNS_RBTDB_LIMITLRUUPDATE 0
#define DNS_RBTDB_LIMITLRUUPDATE 1
#endif
#define DNS_RBTDB_LRUUPDATE_GLUE 300
#define DNS_RBTDB_LRUUPDATE_REGULAR 600
/*
* Allow clients with a virtual time of up to 5 minutes in the past to see
* records that would have otherwise have expired.
@@ -309,7 +312,7 @@ typedef ISC_LIST(dns_rbtnode_t) rbtnodelist_t;
(((header)->rdh_ttl > (now)) || \
((header)->rdh_ttl == (now) && ZEROTTL(header)))
#define DEFAULT_NODE_LOCK_COUNT 7 /*%< Should be prime. */
#define DEFAULT_NODE_LOCK_COUNT 53 /*%< Should be prime. */
#define RBTDB_GLUE_TABLE_INIT_SIZE 2U
/*%
@@ -329,7 +332,7 @@ typedef ISC_LIST(dns_rbtnode_t) rbtnodelist_t;
#define DEFAULT_CACHE_NODE_LOCK_COUNT DNS_RBTDB_CACHE_NODE_LOCK_COUNT
#endif
#else
#define DEFAULT_CACHE_NODE_LOCK_COUNT 16
#define DEFAULT_CACHE_NODE_LOCK_COUNT 97
#endif /* DNS_RBTDB_CACHE_NODE_LOCK_COUNT */
typedef struct {
@@ -10088,11 +10091,11 @@ need_headerupdate(rdatasetheader_t *header, isc_stdtime_t now) {
* Glue records are updated if at least 60 seconds have passed
* since the previous update time.
*/
return (header->last_used + 60 <= now);
return (header->last_used + DNS_RBTDB_LRUUPDATE_GLUE <= now);
}
/* Other records are updated if 5 minutes have passed. */
return (header->last_used + 300 <= now);
return (header->last_used + DNS_RBTDB_LRUUPDATE_REGULAR <= now);
#else
UNUSED(now);

View File

@@ -33,7 +33,7 @@
#include "uv-compat.h"
#define ISC_NETMGR_TID_UNKNOWN -1
#define ISC_NETMGR_RECVBUF_SIZE (20*65536)
/*
* Single network event loop worker.
*/
@@ -55,7 +55,7 @@ typedef struct isc__networker {
* worker is paused */
isc_refcount_t references;
atomic_int_fast64_t pktcount;
char recvbuf[65536];
char * recvbuf;
bool recvbuf_inuse;
} isc__networker_t;

View File

@@ -210,6 +210,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
/*
* We need to do this here and not in nm_thread to avoid a
@@ -281,6 +282,7 @@ nm_destroy(isc_nm_t **mgr0) {
isc_queue_destroy(worker->ievents);
isc_queue_destroy(worker->ievents_prio);
isc_mem_put(mgr->mctx, worker->recvbuf, ISC_NETMGR_RECVBUF_SIZE);
isc_thread_join(worker->thread, NULL);
}
@@ -982,14 +984,14 @@ isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(isc__nm_in_netthread());
REQUIRE(size <= 65536);
REQUIRE(size <= ISC_NETMGR_RECVBUF_SIZE);
worker = &sock->mgr->workers[sock->tid];
INSIST(!worker->recvbuf_inuse);
buf->base = worker->recvbuf;
worker->recvbuf_inuse = true;
buf->len = size;
buf->len = ISC_NETMGR_RECVBUF_SIZE;
}
void
@@ -1004,8 +1006,11 @@ isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
worker = &sock->mgr->workers[sock->tid];
REQUIRE(worker->recvbuf_inuse);
if (buf->base > worker->recvbuf && buf->base <= worker->recvbuf + ISC_NETMGR_RECVBUF_SIZE) {
/* Can happen in case of recvmmsg */
return;
}
REQUIRE(buf->base == worker->recvbuf);
worker->recvbuf_inuse = false;
}

View File

@@ -291,8 +291,8 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
isc_result_t result;
isc_nmhandle_t *nmhandle = NULL;
isc_sockaddr_t sockaddr;
isc_sockaddr_t localaddr;
struct sockaddr_storage laddr;
/* isc_sockaddr_t localaddr;
struct sockaddr_storage laddr; */
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
isc_region_t region;
uint32_t maxudp;
@@ -326,13 +326,13 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
uv_udp_getsockname(handle, (struct sockaddr *) &laddr,
/* uv_udp_getsockname(handle, (struct sockaddr *) &laddr,
&(int){sizeof(struct sockaddr_storage)});
result = isc_sockaddr_fromsockaddr(&localaddr,
(struct sockaddr *) &laddr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
RUNTIME_CHECK(result == ISC_R_SUCCESS); */
nmhandle = isc__nmhandle_get(sock, &sockaddr, &localaddr);
nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
region.base = (unsigned char *) buf->base;
region.length = nrecv;

View File

@@ -15,9 +15,9 @@
#include <stdlib.h>
#include <isc/lib.h>
#include <isc/mutex.h>
#include <isc/once.h>
#include <isc/resultclass.h>
#include <isc/rwlock.h>
#include <isc/util.h>
typedef struct resulttable {
@@ -182,7 +182,7 @@ static const char *identifier[ISC_R_NRESULTS] = {
static isc_once_t once = ISC_ONCE_INIT;
static resulttable_list_t description_tables;
static resulttable_list_t identifier_tables;
static isc_mutex_t lock;
static isc_rwlock_t lock;
static isc_result_t
register_table(resulttable_list_t *tables, unsigned int base,
@@ -208,11 +208,11 @@ register_table(resulttable_list_t *tables, unsigned int base,
table->set = set;
ISC_LINK_INIT(table, link);
LOCK(&lock);
RWLOCK(&lock, isc_rwlocktype_write);
ISC_LIST_APPEND(*tables, table, link);
UNLOCK(&lock);
RWUNLOCK(&lock, isc_rwlocktype_write);
return (ISC_R_SUCCESS);
}
@@ -221,7 +221,7 @@ static void
initialize_action(void) {
isc_result_t result;
isc_mutex_init(&lock);
isc_rwlock_init(&lock, 0, 0);
ISC_LIST_INIT(description_tables);
ISC_LIST_INIT(identifier_tables);
@@ -257,7 +257,7 @@ isc_result_tomany_helper(resulttable_list_t *tables, isc_result_t result) {
initialize();
LOCK(&lock);
RWLOCK(&lock, isc_rwlocktype_read);
text = NULL;
for (table = ISC_LIST_HEAD(*tables);
@@ -273,7 +273,7 @@ isc_result_tomany_helper(resulttable_list_t *tables, isc_result_t result) {
text = "(result code text not available)";
}
UNLOCK(&lock);
RWUNLOCK(&lock, isc_rwlocktype_read);
return (text);
}

View File

@@ -23,11 +23,14 @@
#include <isc/print.h>
#include <isc/refcount.h>
#include <isc/stats.h>
#include <isc/thread.h>
#include <isc/util.h>
#define ISC_STATS_MAGIC ISC_MAGIC('S', 't', 'a', 't')
#define ISC_STATS_VALID(x) ISC_MAGIC_VALID(x, ISC_STATS_MAGIC)
#define STATS_BUCKETS 64
#if defined(_WIN32) && !defined(_WIN64)
typedef atomic_int_fast32_t isc__atomic_statcounter_t;
#else
@@ -42,6 +45,17 @@ struct isc_stats {
isc__atomic_statcounter_t *counters;
};
ISC_THREAD_LOCAL int isc__stats_thread_v = -1;
static atomic_uint_fast32_t isc__stats_thread_n = 0;
static int
threadhash() {
if (isc__stats_thread_v < 0) {
isc__stats_thread_v = atomic_fetch_add_relaxed(&isc__stats_thread_n, 1) % STATS_BUCKETS;
}
return (isc__stats_thread_v);
}
static isc_result_t
create_stats(isc_mem_t *mctx, int ncounters, isc_stats_t **statsp) {
isc_stats_t *stats;
@@ -50,7 +64,7 @@ create_stats(isc_mem_t *mctx, int ncounters, isc_stats_t **statsp) {
REQUIRE(statsp != NULL && *statsp == NULL);
stats = isc_mem_get(mctx, sizeof(*stats));
counters_alloc_size = sizeof(isc__atomic_statcounter_t) * ncounters;
counters_alloc_size = sizeof(isc__atomic_statcounter_t) * ncounters * STATS_BUCKETS;
stats->counters = isc_mem_get(mctx, counters_alloc_size);
isc_refcount_init(&stats->references, 1);
memset(stats->counters, 0, counters_alloc_size);
@@ -85,7 +99,8 @@ isc_stats_detach(isc_stats_t **statsp) {
isc_refcount_destroy(&stats->references);
isc_mem_put(stats->mctx, stats->counters,
sizeof(isc__atomic_statcounter_t) *
stats->ncounters);
stats->ncounters *
STATS_BUCKETS);
isc_mem_putanddetach(&stats->mctx, stats, sizeof(*stats));
}
}
@@ -108,8 +123,8 @@ void
isc_stats_increment(isc_stats_t *stats, isc_statscounter_t counter) {
REQUIRE(ISC_STATS_VALID(stats));
REQUIRE(counter < stats->ncounters);
atomic_fetch_add_explicit(&stats->counters[counter], 1,
int idx = threadhash() * stats->ncounters + counter;
atomic_fetch_add_explicit(&stats->counters[idx], 1,
memory_order_relaxed);
}
@@ -118,7 +133,8 @@ isc_stats_decrement(isc_stats_t *stats, isc_statscounter_t counter) {
REQUIRE(ISC_STATS_VALID(stats));
REQUIRE(counter < stats->ncounters);
atomic_fetch_sub_explicit(&stats->counters[counter], 1,
int idx = threadhash() * stats->ncounters + counter;
atomic_fetch_sub_explicit(&stats->counters[idx], 1,
memory_order_relaxed);
}
@@ -131,8 +147,13 @@ isc_stats_dump(isc_stats_t *stats, isc_stats_dumper_t dump_fn,
REQUIRE(ISC_STATS_VALID(stats));
for (i = 0; i < stats->ncounters; i++) {
uint32_t counter = atomic_load_explicit(&stats->counters[i],
uint32_t counter = 0;
int b;
for (b = 0; b < STATS_BUCKETS; b++) {
int idx = stats->ncounters * b + i;
counter += atomic_load_explicit(&stats->counters[idx],
memory_order_relaxed);
}
if ((options & ISC_STATSDUMP_VERBOSE) == 0 && counter == 0) {
continue;
}
@@ -144,11 +165,17 @@ void
isc_stats_set(isc_stats_t *stats, uint64_t val,
isc_statscounter_t counter)
{
int i;
REQUIRE(ISC_STATS_VALID(stats));
REQUIRE(counter < stats->ncounters);
atomic_store_explicit(&stats->counters[counter], val,
memory_order_relaxed);
for (i = 1; i < STATS_BUCKETS; i++) {
int idx = stats->ncounters * i + counter;
atomic_store_explicit(&stats->counters[idx], val,
memory_order_relaxed);
}
}
void isc_stats_update_if_greater(isc_stats_t *stats,
@@ -158,9 +185,9 @@ void isc_stats_update_if_greater(isc_stats_t *stats,
REQUIRE(ISC_STATS_VALID(stats));
REQUIRE(counter < stats->ncounters);
isc_statscounter_t curr_value =
atomic_load_relaxed(&stats->counters[counter]);
isc_statscounter_t curr_value;
do {
curr_value = atomic_load_relaxed(&stats->counters[counter]);
if (curr_value >= value) {
break;
}
@@ -172,9 +199,15 @@ void isc_stats_update_if_greater(isc_stats_t *stats,
isc_statscounter_t
isc_stats_get_counter(isc_stats_t *stats, isc_statscounter_t counter)
{
uint32_t value = 0;
int i;
REQUIRE(ISC_STATS_VALID(stats));
REQUIRE(counter < stats->ncounters);
return (atomic_load_explicit(&stats->counters[counter],
memory_order_relaxed));
for (i = 0; i < STATS_BUCKETS; i++) {
int idx = i * stats->ncounters + counter;
value += atomic_load_explicit(&stats->counters[idx],
memory_order_relaxed);
}
return (value);
}

View File

@@ -106,6 +106,7 @@ struct isc__task {
isc_mutex_t lock;
/* Locked by task lock. */
task_state_t state;
int pause_cnt;
isc_refcount_t references;
isc_eventlist_t events;
isc_eventlist_t on_shutdown;
@@ -152,6 +153,7 @@ struct isc__taskqueue {
isc_thread_t thread;
unsigned int threadid;
isc__taskmgr_t *manager;
uint64_t eprocessed;
};
struct isc__taskmgr {
@@ -308,6 +310,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
isc_mutex_init(&task->lock);
task->state = task_state_idle;
task->pause_cnt = 0;
isc_refcount_init(&task->references, 1);
INIT_LIST(task->events);
@@ -407,7 +410,7 @@ task_shutdown(isc__task_t *task) {
/*
* Moves a task onto the appropriate run queue.
*
* Caller must NOT hold manager lock.
* Caller must NOT hold queue lock.
*/
static inline void
task_ready(isc__task_t *task) {
@@ -415,7 +418,6 @@ task_ready(isc__task_t *task) {
bool has_privilege = isc_task_privilege((isc_task_t *) task);
REQUIRE(VALID_MANAGER(manager));
REQUIRE(task->state == task_state_ready);
XTRACE("task_ready");
LOCK(&manager->queues[task->threadid].lock);
@@ -967,10 +969,13 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
* Push 'task' onto the ready_tasks queue. If 'task' has the privilege
* flag set, then also push it onto the ready_priority_tasks queue.
*
* Caller must hold the task manager lock.
* Caller must hold the task queue lock.
*/
static inline void
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
if (ISC_LINK_LINKED(task, ready_link)) {
return;
}
ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
if (TASK_PRIVILEGED(task)) {
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
@@ -1128,6 +1133,17 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
memory_order_acquire);
LOCK(&task->lock);
/*
* It is possible because that we have a paused task
* in the queue - it might have been paused in the
* meantime and we never hold both queue and task lock
* to avoid deadlocks, just bail then.
*/
if (task->state != task_state_ready) {
UNLOCK(&task->lock);
LOCK(&manager->queues[threadid].lock);
continue;
}
INSIST(task->state == task_state_ready);
task->state = task_state_running;
XTRACE("running");
@@ -1145,6 +1161,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
*/
XTRACE("execute action");
XTRACE(task->name);
manager->queues[threadid].eprocessed++;
if (event->ev_action != NULL) {
UNLOCK(&task->lock);
(event->ev_action)(
@@ -1215,6 +1232,15 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
}
}
done = true;
} else if (task->state == task_state_pausing) {
/*
* We got a pause request on this task,
* stop working on it and switch the state
* to paused.
*/
XTRACE("pausing");
task->state = task_state_paused;
done = true;
} else if (dispatch_count >= task->quantum) {
/*
* Our quantum has expired, but
@@ -1227,17 +1253,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
* so the minimum quantum is one.
*/
XTRACE("quantum");
if (task->state == task_state_running) {
/*
* We requeue only if it's
* not paused.
*/
task->state = task_state_ready;
requeue = true;
} else if (task->state ==
task_state_pausing) {
task->state = task_state_paused;
}
task->state = task_state_ready;
requeue = true;
done = true;
}
} while (!done);
@@ -1428,6 +1445,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->queues[i].manager = manager;
manager->queues[i].threadid = i;
manager->queues[i].eprocessed = 0;
isc_thread_create(run, &manager->queues[i],
&manager->queues[i].thread);
char name[21];
@@ -1523,7 +1541,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
*/
wake_all_queues(manager);
UNLOCK(&manager->lock);
FILE * f = fopen("/tmp/foobar","w");
for (i=0; i< manager->workers; i++) {
fprintf(f, "%d: %lu\n", i, manager->queues[i].eprocessed);
}
fclose(f);
/*
* Wait for all the worker threads to exit.
*/
@@ -1686,31 +1708,27 @@ void
isc_task_pause(isc_task_t *task0) {
REQUIRE(ISCAPI_TASK_VALID(task0));
isc__task_t *task = (isc__task_t *)task0;
isc__taskmgr_t *manager = task->manager;
bool running = false;
LOCK(&task->lock);
task->pause_cnt++;
if (task->pause_cnt > 1) {
/*
* Someone already paused this thread, just increase
* the number of pausing clients.
*/
UNLOCK(&task->lock);
return;
}
INSIST(task->state == task_state_idle ||
task->state == task_state_ready ||
task->state == task_state_running);
if (task->state == task_state_running) {
running = true;
task->state = task_state_pausing;
} else {
task->state = task_state_paused;
}
UNLOCK(&task->lock);
if (running) {
return;
}
LOCK(&manager->queues[task->threadid].lock);
if (ISC_LINK_LINKED(task, ready_link)) {
DEQUEUE(manager->queues[task->threadid].ready_tasks,
task, ready_link);
}
UNLOCK(&manager->queues[task->threadid].lock);
}
void
@@ -1721,6 +1739,13 @@ isc_task_unpause(isc_task_t *task0) {
REQUIRE(ISCAPI_TASK_VALID(task0));
LOCK(&task->lock);
task->pause_cnt--;
INSIST(task->pause_cnt >= 0);
if (task->pause_cnt > 0) {
UNLOCK(&task->lock);
return;
}
INSIST(task->state == task_state_paused ||
task->state == task_state_pausing);
/* If the task was pausing we can't reschedule it */

View File

@@ -132,6 +132,8 @@ static void compute_cookie(ns_client_t *client, uint32_t when,
isc_buffer_t *buf);
static void
get_clientmctx(ns_clientmgr_t *manager, isc_mem_t **mctxp);
static void
get_clienttask(ns_clientmgr_t *manager, isc_task_t **taskp);
void
ns_client_recursing(ns_client_t *client) {
@@ -1660,6 +1662,7 @@ ns__client_request(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
return;
}
}
atomic_fetch_add_relaxed(&mgr->tbuckets[isc_nm_tid()], 1);
client->state = NS_CLIENTSTATE_READY;
client->dscp = ifp->dscp;
@@ -2212,38 +2215,34 @@ ns__client_tcpconn(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
static void
get_clientmctx(ns_clientmgr_t *manager, isc_mem_t **mctxp) {
isc_mem_t *clientmctx;
#if CLIENT_NMCTXS > 0
unsigned int nextmctx;
#endif
MTRACE("clientmctx");
#if CLIENT_NMCTXS > 0
LOCK(&manager->lock);
if (isc_nm_tid() >= 0) {
nextmctx = isc_nm_tid();
} else {
nextmctx = manager->nextmctx++;
if (manager->nextmctx == CLIENT_NMCTXS)
manager->nextmctx = 0;
INSIST(nextmctx < CLIENT_NMCTXS);
int tid = isc_nm_tid();
if (tid < 0) {
tid = isc_random_uniform(manager->ncpus);
}
int rand = isc_random_uniform(CLIENT_NMCTXS_PERCPU);
int nextmctx = (rand * manager->ncpus) + tid;
clientmctx = manager->mctxpool[nextmctx];
if (clientmctx == NULL) {
isc_mem_create(&clientmctx);
isc_mem_setname(clientmctx, "client", NULL);
manager->mctxpool[nextmctx] = clientmctx;
}
UNLOCK(&manager->lock);
#else
clientmctx = manager->mctx;
#endif
isc_mem_attach(clientmctx, mctxp);
}
static void
get_clienttask(ns_clientmgr_t *manager, isc_task_t **taskp) {
MTRACE("clienttask");
int tid = isc_nm_tid();
if (tid < 0) {
tid = isc_random_uniform(manager->ncpus);
}
int rand = isc_random_uniform(CLIENT_NTASKS_PERCPU);
int nexttask = (rand * manager->ncpus) + tid;
atomic_fetch_add_relaxed(&manager->xbuckets[nexttask], 1);
isc_task_attach(manager->taskpool[nexttask], taskp);
}
isc_result_t
ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
isc_result_t result;
@@ -2267,10 +2266,8 @@ ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
get_clientmctx(mgr, &client->mctx);
clientmgr_attach(mgr, &client->manager);
ns_server_attach(mgr->sctx, &client->sctx);
result = isc_task_create(mgr->taskmgr, 20, &client->task);
if (result != ISC_R_SUCCESS) {
goto cleanup;
}
get_clienttask(mgr, &client->task);
result = dns_message_create(client->mctx,
DNS_MESSAGE_INTENTPARSE,
&client->message);
@@ -2397,21 +2394,18 @@ clientmgr_detach(ns_clientmgr_t **mp) {
static void
clientmgr_destroy(ns_clientmgr_t *manager) {
#if CLIENT_NMCTXS > 0
int i;
#endif
MTRACE("clientmgr_destroy");
isc_refcount_destroy(&manager->references);
manager->magic = 0;
#if CLIENT_NMCTXS > 0
for (i = 0; i < CLIENT_NMCTXS; i++) {
if (manager->mctxpool[i] != NULL)
isc_mem_detach(&manager->mctxpool[i]);
for (i = 0; i < manager->ncpus * CLIENT_NMCTXS_PERCPU; i++) {
isc_mem_detach(&manager->mctxpool[i]);
}
#endif
isc_mem_put(manager->mctx, manager->mctxpool,
manager->ncpus * CLIENT_NMCTXS_PERCPU * sizeof(isc_mem_t*));
if (manager->interface != NULL) {
ns_interface_detach(&manager->interface);
@@ -2422,14 +2416,22 @@ clientmgr_destroy(ns_clientmgr_t *manager) {
if (manager->excl != NULL)
isc_task_detach(&manager->excl);
for (i = 0; i < CLIENT_NTASKS; i++) {
char x[1024];
snprintf(x, 1024, "/tmp/foo.%p", manager);
FILE * f = fopen(x,"w");
for (i = 0; i < manager->ncpus * CLIENT_NTASKS_PERCPU; i++) {
fprintf(f, "%d: %ld\n", i, manager->xbuckets[i]);
if (manager->taskpool[i] != NULL) {
isc_task_detach(&manager->taskpool[i]);
}
}
for (i = 0; i < manager->ncpus; i++) {
fprintf(f, "T%d: %ld\n", i, manager->tbuckets[i]);
}
fclose(f);
isc_mem_put(manager->mctx, manager->taskpool,
CLIENT_NTASKS * sizeof(isc_task_t *));
manager->ncpus * CLIENT_NTASKS_PERCPU *
sizeof(isc_task_t *));
ns_server_detach(&manager->sctx);
isc_mem_put(manager->mctx, manager, sizeof(*manager));
@@ -2438,13 +2440,12 @@ clientmgr_destroy(ns_clientmgr_t *manager) {
isc_result_t
ns_clientmgr_create(isc_mem_t *mctx, ns_server_t *sctx, isc_taskmgr_t *taskmgr,
isc_timermgr_t *timermgr, ns_interface_t *interface,
ns_clientmgr_t **managerp)
int ncpus, ns_clientmgr_t **managerp)
{
ns_clientmgr_t *manager;
isc_result_t result;
#if CLIENT_NMCTXS > 0
int i;
#endif
int npools;
manager = isc_mem_get(mctx, sizeof(*manager));
*manager = (ns_clientmgr_t) { .magic = 0 };
@@ -2461,26 +2462,36 @@ ns_clientmgr_create(isc_mem_t *mctx, ns_server_t *sctx, isc_taskmgr_t *taskmgr,
manager->mctx = mctx;
manager->taskmgr = taskmgr;
manager->timermgr = timermgr;
manager->ncpus = ncpus;
ns_interface_attach(interface, &manager->interface);
manager->exiting = false;
manager->taskpool =
isc_mem_get(mctx, CLIENT_NTASKS*sizeof(isc_task_t *));
for (i = 0; i < CLIENT_NTASKS; i++) {
int ntasks = CLIENT_NTASKS_PERCPU * manager->ncpus;
manager->taskpool = isc_mem_get(mctx, ntasks * sizeof(isc_task_t *));
for (i = 0; i < ntasks; i++) {
atomic_init(&manager->xbuckets[i], 0);
manager->taskpool[i] = NULL;
isc_task_create(manager->taskmgr, 20, &manager->taskpool[i]);
result = isc_task_create_bound(manager->taskmgr, 20,
&manager->taskpool[i],
i % manager->ncpus);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
}
isc_refcount_init(&manager->references, 1);
manager->sctx = NULL;
ns_server_attach(sctx, &manager->sctx);
ISC_LIST_INIT(manager->recursing);
#if CLIENT_NMCTXS > 0
manager->nextmctx = 0;
for (i = 0; i < CLIENT_NMCTXS; i++)
manager->mctxpool[i] = NULL; /* will be created on-demand */
#endif
npools = CLIENT_NMCTXS_PERCPU * manager->ncpus;
manager->mctxpool = isc_mem_get(manager->mctx,
npools * sizeof(isc_mem_t*));
for (i = 0; i < npools; i++) {
manager->mctxpool[i] = NULL;
isc_mem_create(&manager->mctxpool[i]);
isc_mem_setname(manager->mctxpool[i], "client", NULL);
}
manager->magic = MANAGER_MAGIC;
MTRACE("create");

View File

@@ -85,7 +85,7 @@
#define NS_CLIENT_SEND_BUFFER_SIZE 4096
#define NS_CLIENT_RECV_BUFFER_SIZE 4096
#define CLIENT_NMCTXS 100
#define CLIENT_NMCTXS_PERCPU 8
/*%<
* Number of 'mctx pools' for clients. (Should this be configurable?)
* When enabling threads, we use a pool of memory contexts shared by
@@ -95,7 +95,7 @@
* server.
*/
#define CLIENT_NTASKS 100
#define CLIENT_NTASKS_PERCPU 16
/*%<
* Number of tasks to be used by clients - those are used only when recursing
*/
@@ -166,10 +166,13 @@ struct ns_clientmgr {
isc_timermgr_t * timermgr;
isc_task_t * excl;
isc_refcount_t references;
int ncpus;
/* Attached by clients, needed for e.g. recursion */
isc_task_t ** taskpool;
atomic_uint_fast32_t xbuckets[1000];
atomic_uint_fast32_t tbuckets[1000];
ns_interface_t *interface;
/* Lock covers manager state. */
@@ -180,11 +183,8 @@ struct ns_clientmgr {
isc_mutex_t reclock;
client_list_t recursing; /*%< Recursing clients */
#if CLIENT_NMCTXS > 0
/*%< mctx pool for clients. */
unsigned int nextmctx;
isc_mem_t * mctxpool[CLIENT_NMCTXS];
#endif
isc_mem_t ** mctxpool;
};
/*% nameserver client structure */
@@ -364,7 +364,7 @@ ns_client_settimeout(ns_client_t *client, unsigned int seconds);
isc_result_t
ns_clientmgr_create(isc_mem_t *mctx, ns_server_t *sctx, isc_taskmgr_t *taskmgr,
isc_timermgr_t *timermgr, ns_interface_t *ifp,
isc_timermgr_t *timermgr, ns_interface_t *ifp, int ncpus,
ns_clientmgr_t **managerp);
/*%<
* Create a client manager.

View File

@@ -103,7 +103,7 @@ ns_interfacemgr_create(isc_mem_t *mctx, ns_server_t *sctx,
isc_socketmgr_t *socketmgr, isc_nm_t *nm,
dns_dispatchmgr_t *dispatchmgr, isc_task_t *task,
unsigned int udpdisp, dns_geoip_databases_t *geoip,
ns_interfacemgr_t **mgrp);
int ncpus, ns_interfacemgr_t **mgrp);
/*%<
* Create a new interface manager.
*

View File

@@ -75,6 +75,7 @@ struct ns_interfacemgr {
isc_timermgr_t * timermgr; /*%< Timer manager. */
isc_socketmgr_t * socketmgr; /*%< Socket manager. */
isc_nm_t * nm; /*%< Net manager. */
int ncpus; /*%< Number of workers . */
dns_dispatchmgr_t * dispatchmgr;
unsigned int generation; /*%< Current generation no. */
ns_listenlist_t * listenon4;
@@ -181,6 +182,7 @@ ns_interfacemgr_create(isc_mem_t *mctx,
isc_task_t *task,
unsigned int udpdisp,
dns_geoip_databases_t *geoip,
int ncpus,
ns_interfacemgr_t **mgrp)
{
isc_result_t result;
@@ -219,6 +221,7 @@ ns_interfacemgr_create(isc_mem_t *mctx,
mgr->listenon4 = NULL;
mgr->listenon6 = NULL;
mgr->udpdisp = udpdisp;
mgr->ncpus = ncpus;
atomic_init(&mgr->shuttingdown, false);
ISC_LIST_INIT(mgr->interfaces);
@@ -425,7 +428,7 @@ ns_interface_create(ns_interfacemgr_t *mgr, isc_sockaddr_t *addr,
result = ns_clientmgr_create(mgr->mctx, mgr->sctx,
mgr->taskmgr, mgr->timermgr, ifp,
&ifp->clientmgr);
mgr->ncpus, &ifp->clientmgr);
if (result != ISC_R_SUCCESS) {
isc_log_write(IFMGR_COMMON_LOGARGS, ISC_LOG_ERROR,
"ns_clientmgr_create() failed: %s",

View File

@@ -237,7 +237,7 @@ create_managers(void) {
CHECK(ns_interfacemgr_create(mctx, sctx, taskmgr, timermgr,
socketmgr, nm, dispatchmgr, maintask,
ncpus, NULL, &interfacemgr));
ncpus, NULL, 1, &interfacemgr));
CHECK(ns_listenlist_default(mctx, port, -1, true, &listenon));
ns_interfacemgr_setlistenon4(interfacemgr, listenon);