aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.adoc4
-rw-r--r--json-rpc-shell.c4
-rw-r--r--json-rpc-test-server.c178
3 files changed, 179 insertions, 7 deletions
diff --git a/README.adoc b/README.adoc
index 67d009f..2d390d5 100644
--- a/README.adoc
+++ b/README.adoc
@@ -65,8 +65,8 @@ Test server
-----------
If you install development packages for libmagic, an included test server will
be built but not installed which provides a trivial JSON-RPC 2.0 service with
-FastCGI, SCGI, and WebSocket interfaces. It responds to `ping` and `date`
-methods and it can serve static files.
+FastCGI, SCGI, WebSocket and LSP-like co-process interfaces. It responds to
+`ping` and `date`, supports OpenRPC discovery and it can serve static files.
Contributing and Support
------------------------
diff --git a/json-rpc-shell.c b/json-rpc-shell.c
index 1da11b0..38ff022 100644
--- a/json-rpc-shell.c
+++ b/json-rpc-shell.c
@@ -2623,7 +2623,7 @@ static const http_parser_settings backend_co_http_settings =
};
static bool
-backend_co_write_starter (struct co_context *self, struct error **e)
+backend_co_inject_starter (struct co_context *self, struct error **e)
{
// The default "Connection: keep-alive" maps well here.
// We cannot feed this line into the parser from within callbacks.
@@ -2653,7 +2653,7 @@ backend_co_parse (struct co_context *self, const char *data, size_t len,
if (self->pending_fake_starter)
{
self->pending_fake_starter = false;
- if (!backend_co_write_starter (self, e))
+ if (!backend_co_inject_starter (self, e))
return false;
}
diff --git a/json-rpc-test-server.c b/json-rpc-test-server.c
index 0cfd161..36950d8 100644
--- a/json-rpc-test-server.c
+++ b/json-rpc-test-server.c
@@ -1581,7 +1581,6 @@ static void
process_json_rpc (struct server_context *ctx,
const void *data, size_t len, struct str *output)
{
-
json_error_t e;
json_t *request;
if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e)))
@@ -2551,6 +2550,165 @@ client_ws_create (EV_P_ int sock_fd)
return &self->client;
}
+// --- Co-process client -------------------------------------------------------
+
+// This is mostly copied over from json-rpc-shell.c, only a bit simplified.
+// We're giving up on header parsing in order to keep this small.
+struct co_context
+{
+ struct server_context *ctx; ///< Server context
+ struct str message; ///< Message data
+ struct http_parser parser; ///< HTTP parser
+ bool pending_fake_starter; ///< Start of message?
+};
+
+static int
+client_co_on_message_begin (http_parser *parser)
+{
+ struct co_context *self = parser->data;
+ str_reset (&self->message);
+ return 0;
+}
+
+static int
+client_co_on_body (http_parser *parser, const char *at, size_t len)
+{
+ struct co_context *self = parser->data;
+ str_append_data (&self->message, at, len);
+ return 0;
+}
+
+static int
+client_co_on_message_complete (http_parser *parser)
+{
+ struct co_context *self = parser->data;
+ http_parser_pause (&self->parser, true);
+ return 0;
+}
+
+// The LSP incorporates a very thin subset of RFC 822, and it so happens
+// that we may simply reuse the full HTTP parser here, with a small hack.
+static const http_parser_settings client_co_http_settings =
+{
+ .on_message_begin = client_co_on_message_begin,
+ .on_body = client_co_on_body,
+ .on_message_complete = client_co_on_message_complete,
+};
+
+static void
+client_co_respond (const struct str *buf)
+{
+ struct str wrapped = str_make();
+ str_append_printf (&wrapped,
+ "Content-Length: %zu\r\n"
+ "Content-Type: application/json; charset=utf-8\r\n"
+ "\r\n", buf->len);
+ str_append_data (&wrapped, buf->str, buf->len);
+
+ if (write (STDOUT_FILENO, wrapped.str, wrapped.len)
+ != (ssize_t) wrapped.len)
+ exit_fatal ("write: %s", strerror (errno));
+ str_free (&wrapped);
+}
+
+static void
+client_co_inject_starter (struct co_context *self)
+{
+ // The default "Connection: keep-alive" maps well here.
+ // We cannot feed this line into the parser from within callbacks.
+ static const char starter[] = "POST / HTTP/1.1\r\n";
+ http_parser_pause (&self->parser, false);
+
+ size_t n_parsed = http_parser_execute (&self->parser,
+ &client_co_http_settings, starter, sizeof starter - 1);
+ enum http_errno err = HTTP_PARSER_ERRNO (&self->parser);
+ if (n_parsed != sizeof starter - 1 || err != HPE_OK)
+ exit_fatal ("protocol failure: %s", http_errno_description (err));
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static void
+client_co_process (struct co_context *self)
+{
+ struct str *message = &self->message;
+ struct str response = str_make ();
+ process_json_rpc (self->ctx, message->str, message->len, &response);
+ if (response.len)
+ client_co_respond (&response);
+ str_free (&response);
+}
+
+static void
+backend_co_parse (struct co_context *self, const char *data, size_t len,
+ size_t *n_parsed)
+{
+ if (self->pending_fake_starter)
+ {
+ self->pending_fake_starter = false;
+ client_co_inject_starter (self);
+ }
+
+ *n_parsed = http_parser_execute
+ (&self->parser, &client_co_http_settings, data, len);
+ if (self->parser.upgrade)
+ exit_fatal ("protocol failure: %s", "unsupported upgrade attempt");
+
+ enum http_errno err = HTTP_PARSER_ERRNO (&self->parser);
+ if (err == HPE_PAUSED)
+ {
+ self->pending_fake_starter = true;
+ client_co_process (self);
+ }
+ else if (err != HPE_OK)
+ exit_fatal ("protocol failure: %s", http_errno_description (err));
+}
+
+static void
+backend_co_on_data (struct co_context *self, const char *data, size_t len)
+{
+ size_t n_parsed = 0;
+ do
+ {
+ backend_co_parse (self, data, len, &n_parsed);
+ data += n_parsed;
+ }
+ while ((len -= n_parsed));
+}
+
+static void
+client_co_run (struct server_context *ctx)
+{
+ struct co_context self = {};
+ self.ctx = ctx;
+ self.message = str_make ();
+ http_parser_init (&self.parser, HTTP_REQUEST);
+ self.parser.data = &self;
+ self.pending_fake_starter = true;
+
+ hard_assert (set_blocking (STDIN_FILENO, false));
+ struct str buf = str_make ();
+ struct pollfd pfd = { .fd = STDIN_FILENO, .events = POLLIN };
+ while (true)
+ {
+ if (poll (&pfd, 1, -1) <= 0)
+ exit_fatal ("poll: %s", strerror (errno));
+
+ str_remove_slice (&buf, 0, buf.len);
+ enum socket_io_result result = socket_io_try_read (pfd.fd, &buf);
+ int errno_saved = errno;
+
+ if (buf.len)
+ backend_co_on_data (&self, buf.str, buf.len);
+ if (result == SOCKET_IO_ERROR)
+ exit_fatal ("read: %s", strerror (errno_saved));
+ if (result == SOCKET_IO_EOF)
+ break;
+ }
+ str_free (&buf);
+ str_free (&self.message);
+}
+
// --- Basic server stuff ------------------------------------------------------
typedef struct client *(*client_create_fn) (EV_P_ int sock_fd);
@@ -2914,11 +3072,12 @@ daemonize (struct server_context *ctx)
}
static void
-parse_program_arguments (int argc, char **argv)
+parse_program_arguments (int argc, char **argv, bool *running_as_slave)
{
static const struct opt opts[] =
{
{ 't', "test", NULL, 0, "self-test" },
+ { 's', "slave", NULL, 0, "co-process mode" },
{ 'd', "debug", NULL, 0, "run in debug mode" },
{ 'h', "help", NULL, 0, "display this help and exit" },
{ 'V', "version", NULL, 0, "output version information and exit" },
@@ -2938,6 +3097,9 @@ parse_program_arguments (int argc, char **argv)
case 't':
test_main (argc, argv);
exit (EXIT_SUCCESS);
+ case 's':
+ *running_as_slave = true;
+ break;
case 'd':
g_debug_mode = true;
break;
@@ -2970,7 +3132,8 @@ parse_program_arguments (int argc, char **argv)
int
main (int argc, char *argv[])
{
- parse_program_arguments (argc, argv);
+ bool running_as_a_slave = false;
+ parse_program_arguments (argc, argv, &running_as_a_slave);
print_status (PROGRAM_NAME " " PROGRAM_VERSION " starting");
@@ -2985,6 +3148,15 @@ main (int argc, char *argv[])
exit (EXIT_FAILURE);
}
+ // There's a lot of unnecessary left-over scaffolding in this program,
+ // for testing purposes assume that everything is synchronous
+ if (running_as_a_slave)
+ {
+ client_co_run (&ctx);
+ server_context_free (&ctx);
+ return EXIT_SUCCESS;
+ }
+
struct ev_loop *loop;
if (!(loop = EV_DEFAULT))
exit_fatal ("libev initialization failed");