aboutsummaryrefslogtreecommitdiff
path: root/json-rpc-shell.c
diff options
context:
space:
mode:
Diffstat (limited to 'json-rpc-shell.c')
-rw-r--r--json-rpc-shell.c442
1 files changed, 432 insertions, 10 deletions
diff --git a/json-rpc-shell.c b/json-rpc-shell.c
index b65c666..1333346 100644
--- a/json-rpc-shell.c
+++ b/json-rpc-shell.c
@@ -1087,6 +1087,10 @@ static struct app_context
char *editor_filename; ///< File for input line editor
struct str_map methods; ///< Methods detected via OpenRPC
+ bool awaiting; ///< Running a separate loop to wait?
+ struct error *await_error; ///< Error while waiting for event
+ struct str *await_response; ///< Buffer for a response to a message
+
struct config config; ///< Program configuration
enum color_mode color_mode; ///< Colour output mode
bool compact; ///< Whether to not pretty print
@@ -1125,6 +1129,9 @@ struct backend_vtable
const char *request, bool expect_content,
struct str *buf, struct error **e);
+ /// See if the child belongs to the backend and process the signal
+ bool (*on_child) (struct backend *backend, pid_t pid, int status);
+
/// Do everything necessary to deal with ev_break(EVBREAK_ALL)
void (*on_quit) (struct backend *backend);
@@ -1132,6 +1139,73 @@ struct backend_vtable
void (*destroy) (struct backend *backend);
};
+// --- Asynchronous results ----------------------------------------------------
+
+static bool
+await (struct app_context *ctx, struct str *buf, struct error **e)
+{
+ hard_assert (!ctx->awaiting);
+
+ // Run an event loop to retrieve the response
+ ctx->await_response = buf;
+ ctx->awaiting = true;
+
+ ev_run (EV_DEFAULT_ 0);
+
+ ctx->awaiting = false;
+ ctx->await_response = NULL;
+
+ if (ctx->await_error)
+ {
+ error_propagate (e, ctx->await_error);
+ ctx->await_error = NULL;
+ return false;
+ }
+ return true;
+}
+
+static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; }
+
+/// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup())
+static void
+await_try_finish (struct app_context *ctx, const char *data, size_t len)
+{
+ // There is no buffer while connecting and after we obtain our result
+ if (data && !ctx->await_response)
+ {
+ char *s = iconv_xstrdup (ctx->term_from_utf8,
+ (char *) data, len + 1 /* null byte */, NULL);
+ // Does not affect JSON and ensures the message is printed out okay
+ cstr_transform (s, normalize_whitespace);
+ print_warning ("unexpected message received: %s", s);
+ free (s);
+ return;
+ }
+
+ if (data && ctx->await_response)
+ {
+ str_append_data (ctx->await_response, data, len);
+ ctx->await_response = NULL;
+ }
+
+ // Here we need to be very careful to not return from too many levels
+ if (ctx->awaiting)
+ ev_break (EV_DEFAULT_ EVBREAK_ONE);
+}
+
+static void
+await_try_cancel (struct app_context *ctx)
+{
+ if (!ctx->awaiting)
+ return;
+
+ ctx->await_response = NULL;
+ if (!ctx->await_error)
+ error_set (&ctx->await_error, "unexpected connection close");
+
+ ev_break (EV_DEFAULT_ EVBREAK_ONE);
+}
+
// --- Configuration -----------------------------------------------------------
static void on_config_attribute_change (struct config_item *item);
@@ -2114,8 +2188,6 @@ backend_ws_on_control_frame
return true;
}
-static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; }
-
/// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup())
static bool
backend_ws_on_message (struct ws_context *self,
@@ -2577,6 +2649,328 @@ backend_curl_new (struct app_context *ctx, const char *endpoint)
return &self->super;
}
+// --- Co-process backend ------------------------------------------------------
+
+struct co_context
+{
+ struct backend super; ///< Parent class
+ struct app_context *ctx; ///< Application context
+
+ pid_t child; ///< The co-process or -1
+ int socket; ///< Our end of the socketpair
+ int stderr_fd; ///< stderr read end
+ struct str stderr_buffer; ///< stderr buffer
+
+ ev_io stderr_watcher; ///< stderr watcher
+ ev_io socket_watcher; ///< Socketpair watcher
+
+ struct http_parserpp http; ///< HTTP parser
+ bool pending_fake_starter; ///< Start of message?
+};
+
+static int
+backend_co_on_message_complete (http_parser *parser)
+{
+ struct http_parserpp *self = parser->data;
+ http_parser_pause (&self->parser, true);
+ return 0;
+}
+
+// The LSP incorporates a very thin subset of RFC 822, and it so happens
+// that we may simply reuse the full HTTP parser here, with a small hack.
+static const http_parser_settings backend_co_http_settings =
+{
+ .on_header_field = http_parserpp_on_header_field,
+ .on_header_value = http_parserpp_on_header_value,
+ .on_headers_complete = http_parserpp_on_headers_complete,
+
+ // TODO: check Content-Type early?
+ .on_message_begin = http_parserpp_on_message_begin,
+ .on_body = http_parserpp_on_body,
+ .on_message_complete = backend_co_on_message_complete,
+};
+
+static bool
+backend_co_write_starter (struct co_context *self, struct error **e)
+{
+ // The default "Connection: keep-alive" maps well here.
+ // We cannot feed this line into the parser from within callbacks.
+ static const char starter[] = "POST / HTTP/1.1\r\n";
+ http_parser_pause (&self->http.parser, false);
+
+ size_t n_parsed = http_parser_execute (&self->http.parser,
+ &backend_co_http_settings, starter, sizeof starter - 1);
+ enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser);
+ if (n_parsed != sizeof starter - 1 || err != HPE_OK)
+ FAIL ("protocol failure: %s", http_errno_description (err));
+ return true;
+}
+
+static void
+backend_co_process (struct co_context *self)
+{
+ // TODO: verify Content-Type in the headers, though tricky and optional
+ struct str *message = &self->http.message;
+ await_try_finish (self->ctx, message->str, message->len);
+}
+
+static bool
+backend_co_parse (struct co_context *self, const char *data, size_t len,
+ size_t *n_parsed, struct error **e)
+{
+ if (self->pending_fake_starter)
+ {
+ self->pending_fake_starter = false;
+ if (!backend_co_write_starter (self, e))
+ return false;
+ }
+
+ *n_parsed = http_parser_execute
+ (&self->http.parser, &backend_co_http_settings, data, len);
+ if (self->http.parser.upgrade)
+ FAIL ("protocol failure: %s", "unsupported upgrade attempt");
+
+ enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser);
+ if (err == HPE_PAUSED)
+ {
+ self->pending_fake_starter = true;
+ backend_co_process (self);
+ }
+ else if (err != HPE_OK)
+ FAIL ("protocol failure: %s", http_errno_description (err));
+ return true;
+}
+
+static bool
+backend_co_on_data (struct co_context *self, const char *data, size_t len,
+ struct error **e)
+{
+ size_t n_parsed = 0;
+ while (backend_co_parse (self, data, len, &n_parsed, e))
+ {
+ data += n_parsed;
+ if (!(len -= n_parsed))
+ return true;
+ }
+ return false;
+}
+
+static void
+backend_co_on_socket_ready (EV_P_ ev_io *handle, int revents)
+{
+ (void) loop;
+ (void) revents;
+
+ struct co_context *self = handle->data;
+ char buf[BUFSIZ];
+
+restart:
+ // Try to read some data in a non-blocking manner
+ (void) set_blocking (handle->fd, false);
+ ssize_t n_read = read (handle->fd, buf, sizeof buf);
+ int errno_saved = errno;
+ (void) set_blocking (handle->fd, true);
+ errno = errno_saved;
+
+ struct error *e = NULL;
+ if (n_read < 0)
+ {
+ if (errno == EAGAIN)
+ return;
+ if (errno == EINTR)
+ goto restart;
+
+ print_error ("reading from the command failed: %s", strerror (errno));
+ }
+ else if (!backend_co_on_data (self, buf, n_read, &e))
+ {
+ print_error ("%s", e->message);
+ error_free (e);
+ }
+ else if (!n_read)
+ print_status ("the command has closed the connection");
+ else
+ goto restart;
+
+ ev_io_stop (EV_A_ handle);
+ // That would have no way of succeeding
+ await_try_cancel (self->ctx);
+}
+
+static void
+backend_co_on_err_ready (EV_P_ ev_io *handle, int revents)
+{
+ (void) revents;
+
+ struct co_context *self = handle->data;
+ struct str *buf = &self->stderr_buffer;
+ enum socket_io_result result = socket_io_try_read (handle->fd, buf);
+
+ char *p;
+ while ((p = strchr (buf->str, '\n')))
+ {
+ *p = 0;
+ print_status ("stderr: %s", buf->str);
+ str_remove_slice (buf, 0, p - buf->str + 1);
+ }
+
+ switch (result)
+ {
+ case SOCKET_IO_EOF:
+ print_debug ("the command has closed its stderr");
+ break;
+ case SOCKET_IO_OK:
+ return;
+ case SOCKET_IO_ERROR:
+ print_warning ("cannot read stderr: %s", strerror (errno));
+ }
+
+ ev_io_stop (EV_A_ handle);
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+// TODO: do we want to go for synchronous writes, as with the WebSocket backend?
+// We can postpone it for later.
+
+static bool
+backend_co_make_call (struct backend *backend,
+ const char *request, bool expect_content, struct str *buf, struct error **e)
+{
+ struct co_context *self = (struct co_context *) backend;
+ struct str wrapped = str_make();
+ str_append_printf (&wrapped,
+ "Content-Length: %zu\r\n"
+ "Content-Type: application/json; charset=utf-8\r\n"
+ "\r\n%s", strlen (request), request);
+
+ enum socket_io_result result = socket_io_try_write (self->socket, &wrapped);
+ if (result == SOCKET_IO_ERROR)
+ {
+ str_free (&wrapped);
+ ev_io_stop (EV_DEFAULT_ &self->socket_watcher);
+ FAIL ("writing to the command failed: %s", strerror (errno));
+ }
+ else if (wrapped.len)
+ print_error ("internal error, partial write to command");
+
+ str_free (&wrapped);
+ return !expect_content || await (self->ctx, buf, e);
+}
+
+static bool
+backend_co_on_child (struct backend *backend, pid_t pid, int status)
+{
+ struct co_context *self = (struct co_context *) backend;
+ if (pid != self->child)
+ return false;
+
+ if (WIFSTOPPED (status))
+ print_warning ("the command has been stopped");
+ else if (WIFCONTINUED (status))
+ print_warning ("the command has been resumed");
+ else
+ {
+ if (WIFEXITED (status))
+ print_error ("the command has exited with status %d",
+ WEXITSTATUS (status));
+ else
+ print_error ("the command has died from signal %d",
+ WTERMSIG (status));
+
+ self->child = -1;
+ // Wait for the file descriptor to close, it may still contain data
+ }
+ return true;
+}
+
+static void
+backend_co_destroy (struct backend *backend)
+{
+ struct co_context *self = (struct co_context *) backend;
+ str_free (&self->stderr_buffer);
+ http_parserpp_free (&self->http);
+
+ if (self->socket != -1)
+ xclose (self->socket);
+ if (self->stderr_fd != -1)
+ xclose (self->stderr_fd);
+ if (self->child != -1)
+ (void) kill (self->child, SIGKILL);
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static struct backend_vtable backend_co_vtable =
+{
+ .add_header = NULL,
+ .make_call = backend_co_make_call,
+ .on_child = backend_co_on_child,
+ .destroy = backend_co_destroy,
+};
+
+static struct backend *
+backend_co_new (struct app_context *ctx, char **argv)
+{
+ struct co_context *self = xcalloc (1, sizeof *self);
+ self->super.vtable = &backend_co_vtable;
+ self->ctx = ctx;
+
+ enum { OURS, THEIRS };
+
+ int pair[2] = { -1, -1 }, err[2] = { -1, -1 };
+ if (socketpair (AF_UNIX, SOCK_STREAM, 0, pair))
+ exit_fatal ("socketpair: %s", strerror (errno));
+ if (pipe (err))
+ exit_fatal ("pipe: %s", strerror (errno));
+
+ set_cloexec ((self->socket = pair[OURS]));
+ set_cloexec (pair[THEIRS]);
+ set_cloexec ((self->stderr_fd = err[OURS]));
+ set_cloexec (err[THEIRS]);
+
+ // It runs in our own progress group, so it gets SIGINTed with us
+ switch ((self->child = fork ()))
+ {
+ case -1:
+ exit_fatal ("fork: %s", strerror (errno));
+ case 0:
+ dup2 (pair[THEIRS], STDIN_FILENO);
+ dup2 (pair[THEIRS], STDOUT_FILENO);
+ dup2 (err[THEIRS], STDERR_FILENO);
+
+ // Undo what we've done in init_watchers()
+ signal (SIGPIPE, SIG_DFL);
+ signal (SIGTTOU, SIG_DFL);
+
+ execvp (argv[0], argv);
+
+ // stderr has been redirected, this won't cause a SIGTTOU
+ print_error ("execv: %s", strerror (errno));
+ _exit (EXIT_FAILURE);
+ default:
+ xclose (pair[THEIRS]);
+ xclose (err[THEIRS]);
+ }
+
+ ev_io_init (&self->socket_watcher,
+ backend_co_on_socket_ready, self->socket, EV_READ);
+ self->socket_watcher.data = self;
+ ev_io_start (EV_DEFAULT_ &self->socket_watcher);
+
+ set_blocking (self->stderr_fd, false);
+ self->stderr_buffer = str_make ();
+
+ ev_io_init (&self->stderr_watcher,
+ backend_co_on_err_ready, self->stderr_fd, EV_READ);
+ self->stderr_watcher.data = self;
+ ev_io_start (EV_DEFAULT_ &self->stderr_watcher);
+
+ http_parserpp_init (&self->http, HTTP_REQUEST);
+ self->pending_fake_starter = true;
+ return &self->super;
+}
+
// --- JSON tokenizer ----------------------------------------------------------
// A dumb JSON tokenizer intended strictly just for syntax highlighting
@@ -2877,6 +3271,8 @@ quit (struct app_context *ctx)
{
if (ctx->backend->vtable->on_quit)
ctx->backend->vtable->on_quit (ctx->backend);
+ if (ctx->awaiting && !ctx->await_error)
+ error_set (&ctx->await_error, "aborted by user");
ev_break (EV_DEFAULT_ EVBREAK_ALL);
ctx->input->vtable->hide (ctx->input);
@@ -3076,6 +3472,14 @@ static struct error *
json_rpc_call_raw (struct app_context *ctx,
const char *method, json_t *id, json_t *params, struct str *buf)
{
+ struct error *error = NULL;
+ if (ctx->awaiting)
+ {
+ // Only allow recursing once below, awaiting is not re-entrant
+ error_set (&error, "busy");
+ return error;
+ }
+
json_t *request = json_object ();
json_object_set_new (request, "jsonrpc", json_string ("2.0"));
json_object_set_new (request, "method", json_string (method));
@@ -3088,11 +3492,9 @@ json_rpc_call_raw (struct app_context *ctx,
maybe_print_verbose (ctx, ATTR_OUTGOING, req_utf8, -1);
- struct error *error = NULL;
ctx->backend->vtable->make_call (ctx->backend, req_utf8,
id != NULL /* expect_content */, buf, &error);
free (req_utf8);
-
if (error)
return error;
@@ -3460,6 +3862,11 @@ on_child (EV_P_ ev_child *handle, int revents)
(void) revents;
struct app_context *ctx = ev_userdata (loop);
+ if (ctx->backend->vtable->on_child
+ && ctx->backend->vtable->on_child (ctx->backend,
+ handle->rpid, handle->rstatus))
+ return;
+
// I am not a shell, stopping not allowed
int status = handle->rstatus;
if (WIFSTOPPED (status)
@@ -3570,6 +3977,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
{ 'c', "compact-output", NULL, 0, "do not pretty-print responses" },
{ 'C', "color", "WHEN", OPT_LONG_ONLY,
"colorize output: never, always, or auto" },
+ { 'e', "execute", NULL, 0, "launch a command to act as a server" },
{ 'n', "null-as-id", NULL, 0, "JSON null is used as an `id'" },
{ 'o', "origin", "O", 0, "set the HTTP Origin header" },
// So far you have to explicitly enable this rather than disable
@@ -3587,9 +3995,10 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
};
struct opt_handler oh = opt_handler_make (argc, argv, opts,
- "ENDPOINT", "A shell for JSON-RPC 2.0.");
+ "{ ENDPOINT | COMMAND [ARG]... }", "A shell for JSON-RPC 2.0.");
int c;
+ bool run_command = false;
while ((c = opt_handler_get (&oh)) != -1)
switch (c)
{
@@ -3606,6 +4015,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
case 'o': *origin = optarg; break;
case 'O': *openrpc = optarg ? optarg : ""; break;
+ case 'e': run_command = true; break;
case 'n': ctx->null_as_id = true; break;
case 'c': ctx->compact = true; break;
case 't': ctx->trust_all = true; break;
@@ -3637,19 +4047,29 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
argc -= optind;
argv += optind;
- if (argc != 1)
+ if (run_command && argc >= 1)
+ *endpoint = NULL;
+ else if (argc == 1)
+ *endpoint = argv[0];
+ else
{
opt_handler_usage (&oh, stderr);
exit (EXIT_FAILURE);
}
-
- *endpoint = argv[0];
opt_handler_free (&oh);
}
static void
-init_backend (struct app_context *ctx, const char *origin, const char *endpoint)
+init_backend (struct app_context *ctx, const char *origin, const char *endpoint,
+ char **argv)
{
+ if (!endpoint)
+ {
+ // There is no point in passing the Origin to a co-process
+ ctx->backend = backend_co_new (ctx, argv);
+ return;
+ }
+
struct http_parser_url url;
if (http_parser_parse_url (endpoint, strlen (endpoint), false, &url))
exit_fatal ("invalid endpoint address");
@@ -3688,6 +4108,8 @@ main (int argc, char *argv[])
const char *origin = NULL, *endpoint = NULL, *openrpc = NULL;
parse_program_arguments (&g_ctx, argc, argv, &origin, &endpoint, &openrpc);
+ argc -= optind;
+ argv += optind;
g_ctx.input = input_new ();
g_ctx.input->user_data = &g_ctx;
@@ -3698,7 +4120,7 @@ main (int argc, char *argv[])
g_ctx.methods = str_map_make (NULL);
init_colors (&g_ctx);
load_configuration (&g_ctx);
- init_backend (&g_ctx, origin, endpoint);
+ init_backend (&g_ctx, origin, endpoint, argv);
// We only need to convert to and from the terminal encoding
setlocale (LC_CTYPE, "");