From 840b6467009222f743e3c5ac12c0eaf4090b766f Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch
Date: Sat, 17 Sep 2022 00:22:53 +0200 Subject: xC: reorganize relay code, improve logging Even with one forward function declaration down, it was possible to move most code to a more convenient location. Most logging has thus been fixed to go to buffers. --- xC.c | 452 ++++++++++++++++++++++++++++++++++--------------------------------- 1 file changed, 228 insertions(+), 224 deletions(-) diff --git a/xC.c b/xC.c index 3f64804..e6ece0d 100644 --- a/xC.c +++ b/xC.c @@ -1756,7 +1756,6 @@ client_destroy (struct client *self) } static void client_kill (struct client *c); -static bool client_process_buffer (struct client *c); // ~~~ Server ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -2808,7 +2807,7 @@ serialize_configuration (struct config_item *root, struct str *output) config_item_write (root, true, output); } -// --- Relay plumbing ---------------------------------------------------------- +// --- Relay output ------------------------------------------------------------ static void client_kill (struct client *c) @@ -2822,59 +2821,6 @@ client_kill (struct client *c) client_destroy (c); } -static bool -client_try_read (struct client *c) -{ - struct str *buf = &c->read_buffer; - ssize_t n_read; - - while ((n_read = read (c->socket_fd, buf->str + buf->len, - buf->alloc - buf->len - 1 /* null byte */)) > 0) - { - buf->len += n_read; - if (!client_process_buffer (c)) - break; - str_reserve (buf, 512); - } - - if (n_read < 0) - { - if (errno == EAGAIN || errno == EINTR) - return true; - - print_debug ("%s: %s: %s", __func__, "read", strerror (errno)); - } - - client_kill (c); - return false; -} - -static bool -client_try_write (struct client *c) -{ - struct str *buf = &c->write_buffer; - ssize_t n_written; - - while (buf->len) - { - n_written = write (c->socket_fd, buf->str, buf->len); - if (n_written >= 0) - { - str_remove_slice (buf, 0, n_written); - continue; - } - if (errno == EAGAIN || errno == EINTR) - return true; - - print_debug ("%s: %s: %s", __func__, "write", strerror (errno)); - client_kill (c); - return false; - } - return true; -} - -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - static void client_update_poller (struct client *c, const struct pollfd *pfd) { @@ -2887,177 +2833,8 @@ client_update_poller (struct client *c, const struct pollfd *pfd) poller_fd_set (&c->socket_event, new_events); } -static void -on_client_ready (const struct pollfd *pfd, void *user_data) -{ - struct client *c = user_data; - if (client_try_read (c) && client_try_write (c)) - client_update_poller (c, pfd); -} - -static bool -relay_try_fetch_client (struct app_context *ctx, int listen_fd) -{ - // XXX: `struct sockaddr_storage' is not the most portable thing - struct sockaddr_storage peer; - socklen_t peer_len = sizeof peer; - - int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len); - if (fd == -1) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - return false; - if (errno == EINTR) - return true; - - // TODO: Try to make sure these find their way to the global buffer. - if (accept_error_is_transient (errno)) - { - print_warning ("%s: %s", "accept", strerror (errno)); - return true; - } - - print_error ("%s: %s", "accept", strerror (errno)); - app_context_relay_stop (ctx); - return false; - } - - hard_assert (peer_len <= sizeof peer); - set_blocking (fd, false); - set_cloexec (fd); - - // We already buffer our output, so reduce latencies. - int yes = 1; - soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, - &yes, sizeof yes) != -1); - - struct client *c = client_new (); - c->ctx = ctx; - c->socket_fd = fd; - LIST_PREPEND (ctx->clients, c); - - c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd); - c->socket_event.dispatcher = (poller_fd_fn) on_client_ready; - c->socket_event.user_data = c; - - client_update_poller (c, NULL); - return true; -} - -static void -on_relay_client_available (const struct pollfd *pfd, void *user_data) -{ - struct app_context *ctx = user_data; - while (relay_try_fetch_client (ctx, pfd->fd)) - ; -} - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -static int -relay_listen (struct addrinfo *ai, struct error **e) -{ - int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (fd == -1) - { - error_set (e, "socket: %s", strerror (errno)); - return -1; - } - - set_cloexec (fd); - - int yes = 1; - soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, - &yes, sizeof yes) != -1); - soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, - &yes, sizeof yes) != -1); - - if (bind (fd, ai->ai_addr, ai->ai_addrlen)) - error_set (e, "bind: %s", strerror (errno)); - else if (listen (fd, 16 /* arbitrary number */)) - error_set (e, "listen: %s", strerror (errno)); - else - return fd; - - xclose (fd); - return -1; -} - -static int -relay_listen_with_context (struct addrinfo *ai, struct error **e) -{ - char *address = gai_reconstruct_address (ai); - print_debug ("binding to `%s'", address); - - struct error *error = NULL; - int fd = relay_listen (ai, &error); - if (fd == -1) - { - error_set (e, "binding to `%s' failed: %s", address, error->message); - error_free (error); - } - free (address); - return fd; -} - -static bool -relay_start (struct app_context *ctx, char *address, struct error **e) -{ - const char *port = NULL, *host = tokenize_host_port (address, &port); - if (!port || !*port) - return error_set (e, "missing port"); - - struct addrinfo hints = {}, *result = NULL; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - int err = getaddrinfo (*host ? host : NULL, port, &hints, &result); - if (err) - { - return error_set (e, "failed to resolve `%s', port `%s': %s: %s", - host, port, "getaddrinfo", gai_strerror (err)); - } - - // Just try the first one, disregarding IPv4/IPv6 ordering. - int fd = relay_listen_with_context (result, e); - freeaddrinfo (result); - if (fd == -1) - return false; - - set_blocking (fd, false); - - struct poller_fd *event = &ctx->relay_event; - *event = poller_fd_make (&ctx->poller, fd); - event->dispatcher = (poller_fd_fn) on_relay_client_available; - event->user_data = ctx; - - ctx->relay_fd = fd; - poller_fd_set (event, POLLIN); - return true; -} - -static void -on_config_relay_bind_change (struct config_item *item) -{ - struct app_context *ctx = item->user_data; - char *value = item->value.string.str; - app_context_relay_stop (ctx); - if (!value) - return; - - struct error *e = NULL; - char *address = xstrdup (value); - if (!relay_start (ctx, address, &e)) - { - // TODO: Try to make sure this finds its way to the global buffer. - print_error ("%s: %s", item->schema->name, e->message); - error_free (e); - } - free (address); -} - -// --- Relay output ------------------------------------------------------------ - static void relay_send (struct client *c) { @@ -15637,6 +15414,233 @@ client_process_buffer (struct client *c) return true; } +// --- Relay plumbing ---------------------------------------------------------- + +static bool +client_try_read (struct client *c) +{ + struct str *buf = &c->read_buffer; + ssize_t n_read; + + while ((n_read = read (c->socket_fd, buf->str + buf->len, + buf->alloc - buf->len - 1 /* null byte */)) > 0) + { + buf->len += n_read; + if (!client_process_buffer (c)) + break; + str_reserve (buf, 512); + } + + if (n_read < 0) + { + if (errno == EAGAIN || errno == EINTR) + return true; + + log_global_debug (c->ctx, + "#s: #s: #l", __func__, "read", strerror (errno)); + } + + client_kill (c); + return false; +} + +static bool +client_try_write (struct client *c) +{ + struct str *buf = &c->write_buffer; + ssize_t n_written; + + while (buf->len) + { + n_written = write (c->socket_fd, buf->str, buf->len); + if (n_written >= 0) + { + str_remove_slice (buf, 0, n_written); + continue; + } + if (errno == EAGAIN || errno == EINTR) + return true; + + log_global_debug (c->ctx, + "#s: #s: #l", __func__, "write", strerror (errno)); + client_kill (c); + return false; + } + return true; +} + +static void +on_client_ready (const struct pollfd *pfd, void *user_data) +{ + struct client *c = user_data; + if (client_try_read (c) && client_try_write (c)) + client_update_poller (c, pfd); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static bool +relay_try_fetch_client (struct app_context *ctx, int listen_fd) +{ + // XXX: `struct sockaddr_storage' is not the most portable thing + struct sockaddr_storage peer; + socklen_t peer_len = sizeof peer; + + int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len); + if (fd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return false; + if (errno == EINTR) + return true; + + if (accept_error_is_transient (errno)) + { + log_global_debug (ctx, "#s: #l", "accept", strerror (errno)); + return true; + } + + log_global_error (ctx, "#s: #l", "accept", strerror (errno)); + app_context_relay_stop (ctx); + return false; + } + + hard_assert (peer_len <= sizeof peer); + set_blocking (fd, false); + set_cloexec (fd); + + // We already buffer our output, so reduce latencies. + int yes = 1; + soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, + &yes, sizeof yes) != -1); + + struct client *c = client_new (); + c->ctx = ctx; + c->socket_fd = fd; + LIST_PREPEND (ctx->clients, c); + + c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd); + c->socket_event.dispatcher = (poller_fd_fn) on_client_ready; + c->socket_event.user_data = c; + + client_update_poller (c, NULL); + return true; +} + +static void +on_relay_client_available (const struct pollfd *pfd, void *user_data) +{ + struct app_context *ctx = user_data; + while (relay_try_fetch_client (ctx, pfd->fd)) + ; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static int +relay_listen (struct addrinfo *ai, struct error **e) +{ + int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (fd == -1) + { + error_set (e, "socket: %s", strerror (errno)); + return -1; + } + + set_cloexec (fd); + + int yes = 1; + soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, + &yes, sizeof yes) != -1); + soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, + &yes, sizeof yes) != -1); + + if (bind (fd, ai->ai_addr, ai->ai_addrlen)) + error_set (e, "bind: %s", strerror (errno)); + else if (listen (fd, 16 /* arbitrary number */)) + error_set (e, "listen: %s", strerror (errno)); + else + return fd; + + xclose (fd); + return -1; +} + +static int +relay_listen_with_context (struct app_context *ctx, struct addrinfo *ai, + struct error **e) +{ + char *address = gai_reconstruct_address (ai); + log_global_debug (ctx, "binding to `#l'", address); + + struct error *error = NULL; + int fd = relay_listen (ai, &error); + if (fd == -1) + { + error_set (e, "binding to `%s' failed: %s", address, error->message); + error_free (error); + } + free (address); + return fd; +} + +static bool +relay_start (struct app_context *ctx, char *address, struct error **e) +{ + const char *port = NULL, *host = tokenize_host_port (address, &port); + if (!port || !*port) + return error_set (e, "missing port"); + + struct addrinfo hints = {}, *result = NULL; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + + int err = getaddrinfo (*host ? host : NULL, port, &hints, &result); + if (err) + { + return error_set (e, "failed to resolve `%s', port `%s': %s: %s", + host, port, "getaddrinfo", gai_strerror (err)); + } + + // Just try the first one, disregarding IPv4/IPv6 ordering. + int fd = relay_listen_with_context (ctx, result, e); + freeaddrinfo (result); + if (fd == -1) + return false; + + set_blocking (fd, false); + + struct poller_fd *event = &ctx->relay_event; + *event = poller_fd_make (&ctx->poller, fd); + event->dispatcher = (poller_fd_fn) on_relay_client_available; + event->user_data = ctx; + + ctx->relay_fd = fd; + poller_fd_set (event, POLLIN); + return true; +} + +static void +on_config_relay_bind_change (struct config_item *item) +{ + struct app_context *ctx = item->user_data; + char *value = item->value.string.str; + app_context_relay_stop (ctx); + if (!value) + return; + + // XXX: This should perhaps be reencoded as the locale encoding. + char *address = xstrdup (value); + + struct error *e = NULL; + if (!relay_start (ctx, address, &e)) + { + log_global_error (ctx, "#s: #l", item->schema->name, e->message); + error_free (e); + } + free (address); +} + // --- Tests ------------------------------------------------------------------- // The application is quite monolithic and can only be partially unit-tested. -- cgit v1.2.3-70-g09d2