aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2025-05-09 22:34:25 +0200
committerPřemysl Eric Janouch <p@janouch.name>2025-05-09 23:12:41 +0200
commitd647405a20932a2ee131b4287c8b99de38b95a8d (patch)
tree88f2753edf05c37405f54201f51c30b10a78b669
parentd572cfeb40b996a898f6d67d273fac2fab37c86e (diff)
downloadxK-d647405a20932a2ee131b4287c8b99de38b95a8d.tar.gz
xK-d647405a20932a2ee131b4287c8b99de38b95a8d.tar.xz
xK-d647405a20932a2ee131b4287c8b99de38b95a8d.zip
WIP: Make the relay acknowledge received commands
xP now slightly throttles activity notifications, and indicates when there are unacknowledged commands. Relay events have been reordered to improve forward compatibility. As can be observed, even the smallest protocol change has great consequences. WIP: - xC: fix connection killing
-rw-r--r--LICENSE2
-rw-r--r--NEWS12
m---------liberty0
-rw-r--r--xA/xA.go9
-rw-r--r--xC.c39
-rw-r--r--xC.lxdr53
-rw-r--r--xM/main.swift7
-rw-r--r--xP/public/xP.js46
-rw-r--r--xT/xT.cpp10
-rw-r--r--xW/xW.cpp10
10 files changed, 137 insertions, 51 deletions
diff --git a/LICENSE b/LICENSE
index d58be36..69c9c4c 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2014 - 2024, Přemysl Eric Janouch <p@janouch.name>
+Copyright (c) 2014 - 2025, Přemysl Eric Janouch <p@janouch.name>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted.
diff --git a/NEWS b/NEWS
index 63870bd..5015c33 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,15 @@
+Unreleased
+
+ * xC: added more characters as nickname delimiters,
+ so that @nick works as a highlight
+
+ * xC: prevented crashes in relay code
+
+ * xP: added a network lag indicator to the user interface
+
+ * Bumped relay protocol version
+
+
2.1.0 (2024-12-19) "Bunnyrific"
* xC: fixed a crash when the channel topic had too many formatting items
diff --git a/liberty b/liberty
-Subproject af889b733e81fa40d7a7ff652386585115e186f
+Subproject 31ae40085206dc365a15fd6e9d13978e392f8b3
diff --git a/xA/xA.go b/xA/xA.go
index 707a280..b4f796c 100644
--- a/xA/xA.go
+++ b/xA/xA.go
@@ -337,9 +337,14 @@ func relaySend(data RelayCommandData, callback callback) bool {
CommandSeq: commandSeq,
Data: data,
}
- if callback != nil {
- commandCallbacks[m.CommandSeq] = callback
+ if callback == nil {
+ callback = func(err string, response *RelayResponseData) {
+ if response == nil {
+ showErrorMessage(err)
+ }
+ }
}
+ commandCallbacks[m.CommandSeq] = callback
commandSeq++
// TODO(p): Handle errors better.
diff --git a/xC.c b/xC.c
index 73ddd12..60da463 100644
--- a/xC.c
+++ b/xC.c
@@ -1818,6 +1818,7 @@ struct client
uint32_t event_seq; ///< Outgoing message counter
bool initialized; ///< Initial sync took place
+ bool shutdown; ///< Shutting down
struct poller_fd socket_event; ///< The socket can be read/written to
};
@@ -4168,9 +4169,7 @@ relay_send (struct client *c)
{
struct relay_event_message *m = &c->ctx->relay_message;
m->event_seq = c->event_seq++;
-
- // TODO: Also don't try sending anything if half-closed.
- if (!c->initialized || c->socket_fd == -1)
+ if (!c->initialized || c->shutdown || c->socket_fd == -1)
return;
// liberty has msg_{reader,writer} already, but they use 8-byte lengths.
@@ -4180,7 +4179,10 @@ relay_send (struct client *c)
|| (frame_len = c->write_buffer.len - frame_len_pos - 4) > UINT32_MAX)
{
print_error ("serialization failed, killing client");
- client_kill (c);
+ // FIXME: This must not be done immediately!
+ //client_kill (c);
+ // TODO: Perhaps set an idle task that collects shutdown clients.
+ c->shutdown = true;
return;
}
@@ -15716,28 +15718,31 @@ client_process_message (struct client *c,
return true;
}
+ bool acknowledge = true;
switch (m->data.command)
{
case RELAY_COMMAND_HELLO:
+ c->initialized = true;
if (m->data.hello.version != RELAY_VERSION)
{
- // TODO: This should send back an error message and shut down.
log_global_error (c->ctx,
"Protocol version mismatch, killing client");
- return false;
+ relay_prepare_error (c->ctx,
+ m->command_seq, "Protocol version mismatch");
+ relay_send (c);
+
+ c->shutdown = true;
+ return true;
}
- c->initialized = true;
client_resync (c);
break;
case RELAY_COMMAND_PING:
- relay_prepare_response (c->ctx, m->command_seq)
- ->data.command = RELAY_COMMAND_PING;
- relay_send (c);
break;
case RELAY_COMMAND_ACTIVE:
reset_autoaway (c->ctx);
break;
case RELAY_COMMAND_BUFFER_COMPLETE:
+ acknowledge = false;
client_process_buffer_complete (c, m->command_seq, buffer,
&m->data.buffer_complete);
break;
@@ -15751,13 +15756,21 @@ client_process_message (struct client *c,
buffer_toggle_unimportant (c->ctx, buffer);
break;
case RELAY_COMMAND_BUFFER_LOG:
+ acknowledge = false;
client_process_buffer_log (c, m->command_seq, buffer);
break;
default:
+ acknowledge = false;
log_global_debug (c->ctx, "Unhandled client command");
relay_prepare_error (c->ctx, m->command_seq, "Unknown command");
relay_send (c);
}
+ if (acknowledge)
+ {
+ relay_prepare_response (c->ctx, m->command_seq)
+ ->data.command = m->data.command;
+ relay_send (c);
+ }
return true;
}
@@ -15851,7 +15864,13 @@ on_client_ready (const struct pollfd *pfd, void *user_data)
{
struct client *c = user_data;
if (client_try_read (c) && client_try_write (c))
+ {
client_update_poller (c, pfd);
+
+ // There must be something in the write buffer if you request shutdown.
+ if (c->shutdown && !c->write_buffer.len)
+ client_kill (c);
+ }
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
diff --git a/xC.lxdr b/xC.lxdr
index af0f170..eba914f 100644
--- a/xC.lxdr
+++ b/xC.lxdr
@@ -1,7 +1,8 @@
// Backwards-compatible protocol version.
-const VERSION = 1;
+const VERSION = 2;
// From the frontend to the relay.
+// All commands receive either an Event.RESPONSE, or an Event.ERROR.
struct CommandMessage {
// The command sequence number will be repeated in responses
// in the respective fields.
@@ -32,13 +33,10 @@ struct CommandMessage {
// XXX: Perhaps this should rather be handled through a /buffer command.
case BUFFER_TOGGLE_UNIMPORTANT:
string buffer_name;
- case PING_RESPONSE:
- u32 event_seq;
-
- // Only these commands may produce Event.RESPONSE, as below,
- // but any command may produce an error.
case PING:
void;
+ case PING_RESPONSE:
+ u32 event_seq;
case BUFFER_COMPLETE:
string buffer_name;
string text;
@@ -52,6 +50,9 @@ struct CommandMessage {
struct EventMessage {
u32 event_seq;
union EventData switch (enum Event {
+ ERROR,
+ RESPONSE,
+
PING,
BUFFER_LINE,
BUFFER_UPDATE,
@@ -64,12 +65,28 @@ struct EventMessage {
SERVER_UPDATE,
SERVER_RENAME,
SERVER_REMOVE,
- ERROR,
- RESPONSE,
} event) {
+ // Restriction: command_seq strictly follows the sequence received
+ // by the relay, across both of these replies.
+ case ERROR:
+ u32 command_seq;
+ string error;
+ case RESPONSE:
+ u32 command_seq;
+ union ResponseData switch (Command command) {
+ case BUFFER_COMPLETE:
+ u32 start;
+ string completions<>;
+ case BUFFER_LOG:
+ // UTF-8, but not guaranteed.
+ u8 log<>;
+ default:
+ // Reception acknowledged.
+ void;
+ } data;
+
case PING:
void;
-
case BUFFER_LINE:
string buffer_name;
// Whether the line should also be displayed in the active buffer.
@@ -188,23 +205,5 @@ struct EventMessage {
string new;
case SERVER_REMOVE:
string server_name;
-
- // Restriction: command_seq strictly follows the sequence received
- // by the relay, across both of these replies.
- case ERROR:
- u32 command_seq;
- string error;
- case RESPONSE:
- u32 command_seq;
- union ResponseData switch (Command command) {
- case PING:
- void;
- case BUFFER_COMPLETE:
- u32 start;
- string completions<>;
- case BUFFER_LOG:
- // UTF-8, but not guaranteed.
- u8 log<>;
- } data;
} data;
};
diff --git a/xM/main.swift b/xM/main.swift
index 48f26c4..39b66c5 100644
--- a/xM/main.swift
+++ b/xM/main.swift
@@ -173,8 +173,11 @@ class RelayRPC {
func send(data: RelayCommandData, callback: Callback? = nil) {
self.commandSeq += 1
let m = RelayCommandMessage(commandSeq: self.commandSeq, data: data)
- if let callback = callback {
- self.commandCallbacks[m.commandSeq] = callback
+ self.commandCallbacks[m.commandSeq] = callback ?? { error, data in
+ if data == nil {
+ NSSound.beep()
+ Logger().warning("\(error)")
+ }
}
var w = RelayWriter()
diff --git a/xP/public/xP.js b/xP/public/xP.js
index 6035db3..33d7d2a 100644
--- a/xP/public/xP.js
+++ b/xP/public/xP.js
@@ -1,4 +1,4 @@
-// Copyright (c) 2022 - 2024, Přemysl Eric Janouch <p@janouch.name>
+// Copyright (c) 2022 - 2025, Přemysl Eric Janouch <p@janouch.name>
// SPDX-License-Identifier: 0BSD
import * as Relay from './proto.js'
@@ -67,18 +67,19 @@ class RelayRPC extends EventTarget {
_processOne(message) {
let e = message.data
+ let p
switch (e.event) {
case Relay.Event.Error:
- if (this.promised[e.commandSeq] !== undefined)
- this.promised[e.commandSeq].reject(e.error)
- else
+ if ((p = this.promised[e.commandSeq]) === undefined)
console.error(`Unawaited error: ${e.error}`)
+ else if (p !== true)
+ p.reject(e.error)
break
case Relay.Event.Response:
- if (this.promised[e.commandSeq] !== undefined)
- this.promised[e.commandSeq].resolve(e.data)
- else
+ if ((p = this.promised[e.commandSeq]) === undefined)
console.error("Unawaited response")
+ else if (p !== true)
+ p.resolve(e.data)
break
default:
e.eventSeq = message.eventSeq
@@ -95,6 +96,13 @@ class RelayRPC extends EventTarget {
this.promised[seq].reject("No response")
delete this.promised[seq]
}
+ m.redraw()
+ }
+
+ get busy() {
+ for (const seq in this.promised)
+ return true
+ return false
}
send(params) {
@@ -110,6 +118,9 @@ class RelayRPC extends EventTarget {
this.ws.send(JSON.stringify({commandSeq: seq, data: params}))
+ this.promised[seq] = true
+ m.redraw()
+
// Automagically detect if we want a result.
let data = undefined
const promise = new Promise(
@@ -191,6 +202,17 @@ let bufferAutoscroll = true
let servers = new Map()
+let lastActive = undefined
+
+function notifyActive() {
+ // Reduce unnecessary traffic.
+ const now = Date.now()
+ if (lastActive === undefined || (now - lastActive >= 5000)) {
+ lastActive = now
+ rpc.send({command: 'Active'})
+ }
+}
+
function bufferResetStats(b) {
b.newMessages = 0
b.newUnimportantMessages = 0
@@ -998,7 +1020,7 @@ let Input = {
onKeyDown: event => {
// TODO: And perhaps on other actions, too.
- rpc.send({command: 'Active'})
+ notifyActive()
let b = buffers.get(bufferCurrent)
if (b === undefined || event.isComposing)
@@ -1103,7 +1125,13 @@ let Main = {
return m('.xP', {}, [
overlay,
- m('.title', {}, [m('b', {}, `xP`), m(Topic)]),
+ m('.title', {}, [
+ m('span', [
+ rpc.busy ? '⋯ ' : undefined,
+ m('b', {}, `xP`),
+ ]),
+ m(Topic),
+ ]),
m('.middle', {}, [m(BufferList), m(BufferContainer)]),
m(Status),
m('.input', {}, [m(Prompt), m(Input)]),
diff --git a/xT/xT.cpp b/xT/xT.cpp
index 72f5892..b708b95 100644
--- a/xT/xT.cpp
+++ b/xT/xT.cpp
@@ -180,6 +180,14 @@ beep()
// --- Networking --------------------------------------------------------------
static void
+on_relay_generic_response(
+ std::wstring error, const Relay::ResponseData *response)
+{
+ if (!response)
+ show_error_message(QString::fromStdWString(error));
+}
+
+static void
relay_send(Relay::CommandData *data, Callback callback = {})
{
Relay::CommandMessage m = {};
@@ -190,6 +198,8 @@ relay_send(Relay::CommandData *data, Callback callback = {})
if (callback)
g.command_callbacks[m.command_seq] = std::move(callback);
+ else
+ g.command_callbacks[m.command_seq] = on_relay_generic_response;
auto len = qToBigEndian<uint32_t>(w.data.size());
auto prefix = reinterpret_cast<const char *>(&len);
diff --git a/xW/xW.cpp b/xW/xW.cpp
index 7fd8950..0840c16 100644
--- a/xW/xW.cpp
+++ b/xW/xW.cpp
@@ -222,6 +222,14 @@ relay_try_write(std::wstring &error)
}
static void
+on_relay_generic_response(
+ std::wstring error, const Relay::ResponseData *response)
+{
+ if (!response)
+ show_error_message(error.c_str());
+}
+
+static void
relay_send(Relay::CommandData *data, Callback callback = {})
{
Relay::CommandMessage m = {};
@@ -232,6 +240,8 @@ relay_send(Relay::CommandData *data, Callback callback = {})
if (callback)
g.command_callbacks[m.command_seq] = std::move(callback);
+ else
+ g.command_callbacks[m.command_seq] = on_relay_generic_response;
uint32_t len = htonl(w.data.size());
uint8_t *prefix = reinterpret_cast<uint8_t *>(&len);