diff options
-rw-r--r-- | liberty.c | 178 |
1 files changed, 88 insertions, 90 deletions
@@ -1081,17 +1081,6 @@ struct async async_fn destroy; ///< Destroys the whole object }; -static void -async_init (struct async *self, struct async_manager *manager) -{ - memset (self, 0, sizeof *self); - self->manager = manager; -} - -static bool async_run (struct async *self); - -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - struct async_manager { pthread_mutex_t lock; ///< Lock for the queues @@ -1110,20 +1099,88 @@ struct async_manager int finished_pipe[2]; ///< Signals that a task has finished }; +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + static void -async_manager_init (struct async_manager *self) +async_init (struct async *self, struct async_manager *manager) { memset (self, 0, sizeof *self); - hard_assert (!pthread_mutex_init (&self->lock, NULL)); - hard_assert (!pthread_cond_init (&self->finished_cond, NULL)); + self->manager = manager; +} - hard_assert (!pipe (self->finished_pipe)); - hard_assert (set_blocking (self->finished_pipe[0], false)); - set_cloexec (self->finished_pipe[0]); - set_cloexec (self->finished_pipe[1]); +/// Only allowed from the main thread once the job has been started but before +/// the results have been dispatched +static void +async_cancel (struct async *self) +{ + if (self->started) + soft_assert (!pthread_cancel (self->worker)); + self->cancelled = true; +} + +static void +async_cleanup (void *user_data) +{ + struct async *self = user_data; + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + LIST_PREPEND (self->manager->finished, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond)); + hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0); +} + +static void * +async_routine (void *user_data) +{ + // Beware that we mustn't trigger any cancellation point before we set up + // the cleanup handler, otherwise we'd need to disable it first + struct async *self = user_data; + pthread_cleanup_push (async_cleanup, self); + + self->execute (self); + + pthread_cleanup_pop (true); + return NULL; } static bool +async_run (struct async *self) +{ + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_PREPEND (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // Block all signals so that the new thread doesn't receive any (inherited) + sigset_t all_blocked, old_blocked; + hard_assert (!sigfillset (&all_blocked)); + hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); + + int error = pthread_create (&self->worker, NULL, async_routine, self); + + // Now that we've created the thread, resume signal processing as usual + hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); + + if (error) + { + hard_assert (error == EAGAIN); + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // FIXME: we probably want to have some kind of a limit on the queue + LIST_PREPEND (self->manager->delayed, self); + return false; + } + return (self->started = true); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static bool async_manager_retry (struct async_manager *self, struct async *async) { if (async->cancelled) @@ -1200,6 +1257,19 @@ async_manager_cancel_all (struct async_manager *self) } static void +async_manager_init (struct async_manager *self) +{ + memset (self, 0, sizeof *self); + hard_assert (!pthread_mutex_init (&self->lock, NULL)); + hard_assert (!pthread_cond_init (&self->finished_cond, NULL)); + + hard_assert (!pipe (self->finished_pipe)); + hard_assert (set_blocking (self->finished_pipe[0], false)); + set_cloexec (self->finished_pipe[0]); + set_cloexec (self->finished_pipe[1]); +} + +static void async_manager_free (struct async_manager *self) { async_manager_cancel_all (self); @@ -1210,78 +1280,6 @@ async_manager_free (struct async_manager *self) xclose (self->finished_pipe[1]); } -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -/// Only allowed from the main thread once the job has been started but before -/// the results have been dispatched -static void -async_cancel (struct async *self) -{ - if (self->started) - soft_assert (!pthread_cancel (self->worker)); - self->cancelled = true; -} - -static void -async_cleanup (void *user_data) -{ - struct async *self = user_data; - - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_UNLINK (self->manager->running, self); - LIST_PREPEND (self->manager->finished, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond)); - hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0); -} - -static void * -async_routine (void *user_data) -{ - // Beware that we mustn't trigger any cancellation point before we set up - // the cleanup handler, otherwise we'd need to disable it first - struct async *self = user_data; - pthread_cleanup_push (async_cleanup, self); - - self->execute (self); - - pthread_cleanup_pop (true); - return NULL; -} - -static bool -async_run (struct async *self) -{ - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_PREPEND (self->manager->running, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - // Block all signals so that the new thread doesn't receive any (inherited) - sigset_t all_blocked, old_blocked; - hard_assert (!sigfillset (&all_blocked)); - hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); - - int error = pthread_create (&self->worker, NULL, async_routine, self); - - // Now that we've created the thread, resume signal processing as usual - hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); - - if (error) - { - hard_assert (error == EAGAIN); - - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_UNLINK (self->manager->running, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - // FIXME: we probably want to have some kind of a limit on the queue - LIST_PREPEND (self->manager->delayed, self); - return false; - } - return (self->started = true); -} - #endif // LIBERTY_WANT_ASYNC // --- Event loop -------------------------------------------------------------- |