From efafe4fa7f495283af3f9b6699a5a748088bc51e Mon Sep 17 00:00:00 2001 From: Brian Wellington Date: Tue, 29 Aug 2000 23:58:17 +0000 Subject: [PATCH] Non-threaded socket manager. --- lib/isc/unix/socket.c | 353 +++++++++++++++++++++++++--------------- lib/isc/unix/socket_p.h | 29 ++++ 2 files changed, 255 insertions(+), 127 deletions(-) create mode 100644 lib/isc/unix/socket_p.h diff --git a/lib/isc/unix/socket.c b/lib/isc/unix/socket.c index e8ed1e2f65..8e79d8ffbf 100644 --- a/lib/isc/unix/socket.c +++ b/lib/isc/unix/socket.c @@ -15,7 +15,7 @@ * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -/* $Id: socket.c,v 1.160 2000/08/26 01:31:56 bwelling Exp $ */ +/* $Id: socket.c,v 1.161 2000/08/29 23:58:15 bwelling Exp $ */ #include @@ -38,7 +38,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -46,6 +48,10 @@ #include #include +#ifndef ISC_PLATFORM_USETHREADS +#include "socket_p.h" +#endif + /* * Some systems define the socket length argument as an int, some as size_t, * some as socklen_t. This is here so it can be easily changed if needed. @@ -180,16 +186,24 @@ struct isc_socketmgr { isc_mutex_t lock; /* Locked by manager lock. */ ISC_LIST(isc_socket_t) socklist; - isc_thread_t watcher; - isc_condition_t shutdown_ok; fd_set read_fds; fd_set write_fds; isc_socket_t *fds[FD_SETSIZE]; int fdstate[FD_SETSIZE]; int maxfd; +#ifdef ISC_PLATFORM_USETHREADS + isc_thread_t watcher; + isc_condition_t shutdown_ok; int pipe_fds[2]; +#else + unsigned int refs; +#endif }; +#ifndef ISC_PLATFORM_USETHREADS +static isc_socketmgr_t *socketmgr = NULL; +#endif + #define CLOSED 0 /* this one must be zero */ #define MANAGED 1 #define CLOSE_PENDING 2 @@ -272,6 +286,51 @@ socket_log(isc_socket_t *sock, isc_sockaddr_t *address, } } +static void +wakeup_socket(isc_socketmgr_t *manager, int fd) { + isc_event_t *ev2; + isc_socketevent_t *rev; + isc_socket_t *sock; + + /* + * This is a wakeup on a socket. Look at the event queue for both + * read and write, and decide if we need to watch on it now or not. + */ + INSIST(fd < FD_SETSIZE); + + if (manager->fdstate[fd] == CLOSE_PENDING) { + manager->fdstate[fd] = CLOSED; + FD_CLR(fd, &manager->read_fds); + FD_CLR(fd, &manager->write_fds); + close(fd); + return; + } + if (manager->fdstate[fd] != MANAGED) + return; + + sock = manager->fds[fd]; + + /* + * If there are no events, or there is an event but we + * have already queued up the internal event on a task's + * queue, clear the bit. Otherwise, set it. + */ + rev = ISC_LIST_HEAD(sock->recv_list); + ev2 = (isc_event_t *) ISC_LIST_HEAD(sock->accept_list); + if ((rev == NULL && ev2 == NULL) + || sock->pending_recv || sock->pending_accept) + FD_CLR(sock->fd, &manager->read_fds); + else + FD_SET(sock->fd, &manager->read_fds); + + rev = ISC_LIST_HEAD(sock->send_list); + if ((rev == NULL || sock->pending_send) && !sock->connecting) + FD_CLR(sock->fd, &manager->write_fds); + else + FD_SET(sock->fd, &manager->write_fds); +} + +#ifdef ISC_PLATFORM_USETHREADS /* * Poke the select loop when there is something for us to do. * We assume that if a write completes here, it will be inserted into the @@ -284,7 +343,7 @@ select_poke(isc_socketmgr_t *mgr, int msg) { do { cc = write(mgr->pipe_fds[1], &msg, sizeof(int)); } while (cc < 0 && SOFT_ERROR(errno)); - + if (cc < 0) FATAL_ERROR(__FILE__, __LINE__, "write() failed during watcher poke: %s", @@ -309,12 +368,25 @@ select_readmsg(isc_socketmgr_t *mgr) { FATAL_ERROR(__FILE__, __LINE__, "read() failed during watcher poke: %s", strerror(errno)); - + return (SELECT_POKE_NOTHING); } return (msg); } +#else +/* + * Update the state of the socketmgr when something changes. + */ +static void +select_poke(isc_socketmgr_t *manager, int msg) { + if (msg == SELECT_POKE_SHUTDOWN) + return; + else if (msg >= 0) + wakeup_socket(manager, msg); + return; +} +#endif /* * Make a fd non-blocking @@ -986,8 +1058,10 @@ destroy(isc_socket_t **sockp) { select_poke(manager, sock->fd); ISC_LIST_UNLINK(manager->socklist, sock, link); +#ifdef ISC_PLATFORM_USETHREADS if (ISC_LIST_EMPTY(manager->socklist)) SIGNAL(&manager->shutdown_ok); +#endif /* * XXX should reset manager->maxfd here @@ -1782,6 +1856,75 @@ internal_send(isc_task_t *me, isc_event_t *ev) { UNLOCK(&sock->lock); } +static void +process_fds(isc_socketmgr_t *manager, int maxfd, + fd_set *readfds, fd_set *writefds) +{ + int i; + isc_socket_t *sock; + isc_boolean_t unlock_sock; + + /* + * Process read/writes on other fds here. Avoid locking + * and unlocking twice if both reads and writes are possible. + */ + for (i = 0 ; i < maxfd ; i++) { +#ifdef ISC_PLATFORM_USETHREADS + if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1]) + continue; +#endif + + if (manager->fdstate[i] == CLOSE_PENDING) { + manager->fdstate[i] = CLOSED; + FD_CLR(i, &manager->read_fds); + FD_CLR(i, &manager->write_fds); + + close(i); + + continue; + } + + sock = manager->fds[i]; + unlock_sock = ISC_FALSE; + if (FD_ISSET(i, readfds)) { + if (sock == NULL) { + FD_CLR(i, &manager->read_fds); + goto check_write; + } + unlock_sock = ISC_TRUE; + LOCK(&sock->lock); + if (!SOCK_DEAD(sock)) { + if (sock->listener) + dispatch_accept(sock); + else + dispatch_recv(sock); + } + FD_CLR(i, &manager->read_fds); + } + check_write: + if (FD_ISSET(i, writefds)) { + if (sock == NULL) { + FD_CLR(i, &manager->write_fds); + continue; + } + if (!unlock_sock) { + unlock_sock = ISC_TRUE; + LOCK(&sock->lock); + } + if (!SOCK_DEAD(sock)) { + if (sock->connecting) + dispatch_connect(sock); + else + dispatch_send(sock); + } + FD_CLR(i, &manager->write_fds); + } + if (unlock_sock) + UNLOCK(&sock->lock); + } +} + +#ifdef ISC_PLATFORM_USETHREADS /* * This is the thread that will loop forever, always in a select or poll * call. @@ -1792,17 +1935,12 @@ internal_send(isc_task_t *me, isc_event_t *ev) { static isc_threadresult_t watcher(void *uap) { isc_socketmgr_t *manager = uap; - isc_socket_t *sock; isc_boolean_t done; int ctlfd; int cc; fd_set readfds; fd_set writefds; int msg; - isc_boolean_t unlock_sock; - int i; - isc_socketevent_t *rev; - isc_event_t *ev2; int maxfd; /* @@ -1866,122 +2004,12 @@ watcher(void *uap) { * and decide if we need to watch on it now * or not. */ - if (msg >= 0) { - INSIST(msg < FD_SETSIZE); - - if (manager->fdstate[msg] == - CLOSE_PENDING) { - manager->fdstate[msg] = CLOSED; - FD_CLR(msg, - &manager->read_fds); - FD_CLR(msg, - &manager->write_fds); - - close(msg); - - continue; - } - - if (manager->fdstate[msg] != MANAGED) - continue; - - sock = manager->fds[msg]; - - LOCK(&sock->lock); - - /* - * If there are no events, or there - * is an event but we have already - * queued up the internal event on a - * task's queue, clear the bit. - * Otherwise, set it. - */ - rev = ISC_LIST_HEAD(sock->recv_list); - ev2 = (isc_event_t *) - ISC_LIST_HEAD(sock->accept_list); - if ((rev == NULL && ev2 == NULL) - || sock->pending_recv - || sock->pending_accept) { - FD_CLR(sock->fd, - &manager->read_fds); - } else { - FD_SET(sock->fd, - &manager->read_fds); - } - - rev = ISC_LIST_HEAD(sock->send_list); - if ((rev == NULL - || sock->pending_send) - && !sock->connecting) { - FD_CLR(sock->fd, - &manager->write_fds); - } else { - FD_SET(sock->fd, - &manager->write_fds); - } - - UNLOCK(&sock->lock); - } + if (msg >= 0) + wakeup_socket(manager, msg); } } - /* - * Process read/writes on other fds here. Avoid locking - * and unlocking twice if both reads and writes are possible. - */ - for (i = 0 ; i < maxfd ; i++) { - if (i == manager->pipe_fds[0] - || i == manager->pipe_fds[1]) - continue; - - if (manager->fdstate[i] == CLOSE_PENDING) { - manager->fdstate[i] = CLOSED; - FD_CLR(i, &manager->read_fds); - FD_CLR(i, &manager->write_fds); - - close(i); - - continue; - } - - sock = manager->fds[i]; - unlock_sock = ISC_FALSE; - if (FD_ISSET(i, &readfds)) { - if (sock == NULL) { - FD_CLR(i, &manager->read_fds); - goto check_write; - } - unlock_sock = ISC_TRUE; - LOCK(&sock->lock); - if (!SOCK_DEAD(sock)) { - if (sock->listener) - dispatch_accept(sock); - else - dispatch_recv(sock); - } - FD_CLR(i, &manager->read_fds); - } - check_write: - if (FD_ISSET(i, &writefds)) { - if (sock == NULL) { - FD_CLR(i, &manager->write_fds); - continue; - } - if (!unlock_sock) { - unlock_sock = ISC_TRUE; - LOCK(&sock->lock); - } - if (!SOCK_DEAD(sock)) { - if (sock->connecting) - dispatch_connect(sock); - else - dispatch_send(sock); - } - FD_CLR(i, &manager->write_fds); - } - if (unlock_sock) - UNLOCK(&sock->lock); - } + process_fds(manager, maxfd, &readfds, &writefds); } manager_log(manager, TRACE, "watcher exiting"); @@ -1989,6 +2017,7 @@ watcher(void *uap) { UNLOCK(&manager->lock); return ((isc_threadresult_t)0); } +#endif /* * Create a new socket manager. @@ -1999,6 +2028,14 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { REQUIRE(managerp != NULL && *managerp == NULL); +#ifndef ISC_PLATFORM_USETHREADS + if (socketmgr != NULL) { + socketmgr->refs++; + *managerp = socketmgr; + return (ISC_R_SUCCESS); + } +#endif + manager = isc_mem_get(mctx, sizeof *manager); if (manager == NULL) return (ISC_R_NOMEMORY); @@ -2013,7 +2050,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { "isc_mutex_init() failed"); return (ISC_R_UNEXPECTED); } - +#ifdef ISC_PLATFORM_USETHREADS if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) { DESTROYLOCK(&manager->lock); isc_mem_put(mctx, manager, sizeof *manager); @@ -2040,16 +2077,24 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { #if 0 RUNTIME_CHECK(make_nonblock(manager->pipe_fds[1]) == ISC_R_SUCCESS); #endif +#else + manager->refs = 1; +#endif /* * Set up initial state for the select loop */ FD_ZERO(&manager->read_fds); FD_ZERO(&manager->write_fds); +#ifdef ISC_PLATFORM_USETHREADS FD_SET(manager->pipe_fds[0], &manager->read_fds); manager->maxfd = manager->pipe_fds[0]; +#else + manager->maxfd = 0; +#endif memset(manager->fdstate, 0, sizeof(manager->fdstate)); +#ifdef ISC_PLATFORM_USETHREADS /* * Start up the select/poll thread. */ @@ -2063,9 +2108,12 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { close(manager->pipe_fds[1]); return (ISC_R_UNEXPECTED); } - +#endif isc_mem_attach(mctx, &manager->mctx); +#ifndef ISC_PLATFORM_USETHREADS + socketmgr = manager; +#endif *managerp = manager; return (ISC_R_SUCCESS); @@ -2085,8 +2133,17 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { manager = *managerp; REQUIRE(VALID_MANAGER(manager)); +#ifndef ISC_PLATFORM_USETHREADS + if (manager->refs > 1) { + manager->refs--; + *managerp = NULL; + return; + } +#endif + LOCK(&manager->lock); +#ifdef ISC_PLATFORM_USETHREADS /* * Wait for all sockets to be destroyed. */ @@ -2094,33 +2151,47 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { manager_log(manager, CREATION, "sockets exist"); WAIT(&manager->shutdown_ok, &manager->lock); } +#else + /* + * Hope all sockets have been destroyed. + */ + if (!ISC_LIST_EMPTY(manager->socklist)) { + manager_log(manager, CREATION, "sockets exist"); + INSIST(0); + } +#endif UNLOCK(&manager->lock); /* * Here, poke our select/poll thread. Do this by closing the write * half of the pipe, which will send EOF to the read half. + * This is currently a no-op in the non-threaded case. */ select_poke(manager, SELECT_POKE_SHUTDOWN); +#ifdef ISC_PLATFORM_USETHREADS /* * Wait for thread to exit. */ if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS) UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_thread_join() failed"); +#endif /* * Clean up. */ +#ifdef ISC_PLATFORM_USETHREADS close(manager->pipe_fds[0]); close(manager->pipe_fds[1]); + (void)isc_condition_destroy(&manager->shutdown_ok); +#endif for (i = 0 ; i < FD_SETSIZE ; i++) if (manager->fdstate[i] == CLOSE_PENDING) close(i); - (void)isc_condition_destroy(&manager->shutdown_ok); DESTROYLOCK(&manager->lock); manager->magic = 0; mctx= manager->mctx; @@ -2652,6 +2723,7 @@ isc_socket_accept(isc_socket_t *sock, isc_task_t *ntask = NULL; isc_socket_t *nsock; isc_result_t ret; + isc_boolean_t do_poke = ISC_FALSE; REQUIRE(VALID_SOCKET(sock)); manager = sock->manager; @@ -2697,10 +2769,13 @@ isc_socket_accept(isc_socket_t *sock, * bit of time waking it up now or later won't matter all that much. */ if (ISC_LIST_EMPTY(sock->accept_list)) - select_poke(manager, sock->fd); + do_poke = ISC_TRUE; ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link); + if (do_poke) + select_poke(manager, sock->fd); + UNLOCK(&sock->lock); return (ISC_R_SUCCESS); } @@ -3225,3 +3300,27 @@ isc_socket_isbound(isc_socket_t *sock) { return (val); } + +#ifndef ISC_PLATFORM_USETHREADS +void +isc__socketmgr_getfdsets(fd_set *readset, fd_set *writeset, int *maxfd) { + if (socketmgr == NULL) + *maxfd = 0; + else { + *readset = socketmgr->read_fds; + *writeset = socketmgr->write_fds; + *maxfd = socketmgr->maxfd + 1; + } +} + +isc_result_t +isc__socketmgr_dispatch(fd_set *readset, fd_set *writeset, int maxfd) { + isc_socketmgr_t *manager = socketmgr; + + if (manager == NULL) + return (ISC_R_NOTFOUND); + + process_fds(manager, maxfd, readset, writeset); + return (ISC_R_SUCCESS); +} +#endif diff --git a/lib/isc/unix/socket_p.h b/lib/isc/unix/socket_p.h new file mode 100644 index 0000000000..c3842c7876 --- /dev/null +++ b/lib/isc/unix/socket_p.h @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2000 Internet Software Consortium. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SOFTWARE CONSORTIUM + * DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL + * INTERNET SOFTWARE CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, + * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING + * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, + * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION + * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* $Id: socket_p.h,v 1.1 2000/08/29 23:58:17 bwelling Exp $ */ + +#ifndef ISC_SOCKET_P_H +#define ISC_SOCKET_P_H + +void +isc__socketmgr_getfdsets(fd_set *readset, fd_set *writeset, int *maxfd); + +isc_result_t +isc__socketmgr_dispatch(fd_set *readset, fd_set *writeset, int maxfd); + +#endif /* ISC_SOCKET_P_H */