From 23eb4cca38949152fb3f61dd9edd88e60cccf5a3 Mon Sep 17 00:00:00 2001
From: Přemysl Janouch
Date: Sat, 14 Mar 2015 19:36:37 +0100
Subject: Steady progress
Still in a state of total chaos, it appears.
---
demo-json-rpc-server.c | 314 ++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 272 insertions(+), 42 deletions(-)
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index 1b10e9d..77c9ca5 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -701,6 +701,12 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len)
}
}
+static void
+fcgi_request_finish (struct fcgi_request *self)
+{
+ // TODO: flush(), end_request(), delete self, muxer->request_destroy_cb()?
+}
+
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
typedef void (*fcgi_muxer_handler_fn)
@@ -764,6 +770,7 @@ fcgi_muxer_on_begin_request
return;
}
+ // We can only act as a responder, reject everything else up front
if (role != FCGI_RESPONDER)
{
fcgi_muxer_send_end_request (self,
@@ -1063,6 +1070,8 @@ scgi_parser_push (struct scgi_parser *self,
#define SEC_WS_PROTOCOL "Sec-WebSocket-Protocol"
#define SEC_WS_VERSION "Sec-WebSocket-Version"
+#define WS_MAX_CONTROL_PAYLOAD_LEN 125
+
static char *
ws_encode_response_key (const char *key)
{
@@ -1122,6 +1131,12 @@ enum ws_opcode
WS_OPCODE_PONG = 10
};
+static bool
+ws_is_control_frame (int opcode)
+{
+ return opcode >= WS_OPCODE_CLOSE;
+}
+
struct ws_parser
{
struct str input; ///< External input buffer
@@ -1136,8 +1151,7 @@ struct ws_parser
uint32_t mask; ///< Frame mask
uint64_t payload_len; ///< Payload length
- // TODO: it wouldn't be half bad if there was a callback to just validate
- // the frame header (such as the maximum payload length)
+ bool (*on_frame_header) (void *user_data, const struct ws_parser *self);
/// Callback for when a message is successfully parsed.
/// The actual payload is stored in "input", of length "payload_len".
@@ -1248,17 +1262,17 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len)
case WS_PARSER_MASK:
if (!self->is_masked)
- {
- self->state = WS_PARSER_PAYLOAD;
- break;
- }
+ goto end_of_header;
if (self->input.len < 4)
return true;
(void) msg_unpacker_u32 (&unpacker, &self->mask);
+ str_remove_slice (&self->input, 0, 4);
+ end_of_header:
self->state = WS_PARSER_PAYLOAD;
- str_remove_slice (&self->input, 0, 4);
+ if (!self->on_frame_header (self->user_data, self))
+ return false;
break;
case WS_PARSER_PAYLOAD:
@@ -1289,7 +1303,7 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len)
enum ws_handler_state
{
- WS_HANDLER_HTTP, ///< Parsing HTTP
+ WS_HANDLER_HANDSHAKE, ///< Parsing HTTP
WS_HANDLER_WEBSOCKETS ///< Parsing WebSockets frames
};
@@ -1305,12 +1319,38 @@ struct ws_handler
struct str url; ///< Request URL
struct ws_parser parser; ///< Protocol frame parser
+ bool expecting_continuation; ///< For non-control traffic
+
+ enum ws_opcode message_opcode; ///< Opcode for the current message
+ struct str message_data; ///< Concatenated message data
+
+ unsigned ping_interval; ///< Ping interval in seconds
+ uint64_t max_payload_len; ///< Maximum length of any message
+
+ // TODO: bool closing; // XXX: rather a { OPEN, CLOSING } state?
+ // TODO: a close timer
- // TODO: bool closing;
- // TODO: a configurable max_payload_len initialized by _init()
+ // TODO: a ping timer (when no pong is received by the second time the
+ // timer triggers, it is a ping timeout)
+ ev_timer ping_timer; ///< Ping timer
+ bool received_pong; ///< Received PONG since the last PING
/// Called upon reception of a single full message
- bool (*on_message) (void *user_data, const void *data, size_t len);
+ bool (*on_message) (void *user_data,
+ enum ws_opcode type, const void *data, size_t len);
+
+ // TODO: void (*on_initialized) () that will allow the user to choose
+ // any sub-protocol, if the client has provided any.
+
+ /// The connection has been closed.
+ /// @a close_code may, or may not, be one of enum ws_status.
+ // NOTE: the "close_code" is what we receive from the remote endpoint,
+ // or one of 1005/1006/1015
+ // NOTE: the reason is an empty string if omitted
+ // TODO; also note that ideally, the handler should (be able to) first
+ // receive a notification about the connection being closed because of
+ // an error (recv()) returns -1, and call on_close() in reaction.
+ void (*on_close) (void *user_data, int close_code, const char *reason);
/// Write a chunk of data to the stream
void (*write_cb) (void *user_data, const void *data, size_t len);
@@ -1320,15 +1360,126 @@ struct ws_handler
void *user_data; ///< User data for callbacks
};
+static void
+ws_handler_send_control (struct ws_handler *self, enum ws_opcode opcode,
+ const void *data, size_t len)
+{
+ if (len > WS_MAX_CONTROL_PAYLOAD_LEN)
+ {
+ print_debug ("truncating output control frame payload"
+ " from %zu to %zu bytes", len, (size_t) WS_MAX_CONTROL_PAYLOAD_LEN);
+ len = WS_MAX_CONTROL_PAYLOAD_LEN;
+ }
+
+ uint8_t header[2] = { 0x80 | (opcode & 0x0F), len };
+ self->write_cb (self->user_data, header, sizeof header);
+ self->write_cb (self->user_data, data, len);
+}
+
+static void
+ws_handler_fail (struct ws_handler *self, enum ws_status reason)
+{
+ uint8_t payload[2] = { reason << 8, reason };
+ ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload);
+
+ // TODO: set the close timer, ignore all further incoming input (either set
+ // some flag for the case that we're in the middle of ws_handler_push(),
+ // and/or add a mechanism to stop the caller from polling the socket for
+ // reads).
+}
+
+// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't
+// ignore frames up to a corresponding close from the client.
+// Read the RFC once again to see if we can really process the frames.
+
+static bool
+ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser)
+{
+ struct ws_handler *self = user_data;
+
+ if (parser->reserved_1 || parser->reserved_2 || parser->reserved_3
+ || !parser->is_masked // client -> server payload must be masked
+ || (ws_is_control_frame (parser->opcode) &&
+ (!parser->is_fin || parser->payload_len > WS_MAX_CONTROL_PAYLOAD_LEN))
+ || (!ws_is_control_frame (parser->opcode) &&
+ (self->expecting_continuation && parser->opcode != WS_OPCODE_CONT)))
+ ws_handler_fail (self, WS_STATUS_PROTOCOL);
+ else if (parser->payload_len > self->max_payload_len)
+ ws_handler_fail (self, WS_STATUS_TOO_BIG);
+ else
+ return true;
+ return false;
+}
+
+static bool
+ws_handler_on_control_frame
+ (struct ws_handler *self, const struct ws_parser *parser)
+{
+ switch (parser->opcode)
+ {
+ case WS_OPCODE_CLOSE:
+ // TODO: confirm the close
+ break;
+ case WS_OPCODE_PING:
+ ws_handler_send_control (self, WS_OPCODE_PONG,
+ parser->input.str, parser->payload_len);
+ break;
+ case WS_OPCODE_PONG:
+ // XXX: maybe we should check the payload
+ self->received_pong = true;
+ break;
+ default:
+ // TODO: shouldn't we rather fail on unknown control frames?
+ // But should we actually return false at any time? Yes?
+ break;
+ }
+ return true;
+}
+
static bool
ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
{
struct ws_handler *self = user_data;
- // TODO: handle pings and what not
- // TODO: validate the message
- // TODO: first concatenate all parts of the message
- return self->on_message (self->user_data,
+ if (ws_is_control_frame (parser->opcode))
+ return ws_handler_on_control_frame (self, parser);
+
+ // TODO: do this rather in "on_frame_header"
+ if (self->message_data.len + parser->payload_len > self->max_payload_len)
+ {
+ ws_handler_fail (self, WS_STATUS_TOO_BIG);
+ return true;
+ }
+
+ if (!self->expecting_continuation)
+ self->message_opcode = parser->opcode;
+
+ str_append_data (&self->message_data,
+ parser->input.str, parser->payload_len);
+ self->expecting_continuation = !parser->is_fin;
+
+ if (!parser->is_fin)
+ return true;
+
+ bool result = self->on_message (self->user_data, self->message_opcode,
self->parser.input.str, self->parser.payload_len);
+ str_reset (&self->message_data);
+ return result;
+}
+
+static void
+ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
+{
+ (void) loop;
+ (void) revents;
+
+ struct ws_handler *self = watcher->data;
+ if (!self->received_pong)
+ {
+ // TODO: close/fail the connection?
+ return;
+ }
+
+ ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
}
static void
@@ -1347,7 +1498,20 @@ ws_handler_init (struct ws_handler *self)
str_init (&self->url);
ws_parser_init (&self->parser);
+ self->parser.on_frame_header = ws_handler_on_frame_header;
self->parser.on_frame = ws_handler_on_frame;
+
+ str_init (&self->message_data);
+
+ self->ping_interval = 60;
+ // This is still ridiculously high
+ self->max_payload_len = UINT32_MAX;
+
+ // Just so we can safely stop it
+ ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.);
+ self->ping_timer.data = self;
+ // So that the first ping timer doesn't timeout the connection
+ self->received_pong = true;
}
static void
@@ -1358,6 +1522,8 @@ ws_handler_free (struct ws_handler *self)
str_map_free (&self->headers);
str_free (&self->url);
ws_parser_free (&self->parser);
+ str_free (&self->message_data);
+ ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
}
static void
@@ -1395,10 +1561,11 @@ ws_handler_on_header_value (http_parser *parser, const char *at, size_t len)
static int
ws_handler_on_headers_complete (http_parser *parser)
{
- // Just return 1 to tell the parser we don't want to parse any body;
- // the parser should have found an upgrade request for WebSockets
- (void) parser;
- return 1;
+ // We strictly require a protocol upgrade
+ if (!parser->upgrade)
+ return 2;
+
+ return 0;
}
static int
@@ -1418,6 +1585,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
|| self->hp.http_major != 1
|| self->hp.http_minor != 1)
; // TODO: error (maybe send a frame depending on conditions)
+ // ...mostly just 400 Bad Request
const char *upgrade = str_map_find (&self->headers, "Upgrade");
@@ -1425,6 +1593,12 @@ ws_handler_finish_handshake (struct ws_handler *self)
const char *version = str_map_find (&self->headers, SEC_WS_VERSION);
const char *protocol = str_map_find (&self->headers, SEC_WS_PROTOCOL);
+ if (!upgrade || strcmp (upgrade, "websocket")
+ || !version || strcmp (version, "13"))
+ ; // TODO: error
+ // ... if the version doesn't match, we must send back a header indicating
+ // the version we do support
+
struct str response;
str_init (&response);
str_append (&response, "HTTP/1.1 101 Switching Protocols\r\n");
@@ -1433,8 +1607,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
// TODO: prepare the rest of the headers
- // TODO: we should ideally check that this is a 16-byte base64-encoded
- // value; do we also have to strip surrounding whitespace?
+ // TODO: we should ideally check that this is a 16-byte base64-encoded value
char *response_key = ws_encode_response_key (key);
str_append_printf (&response, SEC_WS_ACCEPT ": %s\r\n", response_key);
free (response_key);
@@ -1442,6 +1615,10 @@ ws_handler_finish_handshake (struct ws_handler *self)
str_append (&response, "\r\n");
self->write_cb (self->user_data, response.str, response.len);
str_free (&response);
+
+ // XXX: maybe we should start it earlier so that the handshake can
+ // timeout as well. ws_handler_connected()?
+ ev_timer_start (EV_DEFAULT_ &self->ping_timer);
return true;
}
@@ -1477,14 +1654,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
self->state = WS_HANDLER_WEBSOCKETS;
return true;
}
- else if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK)
+
+ if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK)
{
// TODO: error
// print_debug (..., http_errno_description
// (HTTP_PARSER_ERRNO (&self->hp));
+ // NOTE: if == HPE_CB_headers_complete, "Upgrade" is missing
+ return false;
}
- // TODO: make double sure to handle the case of !upgrade
return true;
}
@@ -1497,6 +1676,7 @@ static struct config_item g_config_table[] =
{ "port_scgi", NULL, "Port to bind for SCGI" },
{ "port_ws", NULL, "Port to bind for WebSockets" },
{ "pid_file", NULL, "Full path for the PID file" },
+ // XXX: here belongs something like a web SPA that interfaces with us
{ "static_root", NULL, "The root for static content" },
{ NULL, NULL, NULL }
};
@@ -1526,11 +1706,16 @@ server_context_init (struct server_context *self)
load_config_defaults (&self->config, g_config_table);
}
+static void close_listeners (struct server_context *self);
+
static void
server_context_free (struct server_context *self)
{
- // TODO: free the clients (?)
- // TODO: close the listeners (?)
+ // We really shouldn't attempt a quit without closing the clients first
+ soft_assert (!self->clients);
+
+ close_listeners (self);
+ free (self->listeners);
str_map_free (&self->config);
}
@@ -1773,7 +1958,8 @@ struct request
{
struct server_context *ctx; ///< Server context
- void *user_data; ///< User data argument for callbacks
+ struct request_handler *handler; ///< Current request handler
+ void *handler_data; ///< User data for the handler
/// Callback to write some CGI response data to the output
void (*write_cb) (void *user_data, const void *data, size_t len);
@@ -1782,16 +1968,17 @@ struct request
/// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
void (*close_cb) (void *user_data);
- struct request_handler *handler; ///< Current request handler
- void *handler_data; ///< User data for the handler
+ void *user_data; ///< User data argument for callbacks
};
struct request_handler
{
LIST_HEADER (struct request_handler)
- /// Install ourselves as the handler for the request if applicable
- bool (*try_handle) (struct request *request, struct str_map *headers);
+ /// Install ourselves as the handler for the request if applicable.
+ /// Set @a continue_ to false if further processing should be stopped.
+ bool (*try_handle) (struct request *request,
+ struct str_map *headers, bool *continue_);
/// Handle incoming data.
/// Return false if further processing should be stopped.
@@ -1826,16 +2013,22 @@ request_finish (struct request *self)
static bool
request_start (struct request *self, struct str_map *headers)
{
+ // XXX: it feels like this should rather be two steps:
+ // bool (*can_handle) (request *, headers)
+ // ... install the handler ...
+ // bool (*handle) (request *)
+ //
+ // However that might cause some stuff to be done twice.
+ //
+ // Another way we could get rid off the continue_ argument is via adding
+ // some way of marking the request as finished from within the handler.
+
+ bool continue_ = true;
LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers)
- if (handler->try_handle (self, headers))
+ if (handler->try_handle (self, headers, &continue_))
{
- // XXX: maybe we should isolate the handlers a bit more
self->handler = handler;
-
- // TODO: we should also allow the "try_handle" function to
- // return that it has already finished processing the request
- // and we should abort it by returning false here.
- return true;
+ return continue_;
}
// Unable to serve the request
@@ -1862,7 +2055,7 @@ request_push (struct request *self, const void *data, size_t len)
static bool
request_handler_json_rpc_try_handle
- (struct request *request, struct str_map *headers)
+ (struct request *request, struct str_map *headers, bool *continue_)
{
const char *content_type = str_map_find (headers, "CONTENT_TYPE");
const char *method = str_map_find (headers, "REQUEST_METHOD");
@@ -1875,6 +2068,7 @@ request_handler_json_rpc_try_handle
str_init (buf);
request->handler_data = buf;
+ *continue_ = true;
return true;
}
@@ -1972,8 +2166,11 @@ detect_magic (const void *data, size_t len)
static bool
request_handler_static_try_handle
- (struct request *request, struct str_map *headers)
+ (struct request *request, struct str_map *headers, bool *continue_)
{
+ // Serving static files is actually quite complicated as it turns out;
+ // but this is only meant to serve a few tiny text files
+
struct server_context *ctx = request->ctx;
const char *root = str_map_find (&ctx->config, "static_root");
if (!root)
@@ -1999,6 +2196,7 @@ request_handler_static_try_handle
char *suffix = canonicalize_url_path (path_info);
char *path = xstrdup_printf ("%s%s", root, suffix);
+ // TODO: check that this is a regular file
FILE *fp = fopen (path, "rb");
if (!fp)
{
@@ -2045,6 +2243,13 @@ request_handler_static_try_handle
while ((len = fread (buf, 1, sizeof buf, fp)))
request->write_cb (request->user_data, buf, len);
fclose (fp);
+
+ // TODO: this should rather not be returned all at once but in chunks;
+ // file read requests never return EAGAIN
+ // TODO: actual file data should really be returned by a callback when
+ // the socket is writable with nothing to be sent (pumping the entire
+ // file all at once won't really work if it's huge).
+ *continue_ = false;
return true;
}
@@ -2178,7 +2383,8 @@ static void
client_fcgi_request_close (void *user_data)
{
struct client_fcgi_request *request = user_data;
- // TODO: tell the fcgi_request to what?
+ // TODO: fcgi_request_finish()? That will most probably end up with us
+ // receiving client_fcgi_request_destroy()
}
static void *
@@ -2186,6 +2392,7 @@ client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request)
{
struct client *client = user_data;
+ // TODO: what if the request is aborted by ;
struct client_fcgi_request *request = xmalloc (sizeof *request);
request->fcgi_request = fcgi_request;
request_init (&request->request);
@@ -2375,12 +2582,17 @@ client_ws_write (void *user_data, const void *data, size_t len)
}
static bool
-client_ws_on_message (void *user_data, const void *data, size_t len)
+client_ws_on_message (void *user_data,
+ enum ws_opcode type, const void *data, size_t len)
{
struct client *client = user_data;
struct client_ws *self = client->impl_data;
- // TODO: do something about the message
+ struct str response;
+ str_init (&response);
+ process_json_rpc (client->ctx, data, len, &response);
+ // TODO: send the response
+ str_free (&response);
return true;
}
@@ -2431,6 +2643,22 @@ struct listener
struct client_impl *impl; ///< Client behaviour
};
+static void
+close_listeners (struct server_context *self)
+{
+ // TODO: factor out the closing act, to be used in initiate_quit()
+ for (size_t i = 0; i < self->n_listeners; i++)
+ {
+ struct listener *listener = &self->listeners[i];
+ if (listener->fd == -1)
+ continue;
+
+ ev_io_stop (EV_DEFAULT_ &listener->watcher);
+ xclose (listener->fd);
+ listener->fd = -1;
+ }
+}
+
static bool
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
{
@@ -2472,6 +2700,8 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
// finished flushing the write queue? This should probably even be
// the default behaviour, as it's fairly uncommon for clients to
// shutdown the socket for writes while leaving it open for reading.
+ // TODO: some sort of "on_buffers_flushed" callback for streaming huge
+ // chunks of external (or generated) data.
if (!flush_queue (&client->write_queue, watcher))
goto close;
return;
--
cgit v1.2.3-70-g09d2