implement fetch-and-add array queue data structure
this is a lockless queue based on hazard pointers.
This commit is contained in:
committed by
Evan Hunt
parent
64e1a4a398
commit
402969bf95
@@ -55,9 +55,9 @@ OBJS = pk11.@O@ pk11_result.@O@ \
|
||||
lex.@O@ lfsr.@O@ lib.@O@ log.@O@ \
|
||||
md.@O@ mem.@O@ mutexblock.@O@ \
|
||||
netaddr.@O@ netscope.@O@ nonce.@O@ openssl_shim.@O@ pool.@O@ \
|
||||
parseint.@O@ portset.@O@ quota.@O@ radix.@O@ random.@O@ \
|
||||
ratelimiter.@O@ region.@O@ regex.@O@ result.@O@ \
|
||||
rwlock.@O@ \
|
||||
parseint.@O@ portset.@O@ queue.@O@ quota.@O@ \
|
||||
radix.@O@ random.@O@ ratelimiter.@O@ \
|
||||
region.@O@ regex.@O@ result.@O@ rwlock.@O@ \
|
||||
serial.@O@ siphash.@O@ sockaddr.@O@ stats.@O@ \
|
||||
string.@O@ symtab.@O@ task.@O@ taskpool.@O@ \
|
||||
tm.@O@ timer.@O@ version.@O@ \
|
||||
@@ -74,7 +74,7 @@ SRCS = pk11.c pk11_result.c \
|
||||
lex.c lfsr.c lib.c log.c \
|
||||
md.c mem.c mutexblock.c \
|
||||
netaddr.c netscope.c nonce.c openssl_shim.c pool.c \
|
||||
parseint.c portset.c quota.c radix.c random.c \
|
||||
parseint.c portset.c queue.c quota.c radix.c random.c \
|
||||
ratelimiter.c region.c regex.c result.c rwlock.c \
|
||||
serial.c siphash.c sockaddr.c stats.c string.c \
|
||||
symtab.c task.c taskpool.c timer.c \
|
||||
|
||||
@@ -9,7 +9,46 @@
|
||||
* information regarding copyright ownership.
|
||||
*/
|
||||
|
||||
#ifndef ISC_QUEUE_H
|
||||
#define ISC_QUEUE_H 1
|
||||
#pragma once
|
||||
#include <isc/mem.h>
|
||||
|
||||
#endif /* ISC_QUEUE_H */
|
||||
typedef struct isc_queue isc_queue_t;
|
||||
|
||||
isc_queue_t *
|
||||
isc_queue_new(isc_mem_t *mctx, int max_threads);
|
||||
/*%<
|
||||
* Create a new fetch-and-add array queue.
|
||||
*
|
||||
* 'max_threads' is currently unused. In the future it can be used
|
||||
* to pass a maximum threads parameter when creating hazard pointers,
|
||||
* but currently `isc_hp_t` uses a hard-coded value.
|
||||
*/
|
||||
|
||||
void
|
||||
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item);
|
||||
/*%<
|
||||
* Enqueue an object pointer 'item' at the tail of the queue.
|
||||
*
|
||||
* Requires:
|
||||
* \li 'item' is not null.
|
||||
*/
|
||||
|
||||
uintptr_t
|
||||
isc_queue_dequeue(isc_queue_t *queue);
|
||||
/*%<
|
||||
* Remove an object pointer from the head of the queue and return the
|
||||
* pointer. If the queue is empty, return `nulluintptr` (the uintptr_t
|
||||
* representation of NULL).
|
||||
*
|
||||
* Requires:
|
||||
* \li 'queue' is not null.
|
||||
*/
|
||||
|
||||
void
|
||||
isc_queue_destroy(isc_queue_t *queue);
|
||||
/*%<
|
||||
* Destroy a queue.
|
||||
*
|
||||
* Requires:
|
||||
* \li 'queue' is not null.
|
||||
*/
|
||||
|
||||
219
lib/isc/queue.c
Normal file
219
lib/isc/queue.c
Normal file
@@ -0,0 +1,219 @@
|
||||
/*
|
||||
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* See the COPYRIGHT file distributed with this work for additional
|
||||
* information regarding copyright ownership.
|
||||
*/
|
||||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <isc/align.h>
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/queue.h>
|
||||
#include <isc/string.h>
|
||||
#include <isc/mem.h>
|
||||
#include <isc/hp.h>
|
||||
|
||||
#define BUFFER_SIZE 1024
|
||||
|
||||
#define MAX_THREADS 128
|
||||
|
||||
static uintptr_t nulluintptr = (uintptr_t)NULL;
|
||||
|
||||
typedef struct node {
|
||||
atomic_uint_fast32_t deqidx;
|
||||
atomic_uintptr_t items[BUFFER_SIZE];
|
||||
atomic_uint_fast32_t enqidx;
|
||||
atomic_uintptr_t next;
|
||||
isc_mem_t *mctx;
|
||||
} node_t;
|
||||
|
||||
/* we just need one Hazard Pointer */
|
||||
#define HP_TAIL 0
|
||||
#define HP_HEAD 0
|
||||
|
||||
struct isc_queue {
|
||||
alignas(128) atomic_uintptr_t head;
|
||||
alignas(128) atomic_uintptr_t tail;
|
||||
isc_mem_t *mctx;
|
||||
int max_threads;
|
||||
int taken;
|
||||
isc_hp_t *hp;
|
||||
};
|
||||
|
||||
static node_t *
|
||||
node_new(isc_mem_t *mctx, uintptr_t item) {
|
||||
node_t *node = isc_mem_get(mctx, sizeof(*node));
|
||||
*node = (node_t){
|
||||
.mctx = NULL
|
||||
};
|
||||
|
||||
atomic_init(&node->deqidx, 0);
|
||||
atomic_init(&node->enqidx, 1);
|
||||
atomic_init(&node->next, 0);
|
||||
atomic_init(&node->items[0], item);
|
||||
|
||||
for (int i = 1; i < BUFFER_SIZE; i++) {
|
||||
atomic_init(&node->items[i], 0);
|
||||
}
|
||||
|
||||
isc_mem_attach(mctx, &node->mctx);
|
||||
|
||||
return (node);
|
||||
}
|
||||
|
||||
static void
|
||||
node_destroy(void *node0) {
|
||||
node_t *node = (node_t *)node0;
|
||||
|
||||
isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
|
||||
}
|
||||
|
||||
static bool
|
||||
node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
|
||||
return (atomic_compare_exchange_strong(&node->next,
|
||||
(uintptr_t *)&cmp,
|
||||
(uintptr_t)val));
|
||||
}
|
||||
|
||||
static bool
|
||||
queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
|
||||
return (atomic_compare_exchange_strong(&queue->tail,
|
||||
(uintptr_t *)&cmp,
|
||||
(uintptr_t)val));
|
||||
}
|
||||
|
||||
static bool
|
||||
queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
|
||||
return (atomic_compare_exchange_strong(&queue->head,
|
||||
(uintptr_t *)&cmp,
|
||||
(uintptr_t)val));
|
||||
}
|
||||
|
||||
isc_queue_t *
|
||||
isc_queue_new(isc_mem_t *mctx, int max_threads) {
|
||||
isc_queue_t *queue = isc_mem_get(mctx, sizeof(*queue));
|
||||
node_t *sentinel = node_new(mctx, nulluintptr);
|
||||
|
||||
if (max_threads == 0) {
|
||||
max_threads = MAX_THREADS;
|
||||
}
|
||||
|
||||
*queue = (isc_queue_t){
|
||||
.max_threads = max_threads,
|
||||
};
|
||||
|
||||
isc_mem_attach(mctx, &queue->mctx);
|
||||
|
||||
queue->hp = isc_hp_new(mctx, 1, node_destroy);
|
||||
|
||||
atomic_init(&sentinel->enqidx, 0);
|
||||
atomic_init(&queue->head, (uintptr_t)sentinel);
|
||||
atomic_init(&queue->tail, (uintptr_t)sentinel);
|
||||
|
||||
return (queue);
|
||||
}
|
||||
|
||||
void
|
||||
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
|
||||
REQUIRE(item != nulluintptr);
|
||||
|
||||
while (true) {
|
||||
node_t *lt = NULL;
|
||||
uint_fast32_t idx;
|
||||
uintptr_t n = nulluintptr;
|
||||
|
||||
lt = (node_t *)isc_hp_protect(queue->hp, 0, &queue->tail);
|
||||
idx = atomic_fetch_add(<->enqidx, 1);
|
||||
if (idx > BUFFER_SIZE-1) {
|
||||
node_t *lnext = NULL;
|
||||
|
||||
if (lt != (node_t *)atomic_load(&queue->tail)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lnext = (node_t *)atomic_load(<->next);
|
||||
if (lnext == NULL) {
|
||||
node_t *newnode = node_new(queue->mctx, item);
|
||||
if (node_cas_next(lt, NULL, newnode)) {
|
||||
queue_cas_tail(queue, lt, newnode);
|
||||
isc_hp_clear(queue->hp);
|
||||
return;
|
||||
}
|
||||
node_destroy(newnode);
|
||||
} else {
|
||||
queue_cas_tail(queue, lt, lnext);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (atomic_compare_exchange_strong(<->items[idx], &n, item)) {
|
||||
isc_hp_clear(queue->hp);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uintptr_t
|
||||
isc_queue_dequeue(isc_queue_t *queue) {
|
||||
REQUIRE(queue != NULL);
|
||||
|
||||
while (true) {
|
||||
node_t *lh = NULL;
|
||||
uint_fast32_t idx;
|
||||
uintptr_t item;
|
||||
|
||||
lh = (node_t *)isc_hp_protect(queue->hp, 0, &queue->head);
|
||||
if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
|
||||
atomic_load(&lh->next) == nulluintptr)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
idx = atomic_fetch_add(&lh->deqidx, 1);
|
||||
if (idx > BUFFER_SIZE-1) {
|
||||
node_t *lnext = (node_t *)atomic_load(&lh->next);
|
||||
if (lnext == NULL) {
|
||||
break;
|
||||
}
|
||||
if (queue_cas_head(queue, lh, lnext)) {
|
||||
isc_hp_retire(queue->hp, (uintptr_t)lh);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
item = atomic_exchange(&(lh->items[idx]),
|
||||
(uintptr_t)&queue->taken);
|
||||
if (item == nulluintptr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
isc_hp_clear(queue->hp);
|
||||
return (item);
|
||||
}
|
||||
|
||||
isc_hp_clear(queue->hp);
|
||||
return (nulluintptr);
|
||||
}
|
||||
|
||||
void
|
||||
isc_queue_destroy(isc_queue_t *queue) {
|
||||
node_t *last = NULL;
|
||||
|
||||
REQUIRE(queue != NULL);
|
||||
|
||||
while (isc_queue_dequeue(queue) != nulluintptr) {
|
||||
/* do nothing */
|
||||
}
|
||||
|
||||
last = (node_t *)atomic_load_relaxed(&queue->head);
|
||||
node_destroy(last);
|
||||
isc_hp_destroy(queue->hp);
|
||||
isc_mem_putanddetach(&queue->mctx, queue, sizeof(*queue));
|
||||
}
|
||||
18
lib/isc/unix/include/isc/align.h
Normal file
18
lib/isc/unix/include/isc/align.h
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* See the COPYRIGHT file distributed with this work for additional
|
||||
* information regarding copyright ownership.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef HAVE_STDALIGN_H
|
||||
#include <stdalign.h>
|
||||
#else
|
||||
#define alignas(x) __attribute__ ((__aligned__ (x)))
|
||||
#endif
|
||||
13
lib/isc/win32/include/isc/align.h
Normal file
13
lib/isc/win32/include/isc/align.h
Normal file
@@ -0,0 +1,13 @@
|
||||
/*
|
||||
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* See the COPYRIGHT file distributed with this work for additional
|
||||
* information regarding copyright ownership.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#define alignas(x) __declspec(align(x))
|
||||
@@ -452,6 +452,10 @@ isc_portset_isset
|
||||
isc_portset_nports
|
||||
isc_portset_remove
|
||||
isc_portset_removerange
|
||||
isc_queue_enqueue
|
||||
isc_queue_dequeue
|
||||
isc_queue_destroy
|
||||
isc_queue_new
|
||||
isc_quota_attach
|
||||
isc_quota_destroy
|
||||
isc_quota_detach
|
||||
|
||||
@@ -554,6 +554,9 @@
|
||||
<ClCompile Include="..\portset.c">
|
||||
<Filter>Library Source Files</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="..\queue.c">
|
||||
<Filter>Library Source Files</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="..\quota.c">
|
||||
<Filter>Library Source Files</Filter>
|
||||
</ClCompile>
|
||||
|
||||
@@ -440,6 +440,7 @@ copy InstallFiles ..\Build\Release\
|
||||
<ClCompile Include="..\parseint.c" />
|
||||
<ClCompile Include="..\pool.c" />
|
||||
<ClCompile Include="..\portset.c" />
|
||||
<ClCompile Include="..\queue.c" />
|
||||
<ClCompile Include="..\quota.c" />
|
||||
<ClCompile Include="..\radix.c" />
|
||||
<ClCompile Include="..\random.c" />
|
||||
|
||||
Reference in New Issue
Block a user