From 7944a91707f2803bc716a50d6674069f838d19c9 Mon Sep 17 00:00:00 2001
From: Přemysl Janouch
Date: Mon, 10 Oct 2016 07:45:17 +0200
Subject: Factor out an abstraction for cURL multi interface
---
nncmpp.c | 455 +++++++++++++++++++++++++++++++++++++--------------------------
1 file changed, 268 insertions(+), 187 deletions(-)
diff --git a/nncmpp.c b/nncmpp.c
index 37c14e4..f7546b4 100644
--- a/nncmpp.c
+++ b/nncmpp.c
@@ -142,6 +142,223 @@ clock_msec (clockid_t clock)
return (int64_t) tp.tv_sec * 1000 + (int64_t) tp.tv_nsec / 1000000;
}
+// --- cURL async wrapper ------------------------------------------------------
+
+// You are meant to subclass this structure, no user_data pointers needed
+struct poller_curl_task;
+
+/// Receives notification for finished transfers
+typedef void (*poller_curl_done_fn)
+ (CURLMsg *msg, struct poller_curl_task *task);
+
+struct poller_curl_task
+{
+ CURL *easy; ///< cURL easy interface handle
+ char curl_error[CURL_ERROR_SIZE]; ///< cURL error info buffer
+ poller_curl_done_fn on_done; ///< Done callback
+};
+
+struct poller_curl_fd
+{
+ LIST_HEADER (struct poller_curl_fd)
+ struct poller_fd fd; ///< Poller FD
+};
+
+struct poller_curl
+{
+ struct poller *poller; ///< Parent poller
+ struct poller_timer timer; ///< cURL timer
+ CURLM *multi; ///< cURL multi interface handle
+ struct poller_curl_fd *fds; ///< List of all FDs
+};
+
+static void
+poller_curl_collect (struct poller_curl *self, curl_socket_t s, int ev_bitmask)
+{
+ int running = 0;
+ CURLMcode res;
+ // XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM
+ if ((res = curl_multi_socket_action (self->multi, s, ev_bitmask, &running)))
+ print_debug ("cURL: %s", curl_multi_strerror (res));
+
+ CURLMsg *msg;
+ while ((msg = curl_multi_info_read (self->multi, &running)))
+ if (msg->msg == CURLMSG_DONE)
+ {
+ struct poller_curl_task *task = NULL;
+ hard_assert (!curl_easy_getinfo
+ (msg->easy_handle, CURLINFO_PRIVATE, &task));
+ task->on_done (msg, task);
+ }
+}
+
+static void
+poller_curl_on_socket (const struct pollfd *pfd, void *user_data)
+{
+ int mask = 0;
+ if (pfd->revents & POLLIN) mask |= CURL_CSELECT_IN;
+ if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT;
+ if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR;
+ poller_curl_collect (user_data, pfd->fd, mask);
+}
+
+static int
+poller_curl_on_socket_action (CURL *easy, curl_socket_t s, int what,
+ void *user_data, void *socket_data)
+{
+ (void) easy;
+ struct poller_curl *self = user_data;
+
+ struct poller_curl_fd *fd;
+ if (!(fd = socket_data))
+ {
+ fd = xmalloc (sizeof *fd);
+ LIST_PREPEND (self->fds, fd);
+
+ poller_fd_init (&fd->fd, self->poller, s);
+ fd->fd.dispatcher = poller_curl_on_socket;
+ fd->fd.user_data = self;
+ curl_multi_assign (self->multi, s, fd);
+ }
+ if (what == CURL_POLL_REMOVE)
+ {
+ poller_fd_reset (&fd->fd);
+ LIST_UNLINK (self->fds, fd);
+ free (fd);
+ }
+ else
+ {
+ short events = 0;
+ if (what == CURL_POLL_IN) events = POLLIN;
+ if (what == CURL_POLL_OUT) events = POLLOUT;
+ if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT;
+ poller_fd_set (&fd->fd, events);
+ }
+ return 0;
+}
+
+static void
+poller_curl_on_timer (void *user_data)
+{
+ poller_curl_collect (user_data, CURL_SOCKET_TIMEOUT, 0);
+}
+
+static int
+poller_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data)
+{
+ (void) multi;
+ struct poller_curl *self = user_data;
+
+ if (timeout_ms < 0)
+ poller_timer_reset (&self->timer);
+ else
+ poller_timer_set (&self->timer, timeout_ms);
+ return 0;
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static bool
+poller_curl_init (struct poller_curl *self, struct poller *poller,
+ struct error **e)
+{
+ memset (self, 0, sizeof *self);
+ if (!(self->multi = curl_multi_init ()))
+ {
+ error_set (e, "cURL setup failed");
+ return false;
+ }
+
+ CURLMcode mres;
+ if ((mres = curl_multi_setopt (self->multi,
+ CURLMOPT_SOCKETFUNCTION, poller_curl_on_socket_action))
+ || (mres = curl_multi_setopt (self->multi,
+ CURLMOPT_TIMERFUNCTION, poller_curl_on_timer_change))
+ || (mres = curl_multi_setopt (self->multi, CURLMOPT_SOCKETDATA, self))
+ || (mres = curl_multi_setopt (self->multi, CURLMOPT_TIMERDATA, self)))
+ {
+ error_set (e, "%s: %s",
+ "cURL setup failed", curl_multi_strerror (mres));
+ curl_multi_cleanup (self->multi);
+ return false;
+ }
+
+ poller_timer_init (&self->timer, (self->poller = poller));
+ self->timer.dispatcher = poller_curl_on_timer;
+ self->timer.user_data = self;
+ return true;
+}
+
+static void
+poller_curl_free (struct poller_curl *self)
+{
+ curl_multi_cleanup (self->multi);
+ poller_timer_reset (&self->timer);
+
+ LIST_FOR_EACH (struct poller_curl_fd, iter, self->fds)
+ {
+ poller_fd_reset (&iter->fd);
+ free (iter);
+ }
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+/// Initialize a task with a new easy instance that can be used with the poller
+static bool
+poller_curl_spawn (struct poller_curl_task *task, struct error **e)
+{
+ CURL *easy;
+ if (!(easy = curl_easy_init ()))
+ {
+ error_set (e, "cURL setup failed");
+ return false;
+ }
+
+ // We already take care of SIGPIPE, and native DNS timeouts are only
+ // a problem for people without the AsynchDNS feature.
+ //
+ // Unfortunately, cURL doesn't allow custom callbacks for DNS.
+ // The most we could try is parse out the hostname and provide an address
+ // override for it using CURLOPT_RESOLVE. Or be our own SOCKS4A/5 proxy.
+
+ CURLcode res;
+ if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL, 1L))
+ || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, task->curl_error))
+ || (res = curl_easy_setopt (easy, CURLOPT_PRIVATE, task)))
+ {
+ error_set (e, "%s", curl_easy_strerror (res));
+ curl_easy_cleanup (easy);
+ return false;
+ }
+
+ task->easy = easy;
+ return true;
+}
+
+static bool
+poller_curl_add (struct poller_curl *self, CURL *easy, struct error **e)
+{
+ CURLMcode mres;
+ // "CURLMOPT_TIMERFUNCTION [...] will be called from within this function"
+ if (!(mres = curl_multi_add_handle (self->multi, easy)))
+ return true;
+
+ error_set (e, "%s", curl_multi_strerror (mres));
+ return false;
+}
+
+static bool
+poller_curl_remove (struct poller_curl *self, CURL *easy, struct error **e)
+{
+ CURLMcode mres;
+ if (!(mres = curl_multi_remove_handle (self->multi, easy)))
+ return true;
+
+ error_set (e, "%s", curl_multi_strerror (mres));
+ return false;
+}
+
// --- Application -------------------------------------------------------------
// Function names are prefixed mostly because of curses which clutters the
@@ -1540,145 +1757,12 @@ app_process_termo_event (termo_key_t *event)
// --- Streams -----------------------------------------------------------------
-// TODO: either move to app_context or write a poller abstraction for cURL
-static struct
-{
- CURLM *curl; ///< cURL multi handle
- struct poller_timer timer; ///< cURL timer
-
- struct poller poller; ///< Poller
- bool polling; ///< Polling
-
- char curl_error[CURL_ERROR_SIZE]; ///< cURL error info buffer
- CURLcode result; ///< Transfer result
-}
-g_curl;
-
-static void
-app_curl_collect (curl_socket_t s, int ev_bitmask)
-{
- int running = 0;
- CURLMcode res;
- // XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM
- if ((res = curl_multi_socket_action (g_curl.curl, s, ev_bitmask, &running)))
- print_debug ("cURL: %s", curl_multi_strerror (res));
-
- CURLMsg *msg;
- while ((msg = curl_multi_info_read (g_curl.curl, &running)))
- {
- // TODO: notify about completion
- if (msg->msg == CURLMSG_DONE)
- {
- (void) msg->easy_handle;
- g_curl.result = msg->data.result;
- g_curl.polling = false;
- }
- }
-}
-
-static void
-app_curl_on_socket (const struct pollfd *pfd, void *user_data)
-{
- (void) pfd;
- (void) user_data;
-
- int mask = 0;
- if (pfd->revents & POLLIN) mask |= CURL_CSELECT_IN;
- if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT;
- if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR;
- app_curl_collect (pfd->fd, mask);
-}
-
-static int
-app_curl_on_socket_action (CURL *easy, curl_socket_t s, int what,
- void *user_data, void *socket_data)
-{
- (void) easy;
- (void) user_data;
-
- // TODO: when we move to the main poller, this should be a linked list
- // so that we can be sure to free it all
- struct poller_fd *fd;
- if (!(fd = socket_data))
- {
- poller_fd_init ((fd = xmalloc (sizeof *fd)), &g_curl.poller, s);
- fd->dispatcher = app_curl_on_socket;
- curl_multi_assign (g_curl.curl, s, fd);
- }
- if (what == CURL_POLL_REMOVE)
- {
- poller_fd_reset (fd);
- free (fd);
- }
- else
- {
- short events = 0;
- if (what == CURL_POLL_IN) events = POLLIN;
- if (what == CURL_POLL_OUT) events = POLLOUT;
- if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT;
- poller_fd_set (fd, events);
- }
- return 0;
-}
-
-static void
-app_curl_on_timer (void *user_data)
-{
- (void) user_data;
- app_curl_collect (CURL_SOCKET_TIMEOUT, 0);
-}
-
-static int
-app_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data)
-{
- (void) multi;
- (void) user_data;
-
- if (timeout_ms < 0)
- poller_timer_reset (&g_curl.timer);
- else
- poller_timer_set (&g_curl.timer, timeout_ms);
- return 0;
-}
-
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-
-static CURL *
-app_curl_start (const char *uri, struct error **e)
+struct stream_tab_task
{
- CURL *easy;
- if (!(easy = curl_easy_init ()))
- {
- error_set (e, "cURL setup failed");
- return NULL;
- }
-
- // We already take care of SIGPIPE, and native DNS timeouts are only
- // a problem for people without the AsynchDNS feature.
- //
- // Unfortunately, cURL doesn't allow custom callbacks for DNS.
- // The most we could try is parse out the hostname and provide an address
- // override for it using CURLOPT_RESOLVE. Or be our own SOCKS4A/5 proxy.
- CURLcode res;
- if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL, 1L))
- || (res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L))
- || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS, 1L))
- // TODO: make the timeout a bit larger once we're asynchronous
- || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT, 5L))
- // TODO: the error needs to be one per "CURL *"
- || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, g_curl.curl_error))
- // Not checking anything, we just want some data, any data
- || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L))
- || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L))
- || (res = curl_easy_setopt (easy, CURLOPT_URL, uri)))
- {
- error_set (e, "%s", curl_easy_strerror (res));
- curl_easy_cleanup (easy);
- return NULL;
- }
-
- return easy;
-}
+ struct poller_curl_task curl; ///< Superclass
+ bool polling; ///< Still downloading
+ CURLcode result; ///< Operation result
+};
static size_t
write_callback (char *ptr, size_t size, size_t nmemb, void *user_data)
@@ -1693,62 +1777,60 @@ write_callback (char *ptr, size_t size, size_t nmemb, void *user_data)
return size * nmemb;
}
-// TODO: don't block on this, move this somehow to the main event loop
+static void
+app_download_on_done (CURLMsg *msg, struct poller_curl_task *task)
+{
+ struct stream_tab_task *self =
+ CONTAINER_OF (task, struct stream_tab_task, curl);
+ self->polling = false;
+ self->result = msg->data.result;
+}
+
static bool
app_download (const char *uri, struct str *buf, char **content_type,
struct error **e)
{
- bool result = false;
- poller_init (&g_curl.poller);
- poller_timer_init (&g_curl.timer, &g_curl.poller);
- g_curl.timer.dispatcher = app_curl_on_timer;
- g_curl.polling = true;
-
- if (!(g_curl.curl = curl_multi_init ()))
- {
- error_set (e, "cURL setup failed");
- goto error_1;
- }
+ struct poller poller;
+ poller_init (&poller);
- CURLMcode mres;
- if ((mres = curl_multi_setopt (g_curl.curl,
- CURLMOPT_SOCKETFUNCTION, app_curl_on_socket_action))
- || (mres = curl_multi_setopt (g_curl.curl,
- CURLMOPT_TIMERFUNCTION, app_curl_on_timer_change)))
- {
- error_set (e, "%s: %s",
- "cURL setup failed", curl_multi_strerror (mres));
- goto error_2;
- }
+ struct poller_curl pc;
+ hard_assert (poller_curl_init (&pc, &poller, NULL));
+ struct stream_tab_task task;
+ hard_assert (poller_curl_spawn (&task.curl, NULL));
- CURL *easy;
- if (!(easy = app_curl_start (uri, e)))
- goto error_2;
+ CURL *easy = task.curl.easy;
+ bool result = false;
CURLcode res;
- if ((res = curl_easy_setopt (easy, CURLOPT_WRITEDATA, buf))
- || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION, write_callback)))
+ if ((res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L))
+ || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS, 1L))
+ // TODO: make the timeout a bit larger once we're asynchronous
+ || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT, 5L))
+ // Not checking anything, we just want some data, any data
+ || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L))
+ || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L))
+ || (res = curl_easy_setopt (easy, CURLOPT_URL, uri))
+
+ || (res = curl_easy_setopt (easy, CURLOPT_WRITEDATA, buf))
+ || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION, write_callback)))
{
error_set (e, "%s: %s", "cURL setup failed", curl_easy_strerror (res));
- goto error_3;
+ goto error_1;
}
- if ((mres = curl_multi_add_handle (g_curl.curl, easy)))
- {
- error_set (e, "%s: %s",
- "cURL setup failed", curl_multi_strerror (mres));
- goto error_3;
- }
+ task.curl.on_done = app_download_on_done;
+ hard_assert (poller_curl_add (&pc, task.curl.easy, NULL));
- poller_timer_set (&g_curl.timer, 0);
- while (g_curl.polling)
- poller_run (&g_curl.poller);
+ // TODO: don't run a subloop, run the task fully asynchronously
+ task.polling = true;
+ while (task.polling)
+ poller_run (&poller);
- if (g_curl.result
- && g_curl.result != CURLE_WRITE_ERROR)
+ if (task.result
+ && task.result != CURLE_WRITE_ERROR)
{
- error_set (e, "%s: %s", "download failed", g_curl.curl_error);
- goto error_4;
+ error_set (e, "%s: %s", "download failed", task.curl.curl_error);
+ goto error_2;
}
long code;
@@ -1758,27 +1840,26 @@ app_download (const char *uri, struct str *buf, char **content_type,
{
error_set (e, "%s: %s",
"cURL info retrieval failed", curl_easy_strerror (res));
- goto error_4;
+ goto error_2;
}
if (code != 200)
{
error_set (e, "%s: %ld", "unexpected HTTP response code", code);
- goto error_4;
+ goto error_2;
}
if (type && content_type)
*content_type = xstrdup (type);
result = true;
-error_4:
- curl_multi_remove_handle (g_curl.curl, easy);
-error_3:
- curl_easy_cleanup (easy);
error_2:
- curl_multi_cleanup (g_curl.curl);
+ hard_assert (poller_curl_remove (&pc, task.curl.easy, NULL));
error_1:
- poller_free (&g_curl.poller);
+ curl_easy_cleanup (task.curl.easy);
+ poller_curl_free (&pc);
+
+ poller_free (&poller);
return result;
}
--
cgit v1.2.3-70-g09d2