aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--demo-json-rpc-server.c234
1 files changed, 180 insertions, 54 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index 2196217..aa799dd 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -468,6 +468,7 @@ enum ws_handler_state
WS_HANDLER_CONNECTING, ///< Parsing HTTP
WS_HANDLER_OPEN, ///< Parsing WebSockets frames
WS_HANDLER_CLOSING, ///< Closing the connection
+ WS_HANDLER_ALMOST_DEAD, ///< Closing connection after failure
WS_HANDLER_CLOSED ///< Dead
};
@@ -475,12 +476,17 @@ struct ws_handler
{
enum ws_handler_state state; ///< State
+ // HTTP handshake:
+
http_parser hp; ///< HTTP parser
bool parsing_header_value; ///< Parsing header value or field?
struct str field; ///< Field part buffer
struct str value; ///< Value part buffer
struct str_map headers; ///< HTTP Headers
struct str url; ///< Request URL
+ ev_timer handshake_timeout_watcher; ///< Handshake timeout watcher
+
+ // WebSocket frame protocol:
struct ws_parser parser; ///< Protocol frame parser
bool expecting_continuation; ///< For non-control traffic
@@ -488,27 +494,32 @@ struct ws_handler
enum ws_opcode message_opcode; ///< Opcode for the current message
struct str message_data; ///< Concatenated message data
+ ev_timer ping_timer; ///< Ping timer
+ bool received_pong; ///< Received PONG since the last PING
+
+ ev_timer close_timeout_watcher; ///< Close timeout watcher
+
+ // Configuration:
+
+ unsigned handshake_timeout; ///< How long to wait for the handshake
+ unsigned close_timeout; ///< How long to wait for TCP close
unsigned ping_interval; ///< Ping interval in seconds
uint64_t max_payload_len; ///< Maximum length of any message
- // TODO: handshake_timeout
- // TODO: a close timer
-
- // TODO: a ping timer (when no pong is received by the second time the
- // timer triggers, it is a ping timeout)
- ev_timer ping_timer; ///< Ping timer
- bool received_pong; ///< Received PONG since the last PING
+ // Event callbacks:
// TODO: void (*on_handshake) (protocols) that will allow the user
// to choose any sub-protocol, if the client has provided any.
+ // This may render "on_connected" unnecessary.
- // TODO: "on_connected" after the handshake has finished?
+ /// Called after successfuly connecting (handshake complete)
+ bool (*on_connected) (void *user_data);
/// Called upon reception of a single full message
bool (*on_message) (void *user_data,
enum ws_opcode type, const void *data, size_t len);
- /// The connection has been closed. @a close_code may, or may not, be one
+ /// The connection is about to close. @a close_code may, or may not, be one
/// of enum ws_status. The @a reason is never NULL.
// TODO; also note that ideally, the handler should (be able to) first
// receive a notification about the connection being closed because of
@@ -516,10 +527,13 @@ struct ws_handler
// Actually, calling push() could work pretty fine for this.
void (*on_close) (void *user_data, int close_code, const char *reason);
+ // Method callbacks:
+
/// Write a chunk of data to the stream
void (*write_cb) (void *user_data, const void *data, size_t len);
- // TODO: "close_cb"; to be used from a ping timer e.g.
+ /// Close the connection
+ void (*close_cb) (void *user_data);
void *user_data; ///< User data for callbacks
};
@@ -541,30 +555,44 @@ ws_handler_send_control (struct ws_handler *self,
}
static void
-ws_handler_fail (struct ws_handler *self, enum ws_status reason)
+ws_handler_close (struct ws_handler *self,
+ enum ws_status close_code, const char *reason, size_t len)
+{
+ struct str payload;
+ str_init (&payload);
+ str_pack_u16 (&payload, close_code);
+ // XXX: maybe accept a null-terminated string on input? Has to be UTF-8 a/w
+ str_append_data (&payload, reason, len);
+ ws_handler_send_control (self, WS_OPCODE_CLOSE, payload.str, payload.len);
+
+ // Close initiated by us; the reason is null-terminated within `payload'
+ if (self->on_close)
+ self->on_close (self->user_data, close_code, payload.str + 2);
+
+ self->state = WS_HANDLER_CLOSING;
+ str_free (&payload);
+}
+
+static void
+ws_handler_fail (struct ws_handler *self, enum ws_status close_code)
{
- uint8_t payload[2] = { reason << 8, reason };
- ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload);
+ ws_handler_close (self, close_code, NULL, 0);
+ self->state = WS_HANDLER_ALMOST_DEAD;
// TODO: set the close timer, ignore all further incoming input (either set
// some flag for the case that we're in the middle of ws_handler_push(),
// and/or add a mechanism to stop the caller from polling the socket for
// reads).
- // TODO: set the state to FAILED (not CLOSED as that means the TCP
- // connection is closed) and wait until all is sent?
// TODO: make sure we don't send pings after the close
}
-// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't
-// ignore frames up to a corresponding close from the client.
-// Read the RFC once again to see if we can really process the frames.
-
// TODO: add support for fragmented responses
static void
ws_handler_send (struct ws_handler *self,
enum ws_opcode opcode, const void *data, size_t len)
{
- // TODO: make sure (just assert?) we're in the OPEN state
+ if (!soft_assert (self->state == WS_HANDLER_OPEN))
+ return;
struct str header;
str_init (&header);
@@ -612,17 +640,45 @@ ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser)
}
static bool
+ws_handler_on_protocol_close
+ (struct ws_handler *self, const struct ws_parser *parser)
+{
+ struct msg_unpacker unpacker;
+ msg_unpacker_init (&unpacker, parser->input.str, parser->payload_len);
+
+ char *reason = NULL;
+ uint16_t close_code = WS_STATUS_NO_STATUS_RECEIVED;
+
+ if (parser->payload_len >= 2)
+ {
+ (void) msg_unpacker_u16 (&unpacker, &close_code);
+ reason = xstrndup (parser->input.str + 2, parser->payload_len - 2);
+ }
+ else
+ reason = xstrdup ("");
+
+ if (self->state != WS_HANDLER_CLOSING)
+ {
+ // Close initiated by the client
+ ws_handler_send_control (self, WS_OPCODE_CLOSE,
+ parser->input.str, parser->payload_len);
+ if (self->on_close)
+ self->on_close (self->user_data, close_code, reason);
+ }
+
+ free (reason);
+ self->state = WS_HANDLER_ALMOST_DEAD;
+ return true;
+}
+
+static bool
ws_handler_on_control_frame
(struct ws_handler *self, const struct ws_parser *parser)
{
switch (parser->opcode)
{
case WS_OPCODE_CLOSE:
- // TODO: confirm the close
- // TODO: change the state to CLOSING
- // TODO: call "on_close"
- // NOTE: the reason is an empty string if omitted
- break;
+ return ws_handler_on_protocol_close (self, parser);
case WS_OPCODE_PING:
ws_handler_send_control (self, WS_OPCODE_PONG,
parser->input.str, parser->payload_len);
@@ -634,6 +690,8 @@ ws_handler_on_control_frame
default:
// Unknown control frame
ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR);
+ // FIXME: we shouldn't close the connection right away;
+ // also check other places
return false;
}
return true;
@@ -670,8 +728,10 @@ ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
return false;
}
- bool result = self->on_message (self->user_data, self->message_opcode,
- self->message_data.str, self->message_data.len);
+ bool result = true;
+ if (self->on_message)
+ result = self->on_message (self->user_data, self->message_opcode,
+ self->message_data.str, self->message_data.len);
str_reset (&self->message_data);
return result;
}
@@ -686,10 +746,26 @@ ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
if (!self->received_pong)
{
// TODO: close/fail the connection?
- return;
}
+ else
+ {
+ ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
+ ev_timer_again (EV_A_ watcher);
+ }
+}
- ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
+static void
+ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
+{
+ struct ws_handler *self = watcher->data;
+ // TODO: call "close_cb"
+}
+
+static void
+ws_handler_on_handshake_timeout (EV_P_ ev_timer *watcher, int revents)
+{
+ struct ws_handler *self = watcher->data;
+ // TODO
}
static void
@@ -697,44 +773,61 @@ ws_handler_init (struct ws_handler *self)
{
memset (self, 0, sizeof *self);
+ self->state = WS_HANDLER_CONNECTING;
+
http_parser_init (&self->hp, HTTP_REQUEST);
self->hp.data = self;
-
str_init (&self->field);
str_init (&self->value);
str_map_init (&self->headers);
self->headers.free = free;
self->headers.key_xfrm = tolower_ascii_strxfrm;
str_init (&self->url);
+ ev_timer_init (&self->handshake_timeout_watcher,
+ ws_handler_on_handshake_timeout, 0., 0.);
+ self->handshake_timeout_watcher.data = self;
ws_parser_init (&self->parser);
self->parser.on_frame_header = ws_handler_on_frame_header;
self->parser.on_frame = ws_handler_on_frame;
-
str_init (&self->message_data);
- self->ping_interval = 60;
+ ev_timer_init (&self->ping_timer,
+ ws_handler_on_ping_timer, 0., 0.);
+ self->ping_timer.data = self;
+ ev_timer_init (&self->close_timeout_watcher,
+ ws_handler_on_close_timeout, 0., 0.);
+ self->ping_timer.data = self;
+ // So that the first ping timer doesn't timeout the connection
+ self->received_pong = true;
+
+ self->handshake_timeout = self->close_timeout = self->ping_interval = 60;
// This is still ridiculously high. Note that the most significant bit
// must always be zero, i.e. the protocol maximum is 0x7FFF FFFF FFFF FFFF.
self->max_payload_len = UINT32_MAX;
+}
- // Just so we can safely stop it
- ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.);
- self->ping_timer.data = self;
- // So that the first ping timer doesn't timeout the connection
- self->received_pong = true;
+/// Stop all timers, not going to use the handler anymore
+static void
+ws_handler_stop (struct ws_handler *self)
+{
+ ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
+ ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
+ ev_timer_stop (EV_DEFAULT_ &self->close_timeout_watcher);
}
static void
ws_handler_free (struct ws_handler *self)
{
+ ws_handler_stop (self);
+
str_free (&self->field);
str_free (&self->value);
str_map_free (&self->headers);
str_free (&self->url);
+
ws_parser_free (&self->parser);
str_free (&self->message_data);
- ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
}
static bool
@@ -863,10 +956,9 @@ ws_handler_http_response (struct ws_handler *self, const char *status, ...)
str_vector_free (&v);
}
-// TODO: also set the connection to some FAILED state or anything that's neither
-// CONNECTING nor OPEN
#define FAIL_HANDSHAKE(status, ...) \
BLOCK_START \
+ self->state = WS_HANDLER_ALMOST_DEAD; \
ws_handler_http_response (self, (status), __VA_ARGS__); \
return false; \
BLOCK_END
@@ -949,21 +1041,38 @@ ws_handler_finish_handshake (struct ws_handler *self)
str_vector_free (&fields);
- // XXX: maybe we should start it earlier so that the handshake can
- // timeout as well. ws_handler_connected()?
- //
- // But it should rather be named "connect_timer"
+ ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer,
+ self->ping_interval, 0);
ev_timer_start (EV_DEFAULT_ &self->ping_timer);
return true;
}
+/// Tells the handler that the TCP connection has been established so it can
+/// timeout when the client handshake doesn't arrive soon enough
+static void
+ws_handler_start (struct ws_handler *self)
+{
+ ev_timer_set (&self->handshake_timeout_watcher,
+ self->handshake_timeout, 0.);
+ ev_timer_start (EV_DEFAULT_ &self->handshake_timeout_watcher);
+}
+
+/// Push data to the WebSocket handler; "len == 0" means EOF
static bool
ws_handler_push (struct ws_handler *self, const void *data, size_t len)
{
+ // TODO: make sure all timers are stopped appropriately
+
if (!len)
{
+ ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
+
if (self->state == WS_HANDLER_OPEN)
- self->on_close (self->user_data, WS_STATUS_ABNORMAL_CLOSURE, "");
+ {
+ if (self->on_close)
+ self->on_close (self->user_data,
+ WS_STATUS_ABNORMAL_CLOSURE, "");
+ }
else
{
// TODO: anything to do besides just closing the connection?
@@ -973,6 +1082,9 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
return false;
}
+ if (self->state == WS_HANDLER_ALMOST_DEAD)
+ // We're waiting for an EOF from the client, must not process data
+ return true;
if (self->state != WS_HANDLER_CONNECTING)
return ws_parser_push (&self->parser, data, len);
@@ -990,6 +1102,8 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
if (self->hp.upgrade)
{
+ ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
+
// The handshake hasn't been finished, yet there is more data
// to be processed after the headers already
if (len - n_parsed)
@@ -999,12 +1113,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
return false;
self->state = WS_HANDLER_OPEN;
+ if (self->on_connected)
+ return self->on_connected (self->user_data);
return true;
}
enum http_errno err = HTTP_PARSER_ERRNO (&self->hp);
if (n_parsed != len || err != HPE_OK)
{
+ ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
+
if (err == HPE_CB_headers_complete)
print_debug ("WS handshake failed: %s", "missing `Upgrade' field");
else
@@ -1916,13 +2034,6 @@ struct client_ws
struct ws_handler handler; ///< WebSockets connection handler
};
-static void
-client_ws_write (void *user_data, const void *data, size_t len)
-{
- struct client *client = user_data;
- client_write (client, data, len);
-}
-
static bool
client_ws_on_message (void *user_data,
enum ws_opcode type, const void *data, size_t len)
@@ -1946,16 +2057,30 @@ client_ws_on_message (void *user_data,
}
static void
+client_ws_write (void *user_data, const void *data, size_t len)
+{
+ struct client *client = user_data;
+ client_write (client, data, len);
+}
+
+static void
+client_ws_close (void *user_data)
+{
+ struct client *client = user_data;
+ client_remove (client);
+}
+
+static void
client_ws_init (struct client *client)
{
struct client_ws *self = xmalloc (sizeof *self);
client->impl_data = self;
ws_handler_init (&self->handler);
- self->handler.write_cb = client_ws_write;
self->handler.on_message = client_ws_on_message;
+ self->handler.write_cb = client_ws_write;
+ self->handler.close_cb = client_ws_close;
self->handler.user_data = client;
- // TODO: configure the handler some more, e.g. regarding the protocol
// One mebibyte seems to be a reasonable value
self->handler.max_payload_len = 1 << 10;
@@ -2052,6 +2177,7 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
// finished flushing the write queue? This should probably even be
// the default behaviour, as it's fairly uncommon for clients to
// shutdown the socket for writes while leaving it open for reading.
+ // Actually, we should wait until the client closes the connection.
// TODO: some sort of "on_buffers_flushed" callback for streaming huge
// chunks of external (or generated) data.
if (!flush_queue (&client->write_queue, watcher))