aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p@janouch.name>2018-10-18 02:55:55 +0200
committerPřemysl Janouch <p@janouch.name>2018-10-18 04:44:40 +0200
commit62945cceb3b2f9fae0fd39842448ebfeba378fa9 (patch)
tree40ca1a16a73e212221d88a5f2f764cd3e80fb307
parent580f0a0c5960d830f49c4cddbfdf869c1d262a0c (diff)
downloadjson-rpc-shell-62945cceb3b2f9fae0fd39842448ebfeba378fa9.tar.gz
json-rpc-shell-62945cceb3b2f9fae0fd39842448ebfeba378fa9.tar.xz
json-rpc-shell-62945cceb3b2f9fae0fd39842448ebfeba378fa9.zip
Finish the WebSocket backend
Of course, everything so far hasn't been tested much.
-rw-r--r--demo-json-rpc-server.c271
1 files changed, 162 insertions, 109 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index 1ae1bad..60a7173 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -540,9 +540,9 @@ enum ws_handler_state
{
WS_HANDLER_CONNECTING, ///< Parsing HTTP
WS_HANDLER_OPEN, ///< Parsing WebSockets frames
- WS_HANDLER_CLOSING, ///< Closing the connection
- WS_HANDLER_ALMOST_DEAD, ///< Closing connection after failure
- WS_HANDLER_CLOSED ///< Dead
+ WS_HANDLER_CLOSING, ///< Partial closure by us
+ WS_HANDLER_FLUSHING, ///< Just waiting for client EOF
+ WS_HANDLER_CLOSED ///< Dead, both sides closed
};
struct ws_handler
@@ -584,6 +584,7 @@ struct ws_handler
// TODO: void (*on_handshake) (protocols) that will allow the user
// to choose any sub-protocol, if the client has provided any.
// This may render "on_connected" unnecessary.
+ // Should also enable failing the handshake.
/// Called after successfuly connecting (handshake complete)
bool (*on_connected) (struct ws_handler *);
@@ -626,36 +627,41 @@ static void
ws_handler_close (struct ws_handler *self,
enum ws_status close_code, const char *reason, size_t len)
{
+ hard_assert (self->state == WS_HANDLER_OPEN);
+
struct str payload = str_make ();
str_pack_u16 (&payload, close_code);
// XXX: maybe accept a null-terminated string on input? Has to be UTF-8 a/w
str_append_data (&payload, reason, len);
ws_handler_send_control (self, WS_OPCODE_CLOSE, payload.str, payload.len);
-
- // Close initiated by us; the reason is null-terminated within `payload'
- if (self->on_close)
- self->on_close (self, close_code, payload.str + 2);
+ self->close_cb (self, true /* half_close */);
self->state = WS_HANDLER_CLOSING;
str_free (&payload);
}
-static void
-ws_handler_fail (struct ws_handler *self, enum ws_status close_code)
+static bool
+ws_handler_fail_connection (struct ws_handler *self, enum ws_status close_code)
{
- ws_handler_close (self, close_code, NULL, 0);
- self->state = WS_HANDLER_ALMOST_DEAD;
+ hard_assert (self->state == WS_HANDLER_OPEN
+ || self->state == WS_HANDLER_CLOSING);
+
+ if (self->state == WS_HANDLER_OPEN)
+ ws_handler_close (self, close_code, NULL, 0);
- // TODO: set the close timer, ignore all further incoming input (either set
- // some flag for the case that we're in the middle of ws_handler_push(),
- // and/or add a mechanism to stop the caller from polling the socket for
- // reads).
- // TODO: make sure we don't send pings after the close
+ self->state = WS_HANDLER_FLUSHING;
+ if (self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "");
+
+ ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
+ ev_timer_set (&self->close_timeout_watcher, self->close_timeout, 0.);
+ ev_timer_start (EV_DEFAULT_ &self->close_timeout_watcher);
+ return false;
}
// TODO: add support for fragmented responses
static void
-ws_handler_send (struct ws_handler *self,
+ws_handler_send_frame (struct ws_handler *self,
enum ws_opcode opcode, const void *data, size_t len)
{
if (!soft_assert (self->state == WS_HANDLER_OPEN))
@@ -697,24 +703,26 @@ ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser)
|| (!ws_is_control_frame (parser->opcode) &&
(self->expecting_continuation && parser->opcode != WS_OPCODE_CONT))
|| parser->payload_len >= 0x8000000000000000ULL)
- ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR);
- else if (parser->payload_len > self->max_payload_len)
- ws_handler_fail (self, WS_STATUS_MESSAGE_TOO_BIG);
- else
- return true;
- return false;
+ return ws_handler_fail_connection (self, WS_STATUS_PROTOCOL_ERROR);
+
+ if (parser->payload_len > self->max_payload_len
+ || (self->expecting_continuation &&
+ self->message_data.len + parser->payload_len > self->max_payload_len))
+ return ws_handler_fail_connection (self, WS_STATUS_MESSAGE_TOO_BIG);
+ return true;
}
static bool
-ws_handler_on_protocol_close
+ws_handler_on_control_close
(struct ws_handler *self, const struct ws_parser *parser)
{
+ hard_assert (self->state == WS_HANDLER_OPEN
+ || self->state == WS_HANDLER_CLOSING);
struct msg_unpacker unpacker =
msg_unpacker_make (parser->input.str, parser->payload_len);
char *reason = NULL;
uint16_t close_code = WS_STATUS_NO_STATUS_RECEIVED;
-
if (parser->payload_len >= 2)
{
(void) msg_unpacker_u16 (&unpacker, &close_code);
@@ -723,17 +731,29 @@ ws_handler_on_protocol_close
else
reason = xstrdup ("");
- if (self->state != WS_HANDLER_CLOSING)
+ if (close_code < 1000 || close_code > 4999)
+ // XXX: invalid close code: maybe we should fail the connection instead
+ close_code = WS_STATUS_PROTOCOL_ERROR;
+
+ if (self->state == WS_HANDLER_OPEN)
{
// Close initiated by the client
+ // FIXME: not sending the potentially different close_code
ws_handler_send_control (self, WS_OPCODE_CLOSE,
parser->input.str, parser->payload_len);
+
+ self->state = WS_HANDLER_FLUSHING;
if (self->on_close)
self->on_close (self, close_code, reason);
}
+ else
+ self->state = WS_HANDLER_FLUSHING;
free (reason);
- self->state = WS_HANDLER_ALMOST_DEAD;
+
+ ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
+ ev_timer_set (&self->close_timeout_watcher, self->close_timeout, 0.);
+ ev_timer_start (EV_DEFAULT_ &self->close_timeout_watcher);
return true;
}
@@ -744,21 +764,18 @@ ws_handler_on_control_frame
switch (parser->opcode)
{
case WS_OPCODE_CLOSE:
- return ws_handler_on_protocol_close (self, parser);
+ return ws_handler_on_control_close (self, parser);
case WS_OPCODE_PING:
ws_handler_send_control (self, WS_OPCODE_PONG,
parser->input.str, parser->payload_len);
break;
case WS_OPCODE_PONG:
- // XXX: maybe we should check the payload
+ // TODO: check the payload
self->received_pong = true;
break;
default:
// Unknown control frame
- ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR);
- // FIXME: we shouldn't close the connection right away;
- // also check other places
- return false;
+ return ws_handler_fail_connection (self, WS_STATUS_PROTOCOL_ERROR);
}
return true;
}
@@ -769,29 +786,19 @@ ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
struct ws_handler *self = user_data;
if (ws_is_control_frame (parser->opcode))
return ws_handler_on_control_frame (self, parser);
-
- // TODO: do this rather in "on_frame_header"
- if (self->message_data.len + parser->payload_len > self->max_payload_len)
- {
- ws_handler_fail (self, WS_STATUS_MESSAGE_TOO_BIG);
- return false;
- }
-
if (!self->expecting_continuation)
self->message_opcode = parser->opcode;
str_append_data (&self->message_data,
parser->input.str, parser->payload_len);
- self->expecting_continuation = !parser->is_fin;
-
- if (!parser->is_fin)
+ if ((self->expecting_continuation = !parser->is_fin))
return true;
if (self->message_opcode == WS_OPCODE_TEXT
&& !utf8_validate (self->message_data.str, self->message_data.len))
{
- ws_handler_fail (self, WS_STATUS_INVALID_PAYLOAD_DATA);
- return false;
+ return ws_handler_fail_connection
+ (self, WS_STATUS_INVALID_PAYLOAD_DATA);
}
bool result = true;
@@ -799,6 +806,8 @@ ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
result = self->on_message (self, self->message_opcode,
self->message_data.str, self->message_data.len);
str_reset (&self->message_data);
+ // TODO: if (!result), either replace this with a state check,
+ // or make sure to change the state
return result;
}
@@ -810,11 +819,10 @@ ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
struct ws_handler *self = watcher->data;
if (!self->received_pong)
- {
- // TODO: close/fail the connection?
- }
+ ws_handler_fail_connection (self, 4000);
else
{
+ // TODO: be an annoying server and send a nonce in the data
ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
ev_timer_again (EV_A_ watcher);
}
@@ -823,20 +831,38 @@ ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
static void
ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
{
+ (void) loop;
(void) revents;
struct ws_handler *self = watcher->data;
- // TODO: anything else to do here? Invalidate our state?
- if (self->close_cb)
- self->close_cb (self, false /* half_close */);
+ hard_assert (self->state == WS_HANDLER_OPEN
+ || self->state == WS_HANDLER_CLOSING);
+
+ if (self->state == WS_HANDLER_CLOSING
+ && self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "close timeout");
+
+ self->state = WS_HANDLER_CLOSED;
+ self->close_cb (self, false /* half_close */);
}
static void
ws_handler_on_handshake_timeout (EV_P_ ev_timer *watcher, int revents)
{
+ (void) loop;
(void) revents;
struct ws_handler *self = watcher->data;
- // TODO
+
+ // XXX: this is a no-op, since this currently doesn't even call shutdown
+ // immediately but postpones it until later
+ self->close_cb (self, true /* half_close */);
+ self->state = WS_HANDLER_FLUSHING;
+
+ if (self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "handshake timeout");
+
+ self->state = WS_HANDLER_CLOSED;
+ self->close_cb (self, false /* half_close */);
}
static void
@@ -991,6 +1017,7 @@ ws_handler_on_url (http_parser *parser, const char *at, size_t len)
#define HTTP_400_BAD_REQUEST "400 Bad Request"
#define HTTP_405_METHOD_NOT_ALLOWED "405 Method Not Allowed"
#define HTTP_417_EXPECTATION_FAILED "407 Expectation Failed"
+#define HTTP_426_UPGRADE_REQUIRED "426 Upgrade Required"
#define HTTP_505_VERSION_NOT_SUPPORTED "505 HTTP Version Not Supported"
static void
@@ -1024,43 +1051,47 @@ ws_handler_http_responsev (struct ws_handler *self,
str_free (&response);
}
-static void
-ws_handler_http_response (struct ws_handler *self, const char *status, ...)
+static bool
+ws_handler_fail_handshake (struct ws_handler *self, const char *status, ...)
{
- struct strv v = strv_make ();
-
va_list ap;
va_start (ap, status);
const char *s;
+ struct strv v = strv_make ();
while ((s = va_arg (ap, const char *)))
strv_append (&v, s);
va_end (ap);
-
ws_handler_http_responsev (self, status, v.vector);
strv_free (&v);
+
+ self->close_cb (self, true /* half_close */);
+ self->state = WS_HANDLER_FLUSHING;
+
+ if (self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, status);
+ return false;
}
-#define FAIL_HANDSHAKE(status, ...) \
- BLOCK_START \
- self->state = WS_HANDLER_ALMOST_DEAD; \
- ws_handler_http_response (self, (status), __VA_ARGS__); \
- return false; \
- BLOCK_END
+#define FAIL_HANDSHAKE(...) \
+ return ws_handler_fail_handshake (self, __VA_ARGS__, NULL)
static bool
ws_handler_finish_handshake (struct ws_handler *self)
{
- // XXX: we probably shouldn't use 505 to reject the minor version but w/e
- if (self->hp.http_major != 1 || self->hp.http_minor < 1)
- FAIL_HANDSHAKE (HTTP_505_VERSION_NOT_SUPPORTED, NULL);
if (self->hp.method != HTTP_GET)
- FAIL_HANDSHAKE (HTTP_405_METHOD_NOT_ALLOWED, "Allow: GET", NULL);
+ FAIL_HANDSHAKE (HTTP_405_METHOD_NOT_ALLOWED, "Allow: GET");
+
+ // Technically, it must be /at least/ 1.1 but no other 1.x version of HTTP
+ // is going to happen and 2.x is entirely incompatible
+ // XXX: we probably shouldn't use 505 to reject the minor version but w/e
+ if (self->hp.http_major != 1 || self->hp.http_minor != 1)
+ FAIL_HANDSHAKE (HTTP_505_VERSION_NOT_SUPPORTED);
// Your expectations are way too high
if (str_map_find (&self->headers, "Expect"))
- FAIL_HANDSHAKE (HTTP_417_EXPECTATION_FAILED, NULL);
+ FAIL_HANDSHAKE (HTTP_417_EXPECTATION_FAILED);
// Reject URLs specifying the schema and host; we're not parsing that
// TODO: actually do parse this and let our user decide if it matches
@@ -1068,11 +1099,11 @@ ws_handler_finish_handshake (struct ws_handler *self)
if (http_parser_parse_url (self->url.str, self->url.len, false, &url)
|| (url.field_set & (1 << UF_SCHEMA | 1 << UF_HOST | 1 << UF_PORT))
|| !str_map_find (&self->headers, "Host"))
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
const char *connection = str_map_find (&self->headers, "Connection");
if (!connection || strcasecmp_ascii (connection, "Upgrade"))
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
// Check if we can actually upgrade the protocol to WebSockets
const char *upgrade = str_map_find (&self->headers, "Upgrade");
@@ -1088,7 +1119,8 @@ ws_handler_finish_handshake (struct ws_handler *self)
http_protocol_destroy (iter);
}
if (!can_upgrade)
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ FAIL_HANDSHAKE (HTTP_426_UPGRADE_REQUIRED,
+ "Upgrade: websocket", SEC_WS_VERSION ": 13");
// Okay, we're finally past the basic HTTP/1.1 stuff
const char *key = str_map_find (&self->headers, SEC_WS_KEY);
@@ -1098,19 +1130,17 @@ ws_handler_finish_handshake (struct ws_handler *self)
const char *extensions = str_map_find (&self->headers, SEC_WS_EXTENSIONS);
*/
- if (!key)
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ if (!version)
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
+ if (strcmp (version, "13"))
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, SEC_WS_VERSION ": 13");
struct str tmp = str_make ();
- bool key_is_valid = base64_decode (key, false, &tmp) && tmp.len == 16;
+ bool key_is_valid = key
+ && base64_decode (key, false, &tmp) && tmp.len == 16;
str_free (&tmp);
if (!key_is_valid)
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
-
- if (!version)
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
- if (strcmp (version, "13"))
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, SEC_WS_VERSION ": 13", NULL);
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
struct strv fields = strv_make ();
strv_append_args (&fields,
@@ -1130,6 +1160,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
strv_free (&fields);
+ self->state = WS_HANDLER_OPEN;
ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer,
self->ping_interval, 0);
ev_timer_start (EV_DEFAULT_ &self->ping_timer);
@@ -1141,40 +1172,62 @@ ws_handler_finish_handshake (struct ws_handler *self)
static void
ws_handler_start (struct ws_handler *self)
{
+ hard_assert (self->state == WS_HANDLER_CONNECTING);
+
ev_timer_set (&self->handshake_timeout_watcher,
self->handshake_timeout, 0.);
ev_timer_start (EV_DEFAULT_ &self->handshake_timeout_watcher);
}
-/// Push data to the WebSocket handler. "len == 0" means EOF.
+// The client should normally never close the connection, assume that it's
+// either received an EOF from our side, or that it doesn't care about our data
+// anymore, having called close() already
static bool
-ws_handler_push (struct ws_handler *self, const void *data, size_t len)
+ws_handler_push_eof (struct ws_handler *self)
{
- // TODO: make sure all timers are stopped appropriately
-
- if (!len)
+ switch (self->state)
{
+ case WS_HANDLER_CONNECTING:
ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
- if (self->state == WS_HANDLER_OPEN)
- {
- if (self->on_close)
- self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "");
- }
- else
- {
- // TODO: anything to do besides just closing the connection?
- }
-
+ self->state = WS_HANDLER_FLUSHING;
+ if (self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "unexpected EOF");
+ break;
+ case WS_HANDLER_OPEN:
+ ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
+ // Fall-through
+ case WS_HANDLER_CLOSING:
self->state = WS_HANDLER_CLOSED;
- return false;
+ if (self->on_close)
+ self->on_close (self, WS_STATUS_ABNORMAL_CLOSURE, "");
+ // Fall-through
+ case WS_HANDLER_FLUSHING:
+ ev_timer_stop (EV_DEFAULT_ &self->close_timeout_watcher);
+ break;
+ default:
+ soft_assert(self->state != WS_HANDLER_CLOSED);
}
+ self->state = WS_HANDLER_CLOSED;
+ return false;
+}
+
+/// Push data to the WebSocket handler. "len == 0" means EOF.
+/// You are expected to close the connection and dispose of the handler
+/// when the function returns false.
+static bool
+ws_handler_push (struct ws_handler *self, const void *data, size_t len)
+{
+ if (!len)
+ return ws_handler_push_eof (self);
- if (self->state == WS_HANDLER_ALMOST_DEAD)
+ if (self->state == WS_HANDLER_FLUSHING)
// We're waiting for an EOF from the client, must not process data
return true;
+
if (self->state != WS_HANDLER_CONNECTING)
- return ws_parser_push (&self->parser, data, len);
+ return soft_assert (self->state != WS_HANDLER_CLOSED)
+ && ws_parser_push (&self->parser, data, len);
// The handshake hasn't been done yet, process HTTP headers
static const http_parser_settings http_settings =
@@ -1185,8 +1238,8 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
.on_url = ws_handler_on_url,
};
- size_t n_parsed = http_parser_execute (&self->hp,
- &http_settings, data, len);
+ size_t n_parsed =
+ http_parser_execute (&self->hp, &http_settings, data, len);
if (self->hp.upgrade)
{
@@ -1195,12 +1248,10 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
// The handshake hasn't been finished, yet there is more data
// to be processed after the headers already
if (len - n_parsed)
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
if (!ws_handler_finish_handshake (self))
return false;
-
- self->state = WS_HANDLER_OPEN;
if (self->on_connected)
return self->on_connected (self);
return true;
@@ -1217,7 +1268,7 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
print_debug ("WS handshake failed: %s",
http_errno_description (err));
- FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
+ FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST);
}
return true;
}
@@ -2319,15 +2370,15 @@ client_ws_on_message (struct ws_handler *handler,
FIND_CONTAINER (self, handler, struct client_ws, handler);
if (type != WS_OPCODE_TEXT)
{
- ws_handler_fail (&self->handler, WS_STATUS_UNSUPPORTED_DATA);
- return false;
+ return ws_handler_fail_connection
+ (&self->handler, WS_STATUS_UNSUPPORTED_DATA);
}
struct server_context *ctx = ev_userdata (EV_DEFAULT);
struct str response = str_make ();
process_json_rpc (ctx, data, len, &response);
if (response.len)
- ws_handler_send (&self->handler,
+ ws_handler_send_frame (&self->handler,
WS_OPCODE_TEXT, response.str, response.len);
str_free (&response);
return true;
@@ -2353,6 +2404,7 @@ static bool
client_ws_push (struct client *client, const void *data, size_t len)
{
FIND_CONTAINER (self, client, struct client_ws, client);
+ // client_close() will correctly destroy the client on EOF
return ws_handler_push (&self->handler, data, len);
}
@@ -2361,7 +2413,8 @@ client_ws_shutdown (struct client *client)
{
FIND_CONTAINER (self, client, struct client_ws, client);
if (self->handler.state == WS_HANDLER_CONNECTING)
- ; // TODO: abort the connection immediately
+ // No on_close, no problem
+ client_destroy (&self->client);
else if (self->handler.state == WS_HANDLER_OPEN)
ws_handler_close (&self->handler, WS_STATUS_GOING_AWAY, NULL, 0);
}