From a3ec0942f8063509a00695d6e702a7f914caa7e9 Mon Sep 17 00:00:00 2001 From: Přemysl Janouch Date: Wed, 17 Oct 2018 08:40:18 +0200 Subject: Implement basic connection teardown I finally understand the codebase again. It's rather complicated. --- demo-json-rpc-server.c | 240 ++++++++++++++++++++++++++++++------------------- 1 file changed, 147 insertions(+), 93 deletions(-) diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c index cde6ad0..563d11c 100644 --- a/demo-json-rpc-server.c +++ b/demo-json-rpc-server.c @@ -47,10 +47,10 @@ enum { PIPE_READ, PIPE_WRITE }; #define FIND_CONTAINER(name, pointer, type, member) \ type *name = CONTAINER_OF (pointer, type, member) -// --- libev helpers ----------------------------------------------------------- +// --- Utilities --------------------------------------------------------------- static bool -flush_queue (struct write_queue *queue, ev_io *watcher) +flush_queue (struct write_queue *queue, int fd) { struct iovec vec[queue->len], *vec_iter = vec; LIST_FOR_EACH (struct write_req, iter, queue->head) @@ -58,24 +58,17 @@ flush_queue (struct write_queue *queue, ev_io *watcher) ssize_t written; again: - written = writev (watcher->fd, vec, N_ELEMENTS (vec)); - if (written < 0) + if ((written = writev (fd, vec, N_ELEMENTS (vec))) >= 0) { - if (errno == EAGAIN) - goto skip; - if (errno == EINTR) - goto again; - return false; + write_queue_processed (queue, written); + return true; } + if (errno == EINTR) + goto again; + if (errno == EAGAIN) + return true; - write_queue_processed (queue, written); - -skip: - if (write_queue_is_empty (queue)) - ev_io_stop (EV_DEFAULT_ watcher); - else - ev_io_start (EV_DEFAULT_ watcher); - return true; + return false; } // --- Logging ----------------------------------------------------------------- @@ -135,16 +128,17 @@ struct fcgi_muxer /// Write data to the underlying transport void (*write_cb) (struct fcgi_muxer *, const void *data, size_t len); - /// Close the underlying transport - // TODO: consider half-close and the subsequent handling + /// Close the underlying transport. You are allowed to destroy the muxer + /// directly from within the callback. void (*close_cb) (struct fcgi_muxer *); /// Start processing a request. Return false if no further action is /// to be done and the request should be finished. - bool (*request_start_cb) (struct fcgi_muxer *, struct fcgi_request *); + bool (*request_start_cb) (struct fcgi_request *); - /// Handle incoming data. "len == 0" means EOF. - void (*request_push_cb) + /// Handle incoming data. "len == 0" means EOF. Returns false if + /// the underlying transport should be closed, this being the last request. + bool (*request_push_cb) (struct fcgi_request *, const void *data, size_t len); /// Destroy the handler's data stored in the request object @@ -230,7 +224,7 @@ fcgi_request_push_params { // TODO: probably check the state of the header parser // TODO: request_start() can return false, end the request here? - (void) self->muxer->request_start_cb (self->muxer, self); + (void) self->muxer->request_start_cb (self); self->state = FCGI_REQUEST_STDIN; } } @@ -282,7 +276,9 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len) } } -static void +/// Mark the request as done. Returns false if the underlying transport +/// should be closed, this being the last request. +static bool fcgi_request_finish (struct fcgi_request *self) { fcgi_request_flush (self); @@ -292,14 +288,26 @@ fcgi_request_finish (struct fcgi_request *self) 0 /* TODO app_status, although ignored */, FCGI_REQUEST_COMPLETE /* TODO protocol_status, may be different */); - if (!(self->flags & FCGI_KEEP_CONN)) - { - // TODO: tear down (shut down) the connection - } + bool should_close = !(self->flags & FCGI_KEEP_CONN); self->muxer->active_requests--; self->muxer->requests[self->request_id] = NULL; fcgi_request_destroy (self); + + // TODO: tear down (shut down) the connection. This is called from: + // + // 1. client_fcgi_request_push <- request_push_cb + // <- fcgi_request_push_stdin <- fcgi_muxer_on_stdin + // <- fcgi_muxer_on_message <- fcgi_parser_push <- fcgi_muxer_push + // <- client_fcgi_push <- client_read_loop + // => in this case no close_cb may be called + // -> need to pass a false boolean aaall the way up, + // then client_fcgi_finalize eventually cleans up the rest + // + // 2. client_fcgi_request_close_cb <- request_finish + // => our direct caller must call fcgi_muxer::close_cb + // -> not very nice to delegate it there + return !should_close; } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -418,6 +426,7 @@ fcgi_muxer_on_abort_request // TODO: abort the request: let it somehow produce FCGI_END_REQUEST, // make sure to send an stdout EOF record + // TODO: and if that was not a FCGI_KEEP_CONN request, close the transport } static void @@ -1841,23 +1850,24 @@ struct client { LIST_HEADER (struct client) - // XXX: do we really need this here? - struct server_context *ctx; ///< Server context + struct client_vtable *vtable; ///< Client behaviour - int socket_fd; ///< The TCP socket + int socket_fd; ///< The network socket + bool received_eof; ///< Whether EOF has been received yet + bool closing; ///< Whether we're just flushing buffers + bool half_closed; ///< Transport half-closed while closing struct write_queue write_queue; ///< Write queue + ev_timer flush_timeout_watcher; ///< Write queue flush timer ev_io read_watcher; ///< The socket can be read from ev_io write_watcher; ///< The socket can be written to - - struct client_vtable *vtable; ///< Client behaviour }; /// The concrete behaviour to serve a particular client's requests struct client_vtable { /// Process incoming data; "len == 0" means EOF. - /// If the method returns false, the client is destroyed by caller. + /// If the method returns false, client_close() is called by the caller. bool (*push) (struct client *client, const void *data, size_t len); // TODO: optional push_error() to inform about network I/O errors @@ -1885,8 +1895,8 @@ client_write (struct client *self, const void *data, size_t len) static void client_destroy (struct client *self) { - struct server_context *ctx = self->ctx; - + // XXX: this codebase halfway pretends there could be other contexts + struct server_context *ctx = ev_userdata (EV_DEFAULT); LIST_UNLINK (ctx->clients, self); ctx->n_clients--; @@ -1897,63 +1907,111 @@ client_destroy (struct client *self) ev_io_stop (EV_DEFAULT_ &self->write_watcher); xclose (self->socket_fd); write_queue_free (&self->write_queue); + ev_timer_stop (EV_DEFAULT_ &self->flush_timeout_watcher); free (self); try_finish_quit (ctx); } +/// Try to cleanly close the connection, waiting for the remote client to close +/// its own side of the connection as a sign that it has processed all the data +/// it wanted to. The client implementation will not receive any further data. +/// May directly call client_destroy(). +static void +client_close (struct client *self) +{ + if (self->closing) + return; + + self->closing = true; + ev_timer_start (EV_DEFAULT_ &self->flush_timeout_watcher); + ev_feed_event (EV_DEFAULT_ &self->write_watcher, EV_WRITE); + + // We assume the remote client doesn't want our data if it half-closes + if (self->received_eof) + client_destroy (self); +} + // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - static bool client_read_loop (EV_P_ struct client *client, ev_io *watcher) { char buf[8192]; - while (true) + ssize_t n_read; +again: + while ((n_read = recv (watcher->fd, buf, sizeof buf, 0)) >= 0) { - ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0); - if (n_read >= 0) + if (!n_read) { - if (!client->vtable->push (client, buf, n_read)) - return false; - if (!n_read) - break; + // Don't deliver the EOF condition repeatedly + ev_io_stop (EV_A_ watcher); + client->received_eof = true; } - else if (errno == EAGAIN) - return true; - else if (errno != EINTR) + if (!client->closing + && !client->vtable->push (client, buf, n_read)) + { + client_close (client); return false; + } + if (!n_read) + return true; } + if (errno == EINTR) + goto again; + if (errno == EAGAIN) + return true; - // Don't receive the EOF condition repeatedly - ev_io_stop (EV_A_ watcher); + client_destroy (client); + return false; +} - // We can probably still write, so let's just return - // XXX: if there's nothing to be written, shouldn't we close the connection? - return true; +static void +on_client_readable (EV_P_ ev_io *watcher, int revents) +{ + struct client *client = watcher->data; + (void) revents; + + if (client_read_loop (EV_A_ client, watcher) + && client->closing && client->received_eof) + client_destroy (client); } static void -on_client_ready (EV_P_ ev_io *watcher, int revents) +on_client_writable (EV_P_ ev_io *watcher, int revents) { struct client *client = watcher->data; - // XXX: although read and write are in a sequence, if we create response - // data, we'll still likely need to go back to the event loop. - - if (revents & EV_READ) - if (!client_read_loop (EV_A_ client, watcher)) - goto close; - if (revents & EV_WRITE) - // TODO: add "closing link" functionality -> automatic shutdown - // (half-close) once we manage to flush the write buffer, - // which is logically followed by waiting for an EOF from the client - // TODO: some sort of "on_buffers_flushed" callback for streaming huge - // chunks of external (or generated) data. - if (!flush_queue (&client->write_queue, watcher)) - goto close; - return; - -close: - client_destroy (client); + (void) loop; + (void) revents; + + // TODO: some sort of "on_buffers_flushed" callback for streaming huge + // chunks of external (or generated) data. That will need to be + // forwarded to "struct request_handler". + if (!flush_queue (&client->write_queue, watcher->fd)) + { + client_destroy (client); + return; + } + if (!write_queue_is_empty (&client->write_queue)) + return; + + ev_io_stop (EV_A_ watcher); + if (client->closing && !client->half_closed) + { + if (!shutdown (client->socket_fd, SHUT_WR)) + client->half_closed = true; + else + client_destroy (client); + } +} + +static void +on_client_timeout (EV_P_ ev_timer *watcher, int revents) +{ + (void) loop; + (void) revents; + + client_destroy (watcher->data); } /// Create a new instance of a subclass with the given size. @@ -1964,14 +2022,15 @@ client_new (EV_P_ size_t size, int sock_fd) struct server_context *ctx = ev_userdata (loop); struct client *self = xcalloc (1, size); - self->ctx = ctx; self->write_queue = write_queue_make (); + ev_timer_init (&self->flush_timeout_watcher, on_client_timeout, 5., 0.); + self->flush_timeout_watcher.data = self; set_blocking (sock_fd, false); self->socket_fd = sock_fd; - ev_io_init (&self->read_watcher, on_client_ready, sock_fd, EV_READ); - ev_io_init (&self->write_watcher, on_client_ready, sock_fd, EV_WRITE); + ev_io_init (&self->read_watcher, on_client_readable, sock_fd, EV_READ); + ev_io_init (&self->write_watcher, on_client_writable, sock_fd, EV_WRITE); self->read_watcher.data = self; self->write_watcher.data = self; @@ -2010,37 +2069,36 @@ static void client_fcgi_request_close_cb (struct request *req) { FIND_CONTAINER (self, req, struct client_fcgi_request, request); - // No more data to send, terminate the substream/request - // XXX: this will most probably end up with client_fcgi_request_destroy(), - // we might or might not need to defer this action - fcgi_request_finish (self->fcgi_request); + struct fcgi_muxer *muxer = self->fcgi_request->muxer; + // No more data to send, terminate the substream/request, + // and also the transport if the client didn't specifically ask to keep it + if (!fcgi_request_finish (self->fcgi_request)) + muxer->close_cb (muxer); } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - static bool -client_fcgi_request_start - (struct fcgi_muxer *mux, struct fcgi_request *fcgi_request) +client_fcgi_request_start (struct fcgi_request *fcgi_request) { - FIND_CONTAINER (self, mux, struct client_fcgi, muxer); - struct client_fcgi_request *request = fcgi_request->handler_data = xcalloc (1, sizeof *request); request->fcgi_request = fcgi_request; request_init (&request->request); - request->request.ctx = self->client.ctx; + request->request.ctx = ev_userdata (EV_DEFAULT); request->request.write_cb = client_fcgi_request_write_cb; request->request.close_cb = client_fcgi_request_close_cb; return request_start (&request->request, &fcgi_request->headers); } -static void +static bool client_fcgi_request_push (struct fcgi_request *req, const void *data, size_t len) { struct client_fcgi_request *request = req->handler_data; - request_push (&request->request, data, len); + return request_push (&request->request, data, len) + || fcgi_request_finish (req); } static void @@ -2064,9 +2122,7 @@ static void client_fcgi_close_cb (struct fcgi_muxer *mux) { FIND_CONTAINER (self, mux, struct client_fcgi, muxer); - // FIXME: we should probably call something like client_shutdown(), - // which may have an argument whether we should really use close() - client_destroy (&self->client); + client_close (&self->client); } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -2137,11 +2193,9 @@ client_scgi_write_cb (struct request *req, const void *data, size_t len) static void client_scgi_close_cb (struct request *req) { - // NOTE: this rather really means "close me [the request]" FIND_CONTAINER (self, req, struct client_scgi, request); - // FIXME: we should probably call something like client_shutdown(), - // which may have an argument whether we should really use close() - client_destroy (&self->client); + // NOTE: this rather really means "close me [the request]" + client_close (&self->client); } static bool @@ -2201,7 +2255,6 @@ client_scgi_create (EV_P_ int sock_fd) self->client.vtable = &client_scgi_vtable; request_init (&self->request); - self->request.ctx = self->client.ctx; self->request.write_cb = client_scgi_write_cb; self->request.close_cb = client_scgi_close_cb; @@ -2231,8 +2284,9 @@ client_ws_on_message (struct ws_handler *handler, return false; } + struct server_context *ctx = ev_userdata (EV_DEFAULT); struct str response = str_make (); - process_json_rpc (self->client.ctx, data, len, &response); + process_json_rpc (ctx, data, len, &response); if (response.len) ws_handler_send (&self->handler, WS_OPCODE_TEXT, response.str, response.len); -- cgit v1.2.3-70-g09d2