diff options
author | Přemysl Eric Janouch <p@janouch.name> | 2025-05-09 22:34:25 +0200 |
---|---|---|
committer | Přemysl Eric Janouch <p@janouch.name> | 2025-05-09 23:12:41 +0200 |
commit | d647405a20932a2ee131b4287c8b99de38b95a8d (patch) | |
tree | 88f2753edf05c37405f54201f51c30b10a78b669 | |
parent | d572cfeb40b996a898f6d67d273fac2fab37c86e (diff) | |
download | xK-d647405a20932a2ee131b4287c8b99de38b95a8d.tar.gz xK-d647405a20932a2ee131b4287c8b99de38b95a8d.tar.xz xK-d647405a20932a2ee131b4287c8b99de38b95a8d.zip |
WIP: Make the relay acknowledge received commands
xP now slightly throttles activity notifications,
and indicates when there are unacknowledged commands.
Relay events have been reordered to improve forward compatibility.
As can be observed, even the smallest protocol change has
great consequences.
WIP:
- xC: fix connection killing
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | NEWS | 12 | ||||
m--------- | liberty | 0 | ||||
-rw-r--r-- | xA/xA.go | 9 | ||||
-rw-r--r-- | xC.c | 39 | ||||
-rw-r--r-- | xC.lxdr | 53 | ||||
-rw-r--r-- | xM/main.swift | 7 | ||||
-rw-r--r-- | xP/public/xP.js | 46 | ||||
-rw-r--r-- | xT/xT.cpp | 10 | ||||
-rw-r--r-- | xW/xW.cpp | 10 |
10 files changed, 137 insertions, 51 deletions
@@ -1,4 +1,4 @@ -Copyright (c) 2014 - 2024, Přemysl Eric Janouch <p@janouch.name> +Copyright (c) 2014 - 2025, Přemysl Eric Janouch <p@janouch.name> Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted. @@ -1,3 +1,15 @@ +Unreleased + + * xC: added more characters as nickname delimiters, + so that @nick works as a highlight + + * xC: prevented crashes in relay code + + * xP: added a network lag indicator to the user interface + + * Bumped relay protocol version + + 2.1.0 (2024-12-19) "Bunnyrific" * xC: fixed a crash when the channel topic had too many formatting items diff --git a/liberty b/liberty -Subproject af889b733e81fa40d7a7ff652386585115e186f +Subproject 31ae40085206dc365a15fd6e9d13978e392f8b3 @@ -337,9 +337,14 @@ func relaySend(data RelayCommandData, callback callback) bool { CommandSeq: commandSeq, Data: data, } - if callback != nil { - commandCallbacks[m.CommandSeq] = callback + if callback == nil { + callback = func(err string, response *RelayResponseData) { + if response == nil { + showErrorMessage(err) + } + } } + commandCallbacks[m.CommandSeq] = callback commandSeq++ // TODO(p): Handle errors better. @@ -1818,6 +1818,7 @@ struct client uint32_t event_seq; ///< Outgoing message counter bool initialized; ///< Initial sync took place + bool shutdown; ///< Shutting down struct poller_fd socket_event; ///< The socket can be read/written to }; @@ -4168,9 +4169,7 @@ relay_send (struct client *c) { struct relay_event_message *m = &c->ctx->relay_message; m->event_seq = c->event_seq++; - - // TODO: Also don't try sending anything if half-closed. - if (!c->initialized || c->socket_fd == -1) + if (!c->initialized || c->shutdown || c->socket_fd == -1) return; // liberty has msg_{reader,writer} already, but they use 8-byte lengths. @@ -4180,7 +4179,10 @@ relay_send (struct client *c) || (frame_len = c->write_buffer.len - frame_len_pos - 4) > UINT32_MAX) { print_error ("serialization failed, killing client"); - client_kill (c); + // FIXME: This must not be done immediately! + //client_kill (c); + // TODO: Perhaps set an idle task that collects shutdown clients. + c->shutdown = true; return; } @@ -15716,28 +15718,31 @@ client_process_message (struct client *c, return true; } + bool acknowledge = true; switch (m->data.command) { case RELAY_COMMAND_HELLO: + c->initialized = true; if (m->data.hello.version != RELAY_VERSION) { - // TODO: This should send back an error message and shut down. log_global_error (c->ctx, "Protocol version mismatch, killing client"); - return false; + relay_prepare_error (c->ctx, + m->command_seq, "Protocol version mismatch"); + relay_send (c); + + c->shutdown = true; + return true; } - c->initialized = true; client_resync (c); break; case RELAY_COMMAND_PING: - relay_prepare_response (c->ctx, m->command_seq) - ->data.command = RELAY_COMMAND_PING; - relay_send (c); break; case RELAY_COMMAND_ACTIVE: reset_autoaway (c->ctx); break; case RELAY_COMMAND_BUFFER_COMPLETE: + acknowledge = false; client_process_buffer_complete (c, m->command_seq, buffer, &m->data.buffer_complete); break; @@ -15751,13 +15756,21 @@ client_process_message (struct client *c, buffer_toggle_unimportant (c->ctx, buffer); break; case RELAY_COMMAND_BUFFER_LOG: + acknowledge = false; client_process_buffer_log (c, m->command_seq, buffer); break; default: + acknowledge = false; log_global_debug (c->ctx, "Unhandled client command"); relay_prepare_error (c->ctx, m->command_seq, "Unknown command"); relay_send (c); } + if (acknowledge) + { + relay_prepare_response (c->ctx, m->command_seq) + ->data.command = m->data.command; + relay_send (c); + } return true; } @@ -15851,7 +15864,13 @@ on_client_ready (const struct pollfd *pfd, void *user_data) { struct client *c = user_data; if (client_try_read (c) && client_try_write (c)) + { client_update_poller (c, pfd); + + // There must be something in the write buffer if you request shutdown. + if (c->shutdown && !c->write_buffer.len) + client_kill (c); + } } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1,7 +1,8 @@ // Backwards-compatible protocol version. -const VERSION = 1; +const VERSION = 2; // From the frontend to the relay. +// All commands receive either an Event.RESPONSE, or an Event.ERROR. struct CommandMessage { // The command sequence number will be repeated in responses // in the respective fields. @@ -32,13 +33,10 @@ struct CommandMessage { // XXX: Perhaps this should rather be handled through a /buffer command. case BUFFER_TOGGLE_UNIMPORTANT: string buffer_name; - case PING_RESPONSE: - u32 event_seq; - - // Only these commands may produce Event.RESPONSE, as below, - // but any command may produce an error. case PING: void; + case PING_RESPONSE: + u32 event_seq; case BUFFER_COMPLETE: string buffer_name; string text; @@ -52,6 +50,9 @@ struct CommandMessage { struct EventMessage { u32 event_seq; union EventData switch (enum Event { + ERROR, + RESPONSE, + PING, BUFFER_LINE, BUFFER_UPDATE, @@ -64,12 +65,28 @@ struct EventMessage { SERVER_UPDATE, SERVER_RENAME, SERVER_REMOVE, - ERROR, - RESPONSE, } event) { + // Restriction: command_seq strictly follows the sequence received + // by the relay, across both of these replies. + case ERROR: + u32 command_seq; + string error; + case RESPONSE: + u32 command_seq; + union ResponseData switch (Command command) { + case BUFFER_COMPLETE: + u32 start; + string completions<>; + case BUFFER_LOG: + // UTF-8, but not guaranteed. + u8 log<>; + default: + // Reception acknowledged. + void; + } data; + case PING: void; - case BUFFER_LINE: string buffer_name; // Whether the line should also be displayed in the active buffer. @@ -188,23 +205,5 @@ struct EventMessage { string new; case SERVER_REMOVE: string server_name; - - // Restriction: command_seq strictly follows the sequence received - // by the relay, across both of these replies. - case ERROR: - u32 command_seq; - string error; - case RESPONSE: - u32 command_seq; - union ResponseData switch (Command command) { - case PING: - void; - case BUFFER_COMPLETE: - u32 start; - string completions<>; - case BUFFER_LOG: - // UTF-8, but not guaranteed. - u8 log<>; - } data; } data; }; diff --git a/xM/main.swift b/xM/main.swift index 48f26c4..39b66c5 100644 --- a/xM/main.swift +++ b/xM/main.swift @@ -173,8 +173,11 @@ class RelayRPC { func send(data: RelayCommandData, callback: Callback? = nil) { self.commandSeq += 1 let m = RelayCommandMessage(commandSeq: self.commandSeq, data: data) - if let callback = callback { - self.commandCallbacks[m.commandSeq] = callback + self.commandCallbacks[m.commandSeq] = callback ?? { error, data in + if data == nil { + NSSound.beep() + Logger().warning("\(error)") + } } var w = RelayWriter() diff --git a/xP/public/xP.js b/xP/public/xP.js index 6035db3..33d7d2a 100644 --- a/xP/public/xP.js +++ b/xP/public/xP.js @@ -1,4 +1,4 @@ -// Copyright (c) 2022 - 2024, Přemysl Eric Janouch <p@janouch.name> +// Copyright (c) 2022 - 2025, Přemysl Eric Janouch <p@janouch.name> // SPDX-License-Identifier: 0BSD import * as Relay from './proto.js' @@ -67,18 +67,19 @@ class RelayRPC extends EventTarget { _processOne(message) { let e = message.data + let p switch (e.event) { case Relay.Event.Error: - if (this.promised[e.commandSeq] !== undefined) - this.promised[e.commandSeq].reject(e.error) - else + if ((p = this.promised[e.commandSeq]) === undefined) console.error(`Unawaited error: ${e.error}`) + else if (p !== true) + p.reject(e.error) break case Relay.Event.Response: - if (this.promised[e.commandSeq] !== undefined) - this.promised[e.commandSeq].resolve(e.data) - else + if ((p = this.promised[e.commandSeq]) === undefined) console.error("Unawaited response") + else if (p !== true) + p.resolve(e.data) break default: e.eventSeq = message.eventSeq @@ -95,6 +96,13 @@ class RelayRPC extends EventTarget { this.promised[seq].reject("No response") delete this.promised[seq] } + m.redraw() + } + + get busy() { + for (const seq in this.promised) + return true + return false } send(params) { @@ -110,6 +118,9 @@ class RelayRPC extends EventTarget { this.ws.send(JSON.stringify({commandSeq: seq, data: params})) + this.promised[seq] = true + m.redraw() + // Automagically detect if we want a result. let data = undefined const promise = new Promise( @@ -191,6 +202,17 @@ let bufferAutoscroll = true let servers = new Map() +let lastActive = undefined + +function notifyActive() { + // Reduce unnecessary traffic. + const now = Date.now() + if (lastActive === undefined || (now - lastActive >= 5000)) { + lastActive = now + rpc.send({command: 'Active'}) + } +} + function bufferResetStats(b) { b.newMessages = 0 b.newUnimportantMessages = 0 @@ -998,7 +1020,7 @@ let Input = { onKeyDown: event => { // TODO: And perhaps on other actions, too. - rpc.send({command: 'Active'}) + notifyActive() let b = buffers.get(bufferCurrent) if (b === undefined || event.isComposing) @@ -1103,7 +1125,13 @@ let Main = { return m('.xP', {}, [ overlay, - m('.title', {}, [m('b', {}, `xP`), m(Topic)]), + m('.title', {}, [ + m('span', [ + rpc.busy ? '⋯ ' : undefined, + m('b', {}, `xP`), + ]), + m(Topic), + ]), m('.middle', {}, [m(BufferList), m(BufferContainer)]), m(Status), m('.input', {}, [m(Prompt), m(Input)]), @@ -180,6 +180,14 @@ beep() // --- Networking -------------------------------------------------------------- static void +on_relay_generic_response( + std::wstring error, const Relay::ResponseData *response) +{ + if (!response) + show_error_message(QString::fromStdWString(error)); +} + +static void relay_send(Relay::CommandData *data, Callback callback = {}) { Relay::CommandMessage m = {}; @@ -190,6 +198,8 @@ relay_send(Relay::CommandData *data, Callback callback = {}) if (callback) g.command_callbacks[m.command_seq] = std::move(callback); + else + g.command_callbacks[m.command_seq] = on_relay_generic_response; auto len = qToBigEndian<uint32_t>(w.data.size()); auto prefix = reinterpret_cast<const char *>(&len); @@ -222,6 +222,14 @@ relay_try_write(std::wstring &error) } static void +on_relay_generic_response( + std::wstring error, const Relay::ResponseData *response) +{ + if (!response) + show_error_message(error.c_str()); +} + +static void relay_send(Relay::CommandData *data, Callback callback = {}) { Relay::CommandMessage m = {}; @@ -232,6 +240,8 @@ relay_send(Relay::CommandData *data, Callback callback = {}) if (callback) g.command_callbacks[m.command_seq] = std::move(callback); + else + g.command_callbacks[m.command_seq] = on_relay_generic_response; uint32_t len = htonl(w.data.size()); uint8_t *prefix = reinterpret_cast<uint8_t *>(&len); |