diff options
author | Přemysl Janouch <p.janouch@gmail.com> | 2016-01-01 18:05:01 +0100 |
---|---|---|
committer | Přemysl Janouch <p.janouch@gmail.com> | 2016-01-02 04:36:17 +0100 |
commit | ee40af003100c27f9a61b714b9e79e0c5a082252 (patch) | |
tree | a8716fabc50bdc6d3edd2027fac7a6a0a19b2507 | |
parent | 80815519b35446989a7b369012bdfd1b979adb97 (diff) | |
download | liberty-ee40af003100c27f9a61b714b9e79e0c5a082252.tar.gz liberty-ee40af003100c27f9a61b714b9e79e0c5a082252.tar.xz liberty-ee40af003100c27f9a61b714b9e79e0c5a082252.zip |
Add a framework for asynchronous jobs
-rw-r--r-- | liberty.c | 191 |
1 files changed, 191 insertions, 0 deletions
@@ -49,6 +49,7 @@ #include <fnmatch.h> #include <iconv.h> #include <pwd.h> +#include <pthread.h> #include <sys/socket.h> #include <sys/types.h> @@ -1046,6 +1047,196 @@ str_map_unset_iter_free (struct str_map_unset_iter *self) str_map_shrink (map); } +// --- Asynchronous jobs ------------------------------------------------------- + +// For operations that can block execution but can be run independently on the +// rest of the program, such as getaddrinfo(), read(), write(), fsync(). +// +// The async structure is meant to be extended for the various usages with +// new fields and provide an appropriate callback for its destruction. +// +// This is designed so that it can be used in other event loops than poller. + +#ifdef LIBERTY_WANT_ASYNC + +struct async; +typedef void (*async_fn) (struct async *); + +struct async +{ + LIST_HEADER (struct async) + struct async_manager *manager; ///< Our manager object + + // "cancelled" may not be accesed or modified by the worker thread + + pthread_t worker; ///< Worker thread ID + bool cancelled; ///< Task has been cancelled + + async_fn execute; ///< Worker main function + async_fn dispatcher; ///< Main thread result dispatcher + async_fn destroy; ///< Destroys the whole object +}; + +static void +async_init (struct async *self, struct async_manager *manager) +{ + memset (self, 0, sizeof *self); + self->manager = manager; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +struct async_manager +{ + pthread_mutex_t lock; ///< Lock for the queues + struct async *running; ///< Queue of running jobs + struct async *finished; ///< Queue of completed/cancelled jobs + + // We need the pipe in order to abort polling (instead of using EINTR) + + pthread_cond_t finished_cond; ///< Signals that a task has finished + int finished_pipe[2]; ///< Signals that a task has finished +}; + +static void +async_manager_init (struct async_manager *self) +{ + memset (self, 0, sizeof *self); + hard_assert (!pthread_mutex_init (&self->lock, NULL)); + hard_assert (!pthread_cond_init (&self->finished_cond, NULL)); + + hard_assert (!pipe (self->finished_pipe)); + hard_assert (set_blocking (self->finished_pipe[0], false)); + set_cloexec (self->finished_pipe[0]); + set_cloexec (self->finished_pipe[1]); +} + +static struct async * +async_manager_dispatch_fetch (struct async_manager *self) +{ + // We don't want to hold the mutex for too long, mainly to prevent + // a deadlock when trying to add an async job while dispatching another + hard_assert (!pthread_mutex_lock (&self->lock)); + struct async *result; + if ((result = self->finished)) + LIST_UNLINK (self->finished, result); + hard_assert (!pthread_mutex_unlock (&self->lock)); + return result; +} + +static void +async_manager_dispatch (struct async_manager *self) +{ + char dummy; + while (read (self->finished_pipe[0], &dummy, 1) > 0) + ; // Just emptying the signalling pipe + + struct async *iter; + while ((iter = async_manager_dispatch_fetch (self))) + { + // The thread has finished execution already + soft_assert (!pthread_join (iter->worker, NULL)); + + if (iter->dispatcher && !iter->cancelled) + iter->dispatcher (iter); + if (iter->destroy) + iter->destroy (iter); + } +} + +static void +async_manager_cancel_all (struct async_manager *self) +{ + hard_assert (!pthread_mutex_lock (&self->lock)); + + // Cancel all running jobs + LIST_FOR_EACH (struct async, iter, self->running) + soft_assert (!pthread_cancel (iter->worker)); + + // Wait until no jobs are running anymore (we need to release the lock + // here so that worker threads can move their jobs to the finished queue) + while (self->running) + hard_assert (!pthread_cond_wait (&self->finished_cond, &self->lock)); + + // Mark everything cancelled so that it's not actually dispatched + LIST_FOR_EACH (struct async, iter, self->finished) + iter->cancelled = true; + + hard_assert (!pthread_mutex_unlock (&self->lock)); + async_manager_dispatch (self); +} + +static void +async_manager_free (struct async_manager *self) +{ + async_manager_cancel_all (self); + hard_assert (!pthread_cond_destroy (&self->finished_cond)); + hard_assert (!pthread_mutex_destroy (&self->lock)); + + xclose (self->finished_pipe[0]); + xclose (self->finished_pipe[1]); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +/// Only allowed from the main thread once the job has been started but before +/// the results have been dispatched +static void +async_cancel (struct async *self) +{ + soft_assert (!pthread_cancel (self->worker)); + self->cancelled = true; +} + +static void +async_cleanup (void *user_data) +{ + struct async *self = user_data; + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + LIST_PREPEND (self->manager->finished, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond)); + hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0); +} + +static void * +async_routine (void *user_data) +{ + // Beware that we mustn't trigger any cancellation point before we set up + // the cleanup handler, otherwise we'd need to disable it first + struct async *self = user_data; + pthread_cleanup_push (async_cleanup, self); + + self->execute (self); + + pthread_cleanup_pop (true); + return NULL; +} + +static void +async_run (struct async *self) +{ + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_PREPEND (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // Block all signals so that the new thread doesn't receive any (inherited) + sigset_t all_blocked, old_blocked; + hard_assert (!sigfillset (&all_blocked)); + hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); + + hard_assert (!pthread_create (&self->worker, NULL, + async_routine, self)); + + // Now that we've created the thread, resume signal processing as usual + hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); +} + +#endif // LIBERTY_WANT_ASYNC + // --- Event loop -------------------------------------------------------------- #ifdef LIBERTY_WANT_POLLER |