Multiple network event loop threads support for Solarises /dev/poll.

This commit is contained in:
Witold Krecicki
2018-10-04 16:11:24 +02:00
committed by Witold Kręcicki
parent 9c926a5d9b
commit 7223790380

View File

@@ -569,6 +569,14 @@ static const isc_statscounter_t rawstatsindex[] = {
isc_sockstatscounter_rawactive
};
static int
gen_threadid(isc__socket_t *sock);
static int
gen_threadid(isc__socket_t *sock) {
return sock->fd % sock->manager->nthreads;
}
static void
manager_log(isc__socketmgr_t *sockmgr,
isc_logcategory_t *category, isc_logmodule_t *module, int level,
@@ -718,7 +726,6 @@ watch_fd(isc__socketthread_t *thread, int fd, int msg) {
return (result);
#elif defined(USE_DEVPOLL)
struct pollfd pfd;
INSIST(threadid == 0);
int lockid = FDLOCK_ID(fd);
memset(&pfd, 0, sizeof(pfd));
@@ -1782,9 +1789,8 @@ doio_send(isc__socket_t *sock, isc_socketevent_t *dev) {
* references exist.
*/
static void
socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
socketclose(isc__socketthread_t *thread, isc__socket_t *sock, int fd) {
int lockid = FDLOCK_ID(fd);
isc__socketthread_t *thread = &manager->threads[sock->threadid];
/*
* No one has this socket open, so the watcher doesn't have to be
* poked, and the socket doesn't have to be locked.
@@ -1793,11 +1799,11 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
thread->fds[fd] = NULL;
thread->fdstate[fd] = CLOSE_PENDING;
UNLOCK(&thread->fdlock[lockid]);
select_poke(manager, sock->threadid, fd, SELECT_POKE_CLOSE);
select_poke(thread->manager, thread->threadid, fd, SELECT_POKE_CLOSE);
inc_stats(manager->stats, sock->statsindex[STATID_CLOSE]);
inc_stats(thread->manager->stats, sock->statsindex[STATID_CLOSE]);
if (sock->active == 1) {
dec_stats(manager->stats, sock->statsindex[STATID_ACTIVE]);
dec_stats(thread->manager->stats, sock->statsindex[STATID_ACTIVE]);
sock->active = 0;
}
@@ -1806,7 +1812,7 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
* efficiently)
*/
#ifdef USE_SELECT
LOCK(&manager->lock);
LOCK(&thread->manager->lock);
if (thread->maxfd == fd) {
int i;
@@ -1826,7 +1832,7 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
thread->maxfd = thread->pipe_fds[0];
}
UNLOCK(&manager->lock);
UNLOCK(&thread->manager->lock);
#endif /* USE_SELECT */
}
@@ -1835,6 +1841,7 @@ destroy(isc__socket_t **sockp) {
int fd;
isc__socket_t *sock = *sockp;
isc__socketmgr_t *manager = sock->manager;
isc__socketthread_t *thread;
socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_DESTROYING, "destroying");
@@ -1847,8 +1854,10 @@ destroy(isc__socket_t **sockp) {
if (sock->fd >= 0) {
fd = sock->fd;
thread = &manager->threads[sock->threadid];
sock->fd = -1;
socketclose(manager, sock, fd);
sock->threadid = -1;
socketclose(thread, sock, fd);
}
LOCK(&manager->lock);
@@ -1883,6 +1892,7 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type,
sock->manager = manager;
sock->type = type;
sock->fd = -1;
sock->threadid = -1;
sock->dscp = 0; /* TOS/TCLASS is zero until set. */
sock->dupped = 0;
sock->statsindex = NULL;
@@ -2579,7 +2589,10 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type,
return (result);
}
sock->threadid = sock->fd % manager->nthreads; // TODO?
if (sock->fd == -1) {
abort();
}
sock->threadid = gen_threadid(sock);
sock->references = 1;
thread = &manager->threads[sock->threadid];
*socketp = (isc_socket_t *)sock;
@@ -2668,7 +2681,7 @@ isc_socket_open(isc_socket_t *sock0) {
if (result != ISC_R_SUCCESS) {
sock->fd = -1;
} else {
sock->threadid = sock->fd % sock->manager->nthreads; // TODO?
sock->threadid = gen_threadid(sock);
thread = &sock->manager->threads[sock->threadid];
int lockid = FDLOCK_ID(sock->fd);
@@ -2745,7 +2758,7 @@ isc_socket_close(isc_socket_t *sock0) {
isc__socket_t *sock = (isc__socket_t *)sock0;
int fd;
isc__socketmgr_t *manager;
isc__socketthread_t *thread;
fflush(stdout);
REQUIRE(VALID_SOCKET(sock));
@@ -2760,8 +2773,11 @@ isc_socket_close(isc_socket_t *sock0) {
INSIST(ISC_LIST_EMPTY(sock->connect_list));
manager = sock->manager;
thread = &manager->threads[sock->threadid];
fd = sock->fd;
sock->fd = -1;
sock->threadid = -1;
sock->dupped = 0;
memset(sock->name, 0, sizeof(sock->name));
sock->tag = NULL;
@@ -2773,7 +2789,7 @@ isc_socket_close(isc_socket_t *sock0) {
UNLOCK(&sock->lock);
socketclose(manager, sock, fd);
socketclose(thread, sock, fd);
return (ISC_R_SUCCESS);
}
@@ -3038,7 +3054,7 @@ internal_accept(isc__socket_t *sock) {
int lockid = FDLOCK_ID(fd);
NEWCONNSOCK(dev)->fd = fd;
NEWCONNSOCK(dev)->threadid = fd % manager->nthreads; // TODO
NEWCONNSOCK(dev)->threadid = gen_threadid(NEWCONNSOCK(dev));
NEWCONNSOCK(dev)->bound = 1;
NEWCONNSOCK(dev)->connected = 1;
nthread = &manager->threads[NEWCONNSOCK(dev)->threadid];
@@ -3371,17 +3387,16 @@ process_fds(isc__socketthread_t *thread, struct pollfd *events,
int i;
bool done = false;
bool have_ctlevent = false;
INSIST(threadid == 0);
if (nevents == thread->nevents) {
thread_log(manager, ISC_LOGCATEGORY_GENERAL,
thread_log(thread, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
"maximum number of FD events (%d) received",
nevents);
}
for (i = 0; i < nevents; i++) {
REQUIRE(events[i].fd < (int)manager->maxsocks);
REQUIRE(events[i].fd < (int)thread->manager->maxsocks);
if (events[i].fd == thread->pipe_fds[0]) {
have_ctlevent = true;
continue;
@@ -3476,7 +3491,6 @@ netthread(void *uap) {
#elif defined (USE_EPOLL)
const char *fnname = "epoll_wait()";
#elif defined(USE_DEVPOLL)
INSIST(threadid == 0);
isc_result_t result;
const char *fnname = "ioctl(DP_POLL)";
struct dvpoll dvp;
@@ -3511,16 +3525,16 @@ netthread(void *uap) {
/*
* Re-probe every thousand calls.
*/
if (manager->calls++ > 1000U) {
if (thread->calls++ > 1000U) {
result = isc_resource_getcurlimit(
isc_resource_openfiles,
&manager->open_max);
&thread->open_max);
if (result != ISC_R_SUCCESS)
manager->open_max = 64;
manager->calls = 0;
thread->open_max = 64;
thread->calls = 0;
}
for (pass = 0; pass < 2; pass++) {
dvp.dp_fds = tgread->events;
dvp.dp_fds = thread->events;
dvp.dp_nfds = thread->nevents;
if (dvp.dp_nfds >= thread->open_max)
dvp.dp_nfds = thread->open_max - 1;
@@ -3541,9 +3555,9 @@ netthread(void *uap) {
*/
result = isc_resource_getcurlimit(
isc_resource_openfiles,
&manager->open_max);
&thread->open_max);
if (result != ISC_R_SUCCESS)
manager->open_max = 64;
thread->open_max = 64;
} else
break;
}
@@ -3754,29 +3768,31 @@ setup_thread(isc__socketthread_t *thread) {
#elif defined(USE_DEVPOLL)
thread->nevents = ISC_SOCKET_MAXEVENTS;
result = isc_resource_getcurlimit(isc_resource_openfiles,
&manager->open_max);
&thread->open_max);
if (result != ISC_R_SUCCESS)
manager->open_max = 64;
manager->calls = 0;
manager->events = isc_mem_get(thread->manager->mctx,
sizeof(struct pollfd) *
manager->nevents);
if (manager->events == NULL)
thread->open_max = 64;
thread->calls = 0;
thread->events = isc_mem_get(thread->manager->mctx,
sizeof(struct pollfd) *
thread->nevents);
if (thread->events == NULL)
return (ISC_R_NOMEMORY);
/*
* Note: fdpollinfo should be able to support all possible FDs, so
* it must have maxsocks entries (not nevents).
*/
manager->fdpollinfo = isc_mem_get(mctx, sizeof(pollinfo_t) *
manager->maxsocks);
if (manager->fdpollinfo == NULL) {
isc_mem_put(mctx, manager->events,
sizeof(struct pollfd) * manager->nevents);
thread->fdpollinfo = isc_mem_get(thread->manager->mctx,
sizeof(pollinfo_t) *
thread->manager->maxsocks);
if (thread->fdpollinfo == NULL) {
isc_mem_put(thread->manager->mctx, thread->events,
sizeof(struct pollfd) * thread->nevents);
return (ISC_R_NOMEMORY);
}
memset(manager->fdpollinfo, 0, sizeof(pollinfo_t) * manager->maxsocks);
manager->devpoll_fd = open("/dev/poll", O_RDWR);
if (manager->devpoll_fd == -1) {
memset(thread->fdpollinfo, 0, sizeof(pollinfo_t) *
thread->manager->maxsocks);
thread->devpoll_fd = open("/dev/poll", O_RDWR);
if (thread->devpoll_fd == -1) {
result = isc__errno2result(errno);
strerror_r(errno, strbuf, sizeof(strbuf));
UNEXPECTED_ERROR(__FILE__, __LINE__,
@@ -3784,19 +3800,19 @@ setup_thread(isc__socketthread_t *thread) {
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"),
strbuf);
isc_mem_put(mctx, manager->events,
sizeof(struct pollfd) * manager->nevents);
isc_mem_put(mctx, manager->fdpollinfo,
sizeof(pollinfo_t) * manager->maxsocks);
isc_mem_put(thread->manager->mctx, thread->events,
sizeof(struct pollfd) * thread->nevents);
isc_mem_put(thread->manager->mctx, thread->fdpollinfo,
sizeof(pollinfo_t) * thread->manager->maxsocks);
return (result);
}
result = watch_fd(manager, 0, manager->pipe_fds[0], SELECT_POKE_READ);
result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ);
if (result != ISC_R_SUCCESS) {
close(manager->devpoll_fd);
isc_mem_put(mctx, manager->events,
sizeof(struct pollfd) * manager->nevents);
isc_mem_put(mctx, manager->fdpollinfo,
sizeof(pollinfo_t) * manager->maxsocks);
close(thread->devpoll_fd);
isc_mem_put(thread->manager->mctx, thread->events,
sizeof(struct pollfd) * thread->nevents);
isc_mem_put(thread->manager->mctx, thread->fdpollinfo,
sizeof(pollinfo_t) * thread->manager->maxsocks);
return (result);
}
@@ -3884,7 +3900,7 @@ cleanup_thread(isc_mem_t *mctx, isc__socketthread_t *thread) {
isc_mem_put(mctx, thread->events,
sizeof(struct pollfd) * thread->nevents);
isc_mem_put(mctx, thread->fdpollinfo,
sizeof(pollinfo_t) * thread->maxsocks);
sizeof(pollinfo_t) * thread->manager->maxsocks);
#elif defined(USE_SELECT)
if (thread->read_fds != NULL)
isc_mem_put(mctx, thread->read_fds, thread->fd_bufsize);