aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p.janouch@gmail.com>2016-01-16 06:05:17 +0100
committerPřemysl Janouch <p.janouch@gmail.com>2016-01-16 06:12:19 +0100
commitff046ea596e7d53686ebb906820eef7fbe863f6a (patch)
treef3c98e81a084cb987fa975ba63635070fb9ace73
parent38d105dede5f423217462c94e71768f2811a39ce (diff)
downloadliberty-ff046ea596e7d53686ebb906820eef7fbe863f6a.tar.gz
liberty-ff046ea596e7d53686ebb906820eef7fbe863f6a.tar.xz
liberty-ff046ea596e7d53686ebb906820eef7fbe863f6a.zip
Shuffle code
-rw-r--r--liberty.c178
1 files changed, 88 insertions, 90 deletions
diff --git a/liberty.c b/liberty.c
index fd9ccc7..de57812 100644
--- a/liberty.c
+++ b/liberty.c
@@ -1081,17 +1081,6 @@ struct async
async_fn destroy; ///< Destroys the whole object
};
-static void
-async_init (struct async *self, struct async_manager *manager)
-{
- memset (self, 0, sizeof *self);
- self->manager = manager;
-}
-
-static bool async_run (struct async *self);
-
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-
struct async_manager
{
pthread_mutex_t lock; ///< Lock for the queues
@@ -1110,20 +1099,88 @@ struct async_manager
int finished_pipe[2]; ///< Signals that a task has finished
};
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
static void
-async_manager_init (struct async_manager *self)
+async_init (struct async *self, struct async_manager *manager)
{
memset (self, 0, sizeof *self);
- hard_assert (!pthread_mutex_init (&self->lock, NULL));
- hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
+ self->manager = manager;
+}
- hard_assert (!pipe (self->finished_pipe));
- hard_assert (set_blocking (self->finished_pipe[0], false));
- set_cloexec (self->finished_pipe[0]);
- set_cloexec (self->finished_pipe[1]);
+/// Only allowed from the main thread once the job has been started but before
+/// the results have been dispatched
+static void
+async_cancel (struct async *self)
+{
+ if (self->started)
+ soft_assert (!pthread_cancel (self->worker));
+ self->cancelled = true;
+}
+
+static void
+async_cleanup (void *user_data)
+{
+ struct async *self = user_data;
+
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_UNLINK (self->manager->running, self);
+ LIST_PREPEND (self->manager->finished, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
+ hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
+}
+
+static void *
+async_routine (void *user_data)
+{
+ // Beware that we mustn't trigger any cancellation point before we set up
+ // the cleanup handler, otherwise we'd need to disable it first
+ struct async *self = user_data;
+ pthread_cleanup_push (async_cleanup, self);
+
+ self->execute (self);
+
+ pthread_cleanup_pop (true);
+ return NULL;
}
static bool
+async_run (struct async *self)
+{
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_PREPEND (self->manager->running, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ // Block all signals so that the new thread doesn't receive any (inherited)
+ sigset_t all_blocked, old_blocked;
+ hard_assert (!sigfillset (&all_blocked));
+ hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
+
+ int error = pthread_create (&self->worker, NULL, async_routine, self);
+
+ // Now that we've created the thread, resume signal processing as usual
+ hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
+
+ if (error)
+ {
+ hard_assert (error == EAGAIN);
+
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_UNLINK (self->manager->running, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ // FIXME: we probably want to have some kind of a limit on the queue
+ LIST_PREPEND (self->manager->delayed, self);
+ return false;
+ }
+ return (self->started = true);
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static bool
async_manager_retry (struct async_manager *self, struct async *async)
{
if (async->cancelled)
@@ -1200,6 +1257,19 @@ async_manager_cancel_all (struct async_manager *self)
}
static void
+async_manager_init (struct async_manager *self)
+{
+ memset (self, 0, sizeof *self);
+ hard_assert (!pthread_mutex_init (&self->lock, NULL));
+ hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
+
+ hard_assert (!pipe (self->finished_pipe));
+ hard_assert (set_blocking (self->finished_pipe[0], false));
+ set_cloexec (self->finished_pipe[0]);
+ set_cloexec (self->finished_pipe[1]);
+}
+
+static void
async_manager_free (struct async_manager *self)
{
async_manager_cancel_all (self);
@@ -1210,78 +1280,6 @@ async_manager_free (struct async_manager *self)
xclose (self->finished_pipe[1]);
}
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-
-/// Only allowed from the main thread once the job has been started but before
-/// the results have been dispatched
-static void
-async_cancel (struct async *self)
-{
- if (self->started)
- soft_assert (!pthread_cancel (self->worker));
- self->cancelled = true;
-}
-
-static void
-async_cleanup (void *user_data)
-{
- struct async *self = user_data;
-
- hard_assert (!pthread_mutex_lock (&self->manager->lock));
- LIST_UNLINK (self->manager->running, self);
- LIST_PREPEND (self->manager->finished, self);
- hard_assert (!pthread_mutex_unlock (&self->manager->lock));
-
- hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
- hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
-}
-
-static void *
-async_routine (void *user_data)
-{
- // Beware that we mustn't trigger any cancellation point before we set up
- // the cleanup handler, otherwise we'd need to disable it first
- struct async *self = user_data;
- pthread_cleanup_push (async_cleanup, self);
-
- self->execute (self);
-
- pthread_cleanup_pop (true);
- return NULL;
-}
-
-static bool
-async_run (struct async *self)
-{
- hard_assert (!pthread_mutex_lock (&self->manager->lock));
- LIST_PREPEND (self->manager->running, self);
- hard_assert (!pthread_mutex_unlock (&self->manager->lock));
-
- // Block all signals so that the new thread doesn't receive any (inherited)
- sigset_t all_blocked, old_blocked;
- hard_assert (!sigfillset (&all_blocked));
- hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
-
- int error = pthread_create (&self->worker, NULL, async_routine, self);
-
- // Now that we've created the thread, resume signal processing as usual
- hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
-
- if (error)
- {
- hard_assert (error == EAGAIN);
-
- hard_assert (!pthread_mutex_lock (&self->manager->lock));
- LIST_UNLINK (self->manager->running, self);
- hard_assert (!pthread_mutex_unlock (&self->manager->lock));
-
- // FIXME: we probably want to have some kind of a limit on the queue
- LIST_PREPEND (self->manager->delayed, self);
- return false;
- }
- return (self->started = true);
-}
-
#endif // LIBERTY_WANT_ASYNC
// --- Event loop --------------------------------------------------------------