aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Janouch <p.janouch@gmail.com>2015-03-06 19:49:33 +0100
committerPřemysl Janouch <p.janouch@gmail.com>2015-03-06 19:49:33 +0100
commit2733ead30f01c7391a66c492261d9bfb247a8823 (patch)
tree8c55a6e9bb11df3baf960aac7493bf57a70d2be2
parent0b0d64124b391d03ffe2b535b5b23d4ae3655fa0 (diff)
downloadjson-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.tar.gz
json-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.tar.xz
json-rpc-shell-2733ead30f01c7391a66c492261d9bfb247a8823.zip
Figuring out how to close the connection
-rw-r--r--demo-json-rpc-server.c200
1 files changed, 110 insertions, 90 deletions
diff --git a/demo-json-rpc-server.c b/demo-json-rpc-server.c
index de8a542..09f6002 100644
--- a/demo-json-rpc-server.c
+++ b/demo-json-rpc-server.c
@@ -69,30 +69,6 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value)
// --- libev helpers -----------------------------------------------------------
static bool
-read_loop (EV_P_ ev_io *watcher,
- bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
-{
- char buf[8192];
- while (true)
- {
- ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
- if (n_read < 0)
- {
- if (errno == EAGAIN)
- break;
- if (errno == EINTR)
- continue;
- }
- // The callback is called on EOF as well
- if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read))
- return false;
- if (!n_read)
- return false;
- }
- return true;
-}
-
-static bool
flush_queue (write_queue_t *queue, ev_io *watcher)
{
struct iovec vec[queue->len], *vec_iter = vec;
@@ -504,11 +480,13 @@ struct scgi_parser
struct str name; ///< Header name so far
struct str value; ///< Header value so far
- /// Finished parsing request headers
- void (*on_headers_read) (void *user_data);
+ /// Finished parsing request headers.
+ /// Return false to abort further processing of input.
+ bool (*on_headers_read) (void *user_data);
- /// Content available; len == 0 means end of file
- void (*on_content) (void *user_data, const void *data, size_t len);
+ /// Content available; len == 0 means end of file.
+ /// Return false to abort further processing of input.
+ bool (*on_content) (void *user_data, const void *data, size_t len);
void *user_data; ///< User data passed to callbacks
};
@@ -545,15 +523,15 @@ scgi_parser_push (struct scgi_parser *self,
}
// Indicate end of file
- self->on_content (self->user_data, NULL, 0);
- return true;
+ return self->on_content (self->user_data, NULL, 0);
}
// Notice that this madness is significantly harder to parse than FastCGI;
// this procedure could also be optimized significantly
str_append_data (&self->input, data, len);
- while (true)
+ bool keep_running = true;
+ while (keep_running)
switch (self->state)
{
case SCGI_READING_NETSTRING_LENGTH:
@@ -601,7 +579,7 @@ scgi_parser_push (struct scgi_parser *self,
return false;
}
self->state = SCGI_READING_CONTENT;
- // TODO: a "on_headers_read" callback?
+ keep_running = self->on_headers_read (self->user_data);
}
else if (c != '\0')
str_append_c (&self->name, c);
@@ -641,10 +619,12 @@ scgi_parser_push (struct scgi_parser *self,
break;
}
case SCGI_READING_CONTENT:
- self->on_content (self->user_data, self->input.str, self->input.len);
+ keep_running = self->on_content
+ (self->user_data, self->input.str, self->input.len);
str_remove_slice (&self->input, 0, self->input.len);
- return true;
+ return keep_running;
}
+ return false;
}
// --- Server ------------------------------------------------------------------
@@ -703,6 +683,9 @@ server_context_free (struct server_context *self)
// returns back a JSON reply. This function may get called multiple times if
// the user sends a batch request.
+// TODO: a function that queues up a ping over IRC: this has to be owned by the
+// server context as a background job that removes itself upon completion.
+
static bool
try_advance (const char **p, const char *text)
{
@@ -747,9 +730,6 @@ validate_json_rpc_content_type (const char *type)
// --- Requests ----------------------------------------------------------------
-// TODO: something to read in the headers and decide what to do with the request
-// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200
-
struct request
{
// TODO *ctx
@@ -759,7 +739,8 @@ struct request
/// Callback to write some CGI response data to the output
void (*write_cb) (void *user_data, const void *data, size_t len);
- /// Callback to close the connection
+ /// Callback to close the connection.
+ /// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
void (*close_cb) (void *user_data);
struct request_handler *handler; ///< Current request handler
@@ -771,8 +752,9 @@ struct request_handler
/// Install ourselves as the handler for the request if applicable
bool (*try_handle) (struct request *request, struct str_map *headers);
- /// Handle incoming data
- void (*push_cb) (struct request *request, const void *data, size_t len);
+ /// Handle incoming data.
+ /// Return false if further processing should be stopped.
+ bool (*push_cb) (struct request *request, const void *data, size_t len);
/// Destroy the handler
void (*destroy_cb) (struct request *request);
@@ -787,16 +769,27 @@ request_init (struct request *self)
static void
request_free (struct request *self)
{
- // TODO: destroy the handler?
+ if (self->handler)
+ self->handler->destroy_cb (self);
}
+/// This function is only intended to be run from asynchronous event handlers
+/// such as timers, not as a direct result of starting the request or receiving
+/// request data. CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
static void
+request_finish (struct request *self)
+{
+ self->close_cb (self->user_data);
+}
+
+static bool
request_start (struct request *self, struct str_map *headers)
{
bool handled = false;
// TODO: try request handlers registered in self->ctx
if (handled)
- return;
+ // TODO: can also be false
+ return true;
// Unable to serve the request
struct str response;
@@ -804,16 +797,17 @@ request_start (struct request *self, struct str_map *headers)
str_append (&response, "404 Not Found\r\n\r\n");
self->write_cb (self->user_data, response.str, response.len);
str_free (&response);
-
- // XXX: how will the clients behave when this happens?
- self->close_cb (self->user_data);
+ return false;
}
-static void
+static bool
request_push (struct request *self, const void *data, size_t len)
{
if (soft_assert (self->handler))
- self->handler->push_cb (self, data, len);
+ return self->handler->push_cb (self, data, len);
+
+ // No handler, nothing to do with any data
+ return false;
}
// --- Requests handlers -------------------------------------------------------
@@ -833,12 +827,13 @@ request_handler_json_rpc_try_handle
return true;
}
-static void
+static bool
request_handler_json_rpc_push
(struct request *request, const void *data, size_t len)
{
// TODO: append to a buffer
// TODO: len == 0: process the request
+ return true;
}
static void
@@ -883,7 +878,7 @@ struct client_impl
void (*destroy) (struct client *client);
/// Process incoming data; "len == 0" means EOF
- bool (*on_data) (struct client *client, const void *data, size_t len);
+ bool (*push) (struct client *client, const void *data, size_t len);
};
static void
@@ -910,6 +905,24 @@ client_write (struct client *client, const void *data, size_t len)
ev_io_start (EV_DEFAULT_ &client->write_watcher);
}
+static void
+client_remove (struct client *client)
+{
+ struct server_context *ctx = client->ctx;
+
+ LIST_UNLINK (ctx->clients, client);
+ ctx->n_clients--;
+
+ // First uninitialize the higher-level implementation
+ client->impl->destroy (client);
+
+ ev_io_stop (EV_DEFAULT_ &client->read_watcher);
+ ev_io_stop (EV_DEFAULT_ &client->write_watcher);
+ xclose (client->socket_fd);
+ client_free (client);
+ free (client);
+}
+
// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct client_fcgi
@@ -938,7 +951,7 @@ client_fcgi_destroy (struct client *client)
}
static bool
-client_fcgi_on_data (struct client *client, const void *data, size_t len)
+client_fcgi_push (struct client *client, const void *data, size_t len)
{
struct client_fcgi *self = client->impl_data;
fcgi_parser_push (&self->parser, data, len);
@@ -949,7 +962,7 @@ static struct client_impl g_client_fcgi =
{
.init = client_fcgi_init,
.destroy = client_fcgi_destroy,
- .on_data = client_fcgi_on_data,
+ .push = client_fcgi_push,
};
// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -957,7 +970,7 @@ static struct client_impl g_client_fcgi =
struct client_scgi
{
struct scgi_parser parser; ///< SCGI stream parser
- struct request request; ///< Request
+ struct request request; ///< Request (only one per connection)
};
static void
@@ -970,29 +983,29 @@ client_scgi_write (void *user_data, const void *data, size_t len)
static void
client_scgi_close (void *user_data)
{
+ // XXX: this rather really means "close me [the request]"
struct client *client = user_data;
- struct client_scgi *self = client->impl_data;
-
- // TODO
+ client_remove (client);
}
-static void
+static bool
client_scgi_on_headers_read (void *user_data)
{
struct client *client = user_data;
struct client_scgi *self = client->impl_data;
-
- request_start (&self->request, &self->parser.headers);
+ return request_start (&self->request, &self->parser.headers);
}
-static void
+static bool
client_scgi_on_content (void *user_data, const void *data, size_t len)
{
struct client *client = user_data;
struct client_scgi *self = client->impl_data;
- // XXX: make sure this is understood as EOF
- request_push (&self->request, data, len);
+ // XXX: do we have to count CONTENT_LENGTH and supply our own EOF?
+ // If we do produce our own EOF, we should probably make sure we don't
+ // send it twice in a row.
+ return request_push (&self->request, data, len);
}
static void
@@ -1018,23 +1031,24 @@ client_scgi_destroy (struct client *client)
struct client_scgi *self = client->impl_data;
client->impl_data = NULL;
- // TODO: do something more to abort the request?
request_free (&self->request);
-
scgi_parser_free (&self->parser);
free (self);
}
static bool
-client_scgi_on_data (struct client *client, const void *data, size_t len)
+client_scgi_push (struct client *client, const void *data, size_t len)
{
struct client_scgi *self = client->impl_data;
struct error *e = NULL;
if (scgi_parser_push (&self->parser, data, len, &e))
return true;
- print_debug ("SCGI parser failed: %s", e->message);
- error_free (e);
+ if (e != NULL)
+ {
+ print_debug ("SCGI parser failed: %s", e->message);
+ error_free (e);
+ }
return false;
}
@@ -1042,7 +1056,7 @@ static struct client_impl g_client_scgi =
{
.init = client_scgi_init,
.destroy = client_scgi_destroy,
- .on_data = client_scgi_on_data,
+ .push = client_scgi_push,
};
// --- Basic server stuff ------------------------------------------------------
@@ -1054,41 +1068,45 @@ struct listener
struct client_impl *impl; ///< Client behaviour
};
-static void
-remove_client (struct server_context *ctx, struct client *client)
+static bool
+client_read_loop (EV_P_ struct client *client, ev_io *watcher)
{
- LIST_UNLINK (ctx->clients, client);
- ctx->n_clients--;
+ char buf[8192];
+ while (true)
+ {
+ ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
+ if (n_read < 0)
+ {
+ if (errno == EAGAIN)
+ break;
+ if (errno == EINTR)
+ continue;
- // First uninitialize the higher-level implementation
- client->impl->destroy (client);
+ return false;
+ }
- ev_io_stop (EV_DEFAULT_ &client->read_watcher);
- ev_io_stop (EV_DEFAULT_ &client->write_watcher);
- xclose (client->socket_fd);
- client_free (client);
- free (client);
-}
+ if (!client->impl->push (client, buf, n_read))
+ return false;
-static bool
-on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read)
-{
- (void) loop;
+ if (!n_read)
+ {
+ // Don't receive the EOF condition repeatedly
+ ev_io_stop (EV_A_ watcher);
- struct client *client = watcher->data;
- return client->impl->on_data (client, buf, n_read);
+ // We can probably still write, so let's just return
+ return true;
+ }
+ }
+ return true;
}
static void
on_client_ready (EV_P_ ev_io *watcher, int revents)
{
- struct server_context *ctx = ev_userdata (loop);
struct client *client = watcher->data;
- // FIXME: don't close the connection on EOF; we need to be able to keep
- // the connection open and respond in an asynchronous manner
if (revents & EV_READ)
- if (!read_loop (EV_A_ watcher, on_client_data))
+ if (!client_read_loop (EV_A_ client, watcher))
goto error;
if (revents & EV_WRITE)
if (!flush_queue (&client->write_queue, watcher))
@@ -1096,7 +1114,9 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
return;
error:
- remove_client (ctx, client);
+ // The callback also could have just told us to stop reading,
+ // this is not necessarily an error condition
+ client_remove (client);
}
static void