diff options
-rw-r--r-- | demo-json-rpc-server.c | 404 |
1 files changed, 284 insertions, 120 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index cbce78e..8645ae3 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -18,8 +18,6 @@ * */ -#define LIBERTY_WANT_SSL - #define print_fatal_data ((void *) LOG_ERR) #define print_error_data ((void *) LOG_ERR) #define print_warning_data ((void *) LOG_WARNING) @@ -68,6 +66,61 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value) #undef UNPACKER_INT_BEGIN +// --- libev helpers ----------------------------------------------------------- + +static bool +read_loop (EV_P_ ev_io *watcher, + bool (*cb) (EV_P_ ev_io *, const void *, ssize_t)) +{ + char buf[8192]; + while (true) + { + ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0); + if (n_read < 0) + { + if (errno == EAGAIN) + break; + if (errno == EINTR) + continue; + } + // The callback is called on EOF as well + if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read)) + return false; + if (!n_read) + return false; + } + return true; +} + +static bool +flush_queue (write_queue_t *queue, ev_io *watcher) +{ + struct iovec vec[queue->len], *vec_iter = vec; + for (write_req_t *iter = queue->head; iter; iter = iter->next) + *vec_iter++ = iter->data; + + ssize_t written; +again: + written = writev (watcher->fd, vec, N_ELEMENTS (vec)); + if (written < 0) + { + if (errno == EAGAIN) + goto skip; + if (errno == EINTR) + goto again; + return false; + } + + write_queue_processed (queue, written); + +skip: + if (write_queue_is_empty (queue)) + ev_io_stop (EV_DEFAULT_ watcher); + else + ev_io_start (EV_DEFAULT_ watcher); + return true; +} + // --- Logging ----------------------------------------------------------------- static void @@ -88,16 +141,6 @@ log_message_syslog (void *user_data, const char *quote, const char *fmt, syslog (prio, "%s%s", quote, buf); } -// --- Configuration (application-specific) ------------------------------------ - -static struct config_item g_config_table[] = -{ - { "bind_host", NULL, "Address of the server" }, - { "port_fastcgi", "9000", "Port to bind for FastCGI" }, - { "port_scgi", NULL, "Port to bind for SCGI" }, - { NULL, NULL, NULL } -}; - // --- FastCGI ----------------------------------------------------------------- // Constants from the FastCGI specification document @@ -472,9 +515,21 @@ scgi_parser_free (struct scgi_parser *self) static bool scgi_parser_push (struct scgi_parser *self, - void *data, size_t len, struct error **e) + const void *data, size_t len, struct error **e) { - // This retarded netstring madness is even more complicated than FastCGI; + if (!len) + { + if (self->state != SCGI_READING_CONTENT) + { + error_set (e, "premature EOF"); + return false; + } + + // TODO: a "on_eof" callback? + return true; + } + + // Notice that this madness is significantly harder to parse than FastCGI; // this procedure could also be optimized significantly str_append_data (&self->input, data, len); @@ -526,6 +581,7 @@ scgi_parser_push (struct scgi_parser *self, return false; } self->state = SCGI_READING_CONTENT; + // TODO: a "on_headers_read" callback? } else if (c != '\0') str_append_c (&self->name, c); @@ -565,16 +621,95 @@ scgi_parser_push (struct scgi_parser *self, break; } case SCGI_READING_CONTENT: - // TODO: I have no idea what to do with the contents + // TODO: a "on_content" callback? return true; break; } } -// --- ? ----------------------------------------------------------------------- +// --- Server ------------------------------------------------------------------ + +static struct config_item g_config_table[] = +{ + { "bind_host", NULL, "Address of the server" }, + { "port_fastcgi", "9000", "Port to bind for FastCGI" }, + { "port_scgi", NULL, "Port to bind for SCGI" }, + { NULL, NULL, NULL } +}; + +struct server_context +{ + ev_signal sigterm_watcher; ///< Got SIGTERM + ev_signal sigint_watcher; ///< Got SIGINT + bool quitting; ///< User requested quitting + + struct listener *listeners; ///< Listeners + size_t n_listeners; ///< Number of listening sockets + + struct client *clients; ///< Clients + unsigned n_clients; ///< Current number of connections + + struct str_map config; ///< Server configuration +}; + +static void +server_context_init (struct server_context *self) +{ + memset (self, 0, sizeof *self); + + str_map_init (&self->config); + load_config_defaults (&self->config, g_config_table); +} + +static void +server_context_free (struct server_context *self) +{ + // TODO: free the clients (?) + // TODO: close the listeners (?) + + str_map_free (&self->config); +} + +// --- JSON-RPC ---------------------------------------------------------------- + +// TODO: this is where we're actually supposed to do JSON-RPC 2.0 processing + +// There's probably no reason to create an object for this. +// +// We probably just want a handler function that takes a JSON string, parses it, +// and returns back another JSON string. +// +// Then there should be another function that takes a parsed JSON request and +// returns back a JSON reply. This function may get called multiple times if +// the user sends a batch request. + +// --- Requests ---------------------------------------------------------------- + +// TODO: something to read in the headers and decide what to do with the request +// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200 + +#if 0 +// This doesn't necessarily have to be an object by itself either; we can have +// a function that does/returns something based on the headers + +struct request +{ +}; + +static void +request_init (struct request *self) +{ +} + +static void +request_free (struct request *self) +{ +} +#endif + +// --- Client communication handlers ------------------------------------------- -// TODO struct client { LIST_HEADER (struct client) @@ -582,137 +717,172 @@ struct client struct server_context *ctx; ///< Server context int socket_fd; ///< The TCP socket - struct str read_buffer; ///< Unprocessed input write_queue_t write_queue; ///< Write queue ev_io read_watcher; ///< The socket can be read from ev_io write_watcher; ///< The socket can be written to + + struct client_impl *impl; ///< Client behaviour + void *impl_data; ///< Client behaviour data +}; + +struct client_impl +{ + /// Initialize the client as needed + void (*init) (struct client *client); + + /// Do any additional cleanup + void (*destroy) (struct client *client); + + /// Process incoming data; "len == 0" means EOF + bool (*on_data) (struct client *client, const void *data, size_t len); }; static void client_init (struct client *self) { memset (self, 0, sizeof *self); - str_init (&self->read_buffer); write_queue_init (&self->write_queue); } static void client_free (struct client *self) { - str_free (&self->read_buffer); write_queue_free (&self->write_queue); } -// --- ? ----------------------------------------------------------------------- +static void +client_write (struct client *client, const void *data, size_t len) +{ + write_req_t *req = xcalloc (1, sizeof *req); + req->data.iov_base = memcpy (xmalloc (len), data, len); + req->data.iov_len = len; + + write_queue_add (&client->write_queue, req); + ev_io_start (EV_DEFAULT_ &client->write_watcher); +} -enum listener_type +// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +struct client_fcgi { - LISTENER_FCGI, ///< FastCGI - LISTENER_SCGI ///< SCGI + struct fcgi_parser parser; ///< FastCGI stream parser }; -struct listener +static void +client_fcgi_init (struct client *client) { - int fd; ///< Listening socket FD - ev_io watcher; ///< New connection available - enum listener_type type; ///< The protocol -}; + struct client_fcgi *self = xcalloc (1, sizeof *self); + client->impl_data = self; -struct server_context + fcgi_parser_init (&self->parser); + // TODO: configure the parser +} + +static void +client_fcgi_destroy (struct client *client) { - ev_signal sigterm_watcher; ///< Got SIGTERM - ev_signal sigint_watcher; ///< Got SIGINT - bool quitting; ///< User requested quitting + struct client_fcgi *self = client->impl_data; + client->impl_data = NULL; - struct listener *listeners; ///< Listeners - size_t n_listeners; ///< Number of listening sockets + fcgi_parser_free (&self->parser); + free (self); +} - struct client *clients; ///< Clients - unsigned n_clients; ///< Current number of connections +static bool +client_fcgi_on_data (struct client *client, const void *data, size_t len) +{ + struct client_fcgi *self = client->impl_data; + fcgi_parser_push (&self->parser, data, len); + return true; +} - struct str_map config; ///< Server configuration +static struct client_impl g_client_fcgi = +{ + .init = client_fcgi_init, + .destroy = client_fcgi_destroy, + .on_data = client_fcgi_on_data, +}; + +// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +struct client_scgi +{ + struct scgi_parser parser; ///< SCGI stream parser }; static void -server_context_init (struct server_context *self) +client_scgi_init (struct client *client) { - memset (self, 0, sizeof *self); + struct client_scgi *self = xcalloc (1, sizeof *self); + client->impl_data = self; - str_map_init (&self->config); - load_config_defaults (&self->config, g_config_table); + scgi_parser_init (&self->parser); + // TODO: configure the parser } static void -server_context_free (struct server_context *self) +client_scgi_destroy (struct client *client) { - // TODO: free the clients (?) - // TODO: close the listeners (?) + struct client_scgi *self = client->impl_data; + client->impl_data = NULL; - str_map_free (&self->config); + scgi_parser_free (&self->parser); + free (self); } -// --- ? ----------------------------------------------------------------------- +static bool +client_scgi_on_data (struct client *client, const void *data, size_t len) +{ + struct client_scgi *self = client->impl_data; + struct error *e = NULL; + if (scgi_parser_push (&self->parser, data, len, &e)) + return true; + + print_debug ("SCGI parser failed: %s", e->message); + error_free (e); + return false; +} + +static struct client_impl g_client_scgi = +{ + .init = client_scgi_init, + .destroy = client_scgi_destroy, + .on_data = client_scgi_on_data, +}; + +// --- Basic server stuff ------------------------------------------------------ + +struct listener +{ + int fd; ///< Listening socket FD + ev_io watcher; ///< New connection available + struct client_impl *impl; ///< Client behaviour +}; static void remove_client (struct server_context *ctx, struct client *client) { + LIST_UNLINK (ctx->clients, client); + ctx->n_clients--; + + // First uninitialize the higher-level implementation + client->impl->destroy (client); + ev_io_stop (EV_DEFAULT_ &client->read_watcher); ev_io_stop (EV_DEFAULT_ &client->write_watcher); xclose (client->socket_fd); - LIST_UNLINK (ctx->clients, client); client_free (client); free (client); } static bool -read_loop (EV_P_ ev_io *watcher, - bool (*cb) (EV_P_ ev_io *, const void *, ssize_t)) -{ - char buf[8192]; - while (true) - { - ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0); - if (n_read < 0) - { - if (errno == EAGAIN) - break; - if (errno == EINTR) - continue; - } - if (n_read <= 0 || !cb (EV_A_ watcher, buf, n_read)) - return false; - } - return true; -} - -static bool -flush_queue (write_queue_t *queue, ev_io *watcher) +on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read) { - struct iovec vec[queue->len], *vec_iter = vec; - for (write_req_t *iter = queue->head; iter; iter = iter->next) - *vec_iter++ = iter->data; + (void) loop; - ssize_t written; -again: - written = writev (watcher->fd, vec, N_ELEMENTS (vec)); - if (written < 0) - { - if (errno == EAGAIN) - goto skip; - if (errno == EINTR) - goto again; - return false; - } - - write_queue_processed (queue, written); - -skip: - if (write_queue_is_empty (queue)) - ev_io_stop (EV_DEFAULT_ watcher); - else - ev_io_start (EV_DEFAULT_ watcher); - return true; + struct client *client = watcher->data; + return client->impl->on_data (client, buf, n_read); } static void @@ -734,12 +904,12 @@ error: } static void -on_fcgi_client_available (EV_P_ ev_io *watcher, int revents) +on_client_available (EV_P_ ev_io *watcher, int revents) { struct server_context *ctx = ev_userdata (loop); + struct listener *listener = watcher->data; (void) revents; - // TODO while (true) { int sock_fd = accept (watcher->fd, NULL, NULL); @@ -759,11 +929,13 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents) break; } + set_blocking (sock_fd, false); + struct client *client = xmalloc (sizeof *client); client_init (client); client->socket_fd = sock_fd; + client->impl = listener->impl; - set_blocking (sock_fd, false); ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ); ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE); client->read_watcher.data = client; @@ -772,6 +944,9 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents) // We're only interested in reading as the write queue is empty now ev_io_start (EV_A_ &client->read_watcher); + // Initialize the higher-level implementation + client->impl->init (client); + LIST_PREPEND (ctx->clients, client); ctx->n_clients++; } @@ -792,7 +967,7 @@ parse_config (struct server_context *ctx, struct error **e) } static int -listen_finish (struct addrinfo *gai_iter) +listener_finish (struct addrinfo *gai_iter) { int fd = socket (gai_iter->ai_family, gai_iter->ai_socktype, gai_iter->ai_protocol); @@ -832,8 +1007,8 @@ listen_finish (struct addrinfo *gai_iter) } static void -listen_resolve (struct server_context *ctx, const char *host, const char *port, - struct addrinfo *gai_hints, enum listener_type type) +listener_add (struct server_context *ctx, const char *host, const char *port, + struct addrinfo *gai_hints, struct client_impl *impl) { struct addrinfo *gai_result, *gai_iter; int err = getaddrinfo (host, port, gai_hints, &gai_result); @@ -849,24 +1024,15 @@ listen_resolve (struct server_context *ctx, const char *host, const char *port, int fd; for (gai_iter = gai_result; gai_iter; gai_iter = gai_iter->ai_next) { - if ((fd = listen_finish (gai_iter)) == -1) + if ((fd = listener_finish (gai_iter)) == -1) continue; set_blocking (fd, false); struct listener *listener = &ctx->listeners[ctx->n_listeners++]; - switch ((listener->type = type)) - { - case LISTENER_FCGI: - ev_io_init (&listener->watcher, - on_fcgi_client_available, fd, EV_READ); - break; - case LISTENER_SCGI: - ev_io_init (&listener->watcher, - on_scgi_client_available, fd, EV_READ); - break; - } - + ev_io_init (&listener->watcher, on_client_available, fd, EV_READ); ev_io_start (EV_DEFAULT_ &listener->watcher); + listener->watcher.data = listener; + listener->impl = impl; break; } freeaddrinfo (gai_result); @@ -885,10 +1051,8 @@ setup_listen_fds (struct server_context *ctx, struct error **e) gai_hints.ai_socktype = SOCK_STREAM; gai_hints.ai_flags = AI_PASSIVE; - struct str_vector ports_fcgi; - struct str_vector ports_scgi; - str_vector_init (&ports_fcgi); - str_vector_init (&ports_scgi); + struct str_vector ports_fcgi; str_vector_init (&ports_fcgi); + struct str_vector ports_scgi; str_vector_init (&ports_scgi); if (port_fcgi) split_str_ignore_empty (port_fcgi, ',', &ports_fcgi); @@ -899,11 +1063,11 @@ setup_listen_fds (struct server_context *ctx, struct error **e) ctx->listeners = xcalloc (n_ports, sizeof *ctx->listeners); for (size_t i = 0; i < ports_fcgi.len; i++) - listen_resolve (ctx, bind_host, ports_fcgi.vector[i], - &gai_hints, LISTENER_FCGI); + listener_add (ctx, bind_host, ports_fcgi.vector[i], + &gai_hints, &g_client_fcgi); for (size_t i = 0; i < ports_scgi.len; i++) - listen_resolve (ctx, bind_host, ports_scgi.vector[i], - &gai_hints, LISTENER_SCGI); + listener_add (ctx, bind_host, ports_scgi.vector[i], + &gai_hints, &g_client_scgi); str_vector_free (&ports_fcgi); str_vector_free (&ports_scgi); |