diff options
author | Přemysl Janouch <p.janouch@gmail.com> | 2016-01-01 18:08:07 +0100 |
---|---|---|
committer | Přemysl Janouch <p.janouch@gmail.com> | 2016-01-02 04:36:17 +0100 |
commit | 455f2cec821e6ce4ed030be8bd6edde00ba894b9 (patch) | |
tree | a5f3f3ac4d74c1273ca4b8d240b495cb23cb6d6d | |
parent | ee40af003100c27f9a61b714b9e79e0c5a082252 (diff) | |
download | liberty-455f2cec821e6ce4ed030be8bd6edde00ba894b9.tar.gz liberty-455f2cec821e6ce4ed030be8bd6edde00ba894b9.tar.xz liberty-455f2cec821e6ce4ed030be8bd6edde00ba894b9.zip |
Add an async job manager to the poller
-rw-r--r-- | liberty.c | 120 |
1 files changed, 86 insertions, 34 deletions
@@ -1245,7 +1245,6 @@ async_run (struct async *self) // to instead use those tested and proven libraries but we don't need much // and it's interesting to implement. -// Actually it mustn't be totally shitty as scanning exercises it quite a bit. // We sacrifice some memory to allow for O(1) and O(log n) operations. typedef void (*poller_fd_fn) (const struct pollfd *, void *); @@ -1448,6 +1447,23 @@ poller_idle_dispatch (struct poller_idle *list) // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +struct poller_common +{ + struct poller_timers timers; ///< Timeouts + struct poller_idle *idle; ///< Idle events +#ifdef LIBERTY_WANT_ASYNC + struct async_manager async; ///< Asynchronous jobs + struct poller_fd async_event; ///< Asynchronous jobs have finished +#endif // LIBERTY_WANT_ASYNC +}; + +static void poller_common_init (struct poller_common *, struct poller *); +static void poller_common_free (struct poller_common *); +static int poller_common_get_timeout (struct poller_common *); +static void poller_common_dispatch (struct poller_common *); + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + #ifdef __linux__ #include <sys/epoll.h> @@ -1459,10 +1475,7 @@ struct poller struct epoll_event *revents; ///< Output array for epoll_wait() size_t len; ///< Number of polled descriptors size_t alloc; ///< Number of entries allocated - - struct poller_timers timers; ///< Timeouts - struct poller_idle *idle; ///< Idle events - + struct poller_common common; ///< Stuff common to all backends int revents_len; ///< Number of entries in `revents' }; @@ -1480,8 +1493,7 @@ poller_init (struct poller *self) self->revents = xcalloc (self->alloc, sizeof *self->revents); self->revents_len = 0; - poller_timers_init (&self->timers); - self->idle = NULL; + poller_common_init (&self->common, self); } static void @@ -1494,7 +1506,7 @@ poller_free (struct poller *self) EPOLL_CTL_DEL, fd->fd, (void *) "") != -1); } - poller_timers_free (&self->timers); + poller_common_free (&self->common); xclose (self->epoll_fd); free (self->fds); @@ -1618,7 +1630,7 @@ poller_run (struct poller *self) int n_fds; do n_fds = epoll_wait (self->epoll_fd, self->revents, self->alloc, - self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); + poller_common_get_timeout (&self->common)); while (n_fds == -1 && errno == EINTR); if (n_fds == -1) @@ -1628,8 +1640,7 @@ poller_run (struct poller *self) qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds); self->revents_len = n_fds; - poller_timers_dispatch (&self->timers); - poller_idle_dispatch (self->idle); + poller_common_dispatch (&self->common); for (int i = 0; i < n_fds; i++) { @@ -1667,10 +1678,7 @@ struct poller struct kevent *revents; ///< Output array for kevent() size_t len; ///< Number of polled descriptors size_t alloc; ///< Number of entries allocated - - struct poller_timers timers; ///< Timeouts - struct poller_idle *idle; ///< Idle events - + struct poller_common common; ///< Stuff common to all backends int revents_len; ///< Number of entries in `revents' }; @@ -1686,19 +1694,16 @@ poller_init (struct poller *self) self->fds = xcalloc (self->alloc, sizeof *self->fds); self->revents = xcalloc (self->alloc, sizeof *self->revents); self->revents_len = 0; - - poller_timers_init (&self->timers); - self->idle = NULL; + poller_common_init (&self->common, self); } static void poller_free (struct poller *self) { - poller_timers_free (&self->timers); - xclose (self->kqueue_fd); free (self->fds); free (self->revents); + poller_common_free (&self->common); } static void @@ -1841,7 +1846,7 @@ poller_run (struct poller *self) do { struct timespec ts = poller_timeout_to_timespec - (self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); + (poller_common_get_timeout (&self->common)); n_fds = kevent (self->kqueue_fd, NULL, 0, self->revents, self->len, &ts); } @@ -1854,8 +1859,7 @@ poller_run (struct poller *self) qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds); self->revents_len = n_fds; - poller_timers_dispatch (&self->timers); - poller_idle_dispatch (self->idle); + poller_common_dispatch (&self->common); for (int i = 0; i < n_fds; i++) { @@ -1894,9 +1898,7 @@ struct poller struct poller_fd **fds_data; ///< Additional information for each FD size_t len; ///< Number of polled descriptors size_t alloc; ///< Number of entries allocated - - struct poller_timers timers; ///< Timers - struct poller_idle *idle; ///< Idle events + struct poller_common common; ///< Stuff common to all backends int dispatch_next; ///< The next dispatched FD or -1 }; @@ -1907,7 +1909,7 @@ poller_init (struct poller *self) self->len = 0; self->fds = xcalloc (self->alloc, sizeof *self->fds); self->fds_data = xcalloc (self->alloc, sizeof *self->fds_data); - poller_timers_init (&self->timers); + poller_common_init (&self->common, self); self->dispatch_next = -1; } @@ -1916,7 +1918,7 @@ poller_free (struct poller *self) { free (self->fds); free (self->fds_data); - poller_timers_free (&self->timers); + poller_common_free (&self->common); } static void @@ -1986,14 +1988,13 @@ poller_run (struct poller *self) int result; do result = poll (self->fds, self->len, - self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers)); + poller_common_get_timeout (&self->common)); while (result == -1 && errno == EINTR); if (result == -1) exit_fatal ("%s: %s", "poll", strerror (errno)); - poller_timers_dispatch (&self->timers); - poller_idle_dispatch (self->idle); + poller_common_dispatch (&self->common); for (int i = 0; i < (int) self->len; ) { @@ -2016,7 +2017,7 @@ static void poller_timer_init (struct poller_timer *self, struct poller *poller) { memset (self, 0, sizeof *self); - self->timers = &poller->timers; + self->timers = &poller->common.timers; self->index = -1; } @@ -2055,7 +2056,7 @@ poller_idle_set (struct poller_idle *self) if (self->active) return; - LIST_PREPEND (self->poller->idle, self); + LIST_PREPEND (self->poller->common.idle, self); self->active = true; } @@ -2065,7 +2066,7 @@ poller_idle_reset (struct poller_idle *self) if (!self->active) return; - LIST_UNLINK (self->poller->idle, self); + LIST_UNLINK (self->poller->common.idle, self); self->prev = NULL; self->next = NULL; self->active = false; @@ -2096,6 +2097,57 @@ poller_fd_reset (struct poller_fd *self) poller_remove_at_index (self->poller, self->index); } +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +poller_common_dummy_dispatcher (const struct pollfd *pfd, void *user_data) +{ + (void) pfd; + (void) user_data; + + // The async manager will empty the pipe when we invoke dispatch +} + +static void +poller_common_init (struct poller_common *self, struct poller *poller) +{ + poller_timers_init (&self->timers); + self->idle = NULL; +#ifdef LIBERTY_WANT_ASYNC + async_manager_init (&self->async); + + poller_fd_init (&self->async_event, poller, self->async.finished_pipe[0]); + poller_fd_set (&self->async_event, POLLIN); + self->async_event.dispatcher = poller_common_dummy_dispatcher; + self->async_event.user_data = self; +#endif // LIBERTY_WANT_ASYNC +} + +static void +poller_common_free (struct poller_common *self) +{ + poller_timers_free (&self->timers); +#ifdef LIBERTY_WANT_ASYNC + async_manager_free (&self->async); +#endif // LIBERTY_WANT_ASYNC +} + +static int +poller_common_get_timeout (struct poller_common *self) +{ + return self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers); +} + +static void +poller_common_dispatch (struct poller_common *self) +{ + poller_timers_dispatch (&self->timers); + poller_idle_dispatch (self->idle); +#ifdef LIBERTY_WANT_ASYNC + async_manager_dispatch (&self->async); +#endif // LIBERTY_WANT_ASYNC +} + #endif // LIBERTY_WANT_POLLER // --- libuv-style write adaptor ----------------------------------------------- |