From e8fe0dad81378920c6c537013f8759dfe9144bac Mon Sep 17 00:00:00 2001 From: Přemysl Janouch Date: Thu, 12 Feb 2015 01:53:03 +0100 Subject: Import optimized event loop from ponymap --- common.c | 452 +++++++++++++++++++++++++++++++++++++------------------------- kike.c | 83 ++++++++---- zyklonb.c | 101 ++++++++------ 3 files changed, 391 insertions(+), 245 deletions(-) diff --git a/common.c b/common.c index e67f57a..3bbdd78 100644 --- a/common.c +++ b/common.c @@ -813,42 +813,61 @@ xclose (int fd) break; } -// --- Polling ----------------------------------------------------------------- +// --- Event loop -------------------------------------------------------------- // Basically the poor man's GMainLoop/libev/libuv. It might make some sense // to instead use those tested and proven libraries but we don't need much // and it's interesting to implement. -// At the moment the FD's are stored in an unsorted array. This is not ideal -// complexity-wise but I don't think I have much of a choice with poll(), -// and neither with epoll for that matter. -// -// unsorted array sorted array -// search O(n) O(log n) [O(log log n)] -// insert by fd O(n) O(n) -// delete by fd O(n) O(n) -// -// Insertion in the unsorted array can be reduced to O(1) if I maintain a -// bitmap of present FD's but that's still not a huge win. -// -// I don't expect this to be much of an issue, as there are typically not going -// to be that many FD's to watch, and the linear approach is cache-friendly. +// Actually it mustn't be totally shitty as scanning exercises it quite a bit. +// We sacrifice some memory to allow for O(1) and O(log n) operations. -typedef void (*poller_dispatcher_fn) (const struct pollfd *, void *); +typedef void (*poller_fd_fn) (const struct pollfd *, void *); typedef void (*poller_timer_fn) (void *); +typedef void (*poller_idle_fn) (void *); #define POLLER_MIN_ALLOC 16 -struct poller_timer_info +struct poller_timer { + struct poller_timers *timers; ///< The timers part of our poller + ssize_t index; ///< Where we are in the array, or -1 + int64_t when; ///< When is the timer to expire + poller_timer_fn dispatcher; ///< Event dispatcher void *user_data; ///< User data }; +struct poller_fd +{ + struct poller *poller; ///< Our poller + ssize_t index; ///< Where we are in the array, or -1 + + int fd; ///< Our file descriptor + short events; ///< The poll() events we registered for + bool closed; ///< Whether fd has been closed already + + poller_fd_fn dispatcher; ///< Event dispatcher + void *user_data; ///< User data +}; + +struct poller_idle +{ + LIST_HEADER (poller_idle) + struct poller *poller; ///< Our poller + + bool active; ///< Whether we're on the list + + poller_idle_fn dispatcher; ///< Event dispatcher + void *user_data; ///< User data +}; + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + struct poller_timers { - struct poller_timer_info *info; ///< Min-heap of timers + struct poller_timer **heap; ///< Min-heap of timers size_t len; ///< Number of scheduled timers size_t alloc; ///< Number of timers allocated }; @@ -858,13 +877,13 @@ poller_timers_init (struct poller_timers *self) { self->alloc = POLLER_MIN_ALLOC; self->len = 0; - self->info = xmalloc (self->alloc * sizeof *self->info); + self->heap = xmalloc (self->alloc * sizeof *self->heap); } static void poller_timers_free (struct poller_timers *self) { - free (self->info); + free (self->heap); } static int64_t @@ -884,28 +903,30 @@ poller_timers_get_current_time (void) static void poller_timers_heapify_down (struct poller_timers *self, size_t index) { - typedef struct poller_timer_info info_t; - info_t *end = self->info + self->len; + typedef struct poller_timer *timer_t; + timer_t *end = self->heap + self->len; while (true) { - info_t *parent = self->info + index; - info_t *left = self->info + 2 * index + 1; - info_t *right = self->info + 2 * index + 2; + timer_t *parent = self->heap + index; + timer_t *left = self->heap + 2 * index + 1; + timer_t *right = self->heap + 2 * index + 2; - info_t *lowest = parent; - if (left < end && left->when < lowest->when) + timer_t *lowest = parent; + if (left < end && (*left) ->when < (*lowest)->when) lowest = left; - if (right < end && right->when < lowest->when) + if (right < end && (*right)->when < (*lowest)->when) lowest = right; if (parent == lowest) break; - info_t tmp = *parent; + timer_t tmp = *parent; *parent = *lowest; *lowest = tmp; - index = lowest - self->info; + (*parent)->index = parent - self->heap; + (*lowest)->index = lowest - self->heap; + index = lowest - self->heap; } } @@ -913,10 +934,12 @@ static void poller_timers_remove_at_index (struct poller_timers *self, size_t index) { hard_assert (index < self->len); + self->heap[index]->index = -1; if (index == --self->len) return; - self->info[index] = self->info[self->len]; + self->heap[index] = self->heap[self->len]; + self->heap[index]->index = index; poller_timers_heapify_down (self, index); } @@ -924,11 +947,11 @@ static void poller_timers_dispatch (struct poller_timers *self) { int64_t now = poller_timers_get_current_time (); - while (self->len && self->info->when <= now) + while (self->len && self->heap[0]->when <= now) { - struct poller_timer_info info = *self->info; + struct poller_timer *timer = self->heap[0]; poller_timers_remove_at_index (self, 0); - info.dispatcher (info.user_data); + timer->dispatcher (timer->user_data); } } @@ -938,49 +961,35 @@ poller_timers_heapify_up (struct poller_timers *self, size_t index) while (index != 0) { size_t parent = (index - 1) / 2; - if (self->info[parent].when <= self->info[index].when) + if (self->heap[parent]->when <= self->heap[index]->when) break; - struct poller_timer_info tmp = self->info[parent]; - self->info[parent] = self->info[index]; - self->info[index] = tmp; + struct poller_timer *tmp = self->heap[parent]; + self->heap[parent] = self->heap[index]; + self->heap[index] = tmp; + self->heap[parent]->index = parent; + self->heap[index] ->index = index; index = parent; } } -static ssize_t -poller_timers_find (struct poller_timers *self, - poller_timer_fn dispatcher, void *data) -{ - // NOTE: there may be duplicates. - for (size_t i = 0; i < self->len; i++) - if (self->info[i].dispatcher == dispatcher - && self->info[i].user_data == data) - return i; - return -1; -} - -static ssize_t -poller_timers_find_by_data (struct poller_timers *self, void *data) -{ - for (size_t i = 0; i < self->len; i++) - if (self->info[i].user_data == data) - return i; - return -1; -} - static void -poller_timers_add (struct poller_timers *self, - poller_timer_fn dispatcher, void *data, int timeout_ms) +poller_timers_set (struct poller_timers *self, struct poller_timer *timer) { - if (self->len == self->alloc) - self->info = xreallocarray (self->info, - self->alloc <<= 1, sizeof *self->info); + hard_assert (timer->timers == self); + if (timer->index != -1) + { + poller_timers_heapify_down (self, timer->index); + poller_timers_heapify_up (self, timer->index); + return; + } - self->info[self->len] = (struct poller_timer_info) { - .when = poller_timers_get_current_time () + timeout_ms, - .dispatcher = dispatcher, .user_data = data }; + if (self->len == self->alloc) + self->heap = xreallocarray (self->heap, + self->alloc <<= 1, sizeof *self->heap); + self->heap[self->len] = timer; + timer->index = self->len; poller_timers_heapify_up (self, self->len++); } @@ -990,7 +999,7 @@ poller_timers_get_poll_timeout (struct poller_timers *self) if (!self->len) return -1; - int64_t timeout = self->info->when - poller_timers_get_current_time (); + int64_t timeout = self->heap[0]->when - poller_timers_get_current_time (); if (timeout <= 0) return 0; if (timeout > INT_MAX) @@ -998,35 +1007,37 @@ poller_timers_get_poll_timeout (struct poller_timers *self) return timeout; } -#ifdef __linux__ +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -// I don't really need this, I've basically implemented this just because I can. +static void +poller_idle_dispatch (struct poller_idle *list) +{ + struct poller_idle *iter, *next; + for (iter = list; iter; iter = next) + { + next = iter->next; + iter->dispatcher (iter->user_data); + } +} -#include +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -struct poller_info -{ - int fd; ///< Our file descriptor - short events; ///< The poll() events we registered for - poller_dispatcher_fn dispatcher; ///< Event dispatcher - void *user_data; ///< User data -}; +#ifdef __linux__ +#include struct poller { int epoll_fd; ///< The epoll FD - struct poller_info **info; ///< Information associated with each FD + struct poller_fd **fds; ///< Information associated with each FD + int *dummy; ///< For poller_remove_from_dispatch() struct epoll_event *revents; ///< Output array for epoll_wait() size_t len; ///< Number of polled descriptors size_t alloc; ///< Number of entries allocated struct poller_timers timers; ///< Timeouts + struct poller_idle *idle; ///< Idle events - /// Index of the element in `revents' that's about to be dispatched next. - int dispatch_next; - - /// The total number of entries stored in `revents' by epoll_wait(). - int dispatch_total; + int revents_len; ///< Number of entries in `revents' }; static void @@ -1038,13 +1049,13 @@ poller_init (struct poller *self) self->len = 0; self->alloc = POLLER_MIN_ALLOC; - self->info = xcalloc (self->alloc, sizeof *self->info); + self->fds = xcalloc (self->alloc, sizeof *self->fds); + self->dummy = xcalloc (self->alloc, sizeof *self->dummy); self->revents = xcalloc (self->alloc, sizeof *self->revents); + self->revents_len = 0; poller_timers_init (&self->timers); - - self->dispatch_next = 0; - self->dispatch_total = 0; + self->idle = NULL; } static void @@ -1052,28 +1063,19 @@ poller_free (struct poller *self) { for (size_t i = 0; i < self->len; i++) { - struct poller_info *info = self->info[i]; + struct poller_fd *fd = self->fds[i]; hard_assert (epoll_ctl (self->epoll_fd, - EPOLL_CTL_DEL, info->fd, (void *) "") != -1); - free (info); + EPOLL_CTL_DEL, fd->fd, (void *) "") != -1); } poller_timers_free (&self->timers); xclose (self->epoll_fd); - free (self->info); + free (self->fds); + free (self->dummy); free (self->revents); } -static ssize_t -poller_find_by_fd (struct poller *self, int fd) -{ - for (size_t i = 0; i < self->len; i++) - if (self->info[i]->fd == fd) - return i; - return -1; -} - static void poller_ensure_space (struct poller *self) { @@ -1085,8 +1087,10 @@ poller_ensure_space (struct poller *self) self->revents = xreallocarray (self->revents, sizeof *self->revents, self->alloc); - self->info = xreallocarray - (self->info, sizeof *self->info, self->alloc); + self->fds = xreallocarray + (self->fds, sizeof *self->fds, self->alloc); + self->dummy = xreallocarray + (self->dummy, sizeof *self->dummy, self->alloc); } static short @@ -1114,118 +1118,122 @@ poller_poll_to_epoll_events (short events) } static void -poller_set (struct poller *self, int fd, short events, - poller_dispatcher_fn dispatcher, void *data) +poller_set (struct poller *self, struct poller_fd *fd) { - ssize_t index = poller_find_by_fd (self, fd); + hard_assert (fd->poller == self); bool modifying = true; - if (index == -1) + if (fd->index == -1) { poller_ensure_space (self); - self->info[index = self->len++] = xcalloc (1, sizeof **self->info); + self->fds[fd->index = self->len++] = fd; modifying = false; } - struct poller_info *info = self->info[index]; - info->fd = fd; - info->events = events; - info->dispatcher = dispatcher; - info->user_data = data; - struct epoll_event event; - event.events = poller_poll_to_epoll_events (events); - event.data.ptr = info; + event.events = poller_poll_to_epoll_events (fd->events); + event.data.ptr = fd; hard_assert (epoll_ctl (self->epoll_fd, - modifying ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &event) != -1); + modifying ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd->fd, &event) != -1); +} + +static int +poller_compare_fds (const void *ax, const void *bx) +{ + const struct epoll_event *ay = ax, *by = bx; + struct poller_fd *a = ay->data.ptr, *b = by->data.ptr; + return a->fd - b->fd; } static void -poller_remove_from_dispatch (struct poller *self, - const struct poller_info *info) +poller_remove_from_dispatch (struct poller *self, const struct poller_fd *fd) { - if (!self->dispatch_total) + if (!self->revents_len) return; - int i; - for (i = self->dispatch_next; i < self->dispatch_total; i++) - if (self->revents[i].data.ptr == info) - break; - if (i == self->dispatch_total) - return; + struct epoll_event key = { .data.ptr = (void *) fd }, *fd_event; + if ((fd_event = bsearch (&key, self->revents, + self->revents_len, sizeof *self->revents, poller_compare_fds))) + { + fd_event->events = -1; - if (i != --self->dispatch_total) - self->revents[i] = self->revents[self->dispatch_total]; + // Don't let any further bsearch()'s touch possibly freed memory + int *dummy = self->dummy + (fd_event - self->revents); + *dummy = fd->fd; + fd_event->data.ptr = + (uint8_t *) dummy - offsetof (struct poller_fd, fd); + } } static void poller_remove_at_index (struct poller *self, size_t index) { hard_assert (index < self->len); - struct poller_info *info = self->info[index]; + struct poller_fd *fd = self->fds[index]; + fd->index = -1; - poller_remove_from_dispatch (self, info); - hard_assert (epoll_ctl (self->epoll_fd, - EPOLL_CTL_DEL, info->fd, (void *) "") != -1); + poller_remove_from_dispatch (self, fd); + if (!fd->closed) + hard_assert (epoll_ctl (self->epoll_fd, + EPOLL_CTL_DEL, fd->fd, (void *) "") != -1); - free (info); if (index != --self->len) - self->info[index] = self->info[self->len]; + { + self->fds[index] = self->fds[self->len]; + self->fds[index]->index = index; + } } static void poller_run (struct poller *self) { // Not reentrant - hard_assert (!self->dispatch_total); + hard_assert (!self->revents_len); int n_fds; do n_fds = epoll_wait (self->epoll_fd, self->revents, self->len, - poller_timers_get_poll_timeout (&self->timers)); + self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); while (n_fds == -1 && errno == EINTR); if (n_fds == -1) exit_fatal ("%s: %s", "epoll", strerror (errno)); - self->dispatch_next = 0; - self->dispatch_total = n_fds; + // Sort them by file descriptor number for binary search + qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds); + self->revents_len = n_fds; poller_timers_dispatch (&self->timers); + poller_idle_dispatch (self->idle); - while (self->dispatch_next < self->dispatch_total) + for (int i = 0; i < n_fds; i++) { - struct epoll_event *revents = self->revents + self->dispatch_next; - struct poller_info *info = revents->data.ptr; + struct epoll_event *revents = self->revents + i; + if (revents->events == (uint32_t) -1) + continue; + struct poller_fd *fd = revents->data.ptr; struct pollfd pfd; - pfd.fd = info->fd; + pfd.fd = fd->fd; pfd.revents = poller_epoll_to_poll_events (revents->events); - pfd.events = info->events; + pfd.events = fd->events; - self->dispatch_next++; - info->dispatcher (&pfd, info->user_data); + fd->dispatcher (&pfd, fd->user_data); } - self->dispatch_next = 0; - self->dispatch_total = 0; + self->revents_len = 0; } #else // !__linux__ -struct poller_info -{ - poller_dispatcher_func dispatcher; ///< Event dispatcher - void *user_data; ///< User data -}; - struct poller { struct pollfd *fds; ///< Polled descriptors - struct poller_info *fds_info; ///< Additional information for each FD + struct poller_fd **fds_data; ///< Additional information for each FD size_t len; ///< Number of polled descriptors size_t alloc; ///< Number of entries allocated struct poller_timers timers; ///< Timers + struct poller_idle *idle; ///< Idle events int dispatch_next; ///< The next dispatched FD or -1 }; @@ -1235,7 +1243,7 @@ poller_init (struct poller *self) self->alloc = POLLER_MIN_ALLOC; self->len = 0; self->fds = xcalloc (self->alloc, sizeof *self->fds); - self->fds_info = xcalloc (self->alloc, sizeof *self->fds_info); + self->fds_data = xcalloc (self->alloc, sizeof *self->fds_data); poller_timers_init (&self->timers); self->dispatch_next = -1; } @@ -1244,19 +1252,10 @@ static void poller_free (struct poller *self) { free (self->fds); - free (self->fds_info); + free (self->fds_data); poller_timers_free (&self->timers); } -static ssize_t -poller_find_by_fd (struct poller *self, int fd) -{ - for (size_t i = 0; i < self->len; i++) - if (self->fds[i].fd == fd) - return i; - return -1; -} - static void poller_ensure_space (struct poller *self) { @@ -1265,33 +1264,33 @@ poller_ensure_space (struct poller *self) self->alloc <<= 1; self->fds = xreallocarray (self->fds, sizeof *self->fds, self->alloc); - self->fds_info = xreallocarray - (self->fds_info, sizeof *self->fds_info, self->alloc); + self->fds_data = xreallocarray + (self->fds_data, sizeof *self->fds_data, self->alloc); } static void -poller_set (struct poller *self, int fd, short events, - poller_dispatcher_func dispatcher, void *data) +poller_set (struct poller *self, struct poller_fd *fd) { - ssize_t index = poller_find_by_fd (self, fd); - if (index == -1) + hard_assert (fd->poller == self); + if (fd->index == -1) { poller_ensure_space (self); - index = self->len++; + self->fds_data[fd->index = self->len++] = fd; } - struct pollfd *new_entry = self->fds + index; + struct pollfd *new_entry = self->fds + fd->index; memset (new_entry, 0, sizeof *new_entry); - new_entry->fd = fd; - new_entry->events = events; - - self->fds_info[index] = (struct poller_info) { dispatcher, data }; + new_entry->fd = fd->fd; + new_entry->events = fd->events; } static void poller_remove_at_index (struct poller *self, size_t index) { hard_assert (index < self->len); + struct poller_fd *fd = self->fds_data[index]; + fd->index = -1; + if (index == --self->len) return; @@ -1300,14 +1299,18 @@ poller_remove_at_index (struct poller *self, size_t index) { memmove (self->fds + index, self->fds + index + 1, (self->len - index) * sizeof *self->fds); - memmove (self->fds_info + index, self->fds_info + index + 1, - (self->len - index) * sizeof *self->fds_info); + memmove (self->fds_data + index, self->fds_data + index + 1, + (self->len - index) * sizeof *self->fds_data); + for (size_t i = index; i < self->len; i++) + self->fds_data[i]->index = i; + self->dispatch_next--; } else { - self->fds[index] = self->fds[self->len]; - self->fds_info[index] = self->fds_info[self->len]; + self->fds[index] = self->fds [self->len]; + self->fds_data[index] = self->fds_data[self->len]; + self->fds_data[index]->index = index; } } @@ -1320,21 +1323,22 @@ poller_run (struct poller *self) int result; do result = poll (self->fds, self->len, - poller_timers_get_poll_timeout (&self->timers)); + self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); while (result == -1 && errno == EINTR); if (result == -1) exit_fatal ("%s: %s", "poll", strerror (errno)); poller_timers_dispatch (&self->timers); + poller_idle_dispatch (self->idle); for (int i = 0; i < (int) self->len; ) { struct pollfd pfd = self->fds[i]; - struct poller_info *info = self->fds_info + i; + struct poller_fd *fd = self->fds_data[i]; self->dispatch_next = ++i; if (pfd.revents) - info->dispatcher (&pfd, info->user_data); + fd->dispatcher (&pfd, fd->user_data); i = self->dispatch_next; } @@ -1343,6 +1347,86 @@ poller_run (struct poller *self) #endif // !__linux__ +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +poller_timer_init (struct poller_timer *self, struct poller *poller) +{ + memset (self, 0, sizeof *self); + self->timers = &poller->timers; + self->index = -1; +} + +static void +poller_timer_set (struct poller_timer *self, int timeout_ms) +{ + self->when = poller_timers_get_current_time () + timeout_ms; + poller_timers_set (self->timers, self); +} + +static void +poller_timer_reset (struct poller_timer *self) +{ + if (self->index != -1) + poller_timers_remove_at_index (self->timers, self->index); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +poller_idle_init (struct poller_idle *self, struct poller *poller) +{ + memset (self, 0, sizeof *self); + self->poller = poller; +} + +static void +poller_idle_set (struct poller_idle *self) +{ + if (self->active) + return; + + LIST_PREPEND (self->poller->idle, self); + self->active = true; +} + +static void +poller_idle_reset (struct poller_idle *self) +{ + if (!self->active) + return; + + LIST_UNLINK (self->poller->idle, self); + self->prev = NULL; + self->next = NULL; + self->active = false; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +poller_fd_init (struct poller_fd *self, struct poller *poller, int fd) +{ + memset (self, 0, sizeof *self); + self->poller = poller; + self->index = -1; + self->fd = fd; +} + +static void +poller_fd_set (struct poller_fd *self, short events) +{ + self->events = events; + poller_set (self->poller, self); +} + +static void +poller_fd_reset (struct poller_fd *self) +{ + if (self->index != -1) + poller_remove_at_index (self->poller, self->index); +} + // --- Utilities --------------------------------------------------------------- static void diff --git a/kike.c b/kike.c index 422d732..9810e25 100644 --- a/kike.c +++ b/kike.c @@ -303,6 +303,11 @@ struct client struct str read_buffer; ///< Unprocessed input struct str write_buffer; ///< Output yet to be sent out + struct poller_fd socket_event; ///< The socket can be read/written to + struct poller_timer ping_timer; ///< We should send a ping + struct poller_timer timeout_timer; ///< Connection seems to be dead + struct poller_timer kill_timer; ///< Hard kill timeout + bool initialized; ///< Has any data been received yet? bool registered; ///< The user has registered bool closing_link; ///< Closing link @@ -512,6 +517,7 @@ channel_user_count (const struct channel *chan) struct server_context { int *listen_fds; ///< Listening socket FD's + struct poller_fd *listen_events; ///< New connections available size_t n_listen_fds; ///< Number of listening sockets SSL_CTX *ssl_ctx; ///< SSL context @@ -526,6 +532,8 @@ struct server_context bool quitting; ///< User requested quitting bool polling; ///< The event loop is running + struct poller_fd signal_event; ///< Got a signal + struct str_map config; ///< Server configuration char *server_name; ///< Our server name unsigned ping_interval; ///< Ping interval in seconds @@ -539,6 +547,7 @@ static void server_context_init (struct server_context *self) { self->listen_fds = NULL; + self->listen_events = NULL; self->n_listen_fds = 0; self->clients = NULL; self->n_clients = 0; @@ -555,6 +564,8 @@ server_context_init (struct server_context *self) self->quitting = false; self->polling = false; + memset (&self->signal_event, 0, sizeof self->signal_event); + str_map_init (&self->config); self->config.free = free; load_config_defaults (&self->config, g_config_table); @@ -575,8 +586,13 @@ server_context_free (struct server_context *self) str_map_free (&self->config); for (size_t i = 0; i < self->n_listen_fds; i++) + { xclose (self->listen_fds[i]); + self->listen_events[i].closed = true; + poller_fd_reset (&self->listen_events[i]); + } free (self->listen_fds); + free (self->listen_events); if (self->ssl_ctx) SSL_CTX_free (self->ssl_ctx); @@ -628,10 +644,9 @@ irc_initiate_quit (struct server_context *ctx) for (size_t i = 0; i < ctx->n_listen_fds; i++) { - ssize_t index = poller_find_by_fd (&ctx->poller, ctx->listen_fds[i]); - if (soft_assert (index != -1)) - poller_remove_at_index (&ctx->poller, index); xclose (ctx->listen_fds[i]); + ctx->listen_events[i].closed = true; + poller_fd_reset (&ctx->listen_events[i]); } ctx->n_listen_fds = 0; @@ -770,15 +785,15 @@ client_kill (struct client *c, const char *reason) client_unregister (c, reason ? reason : "Client exited"); struct server_context *ctx = c->ctx; - ssize_t i = poller_find_by_fd (&ctx->poller, c->socket_fd); - if (i != -1) - poller_remove_at_index (&ctx->poller, i); - client_cancel_timers (c); if (c->ssl) (void) SSL_shutdown (c->ssl); xclose (c->socket_fd); + c->socket_event.closed = true; + poller_fd_reset (&c->socket_event); + client_cancel_timers (c); + print_debug ("closed connection to %s (%s)", c->address, reason ? reason : "Reason omitted"); @@ -858,17 +873,17 @@ client_get_ssl_cert_fingerprint (struct client *c) static void client_cancel_timers (struct client *c) { - ssize_t i; - struct poller_timers *timers = &c->ctx->poller.timers; - while ((i = poller_timers_find_by_data (timers, c)) != -1) - poller_timers_remove_at_index (timers, i); + poller_timer_reset (&c->kill_timer); + poller_timer_reset (&c->timeout_timer); + poller_timer_reset (&c->ping_timer); } static void -client_set_timer (struct client *c, poller_timer_fn fn, unsigned interval) +client_set_timer (struct client *c, + struct poller_timer *timer, unsigned interval) { client_cancel_timers (c); - poller_timers_add (&c->ctx->poller.timers, fn, c, interval * 1000); + poller_timer_set (timer, interval * 1000); } static void @@ -882,7 +897,7 @@ on_client_kill_timer (void *user_data) static void client_set_kill_timer (struct client *c) { - client_set_timer (c, on_client_kill_timer, c->ctx->ping_interval); + client_set_timer (c, &c->kill_timer, c->ctx->ping_interval); } static void @@ -901,13 +916,13 @@ on_client_ping_timer (void *user_data) struct client *c = user_data; hard_assert (!c->closing_link); client_send (c, "PING :%s", c->ctx->server_name); - client_set_timer (c, on_client_timeout_timer, c->ctx->ping_interval); + client_set_timer (c, &c->timeout_timer, c->ctx->ping_interval); } static void client_set_ping_timer (struct client *c) { - client_set_timer (c, on_client_ping_timer, c->ctx->ping_interval); + client_set_timer (c, &c->ping_timer, c->ctx->ping_interval); } // --- IRC command handling ---------------------------------------------------- @@ -2627,8 +2642,7 @@ client_update_poller (struct client *c, const struct pollfd *pfd) hard_assert (new_events != 0); if (!pfd || pfd->events != new_events) - poller_set (&c->ctx->poller, c->socket_fd, new_events, - (poller_dispatcher_fn) on_client_ready, c); + poller_fd_set (&c->socket_event, new_events); } static void @@ -2686,6 +2700,22 @@ on_irc_client_available (const struct pollfd *pfd, void *user_data) LIST_PREPEND (ctx->clients, c); ctx->n_clients++; + poller_fd_init (&c->socket_event, &c->ctx->poller, c->socket_fd); + c->socket_event.dispatcher = (poller_fd_fn) on_client_ready; + c->socket_event.user_data = c; + + poller_timer_init (&c->kill_timer, &c->ctx->poller); + c->kill_timer.dispatcher = on_client_kill_timer; + c->kill_timer.user_data = c; + + poller_timer_init (&c->timeout_timer, &c->ctx->poller); + c->timeout_timer.dispatcher = on_client_timeout_timer; + c->timeout_timer.user_data = c; + + poller_timer_init (&c->ping_timer, &c->ctx->poller); + c->ping_timer.dispatcher = on_client_ping_timer; + c->ping_timer.user_data = c; + set_blocking (fd, false); client_update_poller (c, NULL); client_set_kill_timer (c); @@ -2985,11 +3015,15 @@ irc_listen_resolve (struct server_context *ctx, { if ((fd = irc_listen (gai_iter)) == -1) continue; + set_blocking (fd, false); + + struct poller_fd *event = &ctx->listen_events[ctx->n_listen_fds]; + poller_fd_init (event, &ctx->poller, fd); + event->dispatcher = (poller_fd_fn) on_irc_client_available; + event->user_data = ctx; ctx->listen_fds[ctx->n_listen_fds++] = fd; - set_blocking (fd, false); - poller_set (&ctx->poller, fd, POLLIN, - (poller_dispatcher_fn) on_irc_client_available, ctx); + poller_fd_set (event, POLLIN); break; } freeaddrinfo (gai_result); @@ -3012,6 +3046,7 @@ irc_setup_listen_fds (struct server_context *ctx, struct error **e) str_vector_init (&ports); split_str_ignore_empty (bind_port, ',', &ports); ctx->listen_fds = xcalloc (ports.len, sizeof *ctx->listen_fds); + ctx->listen_events = xcalloc (ports.len, sizeof *ctx->listen_events); for (size_t i = 0; i < ports.len; i++) irc_listen_resolve (ctx, bind_host, ports.vector[i], &gai_hints); str_vector_free (&ports); @@ -3134,8 +3169,10 @@ main (int argc, char *argv[]) exit (EXIT_FAILURE); } - poller_set (&ctx.poller, g_signal_pipe[0], POLLIN, - (poller_dispatcher_fn) on_signal_pipe_readable, &ctx); + poller_fd_init (&ctx.signal_event, &ctx.poller, g_signal_pipe[0]); + ctx.signal_event.dispatcher = (poller_fd_fn) on_signal_pipe_readable; + ctx.signal_event.user_data = &ctx; + poller_fd_set (&ctx.signal_event, POLLIN); if (!irc_initialize_ssl (&ctx, &e) || !irc_initialize_server_name (&ctx, &e) diff --git a/zyklonb.c b/zyklonb.c index f9624b9..e3289d9 100644 --- a/zyklonb.c +++ b/zyklonb.c @@ -73,6 +73,9 @@ struct plugin_data int read_fd; ///< The read end of the comm. pipe int write_fd; ///< The write end of the comm. pipe + struct poller_fd read_event; ///< Read FD event + struct poller_fd write_event; ///< Write FD event + struct str read_buffer; ///< Unprocessed input struct str write_buffer; ///< Output yet to be sent out }; @@ -118,8 +121,14 @@ struct bot_context int irc_fd; ///< Socket FD of the server struct str read_buffer; ///< Input yet to be processed + struct poller_fd irc_event; ///< IRC FD event bool irc_ready; ///< Whether we may send messages now + struct poller_fd signal_event; ///< Signal FD event + struct poller_timer ping_tmr; ///< We should send a ping + struct poller_timer timeout_tmr; ///< Connection seems to be dead + struct poller_timer reconnect_tmr; ///< We should reconnect now + SSL_CTX *ssl_ctx; ///< SSL context SSL *ssl; ///< SSL connection @@ -131,6 +140,10 @@ struct bot_context bool polling; ///< The event loop is running }; +static void on_irc_ping_timeout (void *user_data); +static void on_irc_timeout (void *user_data); +static void on_irc_reconnect_timeout (void *user_data); + static void bot_context_init (struct bot_context *self) { @@ -152,6 +165,18 @@ bot_context_init (struct bot_context *self) poller_init (&self->poller); self->quitting = false; self->polling = false; + + poller_timer_init (&self->timeout_tmr, &self->poller); + self->timeout_tmr.dispatcher = on_irc_timeout; + self->timeout_tmr.user_data = self; + + poller_timer_init (&self->ping_tmr, &self->poller); + self->ping_tmr.dispatcher = on_irc_ping_timeout; + self->ping_tmr.user_data = self; + + poller_timer_init (&self->reconnect_tmr, &self->poller); + self->reconnect_tmr.dispatcher = on_irc_reconnect_timeout; + self->reconnect_tmr.user_data = self; } static void @@ -172,7 +197,10 @@ bot_context_free (struct bot_context *self) } if (self->irc_fd != -1) + { xclose (self->irc_fd); + poller_fd_reset (&self->irc_event); + } if (self->ssl) SSL_free (self->ssl); if (self->ssl_ctx) @@ -1060,10 +1088,7 @@ plugin_zombify (struct plugin_data *plugin) // empty before closing it... and then on EOF check if `pid == -1' and // only then dispose of it (it'd be best to simulate that both of these // cases may happen). - ssize_t poller_idx = - poller_find_by_fd (&plugin->ctx->poller, plugin->write_fd); - if (poller_idx != -1) - poller_remove_at_index (&plugin->ctx->poller, poller_idx); + poller_fd_reset (&plugin->write_event); // TODO: try to flush the write buffer (non-blocking)? @@ -1083,7 +1108,6 @@ plugin_zombify (struct plugin_data *plugin) static void on_plugin_writable (const struct pollfd *fd, struct plugin_data *plugin) { - struct bot_context *ctx = plugin->ctx; struct str *buf = &plugin->write_buffer; size_t written_total = 0; @@ -1124,12 +1148,8 @@ on_plugin_writable (const struct pollfd *fd, struct plugin_data *plugin) str_remove_slice (buf, 0, written_total); if (buf->len == 0) - { // Everything has been written, there's no need to end up in here again - ssize_t index = poller_find_by_fd (&ctx->poller, fd->fd); - if (index != -1) - poller_remove_at_index (&ctx->poller, index); - } + poller_fd_reset (&plugin->write_event); } static void @@ -1148,9 +1168,7 @@ plugin_queue_write (struct plugin_data *plugin) plugin_zombify (plugin); return; } - - poller_set (&plugin->ctx->poller, plugin->write_fd, POLLOUT, - (poller_dispatcher_fn) on_plugin_writable, plugin); + poller_fd_set (&plugin->write_event, POLLOUT); } static void @@ -1412,11 +1430,18 @@ plugin_load (struct bot_context *ctx, const char *name, struct error **e) plugin->read_fd = stdout_pipe[0]; plugin->write_fd = stdin_pipe[1]; + poller_fd_init (&plugin->read_event, &ctx->poller, plugin->read_fd); + plugin->read_event.dispatcher = (poller_fd_fn) on_plugin_readable; + plugin->read_event.user_data = plugin; + + poller_fd_init (&plugin->write_event, &ctx->poller, plugin->write_fd); + plugin->write_event.dispatcher = (poller_fd_fn) on_plugin_writable; + plugin->write_event.user_data = plugin; + LIST_PREPEND (ctx->plugins, plugin); str_map_set (&ctx->plugins_by_name, name, plugin); - poller_set (&ctx->poller, stdout_pipe[0], POLLIN, - (poller_dispatcher_fn) on_plugin_readable, plugin); + poller_fd_set (&plugin->read_event, POLLIN); return true; fail_3: @@ -1818,14 +1843,13 @@ static void irc_queue_reconnect (struct bot_context *); static void irc_cancel_timers (struct bot_context *ctx) { - ssize_t i; - struct poller_timers *timers = &ctx->poller.timers; - while ((i = poller_timers_find_by_data (timers, ctx)) != -1) - poller_timers_remove_at_index (timers, i); + poller_timer_reset (&ctx->timeout_tmr); + poller_timer_reset (&ctx->ping_tmr); + poller_timer_reset (&ctx->reconnect_tmr); } static void -irc_on_reconnect_timeout (void *user_data) +on_irc_reconnect_timeout (void *user_data) { struct bot_context *ctx = user_data; @@ -1847,8 +1871,7 @@ irc_queue_reconnect (struct bot_context *ctx) hard_assert (ctx->irc_fd == -1); print_status ("trying to reconnect in %ld seconds...", ctx->reconnect_delay); - poller_timers_add (&ctx->poller.timers, - irc_on_reconnect_timeout, ctx, ctx->reconnect_delay * 1000); + poller_timer_set (&ctx->reconnect_tmr, ctx->reconnect_delay * 1000); } static void @@ -1863,14 +1886,13 @@ on_irc_disconnected (struct bot_context *ctx) ctx->ssl_ctx = NULL; } - ssize_t i = poller_find_by_fd (&ctx->poller, ctx->irc_fd); - if (i != -1) - poller_remove_at_index (&ctx->poller, i); - xclose (ctx->irc_fd); ctx->irc_fd = -1; ctx->irc_ready = false; + ctx->irc_event.closed = true; + poller_fd_reset (&ctx->irc_event); + // TODO: inform plugins about the disconnect event // All of our timers have lost their meaning now @@ -1905,10 +1927,8 @@ static void irc_reset_connection_timeouts (struct bot_context *ctx) { irc_cancel_timers (ctx); - poller_timers_add (&ctx->poller.timers, - on_irc_timeout, ctx, 3 * 60 * 1000); - poller_timers_add (&ctx->poller.timers, - on_irc_ping_timeout, ctx, (3 * 60 + 30) * 1000); + poller_timer_set (&ctx->timeout_tmr, 3 * 60 * 1000); + poller_timer_set (&ctx->ping_tmr, (3 * 60 + 30) * 1000); } static void @@ -2026,11 +2046,14 @@ irc_connect (struct bot_context *ctx, struct error **e) } print_status ("connection established"); + poller_fd_init (&ctx->irc_event, &ctx->poller, ctx->irc_fd); + ctx->irc_event.dispatcher = (poller_fd_fn) on_irc_readable; + ctx->irc_event.user_data = ctx; + // TODO: in exec try: 1/ set blocking, 2/ setsockopt() SO_LINGER, // (struct linger) { .l_onoff = true; .l_linger = 1 /* 1s should do */; } // 3/ /* O_CLOEXEC */ But only if the QUIT message proves unreliable. - poller_set (&ctx->poller, ctx->irc_fd, POLLIN, - (poller_dispatcher_fn) on_irc_readable, ctx); + poller_fd_set (&ctx->irc_event, POLLIN); irc_reset_connection_timeouts (ctx); irc_send (ctx, "NICK %s", nickname); @@ -2134,13 +2157,12 @@ on_signal_pipe_readable (const struct pollfd *fd, struct bot_context *ctx) plugin->pid = -1; - ssize_t poller_idx = poller_find_by_fd (&ctx->poller, plugin->read_fd); - if (poller_idx != -1) - poller_remove_at_index (&ctx->poller, poller_idx); - xclose (plugin->read_fd); plugin->read_fd = -1; + plugin->read_event.closed = true; + poller_fd_reset (&plugin->read_event); + LIST_UNLINK (ctx->plugins, plugin); plugin_data_free (plugin); free (plugin); @@ -2215,8 +2237,11 @@ main (int argc, char *argv[]) } setup_recovery_handler (&ctx); - poller_set (&ctx.poller, g_signal_pipe[0], POLLIN, - (poller_dispatcher_fn) on_signal_pipe_readable, &ctx); + + poller_fd_init (&ctx.signal_event, &ctx.poller, g_signal_pipe[0]); + ctx.signal_event.dispatcher = (poller_fd_fn) on_signal_pipe_readable; + ctx.signal_event.user_data = &ctx; + poller_fd_set (&ctx.signal_event, POLLIN); plugin_load_all_from_config (&ctx); if (!parse_config (&ctx, &e) -- cgit v1.2.3-70-g09d2