From 73ec9134c80f9bbca5cdb0d7980fcc03dd3abeb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Eric=20Janouch?= Date: Sat, 17 Oct 2020 00:46:09 +0200 Subject: WIP: json-rpc-test-server: add HTTP support Stage 1: make just the HTTP part work. WIP: - Ensure requests aren't started in client_http_on_headers_complete() when the request demands an upgrade - Figure out keep-alive - Make sure request_free() is called exactly once after request start and between requests - Triple-check the code makes some resembling of sense Updates #5 Later stages: - clean up (#7) - ensure WS upgrade works here (it has a completely separate parser, which could be passed as an http_parserpp pointer to ws_handler_finish_handshake()) - move in ws_handler's handshake code --- json-rpc-test-server.c | 569 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 565 insertions(+), 4 deletions(-) diff --git a/json-rpc-test-server.c b/json-rpc-test-server.c index 0ce8a10..45fdc1a 100644 --- a/json-rpc-test-server.c +++ b/json-rpc-test-server.c @@ -1024,9 +1024,8 @@ ws_handler_on_url (http_parser *parser, const char *at, size_t len) return 0; } -static void -ws_handler_http_responsev (struct ws_handler *self, - const char *status, char *const *fields) +static struct str +ws_handler_build_http_responsev (const char *status, char *const *fields) { hard_assert (status != NULL); @@ -1051,6 +1050,14 @@ ws_handler_http_responsev (struct ws_handler *self, str_append (&response, "Server: " PROGRAM_NAME "/" PROGRAM_VERSION "\r\n\r\n"); + return response; +} + +static void +ws_handler_http_responsev (struct ws_handler *self, + const char *status, char *const *fields) +{ + struct str response = ws_handler_build_http_responsev (status, fields); self->write_cb (self, response.str, response.len); str_free (&response); } @@ -1289,6 +1296,7 @@ static struct simple_config_item g_config_table[] = { "port_fastcgi", "9000", "Port to bind for FastCGI" }, { "port_scgi", NULL, "Port to bind for SCGI" }, { "port_ws", NULL, "Port to bind for WebSocket" }, + { "port_http", NULL, "Port to bind for HTTP" }, { "pid_file", NULL, "Full path for the PID file" }, // XXX: here belongs something like a web SPA that interfaces with us { "static_root", NULL, "The root for static content" }, @@ -2111,7 +2119,9 @@ static void client_shutdown (struct client *self) { self->flushing = true; - // In case this shutdown is immediately followed by a close, try our best + // In case this shutdown is immediately followed by a destroy (client_ws + // calls client_destroy() rather than client_close() in case of a request + // timeout), try our best (void) flush_queue (&self->write_queue, self->socket_fd); ev_feed_event (EV_DEFAULT_ &self->write_watcher, EV_WRITE); } @@ -2584,6 +2594,551 @@ client_ws_create (EV_P_ const char *host, int sock_fd) return &self->client; } +// --- HTTP client handler ----------------------------------------------------- + +struct client_http +{ + struct client client; ///< Parent class + struct ws_handler handler; ///< WebSocket connection handler + struct request request; ///< Request (only one per connection) + + char *listening_host; ///< Listening hostname or IP + int listening_port; ///< Listening port + + http_parser hp; ///< HTTP parser + bool have_header_value; ///< Parsing header value or field? + struct str field; ///< Field part buffer + struct str value; ///< Value part buffer + struct str_map headers; ///< HTTP Headers + struct str url; ///< Request URL + struct str message; ///< Message data + + struct str response; ///< CGI response + + ev_timer request_timeout_watcher; ///< Request timeout watcher + unsigned request_timeout; ///< How long to wait for the handshake +}; + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// TODO: refactor this part as much as possible, see also http_parserpp + +static bool +client_http_header_field_is_a_list (const char *name) +{ + // This must contain all header fields we use for anything + static const char *concatenable[] = + { SEC_WS_PROTOCOL, SEC_WS_EXTENSIONS, "Connection", "Upgrade" }; + + for (size_t i = 0; i < N_ELEMENTS (concatenable); i++) + if (!strcasecmp_ascii (name, concatenable[i])) + return true; + return false; +} + +static void +client_http_on_header_read (struct client_http *self) +{ + // The HTTP parser unfolds values and removes preceding whitespace, but + // otherwise doesn't touch the values or the following whitespace. + + // RFC 7230 states that trailing whitespace is not part of a field value + char *value = self->field.str; + size_t len = self->field.len; + while (len--) + if (value[len] == '\t' || value[len] == ' ') + value[len] = '\0'; + else + break; + self->field.len = len; + + const char *field = self->field.str; + const char *current = str_map_find (&self->headers, field); + if (client_http_header_field_is_a_list (field) && current) + str_map_set (&self->headers, field, + xstrdup_printf ("%s, %s", current, self->value.str)); + else + // If the field cannot be concatenated, just overwrite the last value. + // Maybe we should issue a warning or something. + str_map_set (&self->headers, field, xstrdup (self->value.str)); +} + +static int +client_http_on_header_field (http_parser *parser, const char *at, size_t len) +{ + struct client_http *self = parser->data; + if (self->have_header_value) + { + client_http_on_header_read (self); + str_reset (&self->field); + str_reset (&self->value); + } + str_append_data (&self->field, at, len); + self->have_header_value = false; + return 0; +} + +static int +client_http_on_header_value (http_parser *parser, const char *at, size_t len) +{ + struct client_http *self = parser->data; + str_append_data (&self->value, at, len); + self->have_header_value = true; + return 0; +} + +static bool +client_http_request_start (struct client_http *self) +{ + struct str_map cgi_headers = str_map_make (free); + cgi_headers.key_xfrm = tolower_ascii_strxfrm; + + struct str_map_iter iter = str_map_iter_make (&self->headers); + const char *value; + while ((value = str_map_iter_next (&iter))) + { + char *protocol_key = xstrdup_printf ("HTTP_%s", iter.link->key); + for (char *p = protocol_key; *p; p++) + *p = (*p == '-') ? '_' : toupper_ascii (*p); + str_map_set (&cgi_headers, protocol_key, xstrdup (value)); + free (protocol_key); + } + + // This so far is enough for request_handler_json_rpc_try_handle() + // TODO: set all the required ones https://tools.ietf.org/html/rfc3875 +#define HEADER(name, value) str_map_set (&cgi_headers, (name), (value)) + // TODO: AUTH_TYPE: parse from any Authentication header + + // Trust the client and http-parser that it is set when it should be + const char *content_length = + str_map_find (&self->headers, "Content-Length"); + if (content_length) + HEADER ("CONTENT_LENGTH", xstrdup (content_length)); + + const char *content_type = + str_map_find (&self->headers, "Content-Type"); + if (content_type) + HEADER ("CONTENT_TYPE", xstrdup (content_type)); + + HEADER ("GATEWAY_INTERFACE", xstrdup ("CGI/1.1")); + // TODO: ?PATH_INFO: probably empty or even ${UF_PATH} + // TODO: ?PATH_TRANSLATED: can be relative to "static_root", or just empty + // TODO: QUERY_STRING: ${UF_QUERY} + // TODO: ?REMOTE_ADDR: we get this from accept(), save it somewhere + // or even re-query it, although it's getpeername() getnameinfo(); + // on_client_available() can store it in the returned object (unused) + // TODO: ?REMOTE_HOST: we'd have to be passed this, can be empty + // TODO: ?REMOTE_IDENT: skip + // TODO: ?REMOTE_USER: skip + HEADER ("REQUEST_METHOD", xstrdup (http_method_str (self->hp.method))); + // TODO: SCRIPT_NAME: empty, especially if PATH_INFO is ${UF_PATH} + // TODO: SERVER_NAME: parse Host, if any, fall back to self->listening_host + HEADER ("SERVER_PORT", xstrdup_printf ("%d", self->listening_port)); + HEADER ("SERVER_PROTOCOL", xstrdup ("HTTP/1.1")); + HEADER ("SERVER_SOFTWARE", + xstrdup_printf ("%s/%s", PROGRAM_NAME, PROGRAM_VERSION)); + +#undef HEADER + + // NOTE: this doesn't necessarily affect HTTP keep-alive, + // so long as the output buffer isn't reprocessed + str_reset (&self->response); + bool keep_alive = request_start (&self->request, &cgi_headers); + str_map_free (&cgi_headers); + return keep_alive; +} + +static size_t +parse_cgi_response_headers (const struct str *response, struct str_map *out) +{ + // This is simplified because we're only parsing local, sane responses + char *p = response->str, *end = p + response->len, *eol = NULL; + while ((eol = memchr (p, '\n', end - p))) + { + char *name = p, *value = memchr (p, ':', eol - p); + p = eol + 1; + + // "UNIX: [...] servers should also accept CR LF as a newline" + if (name < eol && eol[-1] == '\r') + eol--; + if (name == eol) + return p - response->str; + if (!value) + return 0; // missing colon + + *value++ = '\0'; // terminate the name to avoid having to copy it + value += strspn (value, " \t"); // skip OWS at the beginning + str_map_set (out, name, xstrndup (value, eol - value)); + } + return 0; // missing or unterminated headers +} + +static int +client_http_request_parse_cgi (struct client_http *self, + struct str_map *headers) +{ + size_t advance = parse_cgi_response_headers (&self->response, headers); + if (!advance) + return 0; + + int code = 200; + // We do not support local redirects, so use an appropriate status code + if (str_map_find (headers, "Location")) + code = 302; + + const char *status = str_map_find (headers, "Status"); + if (status && (code = atoi (status)) < 100) + return 0; + + str_map_set (headers, "Status", NULL); + if (!str_map_find (headers, "Content-Type")) + print_warning ("missing Content-Type header"); + + // Enforce correctness; we have no use for Transfer-Encoding chunked now + str_map_set (headers, "Content-Length", + xstrdup_printf ("%zu", self->response.len - advance)); + + // In the name of simplicity and despite wastefulness + str_remove_slice (&self->response, 0, advance); + return code; +} + +static void +client_http_request_finish (struct client_http *self) +{ + struct str_map headers = str_map_make (free); + headers.key_xfrm = tolower_ascii_strxfrm; + + request_free (&self->request); + int code = client_http_request_parse_cgi (self, &headers); + if (!code) + { + str_reset (&self->response); + str_append (&self->response, "Invalid CGI response."); + code = 500; + } + + str_map_set (&headers, "Connection", + xstrdup (http_should_keep_alive (&self->hp) ? "keep-alive" : "close")); + + struct strv fields = strv_make (); + struct str_map_iter iter = str_map_iter_make (&headers); + const char *value = NULL; + while ((value = str_map_iter_next (&iter))) + strv_append_owned (&fields, + xstrdup_printf ("%s: %s", iter.link->key, value)); + + // XXX: this is still for from optimal (possible conflicts, too complex) + char *status = xstrdup_printf ("%03d %s", code, http_status_str (code)); + struct str response = + ws_handler_build_http_responsev (status, fields.vector); + free (status); + strv_free (&fields); + str_map_free (&headers); + + client_write (&self->client, response.str, response.len); + client_write (&self->client, self->response.str, self->response.len); + str_free (&response); +} + +static bool +client_http_request_fail (struct client_http *self, const char *status, ...) +{ + va_list ap; + va_start (ap, status); + + const char *s; + struct strv fields = strv_make (); + while ((s = va_arg (ap, const char *))) + strv_append (&fields, s); + strv_append (&fields, "Connection: close"); + + va_end (ap); + struct str response = + ws_handler_build_http_responsev (status, fields.vector); + strv_free (&fields); + client_write (&self->client, response.str, response.len); + str_free (&response); + + // TODO: consider client_shutdown() to put this to a flushing state + // - that would mean no other request can be answered + return false; +} + +static void +client_http_write_cb (struct request *req, const void *data, size_t len) +{ + FIND_CONTAINER (self, req, struct client_http, request); + str_append_data (&self->response, data, len); +} + +static int +client_http_on_headers_complete (http_parser *parser) +{ + struct client_http *self = parser->data; + if (self->have_header_value) + client_http_on_header_read (self); + + if (str_map_find (&self->headers, "Transfer-Encoding")) + { + // TODO: see what this would mean to implement (should support chunked) + // - probably that we'd have to pre-buffer the whole message + // before starting the request (for chunked) + client_http_request_fail (self, "501 Not Implemented", NULL); + return 3; + } + + // TODO: it should normally abort when it sees an upgrade but when + // the Content-Length is non-zero or the message is chunked, + // it reads the body first... see if it allows that for GET, + // and if it lets non-GET requests make an upgrade. + // If we return 1 from here on "upgrade", we can ensure an abort. + + // TODO: parse the URI (self->url) + // - seems like this should go in client_http_request_start() + // TODO: reject URIs not containing UF_PATH that aren't "*" + + if (!client_http_request_start (self)) + { + // FIXME: we want to enable keeping the connection alive in this case, + // though we (might) have a body to skip now + client_http_request_finish (self); + return 3; + } + return 0; +} + +static int +client_http_on_url (http_parser *parser, const char *at, size_t len) +{ + struct client_http *self = parser->data; + str_append_data (&self->url, at, len); + return 0; +} + +static int +client_http_on_message_begin (http_parser *parser) +{ + struct client_http *self = parser->data; + ev_timer_start (EV_DEFAULT_ &self->request_timeout_watcher); + str_reset (&self->message); + return 0; +} + +static int +client_http_on_body (http_parser *parser, const char *at, size_t len) +{ + struct client_http *self = parser->data; + str_append_data (&self->message, at, len); + // TODO: consider pushing data to the request in here. + // We may not be able to do that when there's a Content-Encoding + // or a Transfer-Encoding other than chunked. For the chunked encoding, + // we'll probably want to keep filling the message and process it in + // on_message_complete. There may also be other Content/Transfer-Encoding + // filters, in particular gzip. And beware that this means we need to + // delay starting the request as well. + if (!request_push (&self->request, at, len)) + { + // FIXME: we want to enable keeping the connection alive in this case, + // though we (might) have a body to skip now + client_http_request_finish (self); + return 1; + } + return 0; +} + +static int +client_http_on_message_complete (http_parser *parser) +{ + struct client_http *self = parser->data; + str_reset (&self->message); + + // TODO: here we could start a keep-alive timeout, so far we let + // the current timer run and only reset it on new messages + + // XXX: if it decides to stay alive, we forcibly interpret the result, + // we might want to throw it away instead + bool continue_ = request_push (&self->request, NULL, 0); + client_http_request_finish (self); + return continue_; +} + +static void +client_http_on_request_timeout (EV_P_ ev_timer *watcher, int revents) +{ + (void) loop; + (void) revents; + struct client_http *self = watcher->data; + + client_http_request_fail (self, HTTP_408_REQUEST_TIMEOUT, NULL); + client_destroy (&self->client); +} + +#define FAIL_REQUEST(...) \ + return client_http_request_fail (self, __VA_ARGS__, NULL) + +static bool +client_http_push (struct client *client, const void *data, size_t len) +{ + FIND_CONTAINER (self, client, struct client_http, client); + // client_close() will correctly destroy the client on EOF + if (self->handler.state != WS_HANDLER_CONNECTING) + return ws_handler_push (&self->handler, data, len); + + static const http_parser_settings http_settings = + { + .on_header_field = client_http_on_header_field, + .on_header_value = client_http_on_header_value, + .on_headers_complete = client_http_on_headers_complete, + .on_url = client_http_on_url, + .on_message_begin = client_http_on_message_begin, + .on_body = client_http_on_body, + .on_message_complete = client_http_on_message_complete, + }; + + size_t n_parsed = + http_parser_execute (&self->hp, &http_settings, data, len); + + // TODO: in all code, we can use http_status_str() and numerics from + // the respective constants in `enum http_status` + if (self->hp.upgrade) + { + ev_timer_stop (EV_DEFAULT_ &self->request_timeout_watcher); + + // The handshake hasn't been finished, yet there is more data + // to be processed after the headers already + if (len - n_parsed) + FAIL_REQUEST (HTTP_400_BAD_REQUEST); + + // TODO: make it use our url and headers (http_parserpp *?) + // TODO: move the HTTP version check in here + return ws_handler_finish_handshake (&self->handler); + } + + enum http_errno err = HTTP_PARSER_ERRNO (&self->hp); + if (n_parsed != len || err != HPE_OK) + { + ev_timer_stop (EV_DEFAULT_ &self->request_timeout_watcher); + + if (err == HPE_CB_headers_complete + || err == HPE_CB_body + || err == HPE_CB_message_complete) + return false; + + print_debug ("HTTP handshake failed: %s", http_errno_description (err)); + FAIL_REQUEST (HTTP_400_BAD_REQUEST); + } + + // TODO: investigate this: + // - if len == 0, we /certainly/ want to call it quits here + // - but double check check flushing logic + // - we enforce some "Connection: close" cases above + // - maybe an internal bool: + // - start with keep_alive = true (nothing received yet) + // - http_parser will error out on inappropriate EOF + // - then in on_headers_complete reset it to http_should_keep_alive() + // - use this in client_http_request_finish() + // - still lost... + // - maybe client_http_request_finish() should always abort the parser + // when the connection has been closed? + // - go have a look at connection closure in the HTTP specification + return len != 0; +} + +static void +client_http_shutdown (struct client *client) +{ + FIND_CONTAINER (self, client, struct client_http, client); + if (self->handler.state == WS_HANDLER_CONNECTING) + // No on_close, no problem + client_destroy (&self->client); + else if (self->handler.state == WS_HANDLER_OPEN) + ws_handler_close (&self->handler, WS_STATUS_GOING_AWAY, NULL, 0); +} + +static void +client_http_finalize (struct client *client) +{ + FIND_CONTAINER (self, client, struct client_http, client); + ws_handler_free (&self->handler); + request_free (&self->request); + cstr_set (&self->listening_host, NULL); + + str_free (&self->field); + str_free (&self->value); + str_map_free (&self->headers); + str_free (&self->url); + str_free (&self->message); + str_free (&self->response); + ev_timer_stop (EV_DEFAULT_ &self->request_timeout_watcher); +} + +static struct client_vtable client_http_vtable = +{ + .push = client_http_push, + .shutdown = client_http_shutdown, + .finalize = client_http_finalize, +}; + +static struct client * +client_http_create (EV_P_ const char *local_host, int sock_fd) +{ + struct client_http *self = client_new (EV_A_ sizeof *self, sock_fd); + self->client.vtable = &client_http_vtable; + + // XXX: `struct sockaddr_storage' is not the most portable thing + struct sockaddr_storage sock; + socklen_t sock_len = sizeof sock; + + // When we're bound to the any address, at least try to fetch an IP string + char num_host[NI_MAXHOST] = "", num_serv[NI_MAXSERV] = ""; + if (!getsockname (sock_fd, (struct sockaddr *) &sock, &sock_len) + && !getnameinfo ((struct sockaddr *) &sock, sock_len, + num_host, sizeof num_host, num_serv, sizeof num_serv, + NI_NUMERICHOST | NI_NUMERICSERV) + && !*local_host) + local_host = num_host; + + self->listening_host = xstrdup (local_host); + self->listening_port = atoi (num_serv); + + // We may or may not upgrade to WebSocket (keep WS_HANDLER_CONNECTING) + ws_handler_init (&self->handler); + self->handler.on_message = client_ws_on_message; + self->handler.write_cb = client_ws_write_cb; + self->handler.close_cb = client_ws_close_cb; + + // One mebibyte seems to be a reasonable value + self->handler.max_payload_len = 1 << 10; + + request_init (&self->request); + self->request.ctx = ev_userdata (EV_DEFAULT); + self->request.write_cb = client_http_write_cb; + // TODO: finish_cb: call client_http_request_finish(); + // make sure we don't free ourselves and crash + // - this is actually never called because nothing is asynchronous + + http_parser_init (&self->hp, HTTP_REQUEST); + self->hp.data = self; + self->field = str_make (); + self->value = str_make (); + self->headers = str_map_make (free); + self->headers.key_xfrm = tolower_ascii_strxfrm; + self->url = str_make (); + self->message = str_make (); + + self->response = str_make (); + + ev_timer_init (&self->request_timeout_watcher, + client_http_on_request_timeout, 0., 0.); + self->request_timeout_watcher.data = self; + + ev_timer_set (&self->request_timeout_watcher, + (self->request_timeout = 60), 0.); + ev_timer_start (EV_DEFAULT_ &self->request_timeout_watcher); + return &self->client; +} + // --- Co-process client ------------------------------------------------------- // This is mostly copied over from json-rpc-shell.c, only a bit simplified. @@ -2949,10 +3504,12 @@ setup_listen_fds (struct server_context *ctx, struct error **e) struct strv ports_fcgi = strv_make (); struct strv ports_scgi = strv_make (); struct strv ports_ws = strv_make (); + struct strv ports_http = strv_make (); get_ports_from_config (ctx, "port_fastcgi", &ports_fcgi); get_ports_from_config (ctx, "port_scgi", &ports_scgi); get_ports_from_config (ctx, "port_ws", &ports_ws); + get_ports_from_config (ctx, "port_http", &ports_http); const char *bind_host = str_map_find (&ctx->config, "bind_host"); size_t n_ports = ports_fcgi.len + ports_scgi.len + ports_ws.len; @@ -2970,10 +3527,14 @@ setup_listen_fds (struct server_context *ctx, struct error **e) for (size_t i = 0; i < ports_ws.len; i++) listener_add (ctx, bind_host, ports_ws.vector[i], &gai_hints, client_ws_create); + for (size_t i = 0; i < ports_http.len; i++) + listener_add (ctx, bind_host, ports_http.vector[i], + &gai_hints, client_http_create); strv_free (&ports_fcgi); strv_free (&ports_scgi); strv_free (&ports_ws); + strv_free (&ports_http); if (!ctx->n_listeners) { -- cgit v1.2.3