diff options
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | NEWS | 12 | ||||
m--------- | liberty | 0 | ||||
-rw-r--r-- | xA/xA.go | 9 | ||||
-rw-r--r-- | xC.c | 75 | ||||
-rw-r--r-- | xC.lxdr | 53 | ||||
-rw-r--r-- | xM/main.swift | 7 | ||||
-rw-r--r-- | xN/xN.go | 10 | ||||
-rw-r--r-- | xP/public/xP.js | 46 | ||||
-rw-r--r-- | xR/.gitignore | 2 | ||||
-rw-r--r-- | xR/Makefile | 17 | ||||
-rw-r--r-- | xR/go.mod | 5 | ||||
-rw-r--r-- | xR/xR.adoc | 41 | ||||
-rw-r--r-- | xR/xR.go | 134 | ||||
-rw-r--r-- | xT/xT.cpp | 10 | ||||
-rw-r--r-- | xW/xW.cpp | 10 |
16 files changed, 361 insertions, 72 deletions
@@ -1,4 +1,4 @@ -Copyright (c) 2014 - 2024, Přemysl Eric Janouch <p@janouch.name> +Copyright (c) 2014 - 2025, Přemysl Eric Janouch <p@janouch.name> Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted. @@ -1,3 +1,15 @@ +Unreleased + + * xC: added more characters as nickname delimiters, + so that @nick works as a highlight + + * xC: prevented rare crashes in relay code + + * xP: added a network lag indicator to the user interface + + * Bumped relay protocol version + + 2.1.0 (2024-12-19) "Bunnyrific" * xC: fixed a crash when the channel topic had too many formatting items diff --git a/liberty b/liberty -Subproject af889b733e81fa40d7a7ff652386585115e186f +Subproject 31ae40085206dc365a15fd6e9d13978e392f8b3 @@ -337,9 +337,14 @@ func relaySend(data RelayCommandData, callback callback) bool { CommandSeq: commandSeq, Data: data, } - if callback != nil { - commandCallbacks[m.CommandSeq] = callback + if callback == nil { + callback = func(err string, response *RelayResponseData) { + if response == nil { + showErrorMessage(err) + } + } } + commandCallbacks[m.CommandSeq] = callback commandSeq++ // TODO(p): Handle errors better. @@ -1818,6 +1818,7 @@ struct client uint32_t event_seq; ///< Outgoing message counter bool initialized; ///< Initial sync took place + bool closing; ///< We're closing the connection struct poller_fd socket_event; ///< The socket can be read/written to }; @@ -1875,7 +1876,7 @@ enum server_state IRC_CONNECTED, ///< Trying to register IRC_REGISTERED, ///< We can chat now IRC_CLOSING, ///< Flushing output before shutdown - IRC_HALF_CLOSED ///< Connection shutdown from our side + IRC_HALF_CLOSED ///< Connection shut down from our side }; /// Convert an IRC identifier character to lower-case @@ -2263,14 +2264,6 @@ struct app_context struct str_map servers; ///< Our servers - // Relay: - - int relay_fd; ///< Listening socket FD - struct client *clients; ///< Our relay clients - - /// A single message buffer to prepare all outcoming messages within - struct relay_event_message relay_message; - // Events: struct poller_fd tty_event; ///< Terminal input event @@ -2322,6 +2315,14 @@ struct app_context char *editor_filename; ///< The file being edited by user int terminal_suspended; ///< Terminal suspension level + // Relay: + + int relay_fd; ///< Listening socket FD + struct client *clients; ///< Our relay clients + + /// A single message buffer to prepare all outcoming messages within + struct relay_event_message relay_message; + // Plugins: struct plugin *plugins; ///< Loaded plugins @@ -2392,8 +2393,6 @@ app_context_init (struct app_context *self) self->config = config_make (); poller_init (&self->poller); - self->relay_fd = -1; - self->servers = str_map_make ((str_map_free_fn) server_unref); self->servers.key_xfrm = tolower_ascii_strxfrm; @@ -2417,6 +2416,8 @@ app_context_init (struct app_context *self) self->nick_palette = filter_color_cube_for_acceptable_nick_colors (&self->nick_palette_len); + + self->relay_fd = -1; } static void @@ -4152,8 +4153,11 @@ client_kill (struct client *c) static void client_update_poller (struct client *c, const struct pollfd *pfd) { + // In case of closing without any data in the write buffer, + // we don't actually need to be able to write to the socket, + // but the condition should be quick to satisfy. int new_events = POLLIN; - if (c->write_buffer.len) + if (c->write_buffer.len || c->closing) new_events |= POLLOUT; hard_assert (new_events != 0); @@ -4168,9 +4172,7 @@ relay_send (struct client *c) { struct relay_event_message *m = &c->ctx->relay_message; m->event_seq = c->event_seq++; - - // TODO: Also don't try sending anything if half-closed. - if (!c->initialized || c->socket_fd == -1) + if (!c->initialized || c->closing || c->socket_fd == -1) return; // liberty has msg_{reader,writer} already, but they use 8-byte lengths. @@ -4180,12 +4182,18 @@ relay_send (struct client *c) || (frame_len = c->write_buffer.len - frame_len_pos - 4) > UINT32_MAX) { print_error ("serialization failed, killing client"); - client_kill (c); - return; + + // We can't kill the client immediately, + // because more relay_send() calls may follow. + c->write_buffer.len = frame_len_pos; + c->closing = true; + } + else + { + uint32_t len = htonl (frame_len); + memcpy (c->write_buffer.str + frame_len_pos, &len, sizeof len); } - uint32_t len = htonl (frame_len); - memcpy (c->write_buffer.str + frame_len_pos, &len, sizeof len); client_update_poller (c, NULL); } @@ -15716,28 +15724,31 @@ client_process_message (struct client *c, return true; } + bool acknowledge = true; switch (m->data.command) { case RELAY_COMMAND_HELLO: + c->initialized = true; if (m->data.hello.version != RELAY_VERSION) { - // TODO: This should send back an error message and shut down. log_global_error (c->ctx, "Protocol version mismatch, killing client"); - return false; + relay_prepare_error (c->ctx, + m->command_seq, "Protocol version mismatch"); + relay_send (c); + + c->closing = true; + return true; } - c->initialized = true; client_resync (c); break; case RELAY_COMMAND_PING: - relay_prepare_response (c->ctx, m->command_seq) - ->data.command = RELAY_COMMAND_PING; - relay_send (c); break; case RELAY_COMMAND_ACTIVE: reset_autoaway (c->ctx); break; case RELAY_COMMAND_BUFFER_COMPLETE: + acknowledge = false; client_process_buffer_complete (c, m->command_seq, buffer, &m->data.buffer_complete); break; @@ -15751,13 +15762,21 @@ client_process_message (struct client *c, buffer_toggle_unimportant (c->ctx, buffer); break; case RELAY_COMMAND_BUFFER_LOG: + acknowledge = false; client_process_buffer_log (c, m->command_seq, buffer); break; default: + acknowledge = false; log_global_debug (c->ctx, "Unhandled client command"); relay_prepare_error (c->ctx, m->command_seq, "Unknown command"); relay_send (c); } + if (acknowledge) + { + relay_prepare_response (c->ctx, m->command_seq) + ->data.command = m->data.command; + relay_send (c); + } return true; } @@ -15779,7 +15798,7 @@ client_process_buffer (struct client *c) break; struct relay_command_message m = {}; - bool ok = client_process_message (c, &r, &m); + bool ok = c->closing || client_process_message (c, &r, &m); relay_command_message_free (&m); if (!ok) return false; @@ -15851,7 +15870,11 @@ on_client_ready (const struct pollfd *pfd, void *user_data) { struct client *c = user_data; if (client_try_read (c) && client_try_write (c)) + { client_update_poller (c, pfd); + if (c->closing && !c->write_buffer.len) + client_kill (c); + } } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1,7 +1,8 @@ // Backwards-compatible protocol version. -const VERSION = 1; +const VERSION = 2; // From the frontend to the relay. +// All commands receive either an Event.RESPONSE, or an Event.ERROR. struct CommandMessage { // The command sequence number will be repeated in responses // in the respective fields. @@ -32,13 +33,10 @@ struct CommandMessage { // XXX: Perhaps this should rather be handled through a /buffer command. case BUFFER_TOGGLE_UNIMPORTANT: string buffer_name; - case PING_RESPONSE: - u32 event_seq; - - // Only these commands may produce Event.RESPONSE, as below, - // but any command may produce an error. case PING: void; + case PING_RESPONSE: + u32 event_seq; case BUFFER_COMPLETE: string buffer_name; string text; @@ -52,6 +50,9 @@ struct CommandMessage { struct EventMessage { u32 event_seq; union EventData switch (enum Event { + ERROR, + RESPONSE, + PING, BUFFER_LINE, BUFFER_UPDATE, @@ -64,12 +65,28 @@ struct EventMessage { SERVER_UPDATE, SERVER_RENAME, SERVER_REMOVE, - ERROR, - RESPONSE, } event) { + // Restriction: command_seq strictly follows the sequence received + // by the relay, across both of these replies. + case ERROR: + u32 command_seq; + string error; + case RESPONSE: + u32 command_seq; + union ResponseData switch (Command command) { + case BUFFER_COMPLETE: + u32 start; + string completions<>; + case BUFFER_LOG: + // UTF-8, but not guaranteed. + u8 log<>; + default: + // Reception acknowledged. + void; + } data; + case PING: void; - case BUFFER_LINE: string buffer_name; // Whether the line should also be displayed in the active buffer. @@ -188,23 +205,5 @@ struct EventMessage { string new; case SERVER_REMOVE: string server_name; - - // Restriction: command_seq strictly follows the sequence received - // by the relay, across both of these replies. - case ERROR: - u32 command_seq; - string error; - case RESPONSE: - u32 command_seq; - union ResponseData switch (Command command) { - case PING: - void; - case BUFFER_COMPLETE: - u32 start; - string completions<>; - case BUFFER_LOG: - // UTF-8, but not guaranteed. - u8 log<>; - } data; } data; }; diff --git a/xM/main.swift b/xM/main.swift index 48f26c4..39b66c5 100644 --- a/xM/main.swift +++ b/xM/main.swift @@ -173,8 +173,11 @@ class RelayRPC { func send(data: RelayCommandData, callback: Callback? = nil) { self.commandSeq += 1 let m = RelayCommandMessage(commandSeq: self.commandSeq, data: data) - if let callback = callback { - self.commandCallbacks[m.commandSeq] = callback + self.commandCallbacks[m.commandSeq] = callback ?? { error, data in + if data == nil { + NSSound.beep() + Logger().warning("\(error)") + } } var w = RelayWriter() @@ -247,16 +247,16 @@ func main() { flag.PrintDefaults() } flag.Parse() - if flag.NArg() < 1 { - flag.Usage() - os.Exit(2) - } - if *version { fmt.Printf("%s %s\n", projectName, projectVersion) return } + if flag.NArg() < 1 { + flag.Usage() + os.Exit(2) + } + text, err := io.ReadAll(os.Stdin) if err != nil { log.Fatalln(err) diff --git a/xP/public/xP.js b/xP/public/xP.js index 6035db3..33d7d2a 100644 --- a/xP/public/xP.js +++ b/xP/public/xP.js @@ -1,4 +1,4 @@ -// Copyright (c) 2022 - 2024, Přemysl Eric Janouch <p@janouch.name> +// Copyright (c) 2022 - 2025, Přemysl Eric Janouch <p@janouch.name> // SPDX-License-Identifier: 0BSD import * as Relay from './proto.js' @@ -67,18 +67,19 @@ class RelayRPC extends EventTarget { _processOne(message) { let e = message.data + let p switch (e.event) { case Relay.Event.Error: - if (this.promised[e.commandSeq] !== undefined) - this.promised[e.commandSeq].reject(e.error) - else + if ((p = this.promised[e.commandSeq]) === undefined) console.error(`Unawaited error: ${e.error}`) + else if (p !== true) + p.reject(e.error) break case Relay.Event.Response: - if (this.promised[e.commandSeq] !== undefined) - this.promised[e.commandSeq].resolve(e.data) - else + if ((p = this.promised[e.commandSeq]) === undefined) console.error("Unawaited response") + else if (p !== true) + p.resolve(e.data) break default: e.eventSeq = message.eventSeq @@ -95,6 +96,13 @@ class RelayRPC extends EventTarget { this.promised[seq].reject("No response") delete this.promised[seq] } + m.redraw() + } + + get busy() { + for (const seq in this.promised) + return true + return false } send(params) { @@ -110,6 +118,9 @@ class RelayRPC extends EventTarget { this.ws.send(JSON.stringify({commandSeq: seq, data: params})) + this.promised[seq] = true + m.redraw() + // Automagically detect if we want a result. let data = undefined const promise = new Promise( @@ -191,6 +202,17 @@ let bufferAutoscroll = true let servers = new Map() +let lastActive = undefined + +function notifyActive() { + // Reduce unnecessary traffic. + const now = Date.now() + if (lastActive === undefined || (now - lastActive >= 5000)) { + lastActive = now + rpc.send({command: 'Active'}) + } +} + function bufferResetStats(b) { b.newMessages = 0 b.newUnimportantMessages = 0 @@ -998,7 +1020,7 @@ let Input = { onKeyDown: event => { // TODO: And perhaps on other actions, too. - rpc.send({command: 'Active'}) + notifyActive() let b = buffers.get(bufferCurrent) if (b === undefined || event.isComposing) @@ -1103,7 +1125,13 @@ let Main = { return m('.xP', {}, [ overlay, - m('.title', {}, [m('b', {}, `xP`), m(Topic)]), + m('.title', {}, [ + m('span', [ + rpc.busy ? '⋯ ' : undefined, + m('b', {}, `xP`), + ]), + m(Topic), + ]), m('.middle', {}, [m(BufferList), m(BufferContainer)]), m(Status), m('.input', {}, [m(Prompt), m(Input)]), diff --git a/xR/.gitignore b/xR/.gitignore new file mode 100644 index 0000000..a9766d8 --- /dev/null +++ b/xR/.gitignore @@ -0,0 +1,2 @@ +/xR +/proto.go diff --git a/xR/Makefile b/xR/Makefile new file mode 100644 index 0000000..7fb55c5 --- /dev/null +++ b/xR/Makefile @@ -0,0 +1,17 @@ +.POSIX: +AWK = env LC_ALL=C awk + +tools = ../liberty/tools +generated = proto.go +outputs = xR $(generated) +all: $(outputs) +generate: $(generated) + +proto.go: $(tools)/lxdrgen.awk $(tools)/lxdrgen-go.awk ../xC.lxdr + $(AWK) -f $(tools)/lxdrgen.awk -f $(tools)/lxdrgen-go.awk \ + -v PrefixCamel=Relay ../xC.lxdr > $@ +xR: xR.go ../xK-version $(generated) + go build -ldflags "-X 'main.projectVersion=$$(cat ../xK-version)'" -o $@ \ + -gcflags=all="-N -l" +clean: + rm -f $(outputs) diff --git a/xR/go.mod b/xR/go.mod new file mode 100644 index 0000000..998a18f --- /dev/null +++ b/xR/go.mod @@ -0,0 +1,5 @@ +module janouch.name/xK/xR + +go 1.23.0 + +toolchain go1.24.0 diff --git a/xR/xR.adoc b/xR/xR.adoc new file mode 100644 index 0000000..c3215bd --- /dev/null +++ b/xR/xR.adoc @@ -0,0 +1,41 @@ +xR(1) +===== +:doctype: manpage +:manmanual: xK Manual +:mansource: xK {release-version} + +Name +---- +xR - xC relay protocol analyzer + +Synopsis +-------- +*xR* [_OPTION_]... RELAY-ADDRESS... + +Description +----------- +*xR* connects to an *xC* relay and prints all incoming events one per line +in JSON format. The JSON objects have two additional fields: + +when:: + The time of reception (or sending) as a nanosecond precision + RFC 3339 UTC timestamp. +raw:: + The incoming event (or outgoing command) in raw binary form. + +Options +------- +*-debug*:: + Print any outgoing commands as well, which may help in debugging any issues. + +*-version*:: + Output version information and exit. + +Reporting bugs +-------------- +Use https://git.janouch.name/p/xK to report bugs, request features, +or submit pull requests. + +See also +-------- +*xC*(1) diff --git a/xR/xR.go b/xR/xR.go new file mode 100644 index 0000000..a26832d --- /dev/null +++ b/xR/xR.go @@ -0,0 +1,134 @@ +// Copyright (c) 2025, Přemysl Eric Janouch <p@janouch.name> +// SPDX-License-Identifier: 0BSD + +package main + +import ( + "encoding/binary" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "log" + "net" + "os" + "time" +) + +var ( + debug = flag.Bool("debug", false, "enable debug output") + version = flag.Bool("version", false, "show version and exit") + projectName = "xR" + projectVersion = "?" +) + +func now() string { + return time.Now().UTC().Format(time.RFC3339Nano) +} + +func relayReadFrame(r io.Reader) bool { + var length uint32 + if err := binary.Read( + r, binary.BigEndian, &length); errors.Is(err, io.EOF) { + return false + } else if err != nil { + log.Fatalln("Event receive failed: " + err.Error()) + } + b := make([]byte, length) + if _, err := io.ReadFull(r, b); errors.Is(err, io.EOF) { + return false + } else if err != nil { + log.Fatalln("Event receive failed: " + err.Error()) + } + + m := struct { + When string `json:"when"` + Binary []byte `json:"raw"` + RelayEventMessage + }{ + When: now(), + Binary: b, + } + + if after, ok := m.RelayEventMessage.ConsumeFrom(b); !ok { + log.Println("Event deserialization failed") + } else if len(after) != 0 { + log.Println("Event deserialization failed: trailing data") + return true + } + + j, err := json.Marshal(m) + if err != nil { + log.Fatalln("Event marshalling failed: " + err.Error()) + } + fmt.Printf("%s\n", j) + return true +} + +func run(addressConnect string) { + conn, err := net.Dial("tcp", addressConnect) + if err != nil { + log.Println("Connection failed: " + err.Error()) + return + } + defer conn.Close() + + // We can only support this one protocol version + // that proto.go has been generated for. + m := RelayCommandMessage{CommandSeq: 0, Data: RelayCommandData{ + Variant: &RelayCommandDataHello{Version: RelayVersion}, + }} + + b, ok := m.AppendTo(make([]byte, 4)) + if !ok { + log.Fatalln("Command serialization failed") + } + binary.BigEndian.PutUint32(b[:4], uint32(len(b)-4)) + if _, err := conn.Write(b); err != nil { + log.Fatalln("Command send failed: " + err.Error()) + } + + // You can differentiate the direction by the presence + // of .data.command or .data.event. + if *debug { + j, err := json.Marshal(struct { + When string `json:"when"` + Binary []byte `json:"raw"` + RelayCommandMessage + }{ + When: now(), + Binary: b, + RelayCommandMessage: m, + }) + if err != nil { + log.Fatalln("Command marshalling failed: " + err.Error()) + } + fmt.Printf("%s\n", j) + } + + for relayReadFrame(conn) { + } +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), + "Usage: %s [OPTION...] CONNECT\n\n", os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + if *version { + fmt.Printf("%s %s (relay protocol version %d)\n", + projectName, projectVersion, RelayVersion) + return + } + + if flag.NArg() != 1 { + flag.Usage() + os.Exit(1) + } + + // TODO(p): This program should be able to run as a filter as well. + run(flag.Arg(0)) +} @@ -180,6 +180,14 @@ beep() // --- Networking -------------------------------------------------------------- static void +on_relay_generic_response( + std::wstring error, const Relay::ResponseData *response) +{ + if (!response) + show_error_message(QString::fromStdWString(error)); +} + +static void relay_send(Relay::CommandData *data, Callback callback = {}) { Relay::CommandMessage m = {}; @@ -190,6 +198,8 @@ relay_send(Relay::CommandData *data, Callback callback = {}) if (callback) g.command_callbacks[m.command_seq] = std::move(callback); + else + g.command_callbacks[m.command_seq] = on_relay_generic_response; auto len = qToBigEndian<uint32_t>(w.data.size()); auto prefix = reinterpret_cast<const char *>(&len); @@ -222,6 +222,14 @@ relay_try_write(std::wstring &error) } static void +on_relay_generic_response( + std::wstring error, const Relay::ResponseData *response) +{ + if (!response) + show_error_message(error.c_str()); +} + +static void relay_send(Relay::CommandData *data, Callback callback = {}) { Relay::CommandMessage m = {}; @@ -232,6 +240,8 @@ relay_send(Relay::CommandData *data, Callback callback = {}) if (callback) g.command_callbacks[m.command_seq] = std::move(callback); + else + g.command_callbacks[m.command_seq] = on_relay_generic_response; uint32_t len = htonl(w.data.size()); uint8_t *prefix = reinterpret_cast<uint8_t *>(&len); |