From 7944a91707f2803bc716a50d6674069f838d19c9 Mon Sep 17 00:00:00 2001 From: Přemysl Janouch Date: Mon, 10 Oct 2016 07:45:17 +0200 Subject: Factor out an abstraction for cURL multi interface --- nncmpp.c | 455 +++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 268 insertions(+), 187 deletions(-) diff --git a/nncmpp.c b/nncmpp.c index 37c14e4..f7546b4 100644 --- a/nncmpp.c +++ b/nncmpp.c @@ -142,6 +142,223 @@ clock_msec (clockid_t clock) return (int64_t) tp.tv_sec * 1000 + (int64_t) tp.tv_nsec / 1000000; } +// --- cURL async wrapper ------------------------------------------------------ + +// You are meant to subclass this structure, no user_data pointers needed +struct poller_curl_task; + +/// Receives notification for finished transfers +typedef void (*poller_curl_done_fn) + (CURLMsg *msg, struct poller_curl_task *task); + +struct poller_curl_task +{ + CURL *easy; ///< cURL easy interface handle + char curl_error[CURL_ERROR_SIZE]; ///< cURL error info buffer + poller_curl_done_fn on_done; ///< Done callback +}; + +struct poller_curl_fd +{ + LIST_HEADER (struct poller_curl_fd) + struct poller_fd fd; ///< Poller FD +}; + +struct poller_curl +{ + struct poller *poller; ///< Parent poller + struct poller_timer timer; ///< cURL timer + CURLM *multi; ///< cURL multi interface handle + struct poller_curl_fd *fds; ///< List of all FDs +}; + +static void +poller_curl_collect (struct poller_curl *self, curl_socket_t s, int ev_bitmask) +{ + int running = 0; + CURLMcode res; + // XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM + if ((res = curl_multi_socket_action (self->multi, s, ev_bitmask, &running))) + print_debug ("cURL: %s", curl_multi_strerror (res)); + + CURLMsg *msg; + while ((msg = curl_multi_info_read (self->multi, &running))) + if (msg->msg == CURLMSG_DONE) + { + struct poller_curl_task *task = NULL; + hard_assert (!curl_easy_getinfo + (msg->easy_handle, CURLINFO_PRIVATE, &task)); + task->on_done (msg, task); + } +} + +static void +poller_curl_on_socket (const struct pollfd *pfd, void *user_data) +{ + int mask = 0; + if (pfd->revents & POLLIN) mask |= CURL_CSELECT_IN; + if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT; + if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR; + poller_curl_collect (user_data, pfd->fd, mask); +} + +static int +poller_curl_on_socket_action (CURL *easy, curl_socket_t s, int what, + void *user_data, void *socket_data) +{ + (void) easy; + struct poller_curl *self = user_data; + + struct poller_curl_fd *fd; + if (!(fd = socket_data)) + { + fd = xmalloc (sizeof *fd); + LIST_PREPEND (self->fds, fd); + + poller_fd_init (&fd->fd, self->poller, s); + fd->fd.dispatcher = poller_curl_on_socket; + fd->fd.user_data = self; + curl_multi_assign (self->multi, s, fd); + } + if (what == CURL_POLL_REMOVE) + { + poller_fd_reset (&fd->fd); + LIST_UNLINK (self->fds, fd); + free (fd); + } + else + { + short events = 0; + if (what == CURL_POLL_IN) events = POLLIN; + if (what == CURL_POLL_OUT) events = POLLOUT; + if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT; + poller_fd_set (&fd->fd, events); + } + return 0; +} + +static void +poller_curl_on_timer (void *user_data) +{ + poller_curl_collect (user_data, CURL_SOCKET_TIMEOUT, 0); +} + +static int +poller_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data) +{ + (void) multi; + struct poller_curl *self = user_data; + + if (timeout_ms < 0) + poller_timer_reset (&self->timer); + else + poller_timer_set (&self->timer, timeout_ms); + return 0; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static bool +poller_curl_init (struct poller_curl *self, struct poller *poller, + struct error **e) +{ + memset (self, 0, sizeof *self); + if (!(self->multi = curl_multi_init ())) + { + error_set (e, "cURL setup failed"); + return false; + } + + CURLMcode mres; + if ((mres = curl_multi_setopt (self->multi, + CURLMOPT_SOCKETFUNCTION, poller_curl_on_socket_action)) + || (mres = curl_multi_setopt (self->multi, + CURLMOPT_TIMERFUNCTION, poller_curl_on_timer_change)) + || (mres = curl_multi_setopt (self->multi, CURLMOPT_SOCKETDATA, self)) + || (mres = curl_multi_setopt (self->multi, CURLMOPT_TIMERDATA, self))) + { + error_set (e, "%s: %s", + "cURL setup failed", curl_multi_strerror (mres)); + curl_multi_cleanup (self->multi); + return false; + } + + poller_timer_init (&self->timer, (self->poller = poller)); + self->timer.dispatcher = poller_curl_on_timer; + self->timer.user_data = self; + return true; +} + +static void +poller_curl_free (struct poller_curl *self) +{ + curl_multi_cleanup (self->multi); + poller_timer_reset (&self->timer); + + LIST_FOR_EACH (struct poller_curl_fd, iter, self->fds) + { + poller_fd_reset (&iter->fd); + free (iter); + } +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +/// Initialize a task with a new easy instance that can be used with the poller +static bool +poller_curl_spawn (struct poller_curl_task *task, struct error **e) +{ + CURL *easy; + if (!(easy = curl_easy_init ())) + { + error_set (e, "cURL setup failed"); + return false; + } + + // We already take care of SIGPIPE, and native DNS timeouts are only + // a problem for people without the AsynchDNS feature. + // + // Unfortunately, cURL doesn't allow custom callbacks for DNS. + // The most we could try is parse out the hostname and provide an address + // override for it using CURLOPT_RESOLVE. Or be our own SOCKS4A/5 proxy. + + CURLcode res; + if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL, 1L)) + || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, task->curl_error)) + || (res = curl_easy_setopt (easy, CURLOPT_PRIVATE, task))) + { + error_set (e, "%s", curl_easy_strerror (res)); + curl_easy_cleanup (easy); + return false; + } + + task->easy = easy; + return true; +} + +static bool +poller_curl_add (struct poller_curl *self, CURL *easy, struct error **e) +{ + CURLMcode mres; + // "CURLMOPT_TIMERFUNCTION [...] will be called from within this function" + if (!(mres = curl_multi_add_handle (self->multi, easy))) + return true; + + error_set (e, "%s", curl_multi_strerror (mres)); + return false; +} + +static bool +poller_curl_remove (struct poller_curl *self, CURL *easy, struct error **e) +{ + CURLMcode mres; + if (!(mres = curl_multi_remove_handle (self->multi, easy))) + return true; + + error_set (e, "%s", curl_multi_strerror (mres)); + return false; +} + // --- Application ------------------------------------------------------------- // Function names are prefixed mostly because of curses which clutters the @@ -1540,145 +1757,12 @@ app_process_termo_event (termo_key_t *event) // --- Streams ----------------------------------------------------------------- -// TODO: either move to app_context or write a poller abstraction for cURL -static struct -{ - CURLM *curl; ///< cURL multi handle - struct poller_timer timer; ///< cURL timer - - struct poller poller; ///< Poller - bool polling; ///< Polling - - char curl_error[CURL_ERROR_SIZE]; ///< cURL error info buffer - CURLcode result; ///< Transfer result -} -g_curl; - -static void -app_curl_collect (curl_socket_t s, int ev_bitmask) -{ - int running = 0; - CURLMcode res; - // XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM - if ((res = curl_multi_socket_action (g_curl.curl, s, ev_bitmask, &running))) - print_debug ("cURL: %s", curl_multi_strerror (res)); - - CURLMsg *msg; - while ((msg = curl_multi_info_read (g_curl.curl, &running))) - { - // TODO: notify about completion - if (msg->msg == CURLMSG_DONE) - { - (void) msg->easy_handle; - g_curl.result = msg->data.result; - g_curl.polling = false; - } - } -} - -static void -app_curl_on_socket (const struct pollfd *pfd, void *user_data) -{ - (void) pfd; - (void) user_data; - - int mask = 0; - if (pfd->revents & POLLIN) mask |= CURL_CSELECT_IN; - if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT; - if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR; - app_curl_collect (pfd->fd, mask); -} - -static int -app_curl_on_socket_action (CURL *easy, curl_socket_t s, int what, - void *user_data, void *socket_data) -{ - (void) easy; - (void) user_data; - - // TODO: when we move to the main poller, this should be a linked list - // so that we can be sure to free it all - struct poller_fd *fd; - if (!(fd = socket_data)) - { - poller_fd_init ((fd = xmalloc (sizeof *fd)), &g_curl.poller, s); - fd->dispatcher = app_curl_on_socket; - curl_multi_assign (g_curl.curl, s, fd); - } - if (what == CURL_POLL_REMOVE) - { - poller_fd_reset (fd); - free (fd); - } - else - { - short events = 0; - if (what == CURL_POLL_IN) events = POLLIN; - if (what == CURL_POLL_OUT) events = POLLOUT; - if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT; - poller_fd_set (fd, events); - } - return 0; -} - -static void -app_curl_on_timer (void *user_data) -{ - (void) user_data; - app_curl_collect (CURL_SOCKET_TIMEOUT, 0); -} - -static int -app_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data) -{ - (void) multi; - (void) user_data; - - if (timeout_ms < 0) - poller_timer_reset (&g_curl.timer); - else - poller_timer_set (&g_curl.timer, timeout_ms); - return 0; -} - -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -static CURL * -app_curl_start (const char *uri, struct error **e) +struct stream_tab_task { - CURL *easy; - if (!(easy = curl_easy_init ())) - { - error_set (e, "cURL setup failed"); - return NULL; - } - - // We already take care of SIGPIPE, and native DNS timeouts are only - // a problem for people without the AsynchDNS feature. - // - // Unfortunately, cURL doesn't allow custom callbacks for DNS. - // The most we could try is parse out the hostname and provide an address - // override for it using CURLOPT_RESOLVE. Or be our own SOCKS4A/5 proxy. - CURLcode res; - if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL, 1L)) - || (res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L)) - || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS, 1L)) - // TODO: make the timeout a bit larger once we're asynchronous - || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT, 5L)) - // TODO: the error needs to be one per "CURL *" - || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, g_curl.curl_error)) - // Not checking anything, we just want some data, any data - || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L)) - || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L)) - || (res = curl_easy_setopt (easy, CURLOPT_URL, uri))) - { - error_set (e, "%s", curl_easy_strerror (res)); - curl_easy_cleanup (easy); - return NULL; - } - - return easy; -} + struct poller_curl_task curl; ///< Superclass + bool polling; ///< Still downloading + CURLcode result; ///< Operation result +}; static size_t write_callback (char *ptr, size_t size, size_t nmemb, void *user_data) @@ -1693,62 +1777,60 @@ write_callback (char *ptr, size_t size, size_t nmemb, void *user_data) return size * nmemb; } -// TODO: don't block on this, move this somehow to the main event loop +static void +app_download_on_done (CURLMsg *msg, struct poller_curl_task *task) +{ + struct stream_tab_task *self = + CONTAINER_OF (task, struct stream_tab_task, curl); + self->polling = false; + self->result = msg->data.result; +} + static bool app_download (const char *uri, struct str *buf, char **content_type, struct error **e) { - bool result = false; - poller_init (&g_curl.poller); - poller_timer_init (&g_curl.timer, &g_curl.poller); - g_curl.timer.dispatcher = app_curl_on_timer; - g_curl.polling = true; - - if (!(g_curl.curl = curl_multi_init ())) - { - error_set (e, "cURL setup failed"); - goto error_1; - } + struct poller poller; + poller_init (&poller); - CURLMcode mres; - if ((mres = curl_multi_setopt (g_curl.curl, - CURLMOPT_SOCKETFUNCTION, app_curl_on_socket_action)) - || (mres = curl_multi_setopt (g_curl.curl, - CURLMOPT_TIMERFUNCTION, app_curl_on_timer_change))) - { - error_set (e, "%s: %s", - "cURL setup failed", curl_multi_strerror (mres)); - goto error_2; - } + struct poller_curl pc; + hard_assert (poller_curl_init (&pc, &poller, NULL)); + struct stream_tab_task task; + hard_assert (poller_curl_spawn (&task.curl, NULL)); - CURL *easy; - if (!(easy = app_curl_start (uri, e))) - goto error_2; + CURL *easy = task.curl.easy; + bool result = false; CURLcode res; - if ((res = curl_easy_setopt (easy, CURLOPT_WRITEDATA, buf)) - || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION, write_callback))) + if ((res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L)) + || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS, 1L)) + // TODO: make the timeout a bit larger once we're asynchronous + || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT, 5L)) + // Not checking anything, we just want some data, any data + || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L)) + || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L)) + || (res = curl_easy_setopt (easy, CURLOPT_URL, uri)) + + || (res = curl_easy_setopt (easy, CURLOPT_WRITEDATA, buf)) + || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION, write_callback))) { error_set (e, "%s: %s", "cURL setup failed", curl_easy_strerror (res)); - goto error_3; + goto error_1; } - if ((mres = curl_multi_add_handle (g_curl.curl, easy))) - { - error_set (e, "%s: %s", - "cURL setup failed", curl_multi_strerror (mres)); - goto error_3; - } + task.curl.on_done = app_download_on_done; + hard_assert (poller_curl_add (&pc, task.curl.easy, NULL)); - poller_timer_set (&g_curl.timer, 0); - while (g_curl.polling) - poller_run (&g_curl.poller); + // TODO: don't run a subloop, run the task fully asynchronously + task.polling = true; + while (task.polling) + poller_run (&poller); - if (g_curl.result - && g_curl.result != CURLE_WRITE_ERROR) + if (task.result + && task.result != CURLE_WRITE_ERROR) { - error_set (e, "%s: %s", "download failed", g_curl.curl_error); - goto error_4; + error_set (e, "%s: %s", "download failed", task.curl.curl_error); + goto error_2; } long code; @@ -1758,27 +1840,26 @@ app_download (const char *uri, struct str *buf, char **content_type, { error_set (e, "%s: %s", "cURL info retrieval failed", curl_easy_strerror (res)); - goto error_4; + goto error_2; } if (code != 200) { error_set (e, "%s: %ld", "unexpected HTTP response code", code); - goto error_4; + goto error_2; } if (type && content_type) *content_type = xstrdup (type); result = true; -error_4: - curl_multi_remove_handle (g_curl.curl, easy); -error_3: - curl_easy_cleanup (easy); error_2: - curl_multi_cleanup (g_curl.curl); + hard_assert (poller_curl_remove (&pc, task.curl.easy, NULL)); error_1: - poller_free (&g_curl.poller); + curl_easy_cleanup (task.curl.easy); + poller_curl_free (&pc); + + poller_free (&poller); return result; } -- cgit v1.2.3-70-g09d2