diff options
author | Přemysl Janouch <p.janouch@gmail.com> | 2015-04-10 01:40:32 +0200 |
---|---|---|
committer | Přemysl Janouch <p.janouch@gmail.com> | 2015-04-10 01:42:41 +0200 |
commit | 4dbdc849d9b08570ca0ccb4645da3fdc977d6681 (patch) | |
tree | 125da3a3bfcc81535e31aff081626e6775b0c744 | |
parent | db6dff421667e4e7ab5ea24440f28fb55baeee7b (diff) | |
download | json-rpc-shell-4dbdc849d9b08570ca0ccb4645da3fdc977d6681.tar.gz json-rpc-shell-4dbdc849d9b08570ca0ccb4645da3fdc977d6681.tar.xz json-rpc-shell-4dbdc849d9b08570ca0ccb4645da3fdc977d6681.zip |
Steady progress
On the WebSocket service.
It's not too far from being finished now. I just have to make some
sense of the code again and make sure it's correct.
Now that json-rpc-shell should be able to run against this, I can
also finally test if both of them work as they should.
-rw-r--r-- | demo-json-rpc-server.c | 234 |
1 files changed, 180 insertions, 54 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index 2196217..aa799dd 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -468,6 +468,7 @@ enum ws_handler_state WS_HANDLER_CONNECTING, ///< Parsing HTTP WS_HANDLER_OPEN, ///< Parsing WebSockets frames WS_HANDLER_CLOSING, ///< Closing the connection + WS_HANDLER_ALMOST_DEAD, ///< Closing connection after failure WS_HANDLER_CLOSED ///< Dead }; @@ -475,12 +476,17 @@ struct ws_handler { enum ws_handler_state state; ///< State + // HTTP handshake: + http_parser hp; ///< HTTP parser bool parsing_header_value; ///< Parsing header value or field? struct str field; ///< Field part buffer struct str value; ///< Value part buffer struct str_map headers; ///< HTTP Headers struct str url; ///< Request URL + ev_timer handshake_timeout_watcher; ///< Handshake timeout watcher + + // WebSocket frame protocol: struct ws_parser parser; ///< Protocol frame parser bool expecting_continuation; ///< For non-control traffic @@ -488,27 +494,32 @@ struct ws_handler enum ws_opcode message_opcode; ///< Opcode for the current message struct str message_data; ///< Concatenated message data + ev_timer ping_timer; ///< Ping timer + bool received_pong; ///< Received PONG since the last PING + + ev_timer close_timeout_watcher; ///< Close timeout watcher + + // Configuration: + + unsigned handshake_timeout; ///< How long to wait for the handshake + unsigned close_timeout; ///< How long to wait for TCP close unsigned ping_interval; ///< Ping interval in seconds uint64_t max_payload_len; ///< Maximum length of any message - // TODO: handshake_timeout - // TODO: a close timer - - // TODO: a ping timer (when no pong is received by the second time the - // timer triggers, it is a ping timeout) - ev_timer ping_timer; ///< Ping timer - bool received_pong; ///< Received PONG since the last PING + // Event callbacks: // TODO: void (*on_handshake) (protocols) that will allow the user // to choose any sub-protocol, if the client has provided any. + // This may render "on_connected" unnecessary. - // TODO: "on_connected" after the handshake has finished? + /// Called after successfuly connecting (handshake complete) + bool (*on_connected) (void *user_data); /// Called upon reception of a single full message bool (*on_message) (void *user_data, enum ws_opcode type, const void *data, size_t len); - /// The connection has been closed. @a close_code may, or may not, be one + /// The connection is about to close. @a close_code may, or may not, be one /// of enum ws_status. The @a reason is never NULL. // TODO; also note that ideally, the handler should (be able to) first // receive a notification about the connection being closed because of @@ -516,10 +527,13 @@ struct ws_handler // Actually, calling push() could work pretty fine for this. void (*on_close) (void *user_data, int close_code, const char *reason); + // Method callbacks: + /// Write a chunk of data to the stream void (*write_cb) (void *user_data, const void *data, size_t len); - // TODO: "close_cb"; to be used from a ping timer e.g. + /// Close the connection + void (*close_cb) (void *user_data); void *user_data; ///< User data for callbacks }; @@ -541,30 +555,44 @@ ws_handler_send_control (struct ws_handler *self, } static void -ws_handler_fail (struct ws_handler *self, enum ws_status reason) +ws_handler_close (struct ws_handler *self, + enum ws_status close_code, const char *reason, size_t len) +{ + struct str payload; + str_init (&payload); + str_pack_u16 (&payload, close_code); + // XXX: maybe accept a null-terminated string on input? Has to be UTF-8 a/w + str_append_data (&payload, reason, len); + ws_handler_send_control (self, WS_OPCODE_CLOSE, payload.str, payload.len); + + // Close initiated by us; the reason is null-terminated within `payload' + if (self->on_close) + self->on_close (self->user_data, close_code, payload.str + 2); + + self->state = WS_HANDLER_CLOSING; + str_free (&payload); +} + +static void +ws_handler_fail (struct ws_handler *self, enum ws_status close_code) { - uint8_t payload[2] = { reason << 8, reason }; - ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload); + ws_handler_close (self, close_code, NULL, 0); + self->state = WS_HANDLER_ALMOST_DEAD; // TODO: set the close timer, ignore all further incoming input (either set // some flag for the case that we're in the middle of ws_handler_push(), // and/or add a mechanism to stop the caller from polling the socket for // reads). - // TODO: set the state to FAILED (not CLOSED as that means the TCP - // connection is closed) and wait until all is sent? // TODO: make sure we don't send pings after the close } -// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't -// ignore frames up to a corresponding close from the client. -// Read the RFC once again to see if we can really process the frames. - // TODO: add support for fragmented responses static void ws_handler_send (struct ws_handler *self, enum ws_opcode opcode, const void *data, size_t len) { - // TODO: make sure (just assert?) we're in the OPEN state + if (!soft_assert (self->state == WS_HANDLER_OPEN)) + return; struct str header; str_init (&header); @@ -612,17 +640,45 @@ ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser) } static bool +ws_handler_on_protocol_close + (struct ws_handler *self, const struct ws_parser *parser) +{ + struct msg_unpacker unpacker; + msg_unpacker_init (&unpacker, parser->input.str, parser->payload_len); + + char *reason = NULL; + uint16_t close_code = WS_STATUS_NO_STATUS_RECEIVED; + + if (parser->payload_len >= 2) + { + (void) msg_unpacker_u16 (&unpacker, &close_code); + reason = xstrndup (parser->input.str + 2, parser->payload_len - 2); + } + else + reason = xstrdup (""); + + if (self->state != WS_HANDLER_CLOSING) + { + // Close initiated by the client + ws_handler_send_control (self, WS_OPCODE_CLOSE, + parser->input.str, parser->payload_len); + if (self->on_close) + self->on_close (self->user_data, close_code, reason); + } + + free (reason); + self->state = WS_HANDLER_ALMOST_DEAD; + return true; +} + +static bool ws_handler_on_control_frame (struct ws_handler *self, const struct ws_parser *parser) { switch (parser->opcode) { case WS_OPCODE_CLOSE: - // TODO: confirm the close - // TODO: change the state to CLOSING - // TODO: call "on_close" - // NOTE: the reason is an empty string if omitted - break; + return ws_handler_on_protocol_close (self, parser); case WS_OPCODE_PING: ws_handler_send_control (self, WS_OPCODE_PONG, parser->input.str, parser->payload_len); @@ -634,6 +690,8 @@ ws_handler_on_control_frame default: // Unknown control frame ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR); + // FIXME: we shouldn't close the connection right away; + // also check other places return false; } return true; @@ -670,8 +728,10 @@ ws_handler_on_frame (void *user_data, const struct ws_parser *parser) return false; } - bool result = self->on_message (self->user_data, self->message_opcode, - self->message_data.str, self->message_data.len); + bool result = true; + if (self->on_message) + result = self->on_message (self->user_data, self->message_opcode, + self->message_data.str, self->message_data.len); str_reset (&self->message_data); return result; } @@ -686,10 +746,26 @@ ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents) if (!self->received_pong) { // TODO: close/fail the connection? - return; } + else + { + ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0); + ev_timer_again (EV_A_ watcher); + } +} - ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0); +static void +ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents) +{ + struct ws_handler *self = watcher->data; + // TODO: call "close_cb" +} + +static void +ws_handler_on_handshake_timeout (EV_P_ ev_timer *watcher, int revents) +{ + struct ws_handler *self = watcher->data; + // TODO } static void @@ -697,44 +773,61 @@ ws_handler_init (struct ws_handler *self) { memset (self, 0, sizeof *self); + self->state = WS_HANDLER_CONNECTING; + http_parser_init (&self->hp, HTTP_REQUEST); self->hp.data = self; - str_init (&self->field); str_init (&self->value); str_map_init (&self->headers); self->headers.free = free; self->headers.key_xfrm = tolower_ascii_strxfrm; str_init (&self->url); + ev_timer_init (&self->handshake_timeout_watcher, + ws_handler_on_handshake_timeout, 0., 0.); + self->handshake_timeout_watcher.data = self; ws_parser_init (&self->parser); self->parser.on_frame_header = ws_handler_on_frame_header; self->parser.on_frame = ws_handler_on_frame; - str_init (&self->message_data); - self->ping_interval = 60; + ev_timer_init (&self->ping_timer, + ws_handler_on_ping_timer, 0., 0.); + self->ping_timer.data = self; + ev_timer_init (&self->close_timeout_watcher, + ws_handler_on_close_timeout, 0., 0.); + self->ping_timer.data = self; + // So that the first ping timer doesn't timeout the connection + self->received_pong = true; + + self->handshake_timeout = self->close_timeout = self->ping_interval = 60; // This is still ridiculously high. Note that the most significant bit // must always be zero, i.e. the protocol maximum is 0x7FFF FFFF FFFF FFFF. self->max_payload_len = UINT32_MAX; +} - // Just so we can safely stop it - ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.); - self->ping_timer.data = self; - // So that the first ping timer doesn't timeout the connection - self->received_pong = true; +/// Stop all timers, not going to use the handler anymore +static void +ws_handler_stop (struct ws_handler *self) +{ + ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher); + ev_timer_stop (EV_DEFAULT_ &self->ping_timer); + ev_timer_stop (EV_DEFAULT_ &self->close_timeout_watcher); } static void ws_handler_free (struct ws_handler *self) { + ws_handler_stop (self); + str_free (&self->field); str_free (&self->value); str_map_free (&self->headers); str_free (&self->url); + ws_parser_free (&self->parser); str_free (&self->message_data); - ev_timer_stop (EV_DEFAULT_ &self->ping_timer); } static bool @@ -863,10 +956,9 @@ ws_handler_http_response (struct ws_handler *self, const char *status, ...) str_vector_free (&v); } -// TODO: also set the connection to some FAILED state or anything that's neither -// CONNECTING nor OPEN #define FAIL_HANDSHAKE(status, ...) \ BLOCK_START \ + self->state = WS_HANDLER_ALMOST_DEAD; \ ws_handler_http_response (self, (status), __VA_ARGS__); \ return false; \ BLOCK_END @@ -949,21 +1041,38 @@ ws_handler_finish_handshake (struct ws_handler *self) str_vector_free (&fields); - // XXX: maybe we should start it earlier so that the handshake can - // timeout as well. ws_handler_connected()? - // - // But it should rather be named "connect_timer" + ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, + self->ping_interval, 0); ev_timer_start (EV_DEFAULT_ &self->ping_timer); return true; } +/// Tells the handler that the TCP connection has been established so it can +/// timeout when the client handshake doesn't arrive soon enough +static void +ws_handler_start (struct ws_handler *self) +{ + ev_timer_set (&self->handshake_timeout_watcher, + self->handshake_timeout, 0.); + ev_timer_start (EV_DEFAULT_ &self->handshake_timeout_watcher); +} + +/// Push data to the WebSocket handler; "len == 0" means EOF static bool ws_handler_push (struct ws_handler *self, const void *data, size_t len) { + // TODO: make sure all timers are stopped appropriately + if (!len) { + ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher); + if (self->state == WS_HANDLER_OPEN) - self->on_close (self->user_data, WS_STATUS_ABNORMAL_CLOSURE, ""); + { + if (self->on_close) + self->on_close (self->user_data, + WS_STATUS_ABNORMAL_CLOSURE, ""); + } else { // TODO: anything to do besides just closing the connection? @@ -973,6 +1082,9 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len) return false; } + if (self->state == WS_HANDLER_ALMOST_DEAD) + // We're waiting for an EOF from the client, must not process data + return true; if (self->state != WS_HANDLER_CONNECTING) return ws_parser_push (&self->parser, data, len); @@ -990,6 +1102,8 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len) if (self->hp.upgrade) { + ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher); + // The handshake hasn't been finished, yet there is more data // to be processed after the headers already if (len - n_parsed) @@ -999,12 +1113,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len) return false; self->state = WS_HANDLER_OPEN; + if (self->on_connected) + return self->on_connected (self->user_data); return true; } enum http_errno err = HTTP_PARSER_ERRNO (&self->hp); if (n_parsed != len || err != HPE_OK) { + ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher); + if (err == HPE_CB_headers_complete) print_debug ("WS handshake failed: %s", "missing `Upgrade' field"); else @@ -1916,13 +2034,6 @@ struct client_ws struct ws_handler handler; ///< WebSockets connection handler }; -static void -client_ws_write (void *user_data, const void *data, size_t len) -{ - struct client *client = user_data; - client_write (client, data, len); -} - static bool client_ws_on_message (void *user_data, enum ws_opcode type, const void *data, size_t len) @@ -1946,16 +2057,30 @@ client_ws_on_message (void *user_data, } static void +client_ws_write (void *user_data, const void *data, size_t len) +{ + struct client *client = user_data; + client_write (client, data, len); +} + +static void +client_ws_close (void *user_data) +{ + struct client *client = user_data; + client_remove (client); +} + +static void client_ws_init (struct client *client) { struct client_ws *self = xmalloc (sizeof *self); client->impl_data = self; ws_handler_init (&self->handler); - self->handler.write_cb = client_ws_write; self->handler.on_message = client_ws_on_message; + self->handler.write_cb = client_ws_write; + self->handler.close_cb = client_ws_close; self->handler.user_data = client; - // TODO: configure the handler some more, e.g. regarding the protocol // One mebibyte seems to be a reasonable value self->handler.max_payload_len = 1 << 10; @@ -2052,6 +2177,7 @@ on_client_ready (EV_P_ ev_io *watcher, int revents) // finished flushing the write queue? This should probably even be // the default behaviour, as it's fairly uncommon for clients to // shutdown the socket for writes while leaving it open for reading. + // Actually, we should wait until the client closes the connection. // TODO: some sort of "on_buffers_flushed" callback for streaming huge // chunks of external (or generated) data. if (!flush_queue (&client->write_queue, watcher)) |