aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2022-09-15 22:45:14 +0200
committerPřemysl Eric Janouch <p@janouch.name>2022-09-16 00:51:11 +0200
commit6f39aa66156f53b27e3b9cfe8457fc2f64129e56 (patch)
tree0119fd13384bdcdffacfb9d05e10616e6342ca15
parente87cc90b5e20ba90d4f5c9ea349d9cf41b5ae58c (diff)
downloadxK-6f39aa66156f53b27e3b9cfe8457fc2f64129e56.tar.gz
xK-6f39aa66156f53b27e3b9cfe8457fc2f64129e56.tar.xz
xK-6f39aa66156f53b27e3b9cfe8457fc2f64129e56.zip
xP: use the binary protocol for incoming events
And batch event messages together as much as possible. JSON has proven itself to be really slow (for example, encoding/json.Marshaler is a slow interface), and browsers have significant overhead per WS message. Commands are still sent as JSON, sending them in binary would be a laborious rewrite without measurable merits. The xP server now only prints debug output when requested, because that was another source of major slowdowns.
-rw-r--r--xC-gen-proto-js.awk223
-rw-r--r--xP/.gitignore1
-rw-r--r--xP/Makefile4
-rw-r--r--xP/public/xP.js200
-rw-r--r--xP/xP.go122
5 files changed, 412 insertions, 138 deletions
diff --git a/xC-gen-proto-js.awk b/xC-gen-proto-js.awk
new file mode 100644
index 0000000..752fd18
--- /dev/null
+++ b/xC-gen-proto-js.awk
@@ -0,0 +1,223 @@
+# xC-gen-proto-js.awk: Javascript backend for xC-gen-proto.awk.
+#
+# Copyright (c) 2022, Přemysl Eric Janouch <p@janouch.name>
+# SPDX-License-Identifier: 0BSD
+#
+# This backend is currently for decoding the binary format only.
+# (JSON is way too expensive to process and transfer.)
+#
+# Import the resulting script as a Javascript module.
+
+function define_internal(name) {
+ Types[name] = "internal"
+}
+
+function define_sint(size, shortname) {
+ shortname = "i" size
+ define_internal(shortname)
+ CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n"
+
+ print ""
+ print "\t" shortname "() {"
+ if (size == "64") {
+ # XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA.
+ print "\t\tconst " shortname \
+ " = Number(this.getBigInt" size "(this.offset))"
+ } else {
+ print "\t\tconst " shortname " = this.getInt" size "(this.offset)"
+ }
+ print "\t\tthis.offset += " (size / 8)
+ print "\t\treturn " shortname
+ print "\t}"
+}
+
+function define_uint(size, shortname) {
+ shortname = "u" size
+ define_internal(shortname)
+ CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n"
+
+ print ""
+ print "\t" shortname "() {"
+ if (size == "64") {
+ # XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA.
+ print "\t\tconst " shortname \
+ " = Number(this.getBigUint" size "(this.offset))"
+ } else {
+ print "\t\tconst " shortname " = this.getUint" size "(this.offset)"
+ }
+ print "\t\tthis.offset += " (size / 8)
+ print "\t\treturn " shortname
+ print "\t}"
+}
+
+function codegen_begin() {
+ print "export class Reader extends DataView {"
+ print "\tconstructor() {"
+ print "\t\tsuper(...arguments)"
+ print "\t\tthis.offset = 0"
+ print "\t\tthis.decoder = new TextDecoder('utf-8', {fatal: true})"
+ print "\t}"
+ print ""
+ print "\tget empty() {"
+ print "\t\treturn this.byteLength <= this.offset"
+ print "\t}"
+ print ""
+ print "\trequire(len) {"
+ print "\t\tif (this.byteLength - this.offset < len)"
+ print "\t\t\tthrow `Premature end of data`"
+ print "\t\treturn this.byteOffset + this.offset"
+ print "\t}"
+
+ define_internal("string")
+ CodegenDeserialize["string"] = "\t%s = r.string()\n"
+
+ print ""
+ print "\tstring() {"
+ print "\t\tconst len = this.getUint32(this.offset)"
+ print "\t\tthis.offset += 4"
+ print "\t\tconst array = new Uint8Array("
+ print "\t\t\tthis.buffer, this.require(len), len)"
+ print "\t\tthis.offset += len"
+ print "\t\treturn this.decoder.decode(array)"
+ print "\t}"
+
+ define_internal("bool")
+ CodegenDeserialize["bool"] = "\t%s = r.bool()\n"
+
+ print ""
+ print "\tbool() {"
+ print "\t\tconst u8 = this.getUint8(this.offset)"
+ print "\t\tthis.offset += 1"
+ print "\t\treturn u8 != 0"
+ print "\t}"
+
+ define_sint("8")
+ define_sint("16")
+ define_sint("32")
+ define_sint("64")
+ define_uint("8")
+ define_uint("16")
+ define_uint("32")
+ define_uint("64")
+
+ print "}"
+}
+
+function codegen_constant(name, value) {
+ print ""
+ print "export const " decapitalize(snaketocamel(name)) " = " value
+}
+
+function codegen_enum_value(name, subname, value, cg) {
+ append(cg, "fields", "\t" snaketocamel(subname) ": " value ",\n")
+}
+
+function codegen_enum(name, cg) {
+ print ""
+ print "export const " name " = Object.freeze({"
+ print cg["fields"] "})"
+
+ CodegenDeserialize[name] = "\t%s = r.i8()\n"
+ for (i in cg)
+ delete cg[i]
+}
+
+function codegen_struct_field(d, cg, camel, f, deserialize) {
+ camel = decapitalize(snaketocamel(d["name"]))
+ f = "s." camel
+ append(cg, "fields", "\t" camel "\n")
+
+ deserialize = CodegenDeserialize[d["type"]]
+ if (!d["isarray"]) {
+ append(cg, "deserialize", sprintf(deserialize, f))
+ return
+ }
+
+ append(cg, "deserialize",
+ "\t{\n" \
+ indent(sprintf(CodegenDeserialize["u32"], "const len")))
+ if (d["type"] == "u8") {
+ append(cg, "deserialize",
+ "\t\t" f " = new Uint8Array(\n" \
+ "\t\t\tr.buffer, r.require(len), len)\n" \
+ "\t\tr.offset += len\n" \
+ "\t}\n")
+ return
+ }
+ if (d["type"] == "i8") {
+ append(cg, "deserialize",
+ "\t\t" f " = new Int8Array(\n" \
+ "\t\t\tr.buffer, r.require(len), len)\n" \
+ "\t\tr.offset += len\n" \
+ "\t}\n")
+ return
+ }
+
+ append(cg, "deserialize",
+ "\t\t" f " = new Array(len)\n" \
+ "\t}\n" \
+ "\tfor (let i = 0; i < " f ".length; i++)\n" \
+ indent(sprintf(deserialize, f "[i]")))
+}
+
+function codegen_struct_tag(d, cg) {
+ append(cg, "fields", "\t" decapitalize(snaketocamel(d["name"])) "\n")
+ # Do not deserialize here, that is already done by the containing union.
+}
+
+function codegen_struct(name, cg) {
+ print ""
+ print "export class " name " {"
+ print cg["fields"] cg["methods"]
+ print "\tstatic deserialize(r) {"
+ print "\t\tconst s = new " name "()"
+ print indent(cg["deserialize"]) "\t\treturn s"
+ print "\t}"
+ print "}"
+
+ CodegenDeserialize[name] = "\t%s = " name ".deserialize(r)\n"
+ for (i in cg)
+ delete cg[i]
+}
+
+function codegen_union_tag(d, cg) {
+ cg["tagtype"] = d["type"]
+ cg["tagname"] = d["name"]
+}
+
+function codegen_union_struct(name, casename, cg, scg, structname) {
+ append(scg, "methods",
+ "\n" \
+ "\tconstructor() {\n" \
+ "\t\tthis." decapitalize(snaketocamel(cg["tagname"])) \
+ " = " cg["tagtype"] "." snaketocamel(casename) "\n" \
+ "\t}\n")
+
+ # And thus not all generated structs are present in Types.
+ structname = name snaketocamel(casename)
+ codegen_struct(structname, scg)
+
+ append(cg, "deserialize",
+ "\tcase " cg["tagtype"] "." snaketocamel(casename) ":\n" \
+ "\t{\n" \
+ indent(sprintf(CodegenDeserialize[structname], "const s")) \
+ "\t\treturn s\n" \
+ "\t}\n")
+}
+
+function codegen_union(name, cg, tagvar) {
+ tagvar = decapitalize(snaketocamel(cg["tagname"]))
+
+ print ""
+ print "export function deserialize" name "(r) {"
+ print sprintf(CodegenDeserialize[cg["tagtype"]], "const " tagvar) \
+ "\tswitch (" tagvar ") {"
+ print cg["deserialize"] "\tdefault:"
+ print "\t\tthrow `Unknown " cg["tagtype"] " (${tagvar})`"
+ print "\t}"
+ print "}"
+
+ CodegenDeserialize[name] = "\t%s = deserialize" name "(r)\n"
+ for (i in cg)
+ delete cg[i]
+}
diff --git a/xP/.gitignore b/xP/.gitignore
index 68c09f0..ba4d8c3 100644
--- a/xP/.gitignore
+++ b/xP/.gitignore
@@ -1,3 +1,4 @@
/xP
/proto.go
+/public/proto.js
/public/mithril.js
diff --git a/xP/Makefile b/xP/Makefile
index 3c52146..eb0c8f5 100644
--- a/xP/Makefile
+++ b/xP/Makefile
@@ -1,13 +1,15 @@
.POSIX:
.SUFFIXES:
-outputs = xP proto.go public/mithril.js
+outputs = xP proto.go public/proto.js public/mithril.js
all: $(outputs)
xP: xP.go proto.go
go build -o $@
proto.go: ../xC-gen-proto.awk ../xC-gen-proto-go.awk ../xC-proto
awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-go.awk ../xC-proto > $@
+public/proto.js: ../xC-gen-proto.awk ../xC-gen-proto-js.awk ../xC-proto
+ awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-js.awk ../xC-proto > $@
public/mithril.js:
curl -Lo $@ https://unpkg.com/mithril/mithril.js
clean:
diff --git a/xP/public/xP.js b/xP/public/xP.js
index d302a86..1362e22 100644
--- a/xP/public/xP.js
+++ b/xP/public/xP.js
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Přemysl Eric Janouch <p@janouch.name>
// SPDX-License-Identifier: 0BSD
-'use strict'
+import * as Relay from './proto.js'
// ---- RPC --------------------------------------------------------------------
@@ -31,6 +31,7 @@ class RelayRpc extends EventTarget {
}
_initialize() {
+ this.ws.binaryType = 'arraybuffer'
this.ws.onopen = undefined
this.ws.onmessage = event => {
this._process(event.data)
@@ -56,33 +57,30 @@ class RelayRpc extends EventTarget {
}
_process(data) {
- if (typeof data !== 'string')
- throw "Binary messages not supported"
+ if (typeof data === 'string')
+ throw "JSON messages not supported"
- let message = JSON.parse(data)
- if (typeof message !== 'object')
- throw "Invalid message"
- let e = message.data
- if (typeof e !== 'object')
- throw "Invalid message"
+ const r = new Relay.Reader(data)
+ while (!r.empty)
+ this._processOne(Relay.EventMessage.deserialize(r))
+ }
+ _processOne(message) {
+ let e = message.data
switch (e.event) {
- case 'Error':
+ case Relay.Event.Error:
if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].reject(e.error)
else
console.error("Unawaited error")
break
- case 'Response':
+ case Relay.Event.Response:
if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].resolve(e.data)
else
console.error("Unawaited response")
break
default:
- if (typeof e.event !== 'string')
- throw "Invalid event tag"
-
e.eventSeq = message.eventSeq
this.dispatchEvent(new CustomEvent('event', {detail: e}))
return
@@ -115,13 +113,6 @@ class RelayRpc extends EventTarget {
this.promised[seq] = {resolve, reject}
})
}
-
- base64decode(str) {
- const text = atob(str), bytes = new Uint8Array(text.length)
- for (let i = 0; i < text.length; i++)
- bytes[i] = text.charCodeAt(i)
- return new TextDecoder().decode(bytes)
- }
}
// ---- Utilities --------------------------------------------------------------
@@ -183,7 +174,7 @@ function updateIcon(highlighted) {
// ---- Event processing -------------------------------------------------------
let rpc = new RelayRpc(proxy)
-let rpcEventHandlers = {}
+let rpcEventHandlers = new Map()
let buffers = new Map()
let bufferLast = undefined
@@ -221,7 +212,7 @@ function bufferToggleLog() {
if (bufferCurrent !== name)
return
- bufferLog = rpc.base64decode(resp.log)
+ bufferLog = utf8Decode(resp.log)
m.redraw()
})
}
@@ -236,7 +227,7 @@ rpc.connect().then(result => {
servers.clear()
- rpc.send({command: 'Hello', version: 1})
+ rpc.send({command: 'Hello', version: Relay.version})
connecting = false
m.redraw()
}).catch(error => {
@@ -249,10 +240,11 @@ rpc.addEventListener('close', event => {
})
rpc.addEventListener('event', event => {
- const handler = rpcEventHandlers[event.detail.event]
+ const handler = rpcEventHandlers.get(event.detail.event)
if (handler !== undefined) {
handler(event.detail)
- if (bufferCurrent !== undefined || event.detail.event !== 'BufferLine')
+ if (bufferCurrent !== undefined ||
+ event.detail.event !== Relay.Event.BufferLine)
m.redraw()
}
})
@@ -263,7 +255,7 @@ rpcEventHandlers['Ping'] = e => {
// ~~~ Buffer events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-rpcEventHandlers['BufferUpdate'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferUpdate, e => {
let b = buffers.get(e.bufferName)
if (b === undefined) {
buffers.set(e.bufferName, (b = {
@@ -277,9 +269,9 @@ rpcEventHandlers['BufferUpdate'] = e => {
b.hideUnimportant = e.hideUnimportant
b.kind = e.context.kind
b.server = servers.get(e.context.serverName)
-}
+})
-rpcEventHandlers['BufferStats'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferStats, e => {
let b = buffers.get(e.bufferName)
if (b === undefined)
return
@@ -287,20 +279,20 @@ rpcEventHandlers['BufferStats'] = e => {
b.newMessages = e.newMessages,
b.newUnimportantMessages = e.newUnimportantMessages
b.highlighted = e.highlighted
-}
+})
-rpcEventHandlers['BufferRename'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferRename, e => {
buffers.set(e.new, buffers.get(e.bufferName))
buffers.delete(e.bufferName)
-}
+})
-rpcEventHandlers['BufferRemove'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferRemove, e => {
buffers.delete(e.bufferName)
if (e.bufferName === bufferLast)
bufferLast = undefined
-}
+})
-rpcEventHandlers['BufferActivate'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferActivate, e => {
let old = buffers.get(bufferCurrent)
if (old !== undefined)
bufferResetStats(old)
@@ -333,9 +325,9 @@ rpcEventHandlers['BufferActivate'] = e => {
textarea.value = b.input
textarea.setSelectionRange(b.inputStart, b.inputEnd, b.inputDirection)
}
-}
+})
-rpcEventHandlers['BufferLine'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferLine, e => {
let b = buffers.get(e.bufferName), line = {...e}
delete line.event
delete line.eventSeq
@@ -372,37 +364,37 @@ rpcEventHandlers['BufferLine'] = e => {
}
}
- if (line.isHighlight ||
- (!visible && b.kind === 'PrivateMessage' && !line.isUnimportant)) {
+ if (line.isHighlight || (!visible && !line.isUnimportant &&
+ b.kind === Relay.BufferKind.PrivateMessage)) {
beep()
if (!visible)
b.highlighted = true
}
-}
+})
-rpcEventHandlers['BufferClear'] = e => {
+rpcEventHandlers.set(Relay.Event.BufferClear, e => {
let b = buffers.get(e.bufferName)
if (b !== undefined)
b.lines.length = 0
-}
+})
// ~~~ Server events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-rpcEventHandlers['ServerUpdate'] = e => {
+rpcEventHandlers.set(Relay.Event.ServerUpdate, e => {
let s = servers.get(e.serverName)
if (s === undefined)
servers.set(e.serverName, (s = {}))
s.state = e.state
-}
+})
-rpcEventHandlers['ServerRename'] = e => {
+rpcEventHandlers.set(Relay.Event.ServerRename, e => {
servers.set(e.new, servers.get(e.serverName))
servers.delete(e.serverName)
-}
+})
-rpcEventHandlers['ServerRemove'] = e => {
+rpcEventHandlers.set(Relay.Event.ServerRemove, e => {
servers.delete(e.serverName)
-}
+})
// --- Colours -----------------------------------------------------------------
@@ -499,18 +491,19 @@ let Content = {
return a
},
- view: vnode => {
- let line = vnode.children[0]
- let mark = undefined
+ makeMark: line => {
switch (line.rendition) {
- case 'Indent': mark = m('span.mark', {}, ''); break
- case 'Status': mark = m('span.mark', {}, '–'); break
- case 'Error': mark = m('span.mark.error', {}, '⚠'); break
- case 'Join': mark = m('span.mark.join', {}, '→'); break
- case 'Part': mark = m('span.mark.part', {}, '←'); break
- case 'Action': mark = m('span.mark.action', {}, '✶'); break
+ case Relay.Rendition.Indent: return m('span.mark', {}, '')
+ case Relay.Rendition.Status: return m('span.mark', {}, '–')
+ case Relay.Rendition.Error: return m('span.mark.error', {}, '⚠')
+ case Relay.Rendition.Join: return m('span.mark.join', {}, '→')
+ case Relay.Rendition.Part: return m('span.mark.part', {}, '←')
+ case Relay.Rendition.Action: return m('span.mark.action', {}, '✶')
}
+ },
+ view: vnode => {
+ let line = vnode.children[0]
let classes = new Set()
let flip = c => {
if (classes.has(c))
@@ -518,45 +511,49 @@ let Content = {
else
classes.add(c)
}
+
let fg = -1, bg = -1, inverse = false
- return m('.content', vnode.attrs, [mark, line.items.flatMap(item => {
- switch (item.kind) {
- case 'Text':
- return Content.linkify(item.text, {
- class: Array.from(classes.keys()).join(' '),
- style: Content.applyColor(fg, bg, inverse),
- })
- case 'Reset':
- classes.clear()
- fg = bg = -1
- inverse = false
- break
- case 'FgColor':
- fg = item.color
- break
- case 'BgColor':
- bg = item.color
- break
- case 'FlipInverse':
- inverse = !inverse
- break
- case 'FlipBold':
- flip('b')
- break
- case 'FlipItalic':
- flip('i')
- break
- case 'FlipUnderline':
- flip('u')
- break
- case 'FlipCrossedOut':
- flip('s')
- break
- case 'FlipMonospace':
- flip('m')
- break
- }
- })])
+ return m('.content', vnode.attrs, [
+ Content.makeMark(line),
+ line.items.flatMap(item => {
+ switch (item.kind) {
+ case Relay.Item.Text:
+ return Content.linkify(item.text, {
+ class: Array.from(classes.keys()).join(' '),
+ style: Content.applyColor(fg, bg, inverse),
+ })
+ case Relay.Item.Reset:
+ classes.clear()
+ fg = bg = -1
+ inverse = false
+ break
+ case Relay.Item.FgColor:
+ fg = item.color
+ break
+ case Relay.Item.BgColor:
+ bg = item.color
+ break
+ case Relay.Item.FlipInverse:
+ inverse = !inverse
+ break
+ case Relay.Item.FlipBold:
+ flip('b')
+ break
+ case Relay.Item.FlipItalic:
+ flip('i')
+ break
+ case Relay.Item.FlipUnderline:
+ flip('u')
+ break
+ case Relay.Item.FlipCrossedOut:
+ flip('s')
+ break
+ case Relay.Item.FlipMonospace:
+ flip('m')
+ break
+ }
+ }),
+ ])
},
}
@@ -669,8 +666,17 @@ let Status = {
let status = `${bufferCurrent}`
if (b.hideUnimportant)
status += `<H>`
- if (b.server !== undefined)
- status += ` (${b.server.state})`
+
+ // This should be handled differently, so don't mind the lookup.
+ if (b.server !== undefined) {
+ let state = b.server.state
+ for (const s in Relay.ServerState)
+ if (Relay.ServerState[s] == b.server.state) {
+ state = s
+ break
+ }
+ status += ` (${state})`
+ }
return m('.status', {}, status)
},
}
diff --git a/xP/xP.go b/xP/xP.go
index 7e0285c..20117b2 100644
--- a/xP/xP.go
+++ b/xP/xP.go
@@ -8,6 +8,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
+ "flag"
"fmt"
"html/template"
"io"
@@ -21,6 +22,8 @@ import (
)
var (
+ debug = flag.Bool("debug", false, "enable debug output")
+
addressBind string
addressConnect string
addressWS string
@@ -28,7 +31,7 @@ var (
// -----------------------------------------------------------------------------
-func relayReadJSON(r io.Reader) []byte {
+func relayReadFrame(r io.Reader) []byte {
var length uint32
if err := binary.Read(r, binary.BigEndian, &length); err != nil {
log.Println("Event receive failed: " + err.Error())
@@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte {
return nil
}
- log.Printf("<? %v\n", b)
+ if *debug {
+ log.Printf("<? %v\n", b)
- var m RelayEventMessage
- if after, ok := m.ConsumeFrom(b); !ok {
- log.Println("Event deserialization failed")
- return nil
- } else if len(after) != 0 {
- log.Println("Event deserialization failed: trailing data")
- return nil
- }
+ var m RelayEventMessage
+ if after, ok := m.ConsumeFrom(b); !ok {
+ log.Println("Event deserialization failed")
+ return nil
+ } else if len(after) != 0 {
+ log.Println("Event deserialization failed: trailing data")
+ return nil
+ }
- j, err := m.MarshalJSON()
- if err != nil {
- log.Println("Event marshalling failed: " + err.Error())
- return nil
+ j, err := m.MarshalJSON()
+ if err != nil {
+ log.Println("Event marshalling failed: " + err.Error())
+ return nil
+ }
+
+ log.Printf("<- %s\n", j)
}
- return j
+ return b
}
func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte {
- p := make(chan []byte, 1)
- r := bufio.NewReader(conn)
+ // The usual event message rarely gets above 1 kilobyte,
+ // thus this is set to buffer up at most 1 megabyte or so.
+ p := make(chan []byte, 1000)
+ r := bufio.NewReaderSize(conn, 65536)
go func() {
defer close(p)
for {
- j := relayReadJSON(r)
+ j := relayReadFrame(r)
if j == nil {
return
}
@@ -97,7 +106,9 @@ func relayWriteJSON(conn net.Conn, j []byte) bool {
return false
}
- log.Printf("-> %v\n", b)
+ if *debug {
+ log.Printf("-> %v\n", b)
+ }
return true
}
@@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte {
"Command receive failed: " + "binary messages are not supported")
return nil
}
- log.Printf("?> %s\n", j)
+
+ if *debug {
+ log.Printf("?> %s\n", j)
+ }
return j
}
-func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool {
- if err := ws.Write(ctx, websocket.MessageText, j); err != nil {
+func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool {
+ if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil {
log.Println("Event send failed: " + err.Error())
return false
}
- log.Printf("<- %s\n", j)
return true
}
func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
- j, err := (&RelayEventMessage{
+ b, ok := (&RelayEventMessage{
EventSeq: 0,
Data: RelayEventData{
Interface: RelayEventDataError{
@@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
Error: err.Error(),
},
},
- }).MarshalJSON()
- if err != nil {
- log.Println("Event marshalling failed: " + err.Error())
+ }).AppendTo(nil)
+ if ok {
+ log.Println("Event serialization failed")
return false
}
- return clientWriteJSON(ctx, ws, j)
+ return clientWriteBinary(ctx, ws, b)
}
func handleWS(w http.ResponseWriter, r *http.Request) {
@@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", addressConnect)
if err != nil {
+ log.Println("Connection failed: " + err.Error())
clientWriteError(ctx, ws, err)
return
}
defer conn.Close()
+ // To decrease latencies, events are received and decoded in parallel
+ // to their sending, and we try to batch them together.
+ relayFrames := relayMakeReceiver(ctx, conn)
+ batchFrames := func() []byte {
+ batch, ok := <-relayFrames
+ if !ok {
+ return nil
+ }
+ Batch:
+ for {
+ select {
+ case b, ok := <-relayFrames:
+ if !ok {
+ break Batch
+ }
+ batch = append(batch, b...)
+ default:
+ break Batch
+ }
+ }
+ return batch
+ }
+
// We don't need to intervene, so it's just two separate pipes so far.
- // However, to decrease latencies, events are received and decoded
- // in parallel to their sending.
- relayJSON := relayMakeReceiver(ctx, conn)
go func() {
defer cancel()
for {
@@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
go func() {
defer cancel()
for {
- j, ok := <-relayJSON
- if !ok {
+ b := batchFrames()
+ if b == nil {
return
}
- clientWriteJSON(ctx, ws, j)
+ clientWriteBinary(ctx, ws, b)
}
}()
<-ctx.Done()
@@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(`<!DOCTYPE html>
<script>
let proxy = '{{ . }}'
</script>
- <script src="xP.js">
+ <script type="module" src="xP.js">
</script>
</body>
</html>`))
@@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) {
}
func main() {
- if len(os.Args) < 3 || len(os.Args) > 4 {
- log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0])
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(),
+ "Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0])
+ flag.PrintDefaults()
+ }
+
+ flag.Parse()
+ if flag.NArg() < 2 || flag.NArg() > 3 {
+ flag.Usage()
+ os.Exit(1)
}
- addressBind, addressConnect = os.Args[1], os.Args[2]
- if len(os.Args) > 3 {
- addressWS = os.Args[3]
+ addressBind, addressConnect = flag.Arg(0), flag.Arg(1)
+ if flag.NArg() > 2 {
+ addressWS = flag.Arg(2)
}
http.Handle("/ws", http.HandlerFunc(handleWS))