From 23c728e5355bb33e44df91a06d3c68488b85e30b Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch
Date: Wed, 14 Oct 2020 00:05:03 +0200 Subject: Add a backend for co-processes Targets language servers. In this first stage, we don't need to support bi-directionality, although it's a requirement for finishing this task. Updates #4 --- README.adoc | 1 + json-rpc-shell.adoc | 7 +- json-rpc-shell.c | 442 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 439 insertions(+), 11 deletions(-) diff --git a/README.adoc b/README.adoc index d0f6d7f..67d009f 100644 --- a/README.adoc +++ b/README.adoc @@ -19,6 +19,7 @@ the following niceties: results in your favourite editor or redirect them to a file - ability to edit the input line in your favourite editor as well with Alt+E - WebSocket (RFC 6455) can also be used as a transport rather than HTTP + - even Language Server Protocol servers may be launched as a slave command - support for method name tab completion using OpenRPC discovery or file input Documentation diff --git a/json-rpc-shell.adoc b/json-rpc-shell.adoc index 1806c3e..988a701 100644 --- a/json-rpc-shell.adoc +++ b/json-rpc-shell.adoc @@ -10,7 +10,7 @@ json-rpc-shell - a shell for JSON-RPC 2.0 Synopsis -------- -*json-rpc-shell* [_OPTION_]... _ENDPOINT_ +*json-rpc-shell* [_OPTION_]... { _ENDPOINT_ | _COMMAND_ [_ARG_]... } Description ----------- @@ -80,6 +80,11 @@ Protocol Call "rpc.discover" upon start-up in order to pull in OpenRPC data for tab completion of method names. If a path is given, it is read from a file. +*-e*, *--execute*:: + Rather than an _ENDPOINT_, accept a command line to execute and communicate + with using the JSON-RPC 2.0 protocol variation used in the Language Server + Protocol. + Program information ~~~~~~~~~~~~~~~~~~~ *-h*, *--help*:: diff --git a/json-rpc-shell.c b/json-rpc-shell.c index b65c666..1333346 100644 --- a/json-rpc-shell.c +++ b/json-rpc-shell.c @@ -1087,6 +1087,10 @@ static struct app_context char *editor_filename; ///< File for input line editor struct str_map methods; ///< Methods detected via OpenRPC + bool awaiting; ///< Running a separate loop to wait? + struct error *await_error; ///< Error while waiting for event + struct str *await_response; ///< Buffer for a response to a message + struct config config; ///< Program configuration enum color_mode color_mode; ///< Colour output mode bool compact; ///< Whether to not pretty print @@ -1125,6 +1129,9 @@ struct backend_vtable const char *request, bool expect_content, struct str *buf, struct error **e); + /// See if the child belongs to the backend and process the signal + bool (*on_child) (struct backend *backend, pid_t pid, int status); + /// Do everything necessary to deal with ev_break(EVBREAK_ALL) void (*on_quit) (struct backend *backend); @@ -1132,6 +1139,73 @@ struct backend_vtable void (*destroy) (struct backend *backend); }; +// --- Asynchronous results ---------------------------------------------------- + +static bool +await (struct app_context *ctx, struct str *buf, struct error **e) +{ + hard_assert (!ctx->awaiting); + + // Run an event loop to retrieve the response + ctx->await_response = buf; + ctx->awaiting = true; + + ev_run (EV_DEFAULT_ 0); + + ctx->awaiting = false; + ctx->await_response = NULL; + + if (ctx->await_error) + { + error_propagate (e, ctx->await_error); + ctx->await_error = NULL; + return false; + } + return true; +} + +static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; } + +/// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup()) +static void +await_try_finish (struct app_context *ctx, const char *data, size_t len) +{ + // There is no buffer while connecting and after we obtain our result + if (data && !ctx->await_response) + { + char *s = iconv_xstrdup (ctx->term_from_utf8, + (char *) data, len + 1 /* null byte */, NULL); + // Does not affect JSON and ensures the message is printed out okay + cstr_transform (s, normalize_whitespace); + print_warning ("unexpected message received: %s", s); + free (s); + return; + } + + if (data && ctx->await_response) + { + str_append_data (ctx->await_response, data, len); + ctx->await_response = NULL; + } + + // Here we need to be very careful to not return from too many levels + if (ctx->awaiting) + ev_break (EV_DEFAULT_ EVBREAK_ONE); +} + +static void +await_try_cancel (struct app_context *ctx) +{ + if (!ctx->awaiting) + return; + + ctx->await_response = NULL; + if (!ctx->await_error) + error_set (&ctx->await_error, "unexpected connection close"); + + ev_break (EV_DEFAULT_ EVBREAK_ONE); +} + // --- Configuration ----------------------------------------------------------- static void on_config_attribute_change (struct config_item *item); @@ -2114,8 +2188,6 @@ backend_ws_on_control_frame return true; } -static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; } - /// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup()) static bool backend_ws_on_message (struct ws_context *self, @@ -2577,6 +2649,328 @@ backend_curl_new (struct app_context *ctx, const char *endpoint) return &self->super; } +// --- Co-process backend ------------------------------------------------------ + +struct co_context +{ + struct backend super; ///< Parent class + struct app_context *ctx; ///< Application context + + pid_t child; ///< The co-process or -1 + int socket; ///< Our end of the socketpair + int stderr_fd; ///< stderr read end + struct str stderr_buffer; ///< stderr buffer + + ev_io stderr_watcher; ///< stderr watcher + ev_io socket_watcher; ///< Socketpair watcher + + struct http_parserpp http; ///< HTTP parser + bool pending_fake_starter; ///< Start of message? +}; + +static int +backend_co_on_message_complete (http_parser *parser) +{ + struct http_parserpp *self = parser->data; + http_parser_pause (&self->parser, true); + return 0; +} + +// The LSP incorporates a very thin subset of RFC 822, and it so happens +// that we may simply reuse the full HTTP parser here, with a small hack. +static const http_parser_settings backend_co_http_settings = +{ + .on_header_field = http_parserpp_on_header_field, + .on_header_value = http_parserpp_on_header_value, + .on_headers_complete = http_parserpp_on_headers_complete, + + // TODO: check Content-Type early? + .on_message_begin = http_parserpp_on_message_begin, + .on_body = http_parserpp_on_body, + .on_message_complete = backend_co_on_message_complete, +}; + +static bool +backend_co_write_starter (struct co_context *self, struct error **e) +{ + // The default "Connection: keep-alive" maps well here. + // We cannot feed this line into the parser from within callbacks. + static const char starter[] = "POST / HTTP/1.1\r\n"; + http_parser_pause (&self->http.parser, false); + + size_t n_parsed = http_parser_execute (&self->http.parser, + &backend_co_http_settings, starter, sizeof starter - 1); + enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser); + if (n_parsed != sizeof starter - 1 || err != HPE_OK) + FAIL ("protocol failure: %s", http_errno_description (err)); + return true; +} + +static void +backend_co_process (struct co_context *self) +{ + // TODO: verify Content-Type in the headers, though tricky and optional + struct str *message = &self->http.message; + await_try_finish (self->ctx, message->str, message->len); +} + +static bool +backend_co_parse (struct co_context *self, const char *data, size_t len, + size_t *n_parsed, struct error **e) +{ + if (self->pending_fake_starter) + { + self->pending_fake_starter = false; + if (!backend_co_write_starter (self, e)) + return false; + } + + *n_parsed = http_parser_execute + (&self->http.parser, &backend_co_http_settings, data, len); + if (self->http.parser.upgrade) + FAIL ("protocol failure: %s", "unsupported upgrade attempt"); + + enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser); + if (err == HPE_PAUSED) + { + self->pending_fake_starter = true; + backend_co_process (self); + } + else if (err != HPE_OK) + FAIL ("protocol failure: %s", http_errno_description (err)); + return true; +} + +static bool +backend_co_on_data (struct co_context *self, const char *data, size_t len, + struct error **e) +{ + size_t n_parsed = 0; + while (backend_co_parse (self, data, len, &n_parsed, e)) + { + data += n_parsed; + if (!(len -= n_parsed)) + return true; + } + return false; +} + +static void +backend_co_on_socket_ready (EV_P_ ev_io *handle, int revents) +{ + (void) loop; + (void) revents; + + struct co_context *self = handle->data; + char buf[BUFSIZ]; + +restart: + // Try to read some data in a non-blocking manner + (void) set_blocking (handle->fd, false); + ssize_t n_read = read (handle->fd, buf, sizeof buf); + int errno_saved = errno; + (void) set_blocking (handle->fd, true); + errno = errno_saved; + + struct error *e = NULL; + if (n_read < 0) + { + if (errno == EAGAIN) + return; + if (errno == EINTR) + goto restart; + + print_error ("reading from the command failed: %s", strerror (errno)); + } + else if (!backend_co_on_data (self, buf, n_read, &e)) + { + print_error ("%s", e->message); + error_free (e); + } + else if (!n_read) + print_status ("the command has closed the connection"); + else + goto restart; + + ev_io_stop (EV_A_ handle); + // That would have no way of succeeding + await_try_cancel (self->ctx); +} + +static void +backend_co_on_err_ready (EV_P_ ev_io *handle, int revents) +{ + (void) revents; + + struct co_context *self = handle->data; + struct str *buf = &self->stderr_buffer; + enum socket_io_result result = socket_io_try_read (handle->fd, buf); + + char *p; + while ((p = strchr (buf->str, '\n'))) + { + *p = 0; + print_status ("stderr: %s", buf->str); + str_remove_slice (buf, 0, p - buf->str + 1); + } + + switch (result) + { + case SOCKET_IO_EOF: + print_debug ("the command has closed its stderr"); + break; + case SOCKET_IO_OK: + return; + case SOCKET_IO_ERROR: + print_warning ("cannot read stderr: %s", strerror (errno)); + } + + ev_io_stop (EV_A_ handle); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// TODO: do we want to go for synchronous writes, as with the WebSocket backend? +// We can postpone it for later. + +static bool +backend_co_make_call (struct backend *backend, + const char *request, bool expect_content, struct str *buf, struct error **e) +{ + struct co_context *self = (struct co_context *) backend; + struct str wrapped = str_make(); + str_append_printf (&wrapped, + "Content-Length: %zu\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + "\r\n%s", strlen (request), request); + + enum socket_io_result result = socket_io_try_write (self->socket, &wrapped); + if (result == SOCKET_IO_ERROR) + { + str_free (&wrapped); + ev_io_stop (EV_DEFAULT_ &self->socket_watcher); + FAIL ("writing to the command failed: %s", strerror (errno)); + } + else if (wrapped.len) + print_error ("internal error, partial write to command"); + + str_free (&wrapped); + return !expect_content || await (self->ctx, buf, e); +} + +static bool +backend_co_on_child (struct backend *backend, pid_t pid, int status) +{ + struct co_context *self = (struct co_context *) backend; + if (pid != self->child) + return false; + + if (WIFSTOPPED (status)) + print_warning ("the command has been stopped"); + else if (WIFCONTINUED (status)) + print_warning ("the command has been resumed"); + else + { + if (WIFEXITED (status)) + print_error ("the command has exited with status %d", + WEXITSTATUS (status)); + else + print_error ("the command has died from signal %d", + WTERMSIG (status)); + + self->child = -1; + // Wait for the file descriptor to close, it may still contain data + } + return true; +} + +static void +backend_co_destroy (struct backend *backend) +{ + struct co_context *self = (struct co_context *) backend; + str_free (&self->stderr_buffer); + http_parserpp_free (&self->http); + + if (self->socket != -1) + xclose (self->socket); + if (self->stderr_fd != -1) + xclose (self->stderr_fd); + if (self->child != -1) + (void) kill (self->child, SIGKILL); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static struct backend_vtable backend_co_vtable = +{ + .add_header = NULL, + .make_call = backend_co_make_call, + .on_child = backend_co_on_child, + .destroy = backend_co_destroy, +}; + +static struct backend * +backend_co_new (struct app_context *ctx, char **argv) +{ + struct co_context *self = xcalloc (1, sizeof *self); + self->super.vtable = &backend_co_vtable; + self->ctx = ctx; + + enum { OURS, THEIRS }; + + int pair[2] = { -1, -1 }, err[2] = { -1, -1 }; + if (socketpair (AF_UNIX, SOCK_STREAM, 0, pair)) + exit_fatal ("socketpair: %s", strerror (errno)); + if (pipe (err)) + exit_fatal ("pipe: %s", strerror (errno)); + + set_cloexec ((self->socket = pair[OURS])); + set_cloexec (pair[THEIRS]); + set_cloexec ((self->stderr_fd = err[OURS])); + set_cloexec (err[THEIRS]); + + // It runs in our own progress group, so it gets SIGINTed with us + switch ((self->child = fork ())) + { + case -1: + exit_fatal ("fork: %s", strerror (errno)); + case 0: + dup2 (pair[THEIRS], STDIN_FILENO); + dup2 (pair[THEIRS], STDOUT_FILENO); + dup2 (err[THEIRS], STDERR_FILENO); + + // Undo what we've done in init_watchers() + signal (SIGPIPE, SIG_DFL); + signal (SIGTTOU, SIG_DFL); + + execvp (argv[0], argv); + + // stderr has been redirected, this won't cause a SIGTTOU + print_error ("execv: %s", strerror (errno)); + _exit (EXIT_FAILURE); + default: + xclose (pair[THEIRS]); + xclose (err[THEIRS]); + } + + ev_io_init (&self->socket_watcher, + backend_co_on_socket_ready, self->socket, EV_READ); + self->socket_watcher.data = self; + ev_io_start (EV_DEFAULT_ &self->socket_watcher); + + set_blocking (self->stderr_fd, false); + self->stderr_buffer = str_make (); + + ev_io_init (&self->stderr_watcher, + backend_co_on_err_ready, self->stderr_fd, EV_READ); + self->stderr_watcher.data = self; + ev_io_start (EV_DEFAULT_ &self->stderr_watcher); + + http_parserpp_init (&self->http, HTTP_REQUEST); + self->pending_fake_starter = true; + return &self->super; +} + // --- JSON tokenizer ---------------------------------------------------------- // A dumb JSON tokenizer intended strictly just for syntax highlighting @@ -2877,6 +3271,8 @@ quit (struct app_context *ctx) { if (ctx->backend->vtable->on_quit) ctx->backend->vtable->on_quit (ctx->backend); + if (ctx->awaiting && !ctx->await_error) + error_set (&ctx->await_error, "aborted by user"); ev_break (EV_DEFAULT_ EVBREAK_ALL); ctx->input->vtable->hide (ctx->input); @@ -3076,6 +3472,14 @@ static struct error * json_rpc_call_raw (struct app_context *ctx, const char *method, json_t *id, json_t *params, struct str *buf) { + struct error *error = NULL; + if (ctx->awaiting) + { + // Only allow recursing once below, awaiting is not re-entrant + error_set (&error, "busy"); + return error; + } + json_t *request = json_object (); json_object_set_new (request, "jsonrpc", json_string ("2.0")); json_object_set_new (request, "method", json_string (method)); @@ -3088,11 +3492,9 @@ json_rpc_call_raw (struct app_context *ctx, maybe_print_verbose (ctx, ATTR_OUTGOING, req_utf8, -1); - struct error *error = NULL; ctx->backend->vtable->make_call (ctx->backend, req_utf8, id != NULL /* expect_content */, buf, &error); free (req_utf8); - if (error) return error; @@ -3460,6 +3862,11 @@ on_child (EV_P_ ev_child *handle, int revents) (void) revents; struct app_context *ctx = ev_userdata (loop); + if (ctx->backend->vtable->on_child + && ctx->backend->vtable->on_child (ctx->backend, + handle->rpid, handle->rstatus)) + return; + // I am not a shell, stopping not allowed int status = handle->rstatus; if (WIFSTOPPED (status) @@ -3570,6 +3977,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv, { 'c', "compact-output", NULL, 0, "do not pretty-print responses" }, { 'C', "color", "WHEN", OPT_LONG_ONLY, "colorize output: never, always, or auto" }, + { 'e', "execute", NULL, 0, "launch a command to act as a server" }, { 'n', "null-as-id", NULL, 0, "JSON null is used as an `id'" }, { 'o', "origin", "O", 0, "set the HTTP Origin header" }, // So far you have to explicitly enable this rather than disable @@ -3587,9 +3995,10 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv, }; struct opt_handler oh = opt_handler_make (argc, argv, opts, - "ENDPOINT", "A shell for JSON-RPC 2.0."); + "{ ENDPOINT | COMMAND [ARG]... }", "A shell for JSON-RPC 2.0."); int c; + bool run_command = false; while ((c = opt_handler_get (&oh)) != -1) switch (c) { @@ -3606,6 +4015,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv, case 'o': *origin = optarg; break; case 'O': *openrpc = optarg ? optarg : ""; break; + case 'e': run_command = true; break; case 'n': ctx->null_as_id = true; break; case 'c': ctx->compact = true; break; case 't': ctx->trust_all = true; break; @@ -3637,19 +4047,29 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv, argc -= optind; argv += optind; - if (argc != 1) + if (run_command && argc >= 1) + *endpoint = NULL; + else if (argc == 1) + *endpoint = argv[0]; + else { opt_handler_usage (&oh, stderr); exit (EXIT_FAILURE); } - - *endpoint = argv[0]; opt_handler_free (&oh); } static void -init_backend (struct app_context *ctx, const char *origin, const char *endpoint) +init_backend (struct app_context *ctx, const char *origin, const char *endpoint, + char **argv) { + if (!endpoint) + { + // There is no point in passing the Origin to a co-process + ctx->backend = backend_co_new (ctx, argv); + return; + } + struct http_parser_url url; if (http_parser_parse_url (endpoint, strlen (endpoint), false, &url)) exit_fatal ("invalid endpoint address"); @@ -3688,6 +4108,8 @@ main (int argc, char *argv[]) const char *origin = NULL, *endpoint = NULL, *openrpc = NULL; parse_program_arguments (&g_ctx, argc, argv, &origin, &endpoint, &openrpc); + argc -= optind; + argv += optind; g_ctx.input = input_new (); g_ctx.input->user_data = &g_ctx; @@ -3698,7 +4120,7 @@ main (int argc, char *argv[]) g_ctx.methods = str_map_make (NULL); init_colors (&g_ctx); load_configuration (&g_ctx); - init_backend (&g_ctx, origin, endpoint); + init_backend (&g_ctx, origin, endpoint, argv); // We only need to convert to and from the terminal encoding setlocale (LC_CTYPE, ""); -- cgit v1.2.3-70-g09d2