diff options
-rw-r--r-- | README.adoc | 4 | ||||
-rw-r--r-- | json-rpc-shell.c | 4 | ||||
-rw-r--r-- | json-rpc-test-server.c | 178 |
3 files changed, 179 insertions, 7 deletions
diff --git a/README.adoc b/README.adoc index 67d009f..2d390d5 100644 --- a/README.adoc +++ b/README.adoc @@ -65,8 +65,8 @@ Test server ----------- If you install development packages for libmagic, an included test server will be built but not installed which provides a trivial JSON-RPC 2.0 service with -FastCGI, SCGI, and WebSocket interfaces. It responds to `ping` and `date` -methods and it can serve static files. +FastCGI, SCGI, WebSocket and LSP-like co-process interfaces. It responds to +`ping` and `date`, supports OpenRPC discovery and it can serve static files. Contributing and Support ------------------------ diff --git a/json-rpc-shell.c b/json-rpc-shell.c index 1da11b0..38ff022 100644 --- a/json-rpc-shell.c +++ b/json-rpc-shell.c @@ -2623,7 +2623,7 @@ static const http_parser_settings backend_co_http_settings = }; static bool -backend_co_write_starter (struct co_context *self, struct error **e) +backend_co_inject_starter (struct co_context *self, struct error **e) { // The default "Connection: keep-alive" maps well here. // We cannot feed this line into the parser from within callbacks. @@ -2653,7 +2653,7 @@ backend_co_parse (struct co_context *self, const char *data, size_t len, if (self->pending_fake_starter) { self->pending_fake_starter = false; - if (!backend_co_write_starter (self, e)) + if (!backend_co_inject_starter (self, e)) return false; } diff --git a/json-rpc-test-server.c b/json-rpc-test-server.c index 0cfd161..36950d8 100644 --- a/json-rpc-test-server.c +++ b/json-rpc-test-server.c @@ -1581,7 +1581,6 @@ static void process_json_rpc (struct server_context *ctx, const void *data, size_t len, struct str *output) { - json_error_t e; json_t *request; if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e))) @@ -2551,6 +2550,165 @@ client_ws_create (EV_P_ int sock_fd) return &self->client; } +// --- Co-process client ------------------------------------------------------- + +// This is mostly copied over from json-rpc-shell.c, only a bit simplified. +// We're giving up on header parsing in order to keep this small. +struct co_context +{ + struct server_context *ctx; ///< Server context + struct str message; ///< Message data + struct http_parser parser; ///< HTTP parser + bool pending_fake_starter; ///< Start of message? +}; + +static int +client_co_on_message_begin (http_parser *parser) +{ + struct co_context *self = parser->data; + str_reset (&self->message); + return 0; +} + +static int +client_co_on_body (http_parser *parser, const char *at, size_t len) +{ + struct co_context *self = parser->data; + str_append_data (&self->message, at, len); + return 0; +} + +static int +client_co_on_message_complete (http_parser *parser) +{ + struct co_context *self = parser->data; + http_parser_pause (&self->parser, true); + return 0; +} + +// The LSP incorporates a very thin subset of RFC 822, and it so happens +// that we may simply reuse the full HTTP parser here, with a small hack. +static const http_parser_settings client_co_http_settings = +{ + .on_message_begin = client_co_on_message_begin, + .on_body = client_co_on_body, + .on_message_complete = client_co_on_message_complete, +}; + +static void +client_co_respond (const struct str *buf) +{ + struct str wrapped = str_make(); + str_append_printf (&wrapped, + "Content-Length: %zu\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + "\r\n", buf->len); + str_append_data (&wrapped, buf->str, buf->len); + + if (write (STDOUT_FILENO, wrapped.str, wrapped.len) + != (ssize_t) wrapped.len) + exit_fatal ("write: %s", strerror (errno)); + str_free (&wrapped); +} + +static void +client_co_inject_starter (struct co_context *self) +{ + // The default "Connection: keep-alive" maps well here. + // We cannot feed this line into the parser from within callbacks. + static const char starter[] = "POST / HTTP/1.1\r\n"; + http_parser_pause (&self->parser, false); + + size_t n_parsed = http_parser_execute (&self->parser, + &client_co_http_settings, starter, sizeof starter - 1); + enum http_errno err = HTTP_PARSER_ERRNO (&self->parser); + if (n_parsed != sizeof starter - 1 || err != HPE_OK) + exit_fatal ("protocol failure: %s", http_errno_description (err)); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static void +client_co_process (struct co_context *self) +{ + struct str *message = &self->message; + struct str response = str_make (); + process_json_rpc (self->ctx, message->str, message->len, &response); + if (response.len) + client_co_respond (&response); + str_free (&response); +} + +static void +backend_co_parse (struct co_context *self, const char *data, size_t len, + size_t *n_parsed) +{ + if (self->pending_fake_starter) + { + self->pending_fake_starter = false; + client_co_inject_starter (self); + } + + *n_parsed = http_parser_execute + (&self->parser, &client_co_http_settings, data, len); + if (self->parser.upgrade) + exit_fatal ("protocol failure: %s", "unsupported upgrade attempt"); + + enum http_errno err = HTTP_PARSER_ERRNO (&self->parser); + if (err == HPE_PAUSED) + { + self->pending_fake_starter = true; + client_co_process (self); + } + else if (err != HPE_OK) + exit_fatal ("protocol failure: %s", http_errno_description (err)); +} + +static void +backend_co_on_data (struct co_context *self, const char *data, size_t len) +{ + size_t n_parsed = 0; + do + { + backend_co_parse (self, data, len, &n_parsed); + data += n_parsed; + } + while ((len -= n_parsed)); +} + +static void +client_co_run (struct server_context *ctx) +{ + struct co_context self = {}; + self.ctx = ctx; + self.message = str_make (); + http_parser_init (&self.parser, HTTP_REQUEST); + self.parser.data = &self; + self.pending_fake_starter = true; + + hard_assert (set_blocking (STDIN_FILENO, false)); + struct str buf = str_make (); + struct pollfd pfd = { .fd = STDIN_FILENO, .events = POLLIN }; + while (true) + { + if (poll (&pfd, 1, -1) <= 0) + exit_fatal ("poll: %s", strerror (errno)); + + str_remove_slice (&buf, 0, buf.len); + enum socket_io_result result = socket_io_try_read (pfd.fd, &buf); + int errno_saved = errno; + + if (buf.len) + backend_co_on_data (&self, buf.str, buf.len); + if (result == SOCKET_IO_ERROR) + exit_fatal ("read: %s", strerror (errno_saved)); + if (result == SOCKET_IO_EOF) + break; + } + str_free (&buf); + str_free (&self.message); +} + // --- Basic server stuff ------------------------------------------------------ typedef struct client *(*client_create_fn) (EV_P_ int sock_fd); @@ -2914,11 +3072,12 @@ daemonize (struct server_context *ctx) } static void -parse_program_arguments (int argc, char **argv) +parse_program_arguments (int argc, char **argv, bool *running_as_slave) { static const struct opt opts[] = { { 't', "test", NULL, 0, "self-test" }, + { 's', "slave", NULL, 0, "co-process mode" }, { 'd', "debug", NULL, 0, "run in debug mode" }, { 'h', "help", NULL, 0, "display this help and exit" }, { 'V', "version", NULL, 0, "output version information and exit" }, @@ -2938,6 +3097,9 @@ parse_program_arguments (int argc, char **argv) case 't': test_main (argc, argv); exit (EXIT_SUCCESS); + case 's': + *running_as_slave = true; + break; case 'd': g_debug_mode = true; break; @@ -2970,7 +3132,8 @@ parse_program_arguments (int argc, char **argv) int main (int argc, char *argv[]) { - parse_program_arguments (argc, argv); + bool running_as_a_slave = false; + parse_program_arguments (argc, argv, &running_as_a_slave); print_status (PROGRAM_NAME " " PROGRAM_VERSION " starting"); @@ -2985,6 +3148,15 @@ main (int argc, char *argv[]) exit (EXIT_FAILURE); } + // There's a lot of unnecessary left-over scaffolding in this program, + // for testing purposes assume that everything is synchronous + if (running_as_a_slave) + { + client_co_run (&ctx); + server_context_free (&ctx); + return EXIT_SUCCESS; + } + struct ev_loop *loop; if (!(loop = EV_DEFAULT)) exit_fatal ("libev initialization failed"); |