diff options
-rw-r--r-- | demo-json-rpc-server.c | 684 |
1 files changed, 650 insertions, 34 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index 09f6002..6be6b3e 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -66,6 +66,45 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value) #undef UNPACKER_INT_BEGIN +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// "msg_writer" should be rewritten on top of this + +static void +str_pack_u8 (struct str *self, uint8_t x) +{ + str_append_data (self, &x, 1); +} + +static void +str_pack_u16 (struct str *self, uint64_t x) +{ + uint8_t tmp[2] = { x >> 8, x }; + str_append_data (self, tmp, sizeof tmp); +} + +static void +str_pack_u32 (struct str *self, uint32_t x) +{ + uint32_t u = x; + uint8_t tmp[4] = { u >> 24, u >> 16, u >> 8, u }; + str_append_data (self, tmp, sizeof tmp); +} + +static void +str_pack_i32 (struct str *self, int32_t x) +{ + str_pack_u32 (self, (uint32_t) x); +} + +static void +str_pack_u64 (struct str *self, uint64_t x) +{ + uint8_t tmp[8] = + { x >> 56, x >> 48, x >> 40, x >> 32, x >> 24, x >> 16, x >> 8, x }; + str_append_data (self, tmp, sizeof tmp); +} + // --- libev helpers ----------------------------------------------------------- static bool @@ -312,7 +351,7 @@ fcgi_nv_parser_free (struct fcgi_nv_parser *self) } static void -fcgi_nv_parser_push (struct fcgi_nv_parser *self, void *data, size_t len) +fcgi_nv_parser_push (struct fcgi_nv_parser *self, const void *data, size_t len) { // This could be optimized significantly; I'm not even trying str_append_data (&self->input, data, len); @@ -402,7 +441,38 @@ fcgi_nv_parser_push (struct fcgi_nv_parser *self, void *data, size_t len) // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -// TODO +static void +fcgi_nv_convert_len (size_t len, struct str *output) +{ + if (len < 0x80) + str_pack_u8 (output, len); + else + { + len |= (uint32_t) 1 << 31; + str_pack_u32 (output, len); + } +} + +static void +fcgi_nv_convert (struct str_map *map, struct str *output) +{ + struct str_map_iter iter; + str_map_iter_init (&iter, map); + while (str_map_iter_next (&iter)) + { + const char *name = iter.link->key; + const char *value = iter.link->data; + size_t name_len = iter.link->key_length; + size_t value_len = strlen (value); + + fcgi_nv_convert_len (name_len, output); + fcgi_nv_convert_len (value_len, output); + str_append_data (output, name, name_len); + str_append_data (output, value, value_len); + } +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - enum fcgi_request_state { @@ -413,14 +483,17 @@ enum fcgi_request_state struct fcgi_request { struct fcgi_muxer *muxer; ///< The parent muxer - uint16_t request_id; ///< The ID of this request + uint8_t flags; ///< Request flags + enum fcgi_request_state state; ///< Parsing state struct str_map headers; ///< Headers struct fcgi_nv_parser hdr_parser; ///< Header parser -}; -// TODO + struct str output_buffer; ///< Output buffer + + void *handler_data; ///< Handler data +}; struct fcgi_muxer { @@ -429,14 +502,302 @@ struct fcgi_muxer /// Requests assigned to request IDs // TODO: allocate this dynamically struct fcgi_request *requests[1 << 16]; + + void (*write_cb) (void *user_data, const void *data, size_t len); + void (*close_cb) (void *user_data); + + void *(*request_start_cb) (void *user_data, struct fcgi_request *request); + void (*request_push_cb) (void *handler_data, const void *data, size_t len); + void (*request_destroy_cb) (void *handler_data); + + void *user_data; ///< User data for callbacks }; static void +fcgi_muxer_send (struct fcgi_muxer *self, + enum fcgi_type type, uint16_t request_id, const void *data, size_t len) +{ + hard_assert (len <= UINT16_MAX); + + struct str message; + str_init (&message); + + str_pack_u8 (&message, FCGI_VERSION_1); + str_pack_u8 (&message, type); + str_pack_u16 (&message, request_id); + str_pack_u16 (&message, len); // content length + str_pack_u8 (&message, 0); // padding length + + str_append_data (&message, data, len); + + // XXX: we should probably have another write_cb that assumes ownership + self->write_cb (self->user_data, message.str, message.len); + str_free (&message); +} + +static void +fcgi_muxer_send_end_request (struct fcgi_muxer *self, uint16_t request_id, + uint32_t app_status, enum fcgi_protocol_status protocol_status) +{ + uint8_t content[8] = { app_status >> 24, app_status >> 16, + app_status << 8, app_status, protocol_status }; + fcgi_muxer_send (self, FCGI_END_REQUEST, request_id, + content, sizeof content); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +fcgi_request_init (struct fcgi_request *self) +{ + memset (self, 0, sizeof *self); + + str_map_init (&self->headers); + self->headers.free = free; + + fcgi_nv_parser_init (&self->hdr_parser); + self->hdr_parser.output = &self->headers; +} + +static void +fcgi_request_free (struct fcgi_request *self) +{ + str_map_free (&self->headers); + fcgi_nv_parser_free (&self->hdr_parser); +} + +static void +fcgi_request_push_params + (struct fcgi_request *self, const void *data, size_t len) +{ + if (self->state != FCGI_REQUEST_PARAMS) + { + // TODO: probably reject the request + return; + } + + if (len) + fcgi_nv_parser_push (&self->hdr_parser, data, len); + else + { + // TODO: probably check the state of the header parser + // TODO: request_start() can return false, end the request here? + self->handler_data = self->muxer->request_start_cb + (self->muxer->user_data, self); + self->state = FCGI_REQUEST_STDIN; + } +} + +static void +fcgi_request_push_stdin + (struct fcgi_request *self, const void *data, size_t len) +{ + if (self->state != FCGI_REQUEST_STDIN) + { + // TODO: probably reject the request + return; + } + + self->muxer->request_push_cb (self->handler_data, data, len); +} + +static void +fcgi_request_flush (struct fcgi_request *self) +{ + if (!self->output_buffer.len) + return; + + fcgi_muxer_send (self->muxer, FCGI_STDOUT, self->request_id, + self->output_buffer.str, self->output_buffer.len); + str_reset (&self->output_buffer); +} + +static void +fcgi_request_write (struct fcgi_request *self, const void *data, size_t len) +{ + // We're buffering the output and splitting it into messages + bool need_flush = true; + while (len) + { + size_t to_write = UINT16_MAX - self->output_buffer.len; + if (to_write > len) + { + to_write = len; + need_flush = false; + } + + str_append_data (&self->output_buffer, data, to_write); + data = (uint8_t *) data + to_write; + len -= to_write; + + if (need_flush) + fcgi_request_flush (self); + } +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +typedef void (*fcgi_muxer_handler_fn) + (struct fcgi_muxer *, const struct fcgi_parser *); + +static void +fcgi_muxer_on_get_values + (struct fcgi_muxer *self, const struct fcgi_parser *parser) +{ + struct str_map values; str_map_init (&values); values.free = free; + struct str_map response; str_map_init (&response); response.free = free; + + struct fcgi_nv_parser nv_parser; + fcgi_nv_parser_init (&nv_parser); + nv_parser.output = &values; + + fcgi_nv_parser_push (&nv_parser, parser->content.str, parser->content.len); + + struct str_map_iter iter; + str_map_iter_init (&iter, &values); + while (str_map_iter_next (&iter)) + { + const char *key = iter.link->key; + + // TODO: if (!strcmp (key, FCGI_MAX_CONNS)) + // TODO: if (!strcmp (key, FCGI_MAX_REQS)) + + if (!strcmp (key, FCGI_MPXS_CONNS)) + str_map_set (&response, key, xstrdup ("1")); + } + + struct str content; + str_init (&content); + fcgi_nv_convert (&response, &content); + fcgi_muxer_send (self, FCGI_GET_VALUES_RESULT, parser->request_id, + content.str, content.len); + str_free (&content); + + str_map_free (&values); + str_map_free (&response); +} + +static void +fcgi_muxer_on_begin_request + (struct fcgi_muxer *self, const struct fcgi_parser *parser) +{ + struct msg_unpacker unpacker; + msg_unpacker_init (&unpacker, parser->content.str, parser->content.len); + + uint16_t role; + uint8_t flags; + bool success = true; + success &= msg_unpacker_u16 (&unpacker, &role); + success &= msg_unpacker_u8 (&unpacker, &flags); + // Ignoring 5 reserved bytes + + if (!success) + { + print_debug ("FastCGI: ignoring invalid %s message", + STRINGIFY (FCGI_BEGIN_REQUEST)); + return; + } + + if (role != FCGI_RESPONDER) + { + fcgi_muxer_send_end_request (self, + parser->request_id, 0, FCGI_UNKNOWN_ROLE); + return; + } + + struct fcgi_request *request = self->requests[parser->request_id]; + if (request) + { + // TODO: fail + return; + } + + request = xcalloc (1, sizeof *request); + fcgi_request_init (request); + request->muxer = self; + request->request_id = parser->request_id; + request->flags = flags; + + self->requests[parser->request_id] = request; +} + +static void +fcgi_muxer_on_abort_request + (struct fcgi_muxer *self, const struct fcgi_parser *parser) +{ + struct fcgi_request *request = self->requests[parser->request_id]; + if (!request) + { + print_debug ("FastCGI: received %s for an unknown request", + STRINGIFY (FCGI_ABORT_REQUEST)); + return; + } + + // TODO: abort the request +} + +static void +fcgi_muxer_on_params (struct fcgi_muxer *self, const struct fcgi_parser *parser) +{ + struct fcgi_request *request = self->requests[parser->request_id]; + if (!request) + { + print_debug ("FastCGI: received %s for an unknown request", + STRINGIFY (FCGI_PARAMS)); + return; + } + + fcgi_request_push_params (request, + parser->content.str, parser->content.len); +} + +static void +fcgi_muxer_on_stdin (struct fcgi_muxer *self, const struct fcgi_parser *parser) +{ + struct fcgi_request *request = self->requests[parser->request_id]; + if (!request) + { + print_debug ("FastCGI: received %s for an unknown request", + STRINGIFY (FCGI_STDIN)); + return; + } + + fcgi_request_push_stdin (request, + parser->content.str, parser->content.len); +} + +static void fcgi_muxer_on_message (const struct fcgi_parser *parser, void *user_data) { struct fcgi_muxer *self = user_data; - // TODO + if (parser->version != FCGI_VERSION_1) + { + print_debug ("FastCGI: unsupported version %d", parser->version); + // TODO: also return false to stop processing on protocol error? + return; + } + + static const fcgi_muxer_handler_fn handlers[] = + { + [FCGI_GET_VALUES] = fcgi_muxer_on_get_values, + [FCGI_BEGIN_REQUEST] = fcgi_muxer_on_begin_request, + [FCGI_ABORT_REQUEST] = fcgi_muxer_on_abort_request, + [FCGI_PARAMS] = fcgi_muxer_on_params, + [FCGI_STDIN] = fcgi_muxer_on_stdin, + }; + + fcgi_muxer_handler_fn handler; + if (parser->type >= N_ELEMENTS (handlers) + || !(handler = handlers[parser->type])) + { + uint8_t content[8] = { parser->type }; + fcgi_muxer_send (self, FCGI_UNKNOWN_TYPE, parser->request_id, + content, sizeof content); + return; + } + + handler (self, parser); } static void @@ -649,6 +1010,7 @@ struct server_context struct client *clients; ///< Clients unsigned n_clients; ///< Current number of connections + struct request_handler *handlers; ///< Request handlers struct str_map config; ///< Server configuration }; @@ -672,19 +1034,58 @@ server_context_free (struct server_context *self) // --- JSON-RPC ---------------------------------------------------------------- -// TODO: this is where we're actually supposed to do JSON-RPC 2.0 processing +#define JSON_RPC_ERROR_TABLE(XX) \ + XX (-32700, PARSE_ERROR, "Parse error") \ + XX (-32600, INVALID_REQUEST, "Invalid Request") \ + XX (-32601, METHOD_NOT_FOUND, "Method not found") \ + XX (-32602, INVALID_PARAMS, "Invalid params") \ + XX (-32603, INTERNAL_ERROR, "Internal error") + +enum json_rpc_error +{ +#define XX(code, name, message) JSON_RPC_ERROR_ ## name, + JSON_RPC_ERROR_TABLE (XX) +#undef XX + JSON_RPC_ERROR_COUNT +}; + +static json_t * +json_rpc_error (enum json_rpc_error id, json_t *data) +{ +#define XX(code, name, message) { code, message }, + static const struct json_rpc_error + { + int code; + const char *message; + } + errors[JSON_RPC_ERROR_COUNT] = + { + JSON_RPC_ERROR_TABLE (XX) + }; +#undef XX + + json_t *error = json_object (); + json_object_set_new (error, "code", json_integer (errors[id].code)); + json_object_set_new (error, "message", json_string (errors[id].message)); -// There's probably no reason to create an object for this. -// -// We probably just want a handler function that takes a JSON string, parses it, -// and returns back another JSON string. -// -// Then there should be another function that takes a parsed JSON request and -// returns back a JSON reply. This function may get called multiple times if -// the user sends a batch request. + if (data) + json_object_set_new (error, "data", data); -// TODO: a function that queues up a ping over IRC: this has to be owned by the -// server context as a background job that removes itself upon completion. + return error; +} + +static json_t * +json_rpc_response (json_t *id, json_t *result, json_t *error) +{ + json_t *x = json_object (); + json_object_set_new (x, "jsonrpc", json_string ("2.0")); + json_object_set_new (x, "id", id ? id : json_null ()); + if (result) json_object_set_new (x, "result", result); + if (error) json_object_set_new (x, "error", error); + return x; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - static bool try_advance (const char **p, const char *text) @@ -728,11 +1129,128 @@ validate_json_rpc_content_type (const char *type) return !*type; } +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// TODO: a method that queues up a ping over IRC: this has to be owned by the +// server context as a background job that removes itself upon completion. + +static json_t * +json_rpc_ping (struct server_context *ctx, json_t *params) +{ + (void) ctx; + (void) params; + + return json_rpc_response (NULL, json_string ("pong"), NULL); +} + +static json_t * +process_json_rpc_request (struct server_context *ctx, json_t *request) +{ + // TODO: takes a parsed JSON request and returns back a JSON reply. + // This function may get called multiple times for batch requests. + + if (!json_is_object (request)) + return json_rpc_response (NULL, NULL, + json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL)); + + json_t *v = json_object_get (request, "jsonrpc"); + json_t *m = json_object_get (request, "method"); + json_t *params = json_object_get (request, "params"); + json_t *id = json_object_get (request, "id"); + + const char *version; + const char *method; + + bool ok = true; + ok &= v && (version = json_string_value (v)) && !strcmp (version, "2.0"); + ok &= m && (method = json_string_value (m)); + ok &= !params || json_is_array (params) || json_is_object (params); + ok &= !id || json_is_null (id) || + json_is_string (id) || json_is_number (id); + if (!ok) + return json_rpc_response (id, NULL, + json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL)); + + // TODO: add a more extensible mechanism + json_t *response = NULL; + if (!strcmp (method, "ping")) + response = json_rpc_ping (ctx, params); + else + return json_rpc_response (id, NULL, + json_rpc_error (JSON_RPC_ERROR_METHOD_NOT_FOUND, NULL)); + + if (id) + return response; + + // Notifications don't get responses + // TODO: separate notifications from non-notifications? + json_decref (response); + return NULL; +} + +static void +flush_json (json_t *json, struct str *output) +{ + char *utf8 = json_dumps (json, JSON_ENCODE_ANY); + str_append (output, utf8); + free (utf8); + json_decref (json); +} + +static void +process_json_rpc (struct server_context *ctx, + const void *data, size_t len, struct str *output) +{ + + json_error_t e; + json_t *request; + if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e))) + { + flush_json (json_rpc_response (NULL, NULL, + json_rpc_error (JSON_RPC_ERROR_PARSE_ERROR, NULL)), + output); + return; + } + + if (json_is_array (request)) + { + if (!json_array_size (request)) + { + flush_json (json_rpc_response (NULL, NULL, + json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL)), + output); + return; + } + + json_t *response = json_array (); + json_t *iter; + size_t i; + + json_array_foreach (request, i, iter) + { + json_t *result = process_json_rpc_request (ctx, iter); + if (result) + json_array_append_new (response, result); + } + + if (json_array_size (response)) + flush_json (response, output); + else + json_decref (response); + } + else + { + json_t *result = process_json_rpc_request (ctx, request); + if (result) + flush_json (result, output); + } +} + // --- Requests ---------------------------------------------------------------- struct request { - // TODO *ctx + struct server_context *ctx; ///< Server context void *user_data; ///< User data argument for callbacks @@ -749,6 +1267,8 @@ struct request struct request_handler { + LIST_HEADER (struct request_handler) + /// Install ourselves as the handler for the request if applicable bool (*try_handle) (struct request *request, struct str_map *headers); @@ -785,11 +1305,17 @@ request_finish (struct request *self) static bool request_start (struct request *self, struct str_map *headers) { - bool handled = false; - // TODO: try request handlers registered in self->ctx - if (handled) - // TODO: can also be false - return true; + LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers) + if (handler->try_handle (self, headers)) + { + // XXX: maybe we should isolate the handlers a bit more + self->handler = handler; + + // TODO: we should also allow the "try_handle" function to + // return that it has already finished processing the request + // and we should abort it by returning false here. + return true; + } // Unable to serve the request struct str response; @@ -819,11 +1345,14 @@ request_handler_json_rpc_try_handle const char *content_type = str_map_find (headers, "CONTENT_TYPE"); const char *method = str_map_find (headers, "REQUEST_METHOD"); - if (strcmp (method, "POST") - || !validate_json_rpc_content_type (content_type)) + if (!method || strcmp (method, "POST") + || !content_type || !validate_json_rpc_content_type (content_type)) return false; - // TODO: install the handler, perhaps construct an object + struct str *buf = xcalloc (1, sizeof *buf); + str_init (buf); + + request->handler_data = buf; return true; } @@ -831,15 +1360,28 @@ static bool request_handler_json_rpc_push (struct request *request, const void *data, size_t len) { - // TODO: append to a buffer - // TODO: len == 0: process the request + struct str *buf = request->handler_data; + if (len) + str_append_data (buf, data, len); + else + { + struct str response; + str_init (&response); + process_json_rpc (request->ctx, buf->str, buf->len, &response); + request->write_cb (request->user_data, response.str, response.len); + str_free (&response); + } return true; } static void request_handler_json_rpc_destroy (struct request *request) { - // TODO + struct str *buf = request->handler_data; + str_free (buf); + free (buf); + + request->handler_data = NULL; } struct request_handler g_request_handler_json_rpc = @@ -849,6 +1391,8 @@ struct request_handler g_request_handler_json_rpc = .destroy_cb = request_handler_json_rpc_destroy, }; +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + // TODO: another request handler to respond to all GETs with a message // --- Client communication handlers ------------------------------------------- @@ -927,17 +1471,86 @@ client_remove (struct client *client) struct client_fcgi { - struct fcgi_parser parser; ///< FastCGI stream parser + struct fcgi_muxer muxer; ///< FastCGI de/multiplexer +}; + +struct client_fcgi_request +{ + struct fcgi_request *fcgi_request; ///< FastCGI request + struct request request; ///< Request }; static void +client_fcgi_request_write (void *user_data, const void *data, size_t len) +{ + struct client_fcgi_request *request = user_data; + fcgi_request_write (request->fcgi_request, data, len); +} + +static void +client_fcgi_request_close (void *user_data) +{ + struct client_fcgi_request *request = user_data; + // TODO: tell the fcgi_request to what? +} + +static void * +client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request) +{ + struct client *client = user_data; + + struct client_fcgi_request *request = xmalloc (sizeof *request); + request->fcgi_request = fcgi_request; + request_init (&request->request); + request->request.ctx = client->ctx; + request->request.write_cb = client_fcgi_request_write; + request->request.close_cb = client_fcgi_request_close; + request->request.user_data = request; + return request; +} + +static void +client_fcgi_request_push (void *handler_data, const void *data, size_t len) +{ + struct client_fcgi_request *request = handler_data; + request_push (&request->request, data, len); +} + +static void +client_fcgi_request_destroy (void *handler_data) +{ + struct client_fcgi_request *request = handler_data; + request_free (&request->request); + free (handler_data); +} + +static void +client_fcgi_write (void *user_data, const void *data, size_t len) +{ + struct client *client = user_data; + client_write (client, data, len); +} + +static void +client_fcgi_close (void *user_data) +{ + struct client *client = user_data; + client_remove (client); +} + +static void client_fcgi_init (struct client *client) { struct client_fcgi *self = xcalloc (1, sizeof *self); client->impl_data = self; - fcgi_parser_init (&self->parser); - // TODO: configure the parser + fcgi_muxer_init (&self->muxer); + self->muxer.write_cb = client_fcgi_write; + self->muxer.close_cb = client_fcgi_close; + self->muxer.request_start_cb = client_fcgi_request_start; + self->muxer.request_push_cb = client_fcgi_request_push; + self->muxer.request_destroy_cb = client_fcgi_request_destroy; + self->muxer.user_data = client; } static void @@ -946,7 +1559,7 @@ client_fcgi_destroy (struct client *client) struct client_fcgi *self = client->impl_data; client->impl_data = NULL; - fcgi_parser_free (&self->parser); + fcgi_muxer_free (&self->muxer); free (self); } @@ -954,7 +1567,7 @@ static bool client_fcgi_push (struct client *client, const void *data, size_t len) { struct client_fcgi *self = client->impl_data; - fcgi_parser_push (&self->parser, data, len); + fcgi_muxer_push (&self->muxer, data, len); return true; } @@ -1015,6 +1628,7 @@ client_scgi_init (struct client *client) client->impl_data = self; request_init (&self->request); + self->request.ctx = client->ctx; self->request.write_cb = client_scgi_write; self->request.close_cb = client_scgi_close; self->request.user_data = client; @@ -1426,6 +2040,8 @@ main (int argc, char *argv[]) (void) signal (SIGPIPE, SIG_IGN); + LIST_PREPEND (ctx.handlers, &g_request_handler_json_rpc); + if (!parse_config (&ctx, &e) || !setup_listen_fds (&ctx, &e)) { |