diff options
author | Přemysl Janouch <p.janouch@gmail.com> | 2015-03-06 19:49:33 +0100 |
---|---|---|
committer | Přemysl Janouch <p.janouch@gmail.com> | 2015-03-06 19:49:33 +0100 |
commit | 2733ead30f01c7391a66c492261d9bfb247a8823 (patch) | |
tree | 8c55a6e9bb11df3baf960aac7493bf57a70d2be2 | |
parent | 0b0d64124b391d03ffe2b535b5b23d4ae3655fa0 (diff) | |
download | json-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.tar.gz json-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.tar.xz json-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.zip |
Figuring out how to close the connection
-rw-r--r-- | demo-json-rpc-server.c | 200 |
1 files changed, 110 insertions, 90 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index de8a542..09f6002 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -69,30 +69,6 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value) // --- libev helpers ----------------------------------------------------------- static bool -read_loop (EV_P_ ev_io *watcher, - bool (*cb) (EV_P_ ev_io *, const void *, ssize_t)) -{ - char buf[8192]; - while (true) - { - ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0); - if (n_read < 0) - { - if (errno == EAGAIN) - break; - if (errno == EINTR) - continue; - } - // The callback is called on EOF as well - if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read)) - return false; - if (!n_read) - return false; - } - return true; -} - -static bool flush_queue (write_queue_t *queue, ev_io *watcher) { struct iovec vec[queue->len], *vec_iter = vec; @@ -504,11 +480,13 @@ struct scgi_parser struct str name; ///< Header name so far struct str value; ///< Header value so far - /// Finished parsing request headers - void (*on_headers_read) (void *user_data); + /// Finished parsing request headers. + /// Return false to abort further processing of input. + bool (*on_headers_read) (void *user_data); - /// Content available; len == 0 means end of file - void (*on_content) (void *user_data, const void *data, size_t len); + /// Content available; len == 0 means end of file. + /// Return false to abort further processing of input. + bool (*on_content) (void *user_data, const void *data, size_t len); void *user_data; ///< User data passed to callbacks }; @@ -545,15 +523,15 @@ scgi_parser_push (struct scgi_parser *self, } // Indicate end of file - self->on_content (self->user_data, NULL, 0); - return true; + return self->on_content (self->user_data, NULL, 0); } // Notice that this madness is significantly harder to parse than FastCGI; // this procedure could also be optimized significantly str_append_data (&self->input, data, len); - while (true) + bool keep_running = true; + while (keep_running) switch (self->state) { case SCGI_READING_NETSTRING_LENGTH: @@ -601,7 +579,7 @@ scgi_parser_push (struct scgi_parser *self, return false; } self->state = SCGI_READING_CONTENT; - // TODO: a "on_headers_read" callback? + keep_running = self->on_headers_read (self->user_data); } else if (c != '\0') str_append_c (&self->name, c); @@ -641,10 +619,12 @@ scgi_parser_push (struct scgi_parser *self, break; } case SCGI_READING_CONTENT: - self->on_content (self->user_data, self->input.str, self->input.len); + keep_running = self->on_content + (self->user_data, self->input.str, self->input.len); str_remove_slice (&self->input, 0, self->input.len); - return true; + return keep_running; } + return false; } // --- Server ------------------------------------------------------------------ @@ -703,6 +683,9 @@ server_context_free (struct server_context *self) // returns back a JSON reply. This function may get called multiple times if // the user sends a batch request. +// TODO: a function that queues up a ping over IRC: this has to be owned by the +// server context as a background job that removes itself upon completion. + static bool try_advance (const char **p, const char *text) { @@ -747,9 +730,6 @@ validate_json_rpc_content_type (const char *type) // --- Requests ---------------------------------------------------------------- -// TODO: something to read in the headers and decide what to do with the request -// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200 - struct request { // TODO *ctx @@ -759,7 +739,8 @@ struct request /// Callback to write some CGI response data to the output void (*write_cb) (void *user_data, const void *data, size_t len); - /// Callback to close the connection + /// Callback to close the connection. + /// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED. void (*close_cb) (void *user_data); struct request_handler *handler; ///< Current request handler @@ -771,8 +752,9 @@ struct request_handler /// Install ourselves as the handler for the request if applicable bool (*try_handle) (struct request *request, struct str_map *headers); - /// Handle incoming data - void (*push_cb) (struct request *request, const void *data, size_t len); + /// Handle incoming data. + /// Return false if further processing should be stopped. + bool (*push_cb) (struct request *request, const void *data, size_t len); /// Destroy the handler void (*destroy_cb) (struct request *request); @@ -787,16 +769,27 @@ request_init (struct request *self) static void request_free (struct request *self) { - // TODO: destroy the handler? + if (self->handler) + self->handler->destroy_cb (self); } +/// This function is only intended to be run from asynchronous event handlers +/// such as timers, not as a direct result of starting the request or receiving +/// request data. CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED. static void +request_finish (struct request *self) +{ + self->close_cb (self->user_data); +} + +static bool request_start (struct request *self, struct str_map *headers) { bool handled = false; // TODO: try request handlers registered in self->ctx if (handled) - return; + // TODO: can also be false + return true; // Unable to serve the request struct str response; @@ -804,16 +797,17 @@ request_start (struct request *self, struct str_map *headers) str_append (&response, "404 Not Found\r\n\r\n"); self->write_cb (self->user_data, response.str, response.len); str_free (&response); - - // XXX: how will the clients behave when this happens? - self->close_cb (self->user_data); + return false; } -static void +static bool request_push (struct request *self, const void *data, size_t len) { if (soft_assert (self->handler)) - self->handler->push_cb (self, data, len); + return self->handler->push_cb (self, data, len); + + // No handler, nothing to do with any data + return false; } // --- Requests handlers ------------------------------------------------------- @@ -833,12 +827,13 @@ request_handler_json_rpc_try_handle return true; } -static void +static bool request_handler_json_rpc_push (struct request *request, const void *data, size_t len) { // TODO: append to a buffer // TODO: len == 0: process the request + return true; } static void @@ -883,7 +878,7 @@ struct client_impl void (*destroy) (struct client *client); /// Process incoming data; "len == 0" means EOF - bool (*on_data) (struct client *client, const void *data, size_t len); + bool (*push) (struct client *client, const void *data, size_t len); }; static void @@ -910,6 +905,24 @@ client_write (struct client *client, const void *data, size_t len) ev_io_start (EV_DEFAULT_ &client->write_watcher); } +static void +client_remove (struct client *client) +{ + struct server_context *ctx = client->ctx; + + LIST_UNLINK (ctx->clients, client); + ctx->n_clients--; + + // First uninitialize the higher-level implementation + client->impl->destroy (client); + + ev_io_stop (EV_DEFAULT_ &client->read_watcher); + ev_io_stop (EV_DEFAULT_ &client->write_watcher); + xclose (client->socket_fd); + client_free (client); + free (client); +} + // - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - struct client_fcgi @@ -938,7 +951,7 @@ client_fcgi_destroy (struct client *client) } static bool -client_fcgi_on_data (struct client *client, const void *data, size_t len) +client_fcgi_push (struct client *client, const void *data, size_t len) { struct client_fcgi *self = client->impl_data; fcgi_parser_push (&self->parser, data, len); @@ -949,7 +962,7 @@ static struct client_impl g_client_fcgi = { .init = client_fcgi_init, .destroy = client_fcgi_destroy, - .on_data = client_fcgi_on_data, + .push = client_fcgi_push, }; // - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -957,7 +970,7 @@ static struct client_impl g_client_fcgi = struct client_scgi { struct scgi_parser parser; ///< SCGI stream parser - struct request request; ///< Request + struct request request; ///< Request (only one per connection) }; static void @@ -970,29 +983,29 @@ client_scgi_write (void *user_data, const void *data, size_t len) static void client_scgi_close (void *user_data) { + // XXX: this rather really means "close me [the request]" struct client *client = user_data; - struct client_scgi *self = client->impl_data; - - // TODO + client_remove (client); } -static void +static bool client_scgi_on_headers_read (void *user_data) { struct client *client = user_data; struct client_scgi *self = client->impl_data; - - request_start (&self->request, &self->parser.headers); + return request_start (&self->request, &self->parser.headers); } -static void +static bool client_scgi_on_content (void *user_data, const void *data, size_t len) { struct client *client = user_data; struct client_scgi *self = client->impl_data; - // XXX: make sure this is understood as EOF - request_push (&self->request, data, len); + // XXX: do we have to count CONTENT_LENGTH and supply our own EOF? + // If we do produce our own EOF, we should probably make sure we don't + // send it twice in a row. + return request_push (&self->request, data, len); } static void @@ -1018,23 +1031,24 @@ client_scgi_destroy (struct client *client) struct client_scgi *self = client->impl_data; client->impl_data = NULL; - // TODO: do something more to abort the request? request_free (&self->request); - scgi_parser_free (&self->parser); free (self); } static bool -client_scgi_on_data (struct client *client, const void *data, size_t len) +client_scgi_push (struct client *client, const void *data, size_t len) { struct client_scgi *self = client->impl_data; struct error *e = NULL; if (scgi_parser_push (&self->parser, data, len, &e)) return true; - print_debug ("SCGI parser failed: %s", e->message); - error_free (e); + if (e != NULL) + { + print_debug ("SCGI parser failed: %s", e->message); + error_free (e); + } return false; } @@ -1042,7 +1056,7 @@ static struct client_impl g_client_scgi = { .init = client_scgi_init, .destroy = client_scgi_destroy, - .on_data = client_scgi_on_data, + .push = client_scgi_push, }; // --- Basic server stuff ------------------------------------------------------ @@ -1054,41 +1068,45 @@ struct listener struct client_impl *impl; ///< Client behaviour }; -static void -remove_client (struct server_context *ctx, struct client *client) +static bool +client_read_loop (EV_P_ struct client *client, ev_io *watcher) { - LIST_UNLINK (ctx->clients, client); - ctx->n_clients--; + char buf[8192]; + while (true) + { + ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0); + if (n_read < 0) + { + if (errno == EAGAIN) + break; + if (errno == EINTR) + continue; - // First uninitialize the higher-level implementation - client->impl->destroy (client); + return false; + } - ev_io_stop (EV_DEFAULT_ &client->read_watcher); - ev_io_stop (EV_DEFAULT_ &client->write_watcher); - xclose (client->socket_fd); - client_free (client); - free (client); -} + if (!client->impl->push (client, buf, n_read)) + return false; -static bool -on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read) -{ - (void) loop; + if (!n_read) + { + // Don't receive the EOF condition repeatedly + ev_io_stop (EV_A_ watcher); - struct client *client = watcher->data; - return client->impl->on_data (client, buf, n_read); + // We can probably still write, so let's just return + return true; + } + } + return true; } static void on_client_ready (EV_P_ ev_io *watcher, int revents) { - struct server_context *ctx = ev_userdata (loop); struct client *client = watcher->data; - // FIXME: don't close the connection on EOF; we need to be able to keep - // the connection open and respond in an asynchronous manner if (revents & EV_READ) - if (!read_loop (EV_A_ watcher, on_client_data)) + if (!client_read_loop (EV_A_ client, watcher)) goto error; if (revents & EV_WRITE) if (!flush_queue (&client->write_queue, watcher)) @@ -1096,7 +1114,9 @@ on_client_ready (EV_P_ ev_io *watcher, int revents) return; error: - remove_client (ctx, client); + // The callback also could have just told us to stop reading, + // this is not necessarily an error condition + client_remove (client); } static void |