aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p.janouch@gmail.com>2016-01-01 18:05:01 +0100
committerPřemysl Janouch <p.janouch@gmail.com>2016-01-02 04:36:17 +0100
commitee40af003100c27f9a61b714b9e79e0c5a082252 (patch)
treea8716fabc50bdc6d3edd2027fac7a6a0a19b2507
parent80815519b35446989a7b369012bdfd1b979adb97 (diff)
downloadliberty-ee40af003100c27f9a61b714b9e79e0c5a082252.tar.gz
liberty-ee40af003100c27f9a61b714b9e79e0c5a082252.tar.xz
liberty-ee40af003100c27f9a61b714b9e79e0c5a082252.zip
Add a framework for asynchronous jobs
-rw-r--r--liberty.c191
1 files changed, 191 insertions, 0 deletions
diff --git a/liberty.c b/liberty.c
index 60d6667..c4b5cbd 100644
--- a/liberty.c
+++ b/liberty.c
@@ -49,6 +49,7 @@
#include <fnmatch.h>
#include <iconv.h>
#include <pwd.h>
+#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
@@ -1046,6 +1047,196 @@ str_map_unset_iter_free (struct str_map_unset_iter *self)
str_map_shrink (map);
}
+// --- Asynchronous jobs -------------------------------------------------------
+
+// For operations that can block execution but can be run independently on the
+// rest of the program, such as getaddrinfo(), read(), write(), fsync().
+//
+// The async structure is meant to be extended for the various usages with
+// new fields and provide an appropriate callback for its destruction.
+//
+// This is designed so that it can be used in other event loops than poller.
+
+#ifdef LIBERTY_WANT_ASYNC
+
+struct async;
+typedef void (*async_fn) (struct async *);
+
+struct async
+{
+ LIST_HEADER (struct async)
+ struct async_manager *manager; ///< Our manager object
+
+ // "cancelled" may not be accesed or modified by the worker thread
+
+ pthread_t worker; ///< Worker thread ID
+ bool cancelled; ///< Task has been cancelled
+
+ async_fn execute; ///< Worker main function
+ async_fn dispatcher; ///< Main thread result dispatcher
+ async_fn destroy; ///< Destroys the whole object
+};
+
+static void
+async_init (struct async *self, struct async_manager *manager)
+{
+ memset (self, 0, sizeof *self);
+ self->manager = manager;
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+struct async_manager
+{
+ pthread_mutex_t lock; ///< Lock for the queues
+ struct async *running; ///< Queue of running jobs
+ struct async *finished; ///< Queue of completed/cancelled jobs
+
+ // We need the pipe in order to abort polling (instead of using EINTR)
+
+ pthread_cond_t finished_cond; ///< Signals that a task has finished
+ int finished_pipe[2]; ///< Signals that a task has finished
+};
+
+static void
+async_manager_init (struct async_manager *self)
+{
+ memset (self, 0, sizeof *self);
+ hard_assert (!pthread_mutex_init (&self->lock, NULL));
+ hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
+
+ hard_assert (!pipe (self->finished_pipe));
+ hard_assert (set_blocking (self->finished_pipe[0], false));
+ set_cloexec (self->finished_pipe[0]);
+ set_cloexec (self->finished_pipe[1]);
+}
+
+static struct async *
+async_manager_dispatch_fetch (struct async_manager *self)
+{
+ // We don't want to hold the mutex for too long, mainly to prevent
+ // a deadlock when trying to add an async job while dispatching another
+ hard_assert (!pthread_mutex_lock (&self->lock));
+ struct async *result;
+ if ((result = self->finished))
+ LIST_UNLINK (self->finished, result);
+ hard_assert (!pthread_mutex_unlock (&self->lock));
+ return result;
+}
+
+static void
+async_manager_dispatch (struct async_manager *self)
+{
+ char dummy;
+ while (read (self->finished_pipe[0], &dummy, 1) > 0)
+ ; // Just emptying the signalling pipe
+
+ struct async *iter;
+ while ((iter = async_manager_dispatch_fetch (self)))
+ {
+ // The thread has finished execution already
+ soft_assert (!pthread_join (iter->worker, NULL));
+
+ if (iter->dispatcher && !iter->cancelled)
+ iter->dispatcher (iter);
+ if (iter->destroy)
+ iter->destroy (iter);
+ }
+}
+
+static void
+async_manager_cancel_all (struct async_manager *self)
+{
+ hard_assert (!pthread_mutex_lock (&self->lock));
+
+ // Cancel all running jobs
+ LIST_FOR_EACH (struct async, iter, self->running)
+ soft_assert (!pthread_cancel (iter->worker));
+
+ // Wait until no jobs are running anymore (we need to release the lock
+ // here so that worker threads can move their jobs to the finished queue)
+ while (self->running)
+ hard_assert (!pthread_cond_wait (&self->finished_cond, &self->lock));
+
+ // Mark everything cancelled so that it's not actually dispatched
+ LIST_FOR_EACH (struct async, iter, self->finished)
+ iter->cancelled = true;
+
+ hard_assert (!pthread_mutex_unlock (&self->lock));
+ async_manager_dispatch (self);
+}
+
+static void
+async_manager_free (struct async_manager *self)
+{
+ async_manager_cancel_all (self);
+ hard_assert (!pthread_cond_destroy (&self->finished_cond));
+ hard_assert (!pthread_mutex_destroy (&self->lock));
+
+ xclose (self->finished_pipe[0]);
+ xclose (self->finished_pipe[1]);
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+/// Only allowed from the main thread once the job has been started but before
+/// the results have been dispatched
+static void
+async_cancel (struct async *self)
+{
+ soft_assert (!pthread_cancel (self->worker));
+ self->cancelled = true;
+}
+
+static void
+async_cleanup (void *user_data)
+{
+ struct async *self = user_data;
+
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_UNLINK (self->manager->running, self);
+ LIST_PREPEND (self->manager->finished, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
+ hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
+}
+
+static void *
+async_routine (void *user_data)
+{
+ // Beware that we mustn't trigger any cancellation point before we set up
+ // the cleanup handler, otherwise we'd need to disable it first
+ struct async *self = user_data;
+ pthread_cleanup_push (async_cleanup, self);
+
+ self->execute (self);
+
+ pthread_cleanup_pop (true);
+ return NULL;
+}
+
+static void
+async_run (struct async *self)
+{
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_PREPEND (self->manager->running, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ // Block all signals so that the new thread doesn't receive any (inherited)
+ sigset_t all_blocked, old_blocked;
+ hard_assert (!sigfillset (&all_blocked));
+ hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
+
+ hard_assert (!pthread_create (&self->worker, NULL,
+ async_routine, self));
+
+ // Now that we've created the thread, resume signal processing as usual
+ hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
+}
+
+#endif // LIBERTY_WANT_ASYNC
+
// --- Event loop --------------------------------------------------------------
#ifdef LIBERTY_WANT_POLLER