diff options
author | premysl <p.janouch@gmail.com> | 2015-08-06 09:11:58 +0200 |
---|---|---|
committer | premysl <p.janouch@gmail.com> | 2015-08-06 19:00:13 +0200 |
commit | 1a305a1c6b1608219334d7512fc09081c9066c9e (patch) | |
tree | 4be120e30a99c2846fa10cef2870ce6b8c3c1e5d | |
parent | 02708608a97d021576fd08b39c400a27a590c999 (diff) | |
download | liberty-1a305a1c6b1608219334d7512fc09081c9066c9e.tar.gz liberty-1a305a1c6b1608219334d7512fc09081c9066c9e.tar.xz liberty-1a305a1c6b1608219334d7512fc09081c9066c9e.zip |
Add kqueue support
Successfully tested on OpenBSD with ponymap.
-rw-r--r-- | liberty.c | 247 |
1 files changed, 245 insertions, 2 deletions
@@ -53,6 +53,14 @@ #include <netinet/in.h> #include <netdb.h> +#ifdef __unix__ +// This file may define the "BSD" macro... +#include <sys/param.h> +// ...as well as these conflicting ones +#undef MIN +#undef MAX +#endif // __unix__ + #ifndef NI_MAXHOST #define NI_MAXHOST 1025 #endif // ! NI_MAXHOST @@ -1440,7 +1448,242 @@ poller_run (struct poller *self) self->revents_len = 0; } -#else // ! __linux__ +#elif defined (BSD) + +// Mac OS X's kqueue is fatally broken, or so I've been told; leaving it out. +// Otherwise this is sort of similar to the epoll version. + +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> + +struct poller +{ + int kqueue_fd; ///< The kqueue FD + struct poller_fd **fds; ///< Information associated with each FD + struct kevent *revents; ///< Output array for kevent() + size_t len; ///< Number of polled descriptors + size_t alloc; ///< Number of entries allocated + + struct poller_timers timers; ///< Timeouts + struct poller_idle *idle; ///< Idle events + + int revents_len; ///< Number of entries in `revents' +}; + +static void +poller_init (struct poller *self) +{ + self->kqueue_fd = kqueue (); + hard_assert (self->kqueue_fd != -1); + set_cloexec (self->kqueue_fd); + + self->len = 0; + self->alloc = POLLER_MIN_ALLOC; + self->fds = xcalloc (self->alloc, sizeof *self->fds); + self->revents = xcalloc (self->alloc, sizeof *self->revents); + self->revents_len = 0; + + poller_timers_init (&self->timers); + self->idle = NULL; +} + +static void +poller_free (struct poller *self) +{ + poller_timers_free (&self->timers); + + xclose (self->kqueue_fd); + free (self->fds); + free (self->revents); +} + +static void +poller_ensure_space (struct poller *self) +{ + if (self->len < self->alloc) + return; + + self->alloc <<= 1; + hard_assert (self->alloc != 0); + + self->revents = xreallocarray + (self->revents, sizeof *self->revents, self->alloc); + self->fds = xreallocarray + (self->fds, sizeof *self->fds, self->alloc); +} + +static void +poller_set (struct poller *self, struct poller_fd *fd) +{ + hard_assert (fd->poller == self); + bool modifying = true; + if (fd->index == -1) + { + poller_ensure_space (self); + self->fds[fd->index = self->len++] = fd; + modifying = false; + } + + // We have to watch for readability and writeability separately; + // to simplify matters, we can just disable what we don't desire to receive + struct kevent changes[2]; + EV_SET (&changes[0], fd->fd, EVFILT_READ, EV_ADD, 0, 0, fd); + EV_SET (&changes[1], fd->fd, EVFILT_WRITE, EV_ADD, 0, 0, fd); + changes[0].flags |= (fd->events & POLLIN) ? EV_ENABLE : EV_DISABLE; + changes[1].flags |= (fd->events & POLLOUT) ? EV_ENABLE : EV_DISABLE; + + if (kevent (self->kqueue_fd, + changes, N_ELEMENTS (changes), NULL, 0, NULL) == -1) + exit_fatal ("%s: %s", "kevent", strerror (errno)); +} + +static int +poller_compare_fds (const void *ax, const void *bx) +{ + const struct kevent *ay = ax, *by = bx; + return (int) ay->ident - (int) by->ident; +} + +static void +poller_dummify (struct kevent *fd_event) +{ + fd_event->flags = USHRT_MAX; +} + +static void +poller_remove_from_dispatch (struct poller *self, const struct poller_fd *fd) +{ + if (!self->revents_len) + return; + + struct kevent key = { .ident = fd->fd }, *fd_event; + if (!(fd_event = bsearch (&key, self->revents, + self->revents_len, sizeof *self->revents, poller_compare_fds))) + return; + + // The FD may appear twice -- both for reading and writing + int index = fd_event - self->revents; + + if (index > 0 + && !poller_compare_fds (&key, fd_event - 1)) + poller_dummify (fd_event - 1); + + poller_dummify (fd_event); + + if (index < self->revents_len - 1 + && !poller_compare_fds (&key, fd_event + 1)) + poller_dummify (fd_event + 1); +} + +static void +poller_remove_at_index (struct poller *self, size_t index) +{ + hard_assert (index < self->len); + struct poller_fd *fd = self->fds[index]; + fd->index = -1; + + poller_remove_from_dispatch (self, fd); + + if (index != --self->len) + { + self->fds[index] = self->fds[self->len]; + self->fds[index]->index = index; + } + + if (fd->closed) + return; + + struct kevent changes[2]; + EV_SET (&changes[0], fd->fd, EVFILT_READ, EV_DELETE, 0, 0, fd); + EV_SET (&changes[1], fd->fd, EVFILT_WRITE, EV_DELETE, 0, 0, fd); + + if (kevent (self->kqueue_fd, + changes, N_ELEMENTS (changes), NULL, 0, NULL) == -1) + exit_fatal ("%s: %s", "kevent", strerror (errno)); +} + +static struct timespec +poller_timeout_to_timespec (int ms) +{ + return (struct timespec) + { + .tv_sec = ms / 1000, + .tv_nsec = (ms % 1000) * 1000 * 1000 + }; +} + +static short +poller_kqueue_to_poll_events (struct kevent *event) +{ + short result = 0; + if (event->filter == EVFILT_READ) + { + result |= POLLIN; + if ((event->flags & EV_EOF) && event->fflags) + result |= POLLERR; + } + if (event->filter == EVFILT_WRITE) result |= POLLOUT; + if (event->flags & EV_EOF) result |= POLLHUP; + return result; +} + +static void +poller_run (struct poller *self) +{ + // Not reentrant + hard_assert (!self->revents_len); + + int n_fds; + do + { + struct timespec ts = poller_timeout_to_timespec + (self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); + n_fds = kevent (self->kqueue_fd, + NULL, 0, self->revents, self->len, &ts); + } + while (n_fds == -1 && errno == EINTR); + + if (n_fds == -1) + exit_fatal ("%s: %s", "kevent", strerror (errno)); + + // Sort them by file descriptor number for binary search + qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds); + self->revents_len = n_fds; + + poller_timers_dispatch (&self->timers); + poller_idle_dispatch (self->idle); + + for (int i = 0; i < n_fds; i++) + { + struct kevent *event = self->revents + i; + if (event->flags == USHRT_MAX) + continue; + + struct poller_fd *fd = event->udata; + hard_assert (fd->index != -1); + + struct pollfd pfd; + pfd.fd = fd->fd; + pfd.revents = poller_kqueue_to_poll_events (event); + pfd.events = fd->events; + + // Read and write events are separate in the kqueue API -- merge them + int sibling = 1; + while (i + sibling < n_fds + && !poller_compare_fds (event, event + sibling)) + pfd.revents |= poller_kqueue_to_poll_events (event + sibling++); + if ((pfd.revents & POLLHUP) && (pfd.revents & POLLOUT)) + pfd.revents &= ~POLLOUT; + i += --sibling; + + fd->dispatcher (&pfd, fd->user_data); + } + + self->revents_len = 0; +} + +#else // ! BSD struct poller { @@ -1562,7 +1805,7 @@ poller_run (struct poller *self) self->dispatch_next = -1; } -#endif // ! __linux__ +#endif // ! BSD // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |