aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p.janouch@gmail.com>2015-03-02 23:11:29 +0100
committerPřemysl Janouch <p.janouch@gmail.com>2015-03-02 23:11:29 +0100
commita54230bddba09f68f4d7fda23f44962050df7a07 (patch)
tree3a633ddde5a0480ba334242a092faa4dc2d56755
parent8a3241d5c46e8b8fac382f606147285e314f1f6b (diff)
downloadjson-rpc-shell-a54230bddba09f68f4d7fda23f44962050df7a07.tar.gz
json-rpc-shell-a54230bddba09f68f4d7fda23f44962050df7a07.tar.xz
json-rpc-shell-a54230bddba09f68f4d7fda23f44962050df7a07.zip
Steady progress
I'm trying to figure out everything at once, i.e. the entire structure of the application from top to bottom, trying to converge on a workable design while refactoring still doesn't hurt as much as it would once it's established.
-rw-r--r--demo-json-rpc-server.c404
1 files changed, 284 insertions, 120 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index cbce78e..8645ae3 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -18,8 +18,6 @@
*
*/
-#define LIBERTY_WANT_SSL
-
#define print_fatal_data ((void *) LOG_ERR)
#define print_error_data ((void *) LOG_ERR)
#define print_warning_data ((void *) LOG_WARNING)
@@ -68,6 +66,61 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value)
#undef UNPACKER_INT_BEGIN
+// --- libev helpers -----------------------------------------------------------
+
+static bool
+read_loop (EV_P_ ev_io *watcher,
+ bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
+{
+ char buf[8192];
+ while (true)
+ {
+ ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
+ if (n_read < 0)
+ {
+ if (errno == EAGAIN)
+ break;
+ if (errno == EINTR)
+ continue;
+ }
+ // The callback is called on EOF as well
+ if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read))
+ return false;
+ if (!n_read)
+ return false;
+ }
+ return true;
+}
+
+static bool
+flush_queue (write_queue_t *queue, ev_io *watcher)
+{
+ struct iovec vec[queue->len], *vec_iter = vec;
+ for (write_req_t *iter = queue->head; iter; iter = iter->next)
+ *vec_iter++ = iter->data;
+
+ ssize_t written;
+again:
+ written = writev (watcher->fd, vec, N_ELEMENTS (vec));
+ if (written < 0)
+ {
+ if (errno == EAGAIN)
+ goto skip;
+ if (errno == EINTR)
+ goto again;
+ return false;
+ }
+
+ write_queue_processed (queue, written);
+
+skip:
+ if (write_queue_is_empty (queue))
+ ev_io_stop (EV_DEFAULT_ watcher);
+ else
+ ev_io_start (EV_DEFAULT_ watcher);
+ return true;
+}
+
// --- Logging -----------------------------------------------------------------
static void
@@ -88,16 +141,6 @@ log_message_syslog (void *user_data, const char *quote, const char *fmt,
syslog (prio, "%s%s", quote, buf);
}
-// --- Configuration (application-specific) ------------------------------------
-
-static struct config_item g_config_table[] =
-{
- { "bind_host", NULL, "Address of the server" },
- { "port_fastcgi", "9000", "Port to bind for FastCGI" },
- { "port_scgi", NULL, "Port to bind for SCGI" },
- { NULL, NULL, NULL }
-};
-
// --- FastCGI -----------------------------------------------------------------
// Constants from the FastCGI specification document
@@ -472,9 +515,21 @@ scgi_parser_free (struct scgi_parser *self)
static bool
scgi_parser_push (struct scgi_parser *self,
- void *data, size_t len, struct error **e)
+ const void *data, size_t len, struct error **e)
{
- // This retarded netstring madness is even more complicated than FastCGI;
+ if (!len)
+ {
+ if (self->state != SCGI_READING_CONTENT)
+ {
+ error_set (e, "premature EOF");
+ return false;
+ }
+
+ // TODO: a "on_eof" callback?
+ return true;
+ }
+
+ // Notice that this madness is significantly harder to parse than FastCGI;
// this procedure could also be optimized significantly
str_append_data (&self->input, data, len);
@@ -526,6 +581,7 @@ scgi_parser_push (struct scgi_parser *self,
return false;
}
self->state = SCGI_READING_CONTENT;
+ // TODO: a "on_headers_read" callback?
}
else if (c != '\0')
str_append_c (&self->name, c);
@@ -565,16 +621,95 @@ scgi_parser_push (struct scgi_parser *self,
break;
}
case SCGI_READING_CONTENT:
- // TODO: I have no idea what to do with the contents
+ // TODO: a "on_content" callback?
return true;
break;
}
}
-// --- ? -----------------------------------------------------------------------
+// --- Server ------------------------------------------------------------------
+
+static struct config_item g_config_table[] =
+{
+ { "bind_host", NULL, "Address of the server" },
+ { "port_fastcgi", "9000", "Port to bind for FastCGI" },
+ { "port_scgi", NULL, "Port to bind for SCGI" },
+ { NULL, NULL, NULL }
+};
+
+struct server_context
+{
+ ev_signal sigterm_watcher; ///< Got SIGTERM
+ ev_signal sigint_watcher; ///< Got SIGINT
+ bool quitting; ///< User requested quitting
+
+ struct listener *listeners; ///< Listeners
+ size_t n_listeners; ///< Number of listening sockets
+
+ struct client *clients; ///< Clients
+ unsigned n_clients; ///< Current number of connections
+
+ struct str_map config; ///< Server configuration
+};
+
+static void
+server_context_init (struct server_context *self)
+{
+ memset (self, 0, sizeof *self);
+
+ str_map_init (&self->config);
+ load_config_defaults (&self->config, g_config_table);
+}
+
+static void
+server_context_free (struct server_context *self)
+{
+ // TODO: free the clients (?)
+ // TODO: close the listeners (?)
+
+ str_map_free (&self->config);
+}
+
+// --- JSON-RPC ----------------------------------------------------------------
+
+// TODO: this is where we're actually supposed to do JSON-RPC 2.0 processing
+
+// There's probably no reason to create an object for this.
+//
+// We probably just want a handler function that takes a JSON string, parses it,
+// and returns back another JSON string.
+//
+// Then there should be another function that takes a parsed JSON request and
+// returns back a JSON reply. This function may get called multiple times if
+// the user sends a batch request.
+
+// --- Requests ----------------------------------------------------------------
+
+// TODO: something to read in the headers and decide what to do with the request
+// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200
+
+#if 0
+// This doesn't necessarily have to be an object by itself either; we can have
+// a function that does/returns something based on the headers
+
+struct request
+{
+};
+
+static void
+request_init (struct request *self)
+{
+}
+
+static void
+request_free (struct request *self)
+{
+}
+#endif
+
+// --- Client communication handlers -------------------------------------------
-// TODO
struct client
{
LIST_HEADER (struct client)
@@ -582,137 +717,172 @@ struct client
struct server_context *ctx; ///< Server context
int socket_fd; ///< The TCP socket
- struct str read_buffer; ///< Unprocessed input
write_queue_t write_queue; ///< Write queue
ev_io read_watcher; ///< The socket can be read from
ev_io write_watcher; ///< The socket can be written to
+
+ struct client_impl *impl; ///< Client behaviour
+ void *impl_data; ///< Client behaviour data
+};
+
+struct client_impl
+{
+ /// Initialize the client as needed
+ void (*init) (struct client *client);
+
+ /// Do any additional cleanup
+ void (*destroy) (struct client *client);
+
+ /// Process incoming data; "len == 0" means EOF
+ bool (*on_data) (struct client *client, const void *data, size_t len);
};
static void
client_init (struct client *self)
{
memset (self, 0, sizeof *self);
- str_init (&self->read_buffer);
write_queue_init (&self->write_queue);
}
static void
client_free (struct client *self)
{
- str_free (&self->read_buffer);
write_queue_free (&self->write_queue);
}
-// --- ? -----------------------------------------------------------------------
+static void
+client_write (struct client *client, const void *data, size_t len)
+{
+ write_req_t *req = xcalloc (1, sizeof *req);
+ req->data.iov_base = memcpy (xmalloc (len), data, len);
+ req->data.iov_len = len;
+
+ write_queue_add (&client->write_queue, req);
+ ev_io_start (EV_DEFAULT_ &client->write_watcher);
+}
-enum listener_type
+// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+struct client_fcgi
{
- LISTENER_FCGI, ///< FastCGI
- LISTENER_SCGI ///< SCGI
+ struct fcgi_parser parser; ///< FastCGI stream parser
};
-struct listener
+static void
+client_fcgi_init (struct client *client)
{
- int fd; ///< Listening socket FD
- ev_io watcher; ///< New connection available
- enum listener_type type; ///< The protocol
-};
+ struct client_fcgi *self = xcalloc (1, sizeof *self);
+ client->impl_data = self;
-struct server_context
+ fcgi_parser_init (&self->parser);
+ // TODO: configure the parser
+}
+
+static void
+client_fcgi_destroy (struct client *client)
{
- ev_signal sigterm_watcher; ///< Got SIGTERM
- ev_signal sigint_watcher; ///< Got SIGINT
- bool quitting; ///< User requested quitting
+ struct client_fcgi *self = client->impl_data;
+ client->impl_data = NULL;
- struct listener *listeners; ///< Listeners
- size_t n_listeners; ///< Number of listening sockets
+ fcgi_parser_free (&self->parser);
+ free (self);
+}
- struct client *clients; ///< Clients
- unsigned n_clients; ///< Current number of connections
+static bool
+client_fcgi_on_data (struct client *client, const void *data, size_t len)
+{
+ struct client_fcgi *self = client->impl_data;
+ fcgi_parser_push (&self->parser, data, len);
+ return true;
+}
- struct str_map config; ///< Server configuration
+static struct client_impl g_client_fcgi =
+{
+ .init = client_fcgi_init,
+ .destroy = client_fcgi_destroy,
+ .on_data = client_fcgi_on_data,
+};
+
+// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+struct client_scgi
+{
+ struct scgi_parser parser; ///< SCGI stream parser
};
static void
-server_context_init (struct server_context *self)
+client_scgi_init (struct client *client)
{
- memset (self, 0, sizeof *self);
+ struct client_scgi *self = xcalloc (1, sizeof *self);
+ client->impl_data = self;
- str_map_init (&self->config);
- load_config_defaults (&self->config, g_config_table);
+ scgi_parser_init (&self->parser);
+ // TODO: configure the parser
}
static void
-server_context_free (struct server_context *self)
+client_scgi_destroy (struct client *client)
{
- // TODO: free the clients (?)
- // TODO: close the listeners (?)
+ struct client_scgi *self = client->impl_data;
+ client->impl_data = NULL;
- str_map_free (&self->config);
+ scgi_parser_free (&self->parser);
+ free (self);
}
-// --- ? -----------------------------------------------------------------------
+static bool
+client_scgi_on_data (struct client *client, const void *data, size_t len)
+{
+ struct client_scgi *self = client->impl_data;
+ struct error *e = NULL;
+ if (scgi_parser_push (&self->parser, data, len, &e))
+ return true;
+
+ print_debug ("SCGI parser failed: %s", e->message);
+ error_free (e);
+ return false;
+}
+
+static struct client_impl g_client_scgi =
+{
+ .init = client_scgi_init,
+ .destroy = client_scgi_destroy,
+ .on_data = client_scgi_on_data,
+};
+
+// --- Basic server stuff ------------------------------------------------------
+
+struct listener
+{
+ int fd; ///< Listening socket FD
+ ev_io watcher; ///< New connection available
+ struct client_impl *impl; ///< Client behaviour
+};
static void
remove_client (struct server_context *ctx, struct client *client)
{
+ LIST_UNLINK (ctx->clients, client);
+ ctx->n_clients--;
+
+ // First uninitialize the higher-level implementation
+ client->impl->destroy (client);
+
ev_io_stop (EV_DEFAULT_ &client->read_watcher);
ev_io_stop (EV_DEFAULT_ &client->write_watcher);
xclose (client->socket_fd);
- LIST_UNLINK (ctx->clients, client);
client_free (client);
free (client);
}
static bool
-read_loop (EV_P_ ev_io *watcher,
- bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
-{
- char buf[8192];
- while (true)
- {
- ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
- if (n_read < 0)
- {
- if (errno == EAGAIN)
- break;
- if (errno == EINTR)
- continue;
- }
- if (n_read <= 0 || !cb (EV_A_ watcher, buf, n_read))
- return false;
- }
- return true;
-}
-
-static bool
-flush_queue (write_queue_t *queue, ev_io *watcher)
+on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read)
{
- struct iovec vec[queue->len], *vec_iter = vec;
- for (write_req_t *iter = queue->head; iter; iter = iter->next)
- *vec_iter++ = iter->data;
+ (void) loop;
- ssize_t written;
-again:
- written = writev (watcher->fd, vec, N_ELEMENTS (vec));
- if (written < 0)
- {
- if (errno == EAGAIN)
- goto skip;
- if (errno == EINTR)
- goto again;
- return false;
- }
-
- write_queue_processed (queue, written);
-
-skip:
- if (write_queue_is_empty (queue))
- ev_io_stop (EV_DEFAULT_ watcher);
- else
- ev_io_start (EV_DEFAULT_ watcher);
- return true;
+ struct client *client = watcher->data;
+ return client->impl->on_data (client, buf, n_read);
}
static void
@@ -734,12 +904,12 @@ error:
}
static void
-on_fcgi_client_available (EV_P_ ev_io *watcher, int revents)
+on_client_available (EV_P_ ev_io *watcher, int revents)
{
struct server_context *ctx = ev_userdata (loop);
+ struct listener *listener = watcher->data;
(void) revents;
- // TODO
while (true)
{
int sock_fd = accept (watcher->fd, NULL, NULL);
@@ -759,11 +929,13 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents)
break;
}
+ set_blocking (sock_fd, false);
+
struct client *client = xmalloc (sizeof *client);
client_init (client);
client->socket_fd = sock_fd;
+ client->impl = listener->impl;
- set_blocking (sock_fd, false);
ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ);
ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE);
client->read_watcher.data = client;
@@ -772,6 +944,9 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents)
// We're only interested in reading as the write queue is empty now
ev_io_start (EV_A_ &client->read_watcher);
+ // Initialize the higher-level implementation
+ client->impl->init (client);
+
LIST_PREPEND (ctx->clients, client);
ctx->n_clients++;
}
@@ -792,7 +967,7 @@ parse_config (struct server_context *ctx, struct error **e)
}
static int
-listen_finish (struct addrinfo *gai_iter)
+listener_finish (struct addrinfo *gai_iter)
{
int fd = socket (gai_iter->ai_family,
gai_iter->ai_socktype, gai_iter->ai_protocol);
@@ -832,8 +1007,8 @@ listen_finish (struct addrinfo *gai_iter)
}
static void
-listen_resolve (struct server_context *ctx, const char *host, const char *port,
- struct addrinfo *gai_hints, enum listener_type type)
+listener_add (struct server_context *ctx, const char *host, const char *port,
+ struct addrinfo *gai_hints, struct client_impl *impl)
{
struct addrinfo *gai_result, *gai_iter;
int err = getaddrinfo (host, port, gai_hints, &gai_result);
@@ -849,24 +1024,15 @@ listen_resolve (struct server_context *ctx, const char *host, const char *port,
int fd;
for (gai_iter = gai_result; gai_iter; gai_iter = gai_iter->ai_next)
{
- if ((fd = listen_finish (gai_iter)) == -1)
+ if ((fd = listener_finish (gai_iter)) == -1)
continue;
set_blocking (fd, false);
struct listener *listener = &ctx->listeners[ctx->n_listeners++];
- switch ((listener->type = type))
- {
- case LISTENER_FCGI:
- ev_io_init (&listener->watcher,
- on_fcgi_client_available, fd, EV_READ);
- break;
- case LISTENER_SCGI:
- ev_io_init (&listener->watcher,
- on_scgi_client_available, fd, EV_READ);
- break;
- }
-
+ ev_io_init (&listener->watcher, on_client_available, fd, EV_READ);
ev_io_start (EV_DEFAULT_ &listener->watcher);
+ listener->watcher.data = listener;
+ listener->impl = impl;
break;
}
freeaddrinfo (gai_result);
@@ -885,10 +1051,8 @@ setup_listen_fds (struct server_context *ctx, struct error **e)
gai_hints.ai_socktype = SOCK_STREAM;
gai_hints.ai_flags = AI_PASSIVE;
- struct str_vector ports_fcgi;
- struct str_vector ports_scgi;
- str_vector_init (&ports_fcgi);
- str_vector_init (&ports_scgi);
+ struct str_vector ports_fcgi; str_vector_init (&ports_fcgi);
+ struct str_vector ports_scgi; str_vector_init (&ports_scgi);
if (port_fcgi)
split_str_ignore_empty (port_fcgi, ',', &ports_fcgi);
@@ -899,11 +1063,11 @@ setup_listen_fds (struct server_context *ctx, struct error **e)
ctx->listeners = xcalloc (n_ports, sizeof *ctx->listeners);
for (size_t i = 0; i < ports_fcgi.len; i++)
- listen_resolve (ctx, bind_host, ports_fcgi.vector[i],
- &gai_hints, LISTENER_FCGI);
+ listener_add (ctx, bind_host, ports_fcgi.vector[i],
+ &gai_hints, &g_client_fcgi);
for (size_t i = 0; i < ports_scgi.len; i++)
- listen_resolve (ctx, bind_host, ports_scgi.vector[i],
- &gai_hints, LISTENER_SCGI);
+ listener_add (ctx, bind_host, ports_scgi.vector[i],
+ &gai_hints, &g_client_scgi);
str_vector_free (&ports_fcgi);
str_vector_free (&ports_scgi);