aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2022-09-17 00:22:53 +0200
committerPřemysl Eric Janouch <p@janouch.name>2022-09-17 00:32:14 +0200
commit840b6467009222f743e3c5ac12c0eaf4090b766f (patch)
tree4dfc329c6816b8eef5eaf3a3b769be252f36b288
parent126105fa4f7fb8ea5d6b589ddf9f0a109366d638 (diff)
downloadxK-840b6467009222f743e3c5ac12c0eaf4090b766f.tar.gz
xK-840b6467009222f743e3c5ac12c0eaf4090b766f.tar.xz
xK-840b6467009222f743e3c5ac12c0eaf4090b766f.zip
xC: reorganize relay code, improve logging
Even with one forward function declaration down, it was possible to move most code to a more convenient location. Most logging has thus been fixed to go to buffers.
-rw-r--r--xC.c452
1 files changed, 228 insertions, 224 deletions
diff --git a/xC.c b/xC.c
index 3f64804..e6ece0d 100644
--- a/xC.c
+++ b/xC.c
@@ -1756,7 +1756,6 @@ client_destroy (struct client *self)
}
static void client_kill (struct client *c);
-static bool client_process_buffer (struct client *c);
// ~~~ Server ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2808,7 +2807,7 @@ serialize_configuration (struct config_item *root, struct str *output)
config_item_write (root, true, output);
}
-// --- Relay plumbing ----------------------------------------------------------
+// --- Relay output ------------------------------------------------------------
static void
client_kill (struct client *c)
@@ -2822,59 +2821,6 @@ client_kill (struct client *c)
client_destroy (c);
}
-static bool
-client_try_read (struct client *c)
-{
- struct str *buf = &c->read_buffer;
- ssize_t n_read;
-
- while ((n_read = read (c->socket_fd, buf->str + buf->len,
- buf->alloc - buf->len - 1 /* null byte */)) > 0)
- {
- buf->len += n_read;
- if (!client_process_buffer (c))
- break;
- str_reserve (buf, 512);
- }
-
- if (n_read < 0)
- {
- if (errno == EAGAIN || errno == EINTR)
- return true;
-
- print_debug ("%s: %s: %s", __func__, "read", strerror (errno));
- }
-
- client_kill (c);
- return false;
-}
-
-static bool
-client_try_write (struct client *c)
-{
- struct str *buf = &c->write_buffer;
- ssize_t n_written;
-
- while (buf->len)
- {
- n_written = write (c->socket_fd, buf->str, buf->len);
- if (n_written >= 0)
- {
- str_remove_slice (buf, 0, n_written);
- continue;
- }
- if (errno == EAGAIN || errno == EINTR)
- return true;
-
- print_debug ("%s: %s: %s", __func__, "write", strerror (errno));
- client_kill (c);
- return false;
- }
- return true;
-}
-
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-
static void
client_update_poller (struct client *c, const struct pollfd *pfd)
{
@@ -2887,177 +2833,8 @@ client_update_poller (struct client *c, const struct pollfd *pfd)
poller_fd_set (&c->socket_event, new_events);
}
-static void
-on_client_ready (const struct pollfd *pfd, void *user_data)
-{
- struct client *c = user_data;
- if (client_try_read (c) && client_try_write (c))
- client_update_poller (c, pfd);
-}
-
-static bool
-relay_try_fetch_client (struct app_context *ctx, int listen_fd)
-{
- // XXX: `struct sockaddr_storage' is not the most portable thing
- struct sockaddr_storage peer;
- socklen_t peer_len = sizeof peer;
-
- int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len);
- if (fd == -1)
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK)
- return false;
- if (errno == EINTR)
- return true;
-
- // TODO: Try to make sure these find their way to the global buffer.
- if (accept_error_is_transient (errno))
- {
- print_warning ("%s: %s", "accept", strerror (errno));
- return true;
- }
-
- print_error ("%s: %s", "accept", strerror (errno));
- app_context_relay_stop (ctx);
- return false;
- }
-
- hard_assert (peer_len <= sizeof peer);
- set_blocking (fd, false);
- set_cloexec (fd);
-
- // We already buffer our output, so reduce latencies.
- int yes = 1;
- soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
- &yes, sizeof yes) != -1);
-
- struct client *c = client_new ();
- c->ctx = ctx;
- c->socket_fd = fd;
- LIST_PREPEND (ctx->clients, c);
-
- c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd);
- c->socket_event.dispatcher = (poller_fd_fn) on_client_ready;
- c->socket_event.user_data = c;
-
- client_update_poller (c, NULL);
- return true;
-}
-
-static void
-on_relay_client_available (const struct pollfd *pfd, void *user_data)
-{
- struct app_context *ctx = user_data;
- while (relay_try_fetch_client (ctx, pfd->fd))
- ;
-}
-
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-static int
-relay_listen (struct addrinfo *ai, struct error **e)
-{
- int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
- if (fd == -1)
- {
- error_set (e, "socket: %s", strerror (errno));
- return -1;
- }
-
- set_cloexec (fd);
-
- int yes = 1;
- soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE,
- &yes, sizeof yes) != -1);
- soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR,
- &yes, sizeof yes) != -1);
-
- if (bind (fd, ai->ai_addr, ai->ai_addrlen))
- error_set (e, "bind: %s", strerror (errno));
- else if (listen (fd, 16 /* arbitrary number */))
- error_set (e, "listen: %s", strerror (errno));
- else
- return fd;
-
- xclose (fd);
- return -1;
-}
-
-static int
-relay_listen_with_context (struct addrinfo *ai, struct error **e)
-{
- char *address = gai_reconstruct_address (ai);
- print_debug ("binding to `%s'", address);
-
- struct error *error = NULL;
- int fd = relay_listen (ai, &error);
- if (fd == -1)
- {
- error_set (e, "binding to `%s' failed: %s", address, error->message);
- error_free (error);
- }
- free (address);
- return fd;
-}
-
-static bool
-relay_start (struct app_context *ctx, char *address, struct error **e)
-{
- const char *port = NULL, *host = tokenize_host_port (address, &port);
- if (!port || !*port)
- return error_set (e, "missing port");
-
- struct addrinfo hints = {}, *result = NULL;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
-
- int err = getaddrinfo (*host ? host : NULL, port, &hints, &result);
- if (err)
- {
- return error_set (e, "failed to resolve `%s', port `%s': %s: %s",
- host, port, "getaddrinfo", gai_strerror (err));
- }
-
- // Just try the first one, disregarding IPv4/IPv6 ordering.
- int fd = relay_listen_with_context (result, e);
- freeaddrinfo (result);
- if (fd == -1)
- return false;
-
- set_blocking (fd, false);
-
- struct poller_fd *event = &ctx->relay_event;
- *event = poller_fd_make (&ctx->poller, fd);
- event->dispatcher = (poller_fd_fn) on_relay_client_available;
- event->user_data = ctx;
-
- ctx->relay_fd = fd;
- poller_fd_set (event, POLLIN);
- return true;
-}
-
-static void
-on_config_relay_bind_change (struct config_item *item)
-{
- struct app_context *ctx = item->user_data;
- char *value = item->value.string.str;
- app_context_relay_stop (ctx);
- if (!value)
- return;
-
- struct error *e = NULL;
- char *address = xstrdup (value);
- if (!relay_start (ctx, address, &e))
- {
- // TODO: Try to make sure this finds its way to the global buffer.
- print_error ("%s: %s", item->schema->name, e->message);
- error_free (e);
- }
- free (address);
-}
-
-// --- Relay output ------------------------------------------------------------
-
static void
relay_send (struct client *c)
{
@@ -15637,6 +15414,233 @@ client_process_buffer (struct client *c)
return true;
}
+// --- Relay plumbing ----------------------------------------------------------
+
+static bool
+client_try_read (struct client *c)
+{
+ struct str *buf = &c->read_buffer;
+ ssize_t n_read;
+
+ while ((n_read = read (c->socket_fd, buf->str + buf->len,
+ buf->alloc - buf->len - 1 /* null byte */)) > 0)
+ {
+ buf->len += n_read;
+ if (!client_process_buffer (c))
+ break;
+ str_reserve (buf, 512);
+ }
+
+ if (n_read < 0)
+ {
+ if (errno == EAGAIN || errno == EINTR)
+ return true;
+
+ log_global_debug (c->ctx,
+ "#s: #s: #l", __func__, "read", strerror (errno));
+ }
+
+ client_kill (c);
+ return false;
+}
+
+static bool
+client_try_write (struct client *c)
+{
+ struct str *buf = &c->write_buffer;
+ ssize_t n_written;
+
+ while (buf->len)
+ {
+ n_written = write (c->socket_fd, buf->str, buf->len);
+ if (n_written >= 0)
+ {
+ str_remove_slice (buf, 0, n_written);
+ continue;
+ }
+ if (errno == EAGAIN || errno == EINTR)
+ return true;
+
+ log_global_debug (c->ctx,
+ "#s: #s: #l", __func__, "write", strerror (errno));
+ client_kill (c);
+ return false;
+ }
+ return true;
+}
+
+static void
+on_client_ready (const struct pollfd *pfd, void *user_data)
+{
+ struct client *c = user_data;
+ if (client_try_read (c) && client_try_write (c))
+ client_update_poller (c, pfd);
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static bool
+relay_try_fetch_client (struct app_context *ctx, int listen_fd)
+{
+ // XXX: `struct sockaddr_storage' is not the most portable thing
+ struct sockaddr_storage peer;
+ socklen_t peer_len = sizeof peer;
+
+ int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len);
+ if (fd == -1)
+ {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return false;
+ if (errno == EINTR)
+ return true;
+
+ if (accept_error_is_transient (errno))
+ {
+ log_global_debug (ctx, "#s: #l", "accept", strerror (errno));
+ return true;
+ }
+
+ log_global_error (ctx, "#s: #l", "accept", strerror (errno));
+ app_context_relay_stop (ctx);
+ return false;
+ }
+
+ hard_assert (peer_len <= sizeof peer);
+ set_blocking (fd, false);
+ set_cloexec (fd);
+
+ // We already buffer our output, so reduce latencies.
+ int yes = 1;
+ soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
+ &yes, sizeof yes) != -1);
+
+ struct client *c = client_new ();
+ c->ctx = ctx;
+ c->socket_fd = fd;
+ LIST_PREPEND (ctx->clients, c);
+
+ c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd);
+ c->socket_event.dispatcher = (poller_fd_fn) on_client_ready;
+ c->socket_event.user_data = c;
+
+ client_update_poller (c, NULL);
+ return true;
+}
+
+static void
+on_relay_client_available (const struct pollfd *pfd, void *user_data)
+{
+ struct app_context *ctx = user_data;
+ while (relay_try_fetch_client (ctx, pfd->fd))
+ ;
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+static int
+relay_listen (struct addrinfo *ai, struct error **e)
+{
+ int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+ if (fd == -1)
+ {
+ error_set (e, "socket: %s", strerror (errno));
+ return -1;
+ }
+
+ set_cloexec (fd);
+
+ int yes = 1;
+ soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE,
+ &yes, sizeof yes) != -1);
+ soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR,
+ &yes, sizeof yes) != -1);
+
+ if (bind (fd, ai->ai_addr, ai->ai_addrlen))
+ error_set (e, "bind: %s", strerror (errno));
+ else if (listen (fd, 16 /* arbitrary number */))
+ error_set (e, "listen: %s", strerror (errno));
+ else
+ return fd;
+
+ xclose (fd);
+ return -1;
+}
+
+static int
+relay_listen_with_context (struct app_context *ctx, struct addrinfo *ai,
+ struct error **e)
+{
+ char *address = gai_reconstruct_address (ai);
+ log_global_debug (ctx, "binding to `#l'", address);
+
+ struct error *error = NULL;
+ int fd = relay_listen (ai, &error);
+ if (fd == -1)
+ {
+ error_set (e, "binding to `%s' failed: %s", address, error->message);
+ error_free (error);
+ }
+ free (address);
+ return fd;
+}
+
+static bool
+relay_start (struct app_context *ctx, char *address, struct error **e)
+{
+ const char *port = NULL, *host = tokenize_host_port (address, &port);
+ if (!port || !*port)
+ return error_set (e, "missing port");
+
+ struct addrinfo hints = {}, *result = NULL;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+
+ int err = getaddrinfo (*host ? host : NULL, port, &hints, &result);
+ if (err)
+ {
+ return error_set (e, "failed to resolve `%s', port `%s': %s: %s",
+ host, port, "getaddrinfo", gai_strerror (err));
+ }
+
+ // Just try the first one, disregarding IPv4/IPv6 ordering.
+ int fd = relay_listen_with_context (ctx, result, e);
+ freeaddrinfo (result);
+ if (fd == -1)
+ return false;
+
+ set_blocking (fd, false);
+
+ struct poller_fd *event = &ctx->relay_event;
+ *event = poller_fd_make (&ctx->poller, fd);
+ event->dispatcher = (poller_fd_fn) on_relay_client_available;
+ event->user_data = ctx;
+
+ ctx->relay_fd = fd;
+ poller_fd_set (event, POLLIN);
+ return true;
+}
+
+static void
+on_config_relay_bind_change (struct config_item *item)
+{
+ struct app_context *ctx = item->user_data;
+ char *value = item->value.string.str;
+ app_context_relay_stop (ctx);
+ if (!value)
+ return;
+
+ // XXX: This should perhaps be reencoded as the locale encoding.
+ char *address = xstrdup (value);
+
+ struct error *e = NULL;
+ if (!relay_start (ctx, address, &e))
+ {
+ log_global_error (ctx, "#s: #l", item->schema->name, e->message);
+ error_free (e);
+ }
+ free (address);
+}
+
// --- Tests -------------------------------------------------------------------
// The application is quite monolithic and can only be partially unit-tested.