aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p@janouch.name>2018-10-17 08:40:18 +0200
committerPřemysl Janouch <p@janouch.name>2018-10-17 22:59:40 +0200
commita3ec0942f8063509a00695d6e702a7f914caa7e9 (patch)
tree468a84c5e6b0423a32669b617418fcaf11f2ff2f
parentefd500ca3cf4dfdc635ccd71ea8baf5a0d3c5bbd (diff)
downloadjson-rpc-shell-a3ec0942f8063509a00695d6e702a7f914caa7e9.tar.gz
json-rpc-shell-a3ec0942f8063509a00695d6e702a7f914caa7e9.tar.xz
json-rpc-shell-a3ec0942f8063509a00695d6e702a7f914caa7e9.zip
Implement basic connection teardown
I finally understand the codebase again. It's rather complicated.
-rw-r--r--demo-json-rpc-server.c240
1 files changed, 147 insertions, 93 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index cde6ad0..563d11c 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -47,10 +47,10 @@ enum { PIPE_READ, PIPE_WRITE };
#define FIND_CONTAINER(name, pointer, type, member) \
type *name = CONTAINER_OF (pointer, type, member)
-// --- libev helpers -----------------------------------------------------------
+// --- Utilities ---------------------------------------------------------------
static bool
-flush_queue (struct write_queue *queue, ev_io *watcher)
+flush_queue (struct write_queue *queue, int fd)
{
struct iovec vec[queue->len], *vec_iter = vec;
LIST_FOR_EACH (struct write_req, iter, queue->head)
@@ -58,24 +58,17 @@ flush_queue (struct write_queue *queue, ev_io *watcher)
ssize_t written;
again:
- written = writev (watcher->fd, vec, N_ELEMENTS (vec));
- if (written < 0)
+ if ((written = writev (fd, vec, N_ELEMENTS (vec))) >= 0)
{
- if (errno == EAGAIN)
- goto skip;
- if (errno == EINTR)
- goto again;
- return false;
+ write_queue_processed (queue, written);
+ return true;
}
+ if (errno == EINTR)
+ goto again;
+ if (errno == EAGAIN)
+ return true;
- write_queue_processed (queue, written);
-
-skip:
- if (write_queue_is_empty (queue))
- ev_io_stop (EV_DEFAULT_ watcher);
- else
- ev_io_start (EV_DEFAULT_ watcher);
- return true;
+ return false;
}
// --- Logging -----------------------------------------------------------------
@@ -135,16 +128,17 @@ struct fcgi_muxer
/// Write data to the underlying transport
void (*write_cb) (struct fcgi_muxer *, const void *data, size_t len);
- /// Close the underlying transport
- // TODO: consider half-close and the subsequent handling
+ /// Close the underlying transport. You are allowed to destroy the muxer
+ /// directly from within the callback.
void (*close_cb) (struct fcgi_muxer *);
/// Start processing a request. Return false if no further action is
/// to be done and the request should be finished.
- bool (*request_start_cb) (struct fcgi_muxer *, struct fcgi_request *);
+ bool (*request_start_cb) (struct fcgi_request *);
- /// Handle incoming data. "len == 0" means EOF.
- void (*request_push_cb)
+ /// Handle incoming data. "len == 0" means EOF. Returns false if
+ /// the underlying transport should be closed, this being the last request.
+ bool (*request_push_cb)
(struct fcgi_request *, const void *data, size_t len);
/// Destroy the handler's data stored in the request object
@@ -230,7 +224,7 @@ fcgi_request_push_params
{
// TODO: probably check the state of the header parser
// TODO: request_start() can return false, end the request here?
- (void) self->muxer->request_start_cb (self->muxer, self);
+ (void) self->muxer->request_start_cb (self);
self->state = FCGI_REQUEST_STDIN;
}
}
@@ -282,7 +276,9 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len)
}
}
-static void
+/// Mark the request as done. Returns false if the underlying transport
+/// should be closed, this being the last request.
+static bool
fcgi_request_finish (struct fcgi_request *self)
{
fcgi_request_flush (self);
@@ -292,14 +288,26 @@ fcgi_request_finish (struct fcgi_request *self)
0 /* TODO app_status, although ignored */,
FCGI_REQUEST_COMPLETE /* TODO protocol_status, may be different */);
- if (!(self->flags & FCGI_KEEP_CONN))
- {
- // TODO: tear down (shut down) the connection
- }
+ bool should_close = !(self->flags & FCGI_KEEP_CONN);
self->muxer->active_requests--;
self->muxer->requests[self->request_id] = NULL;
fcgi_request_destroy (self);
+
+ // TODO: tear down (shut down) the connection. This is called from:
+ //
+ // 1. client_fcgi_request_push <- request_push_cb
+ // <- fcgi_request_push_stdin <- fcgi_muxer_on_stdin
+ // <- fcgi_muxer_on_message <- fcgi_parser_push <- fcgi_muxer_push
+ // <- client_fcgi_push <- client_read_loop
+ // => in this case no close_cb may be called
+ // -> need to pass a false boolean aaall the way up,
+ // then client_fcgi_finalize eventually cleans up the rest
+ //
+ // 2. client_fcgi_request_close_cb <- request_finish
+ // => our direct caller must call fcgi_muxer::close_cb
+ // -> not very nice to delegate it there
+ return !should_close;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -418,6 +426,7 @@ fcgi_muxer_on_abort_request
// TODO: abort the request: let it somehow produce FCGI_END_REQUEST,
// make sure to send an stdout EOF record
+ // TODO: and if that was not a FCGI_KEEP_CONN request, close the transport
}
static void
@@ -1841,23 +1850,24 @@ struct client
{
LIST_HEADER (struct client)
- // XXX: do we really need this here?
- struct server_context *ctx; ///< Server context
+ struct client_vtable *vtable; ///< Client behaviour
- int socket_fd; ///< The TCP socket
+ int socket_fd; ///< The network socket
+ bool received_eof; ///< Whether EOF has been received yet
+ bool closing; ///< Whether we're just flushing buffers
+ bool half_closed; ///< Transport half-closed while closing
struct write_queue write_queue; ///< Write queue
+ ev_timer flush_timeout_watcher; ///< Write queue flush timer
ev_io read_watcher; ///< The socket can be read from
ev_io write_watcher; ///< The socket can be written to
-
- struct client_vtable *vtable; ///< Client behaviour
};
/// The concrete behaviour to serve a particular client's requests
struct client_vtable
{
/// Process incoming data; "len == 0" means EOF.
- /// If the method returns false, the client is destroyed by caller.
+ /// If the method returns false, client_close() is called by the caller.
bool (*push) (struct client *client, const void *data, size_t len);
// TODO: optional push_error() to inform about network I/O errors
@@ -1885,8 +1895,8 @@ client_write (struct client *self, const void *data, size_t len)
static void
client_destroy (struct client *self)
{
- struct server_context *ctx = self->ctx;
-
+ // XXX: this codebase halfway pretends there could be other contexts
+ struct server_context *ctx = ev_userdata (EV_DEFAULT);
LIST_UNLINK (ctx->clients, self);
ctx->n_clients--;
@@ -1897,63 +1907,111 @@ client_destroy (struct client *self)
ev_io_stop (EV_DEFAULT_ &self->write_watcher);
xclose (self->socket_fd);
write_queue_free (&self->write_queue);
+ ev_timer_stop (EV_DEFAULT_ &self->flush_timeout_watcher);
free (self);
try_finish_quit (ctx);
}
+/// Try to cleanly close the connection, waiting for the remote client to close
+/// its own side of the connection as a sign that it has processed all the data
+/// it wanted to. The client implementation will not receive any further data.
+/// May directly call client_destroy().
+static void
+client_close (struct client *self)
+{
+ if (self->closing)
+ return;
+
+ self->closing = true;
+ ev_timer_start (EV_DEFAULT_ &self->flush_timeout_watcher);
+ ev_feed_event (EV_DEFAULT_ &self->write_watcher, EV_WRITE);
+
+ // We assume the remote client doesn't want our data if it half-closes
+ if (self->received_eof)
+ client_destroy (self);
+}
+
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static bool
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
{
char buf[8192];
- while (true)
+ ssize_t n_read;
+again:
+ while ((n_read = recv (watcher->fd, buf, sizeof buf, 0)) >= 0)
{
- ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
- if (n_read >= 0)
+ if (!n_read)
{
- if (!client->vtable->push (client, buf, n_read))
- return false;
- if (!n_read)
- break;
+ // Don't deliver the EOF condition repeatedly
+ ev_io_stop (EV_A_ watcher);
+ client->received_eof = true;
}
- else if (errno == EAGAIN)
- return true;
- else if (errno != EINTR)
+ if (!client->closing
+ && !client->vtable->push (client, buf, n_read))
+ {
+ client_close (client);
return false;
+ }
+ if (!n_read)
+ return true;
}
+ if (errno == EINTR)
+ goto again;
+ if (errno == EAGAIN)
+ return true;
- // Don't receive the EOF condition repeatedly
- ev_io_stop (EV_A_ watcher);
+ client_destroy (client);
+ return false;
+}
- // We can probably still write, so let's just return
- // XXX: if there's nothing to be written, shouldn't we close the connection?
- return true;
+static void
+on_client_readable (EV_P_ ev_io *watcher, int revents)
+{
+ struct client *client = watcher->data;
+ (void) revents;
+
+ if (client_read_loop (EV_A_ client, watcher)
+ && client->closing && client->received_eof)
+ client_destroy (client);
}
static void
-on_client_ready (EV_P_ ev_io *watcher, int revents)
+on_client_writable (EV_P_ ev_io *watcher, int revents)
{
struct client *client = watcher->data;
- // XXX: although read and write are in a sequence, if we create response
- // data, we'll still likely need to go back to the event loop.
-
- if (revents & EV_READ)
- if (!client_read_loop (EV_A_ client, watcher))
- goto close;
- if (revents & EV_WRITE)
- // TODO: add "closing link" functionality -> automatic shutdown
- // (half-close) once we manage to flush the write buffer,
- // which is logically followed by waiting for an EOF from the client
- // TODO: some sort of "on_buffers_flushed" callback for streaming huge
- // chunks of external (or generated) data.
- if (!flush_queue (&client->write_queue, watcher))
- goto close;
- return;
-
-close:
- client_destroy (client);
+ (void) loop;
+ (void) revents;
+
+ // TODO: some sort of "on_buffers_flushed" callback for streaming huge
+ // chunks of external (or generated) data. That will need to be
+ // forwarded to "struct request_handler".
+ if (!flush_queue (&client->write_queue, watcher->fd))
+ {
+ client_destroy (client);
+ return;
+ }
+ if (!write_queue_is_empty (&client->write_queue))
+ return;
+
+ ev_io_stop (EV_A_ watcher);
+ if (client->closing && !client->half_closed)
+ {
+ if (!shutdown (client->socket_fd, SHUT_WR))
+ client->half_closed = true;
+ else
+ client_destroy (client);
+ }
+}
+
+static void
+on_client_timeout (EV_P_ ev_timer *watcher, int revents)
+{
+ (void) loop;
+ (void) revents;
+
+ client_destroy (watcher->data);
}
/// Create a new instance of a subclass with the given size.
@@ -1964,14 +2022,15 @@ client_new (EV_P_ size_t size, int sock_fd)
struct server_context *ctx = ev_userdata (loop);
struct client *self = xcalloc (1, size);
- self->ctx = ctx;
self->write_queue = write_queue_make ();
+ ev_timer_init (&self->flush_timeout_watcher, on_client_timeout, 5., 0.);
+ self->flush_timeout_watcher.data = self;
set_blocking (sock_fd, false);
self->socket_fd = sock_fd;
- ev_io_init (&self->read_watcher, on_client_ready, sock_fd, EV_READ);
- ev_io_init (&self->write_watcher, on_client_ready, sock_fd, EV_WRITE);
+ ev_io_init (&self->read_watcher, on_client_readable, sock_fd, EV_READ);
+ ev_io_init (&self->write_watcher, on_client_writable, sock_fd, EV_WRITE);
self->read_watcher.data = self;
self->write_watcher.data = self;
@@ -2010,37 +2069,36 @@ static void
client_fcgi_request_close_cb (struct request *req)
{
FIND_CONTAINER (self, req, struct client_fcgi_request, request);
- // No more data to send, terminate the substream/request
- // XXX: this will most probably end up with client_fcgi_request_destroy(),
- // we might or might not need to defer this action
- fcgi_request_finish (self->fcgi_request);
+ struct fcgi_muxer *muxer = self->fcgi_request->muxer;
+ // No more data to send, terminate the substream/request,
+ // and also the transport if the client didn't specifically ask to keep it
+ if (!fcgi_request_finish (self->fcgi_request))
+ muxer->close_cb (muxer);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static bool
-client_fcgi_request_start
- (struct fcgi_muxer *mux, struct fcgi_request *fcgi_request)
+client_fcgi_request_start (struct fcgi_request *fcgi_request)
{
- FIND_CONTAINER (self, mux, struct client_fcgi, muxer);
-
struct client_fcgi_request *request =
fcgi_request->handler_data = xcalloc (1, sizeof *request);
request->fcgi_request = fcgi_request;
request_init (&request->request);
- request->request.ctx = self->client.ctx;
+ request->request.ctx = ev_userdata (EV_DEFAULT);
request->request.write_cb = client_fcgi_request_write_cb;
request->request.close_cb = client_fcgi_request_close_cb;
return request_start (&request->request, &fcgi_request->headers);
}
-static void
+static bool
client_fcgi_request_push
(struct fcgi_request *req, const void *data, size_t len)
{
struct client_fcgi_request *request = req->handler_data;
- request_push (&request->request, data, len);
+ return request_push (&request->request, data, len)
+ || fcgi_request_finish (req);
}
static void
@@ -2064,9 +2122,7 @@ static void
client_fcgi_close_cb (struct fcgi_muxer *mux)
{
FIND_CONTAINER (self, mux, struct client_fcgi, muxer);
- // FIXME: we should probably call something like client_shutdown(),
- // which may have an argument whether we should really use close()
- client_destroy (&self->client);
+ client_close (&self->client);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -2137,11 +2193,9 @@ client_scgi_write_cb (struct request *req, const void *data, size_t len)
static void
client_scgi_close_cb (struct request *req)
{
- // NOTE: this rather really means "close me [the request]"
FIND_CONTAINER (self, req, struct client_scgi, request);
- // FIXME: we should probably call something like client_shutdown(),
- // which may have an argument whether we should really use close()
- client_destroy (&self->client);
+ // NOTE: this rather really means "close me [the request]"
+ client_close (&self->client);
}
static bool
@@ -2201,7 +2255,6 @@ client_scgi_create (EV_P_ int sock_fd)
self->client.vtable = &client_scgi_vtable;
request_init (&self->request);
- self->request.ctx = self->client.ctx;
self->request.write_cb = client_scgi_write_cb;
self->request.close_cb = client_scgi_close_cb;
@@ -2231,8 +2284,9 @@ client_ws_on_message (struct ws_handler *handler,
return false;
}
+ struct server_context *ctx = ev_userdata (EV_DEFAULT);
struct str response = str_make ();
- process_json_rpc (self->client.ctx, data, len, &response);
+ process_json_rpc (ctx, data, len, &response);
if (response.len)
ws_handler_send (&self->handler,
WS_OPCODE_TEXT, response.str, response.len);