From 23eb4cca38949152fb3f61dd9edd88e60cccf5a3 Mon Sep 17 00:00:00 2001 From: Přemysl Janouch Date: Sat, 14 Mar 2015 19:36:37 +0100 Subject: Steady progress Still in a state of total chaos, it appears. --- demo-json-rpc-server.c | 314 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 272 insertions(+), 42 deletions(-) diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index 1b10e9d..77c9ca5 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -701,6 +701,12 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len) } } +static void +fcgi_request_finish (struct fcgi_request *self) +{ + // TODO: flush(), end_request(), delete self, muxer->request_destroy_cb()? +} + // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - typedef void (*fcgi_muxer_handler_fn) @@ -764,6 +770,7 @@ fcgi_muxer_on_begin_request return; } + // We can only act as a responder, reject everything else up front if (role != FCGI_RESPONDER) { fcgi_muxer_send_end_request (self, @@ -1063,6 +1070,8 @@ scgi_parser_push (struct scgi_parser *self, #define SEC_WS_PROTOCOL "Sec-WebSocket-Protocol" #define SEC_WS_VERSION "Sec-WebSocket-Version" +#define WS_MAX_CONTROL_PAYLOAD_LEN 125 + static char * ws_encode_response_key (const char *key) { @@ -1122,6 +1131,12 @@ enum ws_opcode WS_OPCODE_PONG = 10 }; +static bool +ws_is_control_frame (int opcode) +{ + return opcode >= WS_OPCODE_CLOSE; +} + struct ws_parser { struct str input; ///< External input buffer @@ -1136,8 +1151,7 @@ struct ws_parser uint32_t mask; ///< Frame mask uint64_t payload_len; ///< Payload length - // TODO: it wouldn't be half bad if there was a callback to just validate - // the frame header (such as the maximum payload length) + bool (*on_frame_header) (void *user_data, const struct ws_parser *self); /// Callback for when a message is successfully parsed. /// The actual payload is stored in "input", of length "payload_len". @@ -1248,17 +1262,17 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len) case WS_PARSER_MASK: if (!self->is_masked) - { - self->state = WS_PARSER_PAYLOAD; - break; - } + goto end_of_header; if (self->input.len < 4) return true; (void) msg_unpacker_u32 (&unpacker, &self->mask); + str_remove_slice (&self->input, 0, 4); + end_of_header: self->state = WS_PARSER_PAYLOAD; - str_remove_slice (&self->input, 0, 4); + if (!self->on_frame_header (self->user_data, self)) + return false; break; case WS_PARSER_PAYLOAD: @@ -1289,7 +1303,7 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len) enum ws_handler_state { - WS_HANDLER_HTTP, ///< Parsing HTTP + WS_HANDLER_HANDSHAKE, ///< Parsing HTTP WS_HANDLER_WEBSOCKETS ///< Parsing WebSockets frames }; @@ -1305,12 +1319,38 @@ struct ws_handler struct str url; ///< Request URL struct ws_parser parser; ///< Protocol frame parser + bool expecting_continuation; ///< For non-control traffic + + enum ws_opcode message_opcode; ///< Opcode for the current message + struct str message_data; ///< Concatenated message data + + unsigned ping_interval; ///< Ping interval in seconds + uint64_t max_payload_len; ///< Maximum length of any message + + // TODO: bool closing; // XXX: rather a { OPEN, CLOSING } state? + // TODO: a close timer - // TODO: bool closing; - // TODO: a configurable max_payload_len initialized by _init() + // TODO: a ping timer (when no pong is received by the second time the + // timer triggers, it is a ping timeout) + ev_timer ping_timer; ///< Ping timer + bool received_pong; ///< Received PONG since the last PING /// Called upon reception of a single full message - bool (*on_message) (void *user_data, const void *data, size_t len); + bool (*on_message) (void *user_data, + enum ws_opcode type, const void *data, size_t len); + + // TODO: void (*on_initialized) () that will allow the user to choose + // any sub-protocol, if the client has provided any. + + /// The connection has been closed. + /// @a close_code may, or may not, be one of enum ws_status. + // NOTE: the "close_code" is what we receive from the remote endpoint, + // or one of 1005/1006/1015 + // NOTE: the reason is an empty string if omitted + // TODO; also note that ideally, the handler should (be able to) first + // receive a notification about the connection being closed because of + // an error (recv()) returns -1, and call on_close() in reaction. + void (*on_close) (void *user_data, int close_code, const char *reason); /// Write a chunk of data to the stream void (*write_cb) (void *user_data, const void *data, size_t len); @@ -1320,15 +1360,126 @@ struct ws_handler void *user_data; ///< User data for callbacks }; +static void +ws_handler_send_control (struct ws_handler *self, enum ws_opcode opcode, + const void *data, size_t len) +{ + if (len > WS_MAX_CONTROL_PAYLOAD_LEN) + { + print_debug ("truncating output control frame payload" + " from %zu to %zu bytes", len, (size_t) WS_MAX_CONTROL_PAYLOAD_LEN); + len = WS_MAX_CONTROL_PAYLOAD_LEN; + } + + uint8_t header[2] = { 0x80 | (opcode & 0x0F), len }; + self->write_cb (self->user_data, header, sizeof header); + self->write_cb (self->user_data, data, len); +} + +static void +ws_handler_fail (struct ws_handler *self, enum ws_status reason) +{ + uint8_t payload[2] = { reason << 8, reason }; + ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload); + + // TODO: set the close timer, ignore all further incoming input (either set + // some flag for the case that we're in the middle of ws_handler_push(), + // and/or add a mechanism to stop the caller from polling the socket for + // reads). +} + +// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't +// ignore frames up to a corresponding close from the client. +// Read the RFC once again to see if we can really process the frames. + +static bool +ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser) +{ + struct ws_handler *self = user_data; + + if (parser->reserved_1 || parser->reserved_2 || parser->reserved_3 + || !parser->is_masked // client -> server payload must be masked + || (ws_is_control_frame (parser->opcode) && + (!parser->is_fin || parser->payload_len > WS_MAX_CONTROL_PAYLOAD_LEN)) + || (!ws_is_control_frame (parser->opcode) && + (self->expecting_continuation && parser->opcode != WS_OPCODE_CONT))) + ws_handler_fail (self, WS_STATUS_PROTOCOL); + else if (parser->payload_len > self->max_payload_len) + ws_handler_fail (self, WS_STATUS_TOO_BIG); + else + return true; + return false; +} + +static bool +ws_handler_on_control_frame + (struct ws_handler *self, const struct ws_parser *parser) +{ + switch (parser->opcode) + { + case WS_OPCODE_CLOSE: + // TODO: confirm the close + break; + case WS_OPCODE_PING: + ws_handler_send_control (self, WS_OPCODE_PONG, + parser->input.str, parser->payload_len); + break; + case WS_OPCODE_PONG: + // XXX: maybe we should check the payload + self->received_pong = true; + break; + default: + // TODO: shouldn't we rather fail on unknown control frames? + // But should we actually return false at any time? Yes? + break; + } + return true; +} + static bool ws_handler_on_frame (void *user_data, const struct ws_parser *parser) { struct ws_handler *self = user_data; - // TODO: handle pings and what not - // TODO: validate the message - // TODO: first concatenate all parts of the message - return self->on_message (self->user_data, + if (ws_is_control_frame (parser->opcode)) + return ws_handler_on_control_frame (self, parser); + + // TODO: do this rather in "on_frame_header" + if (self->message_data.len + parser->payload_len > self->max_payload_len) + { + ws_handler_fail (self, WS_STATUS_TOO_BIG); + return true; + } + + if (!self->expecting_continuation) + self->message_opcode = parser->opcode; + + str_append_data (&self->message_data, + parser->input.str, parser->payload_len); + self->expecting_continuation = !parser->is_fin; + + if (!parser->is_fin) + return true; + + bool result = self->on_message (self->user_data, self->message_opcode, self->parser.input.str, self->parser.payload_len); + str_reset (&self->message_data); + return result; +} + +static void +ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents) +{ + (void) loop; + (void) revents; + + struct ws_handler *self = watcher->data; + if (!self->received_pong) + { + // TODO: close/fail the connection? + return; + } + + ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0); } static void @@ -1347,7 +1498,20 @@ ws_handler_init (struct ws_handler *self) str_init (&self->url); ws_parser_init (&self->parser); + self->parser.on_frame_header = ws_handler_on_frame_header; self->parser.on_frame = ws_handler_on_frame; + + str_init (&self->message_data); + + self->ping_interval = 60; + // This is still ridiculously high + self->max_payload_len = UINT32_MAX; + + // Just so we can safely stop it + ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.); + self->ping_timer.data = self; + // So that the first ping timer doesn't timeout the connection + self->received_pong = true; } static void @@ -1358,6 +1522,8 @@ ws_handler_free (struct ws_handler *self) str_map_free (&self->headers); str_free (&self->url); ws_parser_free (&self->parser); + str_free (&self->message_data); + ev_timer_stop (EV_DEFAULT_ &self->ping_timer); } static void @@ -1395,10 +1561,11 @@ ws_handler_on_header_value (http_parser *parser, const char *at, size_t len) static int ws_handler_on_headers_complete (http_parser *parser) { - // Just return 1 to tell the parser we don't want to parse any body; - // the parser should have found an upgrade request for WebSockets - (void) parser; - return 1; + // We strictly require a protocol upgrade + if (!parser->upgrade) + return 2; + + return 0; } static int @@ -1418,6 +1585,7 @@ ws_handler_finish_handshake (struct ws_handler *self) || self->hp.http_major != 1 || self->hp.http_minor != 1) ; // TODO: error (maybe send a frame depending on conditions) + // ...mostly just 400 Bad Request const char *upgrade = str_map_find (&self->headers, "Upgrade"); @@ -1425,6 +1593,12 @@ ws_handler_finish_handshake (struct ws_handler *self) const char *version = str_map_find (&self->headers, SEC_WS_VERSION); const char *protocol = str_map_find (&self->headers, SEC_WS_PROTOCOL); + if (!upgrade || strcmp (upgrade, "websocket") + || !version || strcmp (version, "13")) + ; // TODO: error + // ... if the version doesn't match, we must send back a header indicating + // the version we do support + struct str response; str_init (&response); str_append (&response, "HTTP/1.1 101 Switching Protocols\r\n"); @@ -1433,8 +1607,7 @@ ws_handler_finish_handshake (struct ws_handler *self) // TODO: prepare the rest of the headers - // TODO: we should ideally check that this is a 16-byte base64-encoded - // value; do we also have to strip surrounding whitespace? + // TODO: we should ideally check that this is a 16-byte base64-encoded value char *response_key = ws_encode_response_key (key); str_append_printf (&response, SEC_WS_ACCEPT ": %s\r\n", response_key); free (response_key); @@ -1442,6 +1615,10 @@ ws_handler_finish_handshake (struct ws_handler *self) str_append (&response, "\r\n"); self->write_cb (self->user_data, response.str, response.len); str_free (&response); + + // XXX: maybe we should start it earlier so that the handshake can + // timeout as well. ws_handler_connected()? + ev_timer_start (EV_DEFAULT_ &self->ping_timer); return true; } @@ -1477,14 +1654,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len) self->state = WS_HANDLER_WEBSOCKETS; return true; } - else if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK) + + if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK) { // TODO: error // print_debug (..., http_errno_description // (HTTP_PARSER_ERRNO (&self->hp)); + // NOTE: if == HPE_CB_headers_complete, "Upgrade" is missing + return false; } - // TODO: make double sure to handle the case of !upgrade return true; } @@ -1497,6 +1676,7 @@ static struct config_item g_config_table[] = { "port_scgi", NULL, "Port to bind for SCGI" }, { "port_ws", NULL, "Port to bind for WebSockets" }, { "pid_file", NULL, "Full path for the PID file" }, + // XXX: here belongs something like a web SPA that interfaces with us { "static_root", NULL, "The root for static content" }, { NULL, NULL, NULL } }; @@ -1526,11 +1706,16 @@ server_context_init (struct server_context *self) load_config_defaults (&self->config, g_config_table); } +static void close_listeners (struct server_context *self); + static void server_context_free (struct server_context *self) { - // TODO: free the clients (?) - // TODO: close the listeners (?) + // We really shouldn't attempt a quit without closing the clients first + soft_assert (!self->clients); + + close_listeners (self); + free (self->listeners); str_map_free (&self->config); } @@ -1773,7 +1958,8 @@ struct request { struct server_context *ctx; ///< Server context - void *user_data; ///< User data argument for callbacks + struct request_handler *handler; ///< Current request handler + void *handler_data; ///< User data for the handler /// Callback to write some CGI response data to the output void (*write_cb) (void *user_data, const void *data, size_t len); @@ -1782,16 +1968,17 @@ struct request /// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED. void (*close_cb) (void *user_data); - struct request_handler *handler; ///< Current request handler - void *handler_data; ///< User data for the handler + void *user_data; ///< User data argument for callbacks }; struct request_handler { LIST_HEADER (struct request_handler) - /// Install ourselves as the handler for the request if applicable - bool (*try_handle) (struct request *request, struct str_map *headers); + /// Install ourselves as the handler for the request if applicable. + /// Set @a continue_ to false if further processing should be stopped. + bool (*try_handle) (struct request *request, + struct str_map *headers, bool *continue_); /// Handle incoming data. /// Return false if further processing should be stopped. @@ -1826,16 +2013,22 @@ request_finish (struct request *self) static bool request_start (struct request *self, struct str_map *headers) { + // XXX: it feels like this should rather be two steps: + // bool (*can_handle) (request *, headers) + // ... install the handler ... + // bool (*handle) (request *) + // + // However that might cause some stuff to be done twice. + // + // Another way we could get rid off the continue_ argument is via adding + // some way of marking the request as finished from within the handler. + + bool continue_ = true; LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers) - if (handler->try_handle (self, headers)) + if (handler->try_handle (self, headers, &continue_)) { - // XXX: maybe we should isolate the handlers a bit more self->handler = handler; - - // TODO: we should also allow the "try_handle" function to - // return that it has already finished processing the request - // and we should abort it by returning false here. - return true; + return continue_; } // Unable to serve the request @@ -1862,7 +2055,7 @@ request_push (struct request *self, const void *data, size_t len) static bool request_handler_json_rpc_try_handle - (struct request *request, struct str_map *headers) + (struct request *request, struct str_map *headers, bool *continue_) { const char *content_type = str_map_find (headers, "CONTENT_TYPE"); const char *method = str_map_find (headers, "REQUEST_METHOD"); @@ -1875,6 +2068,7 @@ request_handler_json_rpc_try_handle str_init (buf); request->handler_data = buf; + *continue_ = true; return true; } @@ -1972,8 +2166,11 @@ detect_magic (const void *data, size_t len) static bool request_handler_static_try_handle - (struct request *request, struct str_map *headers) + (struct request *request, struct str_map *headers, bool *continue_) { + // Serving static files is actually quite complicated as it turns out; + // but this is only meant to serve a few tiny text files + struct server_context *ctx = request->ctx; const char *root = str_map_find (&ctx->config, "static_root"); if (!root) @@ -1999,6 +2196,7 @@ request_handler_static_try_handle char *suffix = canonicalize_url_path (path_info); char *path = xstrdup_printf ("%s%s", root, suffix); + // TODO: check that this is a regular file FILE *fp = fopen (path, "rb"); if (!fp) { @@ -2045,6 +2243,13 @@ request_handler_static_try_handle while ((len = fread (buf, 1, sizeof buf, fp))) request->write_cb (request->user_data, buf, len); fclose (fp); + + // TODO: this should rather not be returned all at once but in chunks; + // file read requests never return EAGAIN + // TODO: actual file data should really be returned by a callback when + // the socket is writable with nothing to be sent (pumping the entire + // file all at once won't really work if it's huge). + *continue_ = false; return true; } @@ -2178,7 +2383,8 @@ static void client_fcgi_request_close (void *user_data) { struct client_fcgi_request *request = user_data; - // TODO: tell the fcgi_request to what? + // TODO: fcgi_request_finish()? That will most probably end up with us + // receiving client_fcgi_request_destroy() } static void * @@ -2186,6 +2392,7 @@ client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request) { struct client *client = user_data; + // TODO: what if the request is aborted by ; struct client_fcgi_request *request = xmalloc (sizeof *request); request->fcgi_request = fcgi_request; request_init (&request->request); @@ -2375,12 +2582,17 @@ client_ws_write (void *user_data, const void *data, size_t len) } static bool -client_ws_on_message (void *user_data, const void *data, size_t len) +client_ws_on_message (void *user_data, + enum ws_opcode type, const void *data, size_t len) { struct client *client = user_data; struct client_ws *self = client->impl_data; - // TODO: do something about the message + struct str response; + str_init (&response); + process_json_rpc (client->ctx, data, len, &response); + // TODO: send the response + str_free (&response); return true; } @@ -2431,6 +2643,22 @@ struct listener struct client_impl *impl; ///< Client behaviour }; +static void +close_listeners (struct server_context *self) +{ + // TODO: factor out the closing act, to be used in initiate_quit() + for (size_t i = 0; i < self->n_listeners; i++) + { + struct listener *listener = &self->listeners[i]; + if (listener->fd == -1) + continue; + + ev_io_stop (EV_DEFAULT_ &listener->watcher); + xclose (listener->fd); + listener->fd = -1; + } +} + static bool client_read_loop (EV_P_ struct client *client, ev_io *watcher) { @@ -2472,6 +2700,8 @@ on_client_ready (EV_P_ ev_io *watcher, int revents) // finished flushing the write queue? This should probably even be // the default behaviour, as it's fairly uncommon for clients to // shutdown the socket for writes while leaving it open for reading. + // TODO: some sort of "on_buffers_flushed" callback for streaming huge + // chunks of external (or generated) data. if (!flush_queue (&client->write_queue, watcher)) goto close; return; -- cgit v1.2.3-70-g09d2