diff options
| -rw-r--r-- | nncmpp.c | 455 | 
1 files changed, 268 insertions, 187 deletions
| @@ -142,6 +142,223 @@ clock_msec (clockid_t clock)  	return (int64_t) tp.tv_sec * 1000 + (int64_t) tp.tv_nsec / 1000000;  } +// --- cURL async wrapper ------------------------------------------------------ + +// You are meant to subclass this structure, no user_data pointers needed +struct poller_curl_task; + +/// Receives notification for finished transfers +typedef void (*poller_curl_done_fn) +	(CURLMsg *msg, struct poller_curl_task *task); + +struct poller_curl_task +{ +	CURL *easy;                         ///< cURL easy interface handle +	char curl_error[CURL_ERROR_SIZE];   ///< cURL error info buffer +	poller_curl_done_fn on_done;        ///< Done callback +}; + +struct poller_curl_fd +{ +	LIST_HEADER (struct poller_curl_fd) +	struct poller_fd fd;                ///< Poller FD +}; + +struct poller_curl +{ +	struct poller *poller;              ///< Parent poller +	struct poller_timer timer;          ///< cURL timer +	CURLM *multi;                       ///< cURL multi interface handle +	struct poller_curl_fd *fds;         ///< List of all FDs +}; + +static void +poller_curl_collect (struct poller_curl *self, curl_socket_t s, int ev_bitmask) +{ +	int running = 0; +	CURLMcode res; +	// XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM +	if ((res = curl_multi_socket_action (self->multi, s, ev_bitmask, &running))) +		print_debug ("cURL: %s", curl_multi_strerror (res)); + +	CURLMsg *msg; +	while ((msg = curl_multi_info_read (self->multi, &running))) +		if (msg->msg == CURLMSG_DONE) +		{ +			struct poller_curl_task *task = NULL; +			hard_assert (!curl_easy_getinfo +				(msg->easy_handle, CURLINFO_PRIVATE, &task)); +			task->on_done (msg, task); +		} +} + +static void +poller_curl_on_socket (const struct pollfd *pfd, void *user_data) +{ +	int mask = 0; +	if (pfd->revents & POLLIN)  mask |= CURL_CSELECT_IN; +	if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT; +	if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR; +	poller_curl_collect (user_data, pfd->fd, mask); +} + +static int +poller_curl_on_socket_action (CURL *easy, curl_socket_t s, int what, +	void *user_data, void *socket_data) +{ +	(void) easy; +	struct poller_curl *self = user_data; + +	struct poller_curl_fd *fd; +	if (!(fd = socket_data)) +	{ +		fd = xmalloc (sizeof *fd); +		LIST_PREPEND (self->fds, fd); + +		poller_fd_init (&fd->fd, self->poller, s); +		fd->fd.dispatcher = poller_curl_on_socket; +		fd->fd.user_data = self; +		curl_multi_assign (self->multi, s, fd); +	} +	if (what == CURL_POLL_REMOVE) +	{ +		poller_fd_reset (&fd->fd); +		LIST_UNLINK (self->fds, fd); +		free (fd); +	} +	else +	{ +		short events = 0; +		if (what == CURL_POLL_IN)    events = POLLIN; +		if (what == CURL_POLL_OUT)   events =          POLLOUT; +		if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT; +		poller_fd_set (&fd->fd, events); +	} +	return 0; +} + +static void +poller_curl_on_timer (void *user_data) +{ +	poller_curl_collect (user_data, CURL_SOCKET_TIMEOUT, 0); +} + +static int +poller_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data) +{ +	(void) multi; +	struct poller_curl *self = user_data; + +	if (timeout_ms < 0) +		poller_timer_reset (&self->timer); +	else +		poller_timer_set (&self->timer, timeout_ms); +	return 0; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static bool +poller_curl_init (struct poller_curl *self, struct poller *poller, +	struct error **e) +{ +	memset (self, 0, sizeof *self); +	if (!(self->multi = curl_multi_init ())) +	{ +		error_set (e, "cURL setup failed"); +		return false; +	} + +	CURLMcode mres; +	if ((mres = curl_multi_setopt (self->multi, +			CURLMOPT_SOCKETFUNCTION, poller_curl_on_socket_action)) +	 || (mres = curl_multi_setopt (self->multi, +			CURLMOPT_TIMERFUNCTION, poller_curl_on_timer_change)) +	 || (mres = curl_multi_setopt (self->multi, CURLMOPT_SOCKETDATA, self)) +	 || (mres = curl_multi_setopt (self->multi, CURLMOPT_TIMERDATA, self))) +	{ +		error_set (e, "%s: %s", +			"cURL setup failed", curl_multi_strerror (mres)); +		curl_multi_cleanup (self->multi); +		return false; +	} + +	poller_timer_init (&self->timer, (self->poller = poller)); +	self->timer.dispatcher = poller_curl_on_timer; +	self->timer.user_data = self; +	return true; +} + +static void +poller_curl_free (struct poller_curl *self) +{ +	curl_multi_cleanup (self->multi); +	poller_timer_reset (&self->timer); + +	LIST_FOR_EACH (struct poller_curl_fd, iter, self->fds) +	{ +		poller_fd_reset (&iter->fd); +		free (iter); +	} +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +/// Initialize a task with a new easy instance that can be used with the poller +static bool +poller_curl_spawn (struct poller_curl_task *task, struct error **e) +{ +	CURL *easy; +	if (!(easy = curl_easy_init ())) +	{ +		error_set (e, "cURL setup failed"); +		return false; +	} + +	// We already take care of SIGPIPE, and native DNS timeouts are only +	// a problem for people without the AsynchDNS feature. +	// +	// Unfortunately, cURL doesn't allow custom callbacks for DNS. +	// The most we could try is parse out the hostname and provide an address +	// override for it using CURLOPT_RESOLVE.  Or be our own SOCKS4A/5 proxy. + +	CURLcode res; +	if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL,    1L)) +	 || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, task->curl_error)) +	 || (res = curl_easy_setopt (easy, CURLOPT_PRIVATE,     task))) +	{ +		error_set (e, "%s", curl_easy_strerror (res)); +		curl_easy_cleanup (easy); +		return false; +	} + +	task->easy = easy; +	return true; +} + +static bool +poller_curl_add (struct poller_curl *self, CURL *easy, struct error **e) +{ +	CURLMcode mres; +	// "CURLMOPT_TIMERFUNCTION [...] will be called from within this function" +	if (!(mres = curl_multi_add_handle (self->multi, easy))) +		return true; + +	error_set (e, "%s", curl_multi_strerror (mres)); +	return false; +} + +static bool +poller_curl_remove (struct poller_curl *self, CURL *easy, struct error **e) +{ +	CURLMcode mres; +	if (!(mres = curl_multi_remove_handle (self->multi, easy))) +		return true; + +	error_set (e, "%s", curl_multi_strerror (mres)); +	return false; +} +  // --- Application -------------------------------------------------------------  // Function names are prefixed mostly because of curses which clutters the @@ -1540,145 +1757,12 @@ app_process_termo_event (termo_key_t *event)  // --- Streams ----------------------------------------------------------------- -// TODO: either move to app_context or write a poller abstraction for cURL -static struct -{ -	CURLM *curl;                        ///< cURL multi handle -	struct poller_timer timer;          ///< cURL timer - -	struct poller poller;               ///< Poller -	bool polling;                       ///< Polling - -	char curl_error[CURL_ERROR_SIZE];   ///< cURL error info buffer -	CURLcode result;                    ///< Transfer result -} -g_curl; - -static void -app_curl_collect (curl_socket_t s, int ev_bitmask) -{ -	int running = 0; -	CURLMcode res; -	// XXX: ignoring errors, in particular CURLM_CALL_MULTI_PERFORM -	if ((res = curl_multi_socket_action (g_curl.curl, s, ev_bitmask, &running))) -		print_debug ("cURL: %s", curl_multi_strerror (res)); - -	CURLMsg *msg; -	while ((msg = curl_multi_info_read (g_curl.curl, &running))) -	{ -		// TODO: notify about completion -		if (msg->msg == CURLMSG_DONE) -		{ -			(void) msg->easy_handle; -			g_curl.result = msg->data.result; -			g_curl.polling = false; -		} -	} -} - -static void -app_curl_on_socket (const struct pollfd *pfd, void *user_data) -{ -	(void) pfd; -	(void) user_data; - -	int mask = 0; -	if (pfd->revents & POLLIN)  mask |= CURL_CSELECT_IN; -	if (pfd->revents & POLLOUT) mask |= CURL_CSELECT_OUT; -	if (pfd->revents & POLLERR) mask |= CURL_CSELECT_ERR; -	app_curl_collect (pfd->fd, mask); -} - -static int -app_curl_on_socket_action (CURL *easy, curl_socket_t s, int what, -	void *user_data, void *socket_data) -{ -	(void) easy; -	(void) user_data; - -	// TODO: when we move to the main poller, this should be a linked list -	//   so that we can be sure to free it all -	struct poller_fd *fd; -	if (!(fd = socket_data)) -	{ -		poller_fd_init ((fd = xmalloc (sizeof *fd)), &g_curl.poller, s); -		fd->dispatcher = app_curl_on_socket; -		curl_multi_assign (g_curl.curl, s, fd); -	} -	if (what == CURL_POLL_REMOVE) -	{ -		poller_fd_reset (fd); -		free (fd); -	} -	else -	{ -		short events = 0; -		if (what == CURL_POLL_IN)    events = POLLIN; -		if (what == CURL_POLL_OUT)   events =          POLLOUT; -		if (what == CURL_POLL_INOUT) events = POLLIN | POLLOUT; -		poller_fd_set (fd, events); -	} -	return 0; -} - -static void -app_curl_on_timer (void *user_data) -{ -	(void) user_data; -	app_curl_collect (CURL_SOCKET_TIMEOUT, 0); -} - -static int -app_curl_on_timer_change (CURLM *multi, long timeout_ms, void *user_data) -{ -	(void) multi; -	(void) user_data; - -	if (timeout_ms < 0) -		poller_timer_reset (&g_curl.timer); -	else -		poller_timer_set (&g_curl.timer, timeout_ms); -	return 0; -} - -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -static CURL * -app_curl_start (const char *uri, struct error **e) +struct stream_tab_task  { -	CURL *easy; -	if (!(easy = curl_easy_init ())) -	{ -		error_set (e, "cURL setup failed"); -		return NULL; -	} - -	// We already take care of SIGPIPE, and native DNS timeouts are only -	// a problem for people without the AsynchDNS feature. -	// -	// Unfortunately, cURL doesn't allow custom callbacks for DNS. -	// The most we could try is parse out the hostname and provide an address -	// override for it using CURLOPT_RESOLVE.  Or be our own SOCKS4A/5 proxy. -	CURLcode res; -	if ((res = curl_easy_setopt (easy, CURLOPT_NOSIGNAL,       1L)) -	 || (res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L)) -	 || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS,     1L)) -	// TODO: make the timeout a bit larger once we're asynchronous -	 || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT,        5L)) -	// TODO: the error needs to be one per "CURL *" -	 || (res = curl_easy_setopt (easy, CURLOPT_ERRORBUFFER, g_curl.curl_error)) -	// Not checking anything, we just want some data, any data -	 || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L)) -	 || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L)) -	 || (res = curl_easy_setopt (easy, CURLOPT_URL,            uri))) -	{ -		error_set (e, "%s", curl_easy_strerror (res)); -		curl_easy_cleanup (easy); -		return NULL; -	} - -	return easy; -} +	struct poller_curl_task curl;       ///< Superclass +	bool polling;                       ///< Still downloading +	CURLcode result;                    ///< Operation result +};  static size_t  write_callback (char *ptr, size_t size, size_t nmemb, void *user_data) @@ -1693,62 +1777,60 @@ write_callback (char *ptr, size_t size, size_t nmemb, void *user_data)  	return size * nmemb;  } -// TODO: don't block on this, move this somehow to the main event loop +static void +app_download_on_done (CURLMsg *msg, struct poller_curl_task *task) +{ +	struct stream_tab_task *self = +		CONTAINER_OF (task, struct stream_tab_task, curl); +	self->polling = false; +	self->result = msg->data.result; +} +  static bool  app_download (const char *uri, struct str *buf, char **content_type,  	struct error **e)  { -	bool result = false; -	poller_init (&g_curl.poller); -	poller_timer_init (&g_curl.timer, &g_curl.poller); -	g_curl.timer.dispatcher = app_curl_on_timer; -	g_curl.polling = true; - -	if (!(g_curl.curl = curl_multi_init ())) -	{ -		error_set (e, "cURL setup failed"); -		goto error_1; -	} +	struct poller poller; +	poller_init (&poller); -	CURLMcode mres; -	if ((mres = curl_multi_setopt (g_curl.curl, -			CURLMOPT_SOCKETFUNCTION, app_curl_on_socket_action)) -	 || (mres = curl_multi_setopt (g_curl.curl, -		CURLMOPT_TIMERFUNCTION, app_curl_on_timer_change))) -	{ -		error_set (e, "%s: %s", -			"cURL setup failed", curl_multi_strerror (mres)); -		goto error_2; -	} +	struct poller_curl pc; +	hard_assert (poller_curl_init (&pc, &poller, NULL)); +	struct stream_tab_task task; +	hard_assert (poller_curl_spawn (&task.curl, NULL)); -	CURL *easy; -	if (!(easy = app_curl_start (uri, e))) -		goto error_2; +	CURL *easy = task.curl.easy; +	bool result = false;  	CURLcode res; -	if ((res = curl_easy_setopt (easy, CURLOPT_WRITEDATA, buf)) -	 || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION, write_callback))) +	if ((res = curl_easy_setopt (easy, CURLOPT_FOLLOWLOCATION, 1L)) +	 || (res = curl_easy_setopt (easy, CURLOPT_NOPROGRESS,     1L)) +	// TODO: make the timeout a bit larger once we're asynchronous +	 || (res = curl_easy_setopt (easy, CURLOPT_TIMEOUT,        5L)) +	// Not checking anything, we just want some data, any data +	 || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYPEER, 0L)) +	 || (res = curl_easy_setopt (easy, CURLOPT_SSL_VERIFYHOST, 0L)) +	 || (res = curl_easy_setopt (easy, CURLOPT_URL,            uri)) + +	 || (res = curl_easy_setopt (easy, CURLOPT_WRITEDATA,      buf)) +	 || (res = curl_easy_setopt (easy, CURLOPT_WRITEFUNCTION,  write_callback)))  	{  		error_set (e, "%s: %s", "cURL setup failed", curl_easy_strerror (res)); -		goto error_3; +		goto error_1;  	} -	if ((mres = curl_multi_add_handle (g_curl.curl, easy))) -	{ -		error_set (e, "%s: %s", -			"cURL setup failed", curl_multi_strerror (mres)); -		goto error_3; -	} +	task.curl.on_done = app_download_on_done; +	hard_assert (poller_curl_add (&pc, task.curl.easy, NULL)); -	poller_timer_set (&g_curl.timer, 0); -	while (g_curl.polling) -		poller_run (&g_curl.poller); +	// TODO: don't run a subloop, run the task fully asynchronously +	task.polling = true; +	while (task.polling) +		poller_run (&poller); -	if (g_curl.result -	 && g_curl.result != CURLE_WRITE_ERROR) +	if (task.result +	 && task.result != CURLE_WRITE_ERROR)  	{ -		error_set (e, "%s: %s", "download failed", g_curl.curl_error); -		goto error_4; +		error_set (e, "%s: %s", "download failed", task.curl.curl_error); +		goto error_2;  	}  	long code; @@ -1758,27 +1840,26 @@ app_download (const char *uri, struct str *buf, char **content_type,  	{  		error_set (e, "%s: %s",  			"cURL info retrieval failed", curl_easy_strerror (res)); -		goto error_4; +		goto error_2;  	}  	if (code != 200)  	{  		error_set (e, "%s: %ld", "unexpected HTTP response code", code); -		goto error_4; +		goto error_2;  	}  	if (type && content_type)  		*content_type = xstrdup (type);  	result = true; -error_4: -	curl_multi_remove_handle (g_curl.curl, easy); -error_3: -	curl_easy_cleanup (easy);  error_2: -	curl_multi_cleanup (g_curl.curl); +	hard_assert (poller_curl_remove (&pc, task.curl.easy, NULL));  error_1: -	poller_free (&g_curl.poller); +	curl_easy_cleanup (task.curl.easy); +	poller_curl_free (&pc); + +	poller_free (&poller);  	return result;  } | 
