aboutsummaryrefslogtreecommitdiff
path: root/liberty.c
diff options
context:
space:
mode:
Diffstat (limited to 'liberty.c')
-rw-r--r--liberty.c64
1 files changed, 59 insertions, 5 deletions
diff --git a/liberty.c b/liberty.c
index e004464..fd9ccc7 100644
--- a/liberty.c
+++ b/liberty.c
@@ -1073,6 +1073,7 @@ struct async
// "cancelled" may not be accesed or modified by the worker thread
pthread_t worker; ///< Worker thread ID
+ bool started; ///< Worker thread ID is valid
bool cancelled; ///< Task has been cancelled
async_fn execute; ///< Worker main function
@@ -1087,6 +1088,8 @@ async_init (struct async *self, struct async_manager *manager)
self->manager = manager;
}
+static bool async_run (struct async *self);
+
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct async_manager
@@ -1095,6 +1098,12 @@ struct async_manager
struct async *running; ///< Queue of running jobs
struct async *finished; ///< Queue of completed/cancelled jobs
+ // It's upon the user to call async_manager_dispatch() to retry the delayed.
+ // It's somewhat questionable if this feature is of any use. Possibly if we
+ // provide a means of actively limiting the amount of running async jobs.
+
+ struct async *delayed; ///< Resource exhaustion queue
+
// We need the pipe in order to abort polling (instead of using EINTR)
pthread_cond_t finished_cond; ///< Signals that a task has finished
@@ -1114,6 +1123,18 @@ async_manager_init (struct async_manager *self)
set_cloexec (self->finished_pipe[1]);
}
+static bool
+async_manager_retry (struct async_manager *self, struct async *async)
+{
+ if (async->cancelled)
+ {
+ if (async->destroy)
+ async->destroy (async);
+ return true;
+ }
+ return async_run (async);
+}
+
static struct async *
async_manager_dispatch_fetch (struct async_manager *self)
{
@@ -1145,6 +1166,13 @@ async_manager_dispatch (struct async_manager *self)
if (iter->destroy)
iter->destroy (iter);
}
+
+ LIST_FOR_EACH (struct async, iter, self->delayed)
+ {
+ LIST_UNLINK (self->delayed, iter);
+ if (!async_manager_retry (self, iter))
+ break;
+ }
}
static void
@@ -1164,6 +1192,8 @@ async_manager_cancel_all (struct async_manager *self)
// Mark everything cancelled so that it's not actually dispatched
LIST_FOR_EACH (struct async, iter, self->finished)
iter->cancelled = true;
+ LIST_FOR_EACH (struct async, iter, self->delayed)
+ iter->cancelled = true;
hard_assert (!pthread_mutex_unlock (&self->lock));
async_manager_dispatch (self);
@@ -1187,7 +1217,8 @@ async_manager_free (struct async_manager *self)
static void
async_cancel (struct async *self)
{
- soft_assert (!pthread_cancel (self->worker));
+ if (self->started)
+ soft_assert (!pthread_cancel (self->worker));
self->cancelled = true;
}
@@ -1219,7 +1250,7 @@ async_routine (void *user_data)
return NULL;
}
-static void
+static bool
async_run (struct async *self)
{
hard_assert (!pthread_mutex_lock (&self->manager->lock));
@@ -1231,11 +1262,24 @@ async_run (struct async *self)
hard_assert (!sigfillset (&all_blocked));
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
- hard_assert (!pthread_create (&self->worker, NULL,
- async_routine, self));
+ int error = pthread_create (&self->worker, NULL, async_routine, self);
// Now that we've created the thread, resume signal processing as usual
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
+
+ if (error)
+ {
+ hard_assert (error == EAGAIN);
+
+ hard_assert (!pthread_mutex_lock (&self->manager->lock));
+ LIST_UNLINK (self->manager->running, self);
+ hard_assert (!pthread_mutex_unlock (&self->manager->lock));
+
+ // FIXME: we probably want to have some kind of a limit on the queue
+ LIST_PREPEND (self->manager->delayed, self);
+ return false;
+ }
+ return (self->started = true);
}
#endif // LIBERTY_WANT_ASYNC
@@ -2138,7 +2182,17 @@ poller_common_free (struct poller_common *self)
static int
poller_common_get_timeout (struct poller_common *self)
{
- return self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers);
+ if (self->idle)
+ return 0;
+
+ int timeout = poller_timers_get_poll_timeout (&self->timers);
+#ifdef LIBERTY_WANT_ASYNC
+ // This is completely arbitrary, in general we have no idea when to retry,
+ // however one second doesn't sound like a particularly bad number
+ if (self->async.delayed)
+ timeout = MIN (timeout, 1000);
+#endif // LIBERTY_WANT_ASYNC
+ return timeout;
}
static void