From 6f39aa66156f53b27e3b9cfe8457fc2f64129e56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Eric=20Janouch?=
Date: Thu, 15 Sep 2022 22:45:14 +0200 Subject: xP: use the binary protocol for incoming events And batch event messages together as much as possible. JSON has proven itself to be really slow (for example, encoding/json.Marshaler is a slow interface), and browsers have significant overhead per WS message. Commands are still sent as JSON, sending them in binary would be a laborious rewrite without measurable merits. The xP server now only prints debug output when requested, because that was another source of major slowdowns. --- xP/xP.go | 122 ++++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 82 insertions(+), 40 deletions(-) (limited to 'xP/xP.go') diff --git a/xP/xP.go b/xP/xP.go index 7e0285c..20117b2 100644 --- a/xP/xP.go +++ b/xP/xP.go @@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "encoding/json" + "flag" "fmt" "html/template" "io" @@ -21,6 +22,8 @@ import ( ) var ( + debug = flag.Bool("debug", false, "enable debug output") + addressBind string addressConnect string addressWS string @@ -28,7 +31,7 @@ var ( // ----------------------------------------------------------------------------- -func relayReadJSON(r io.Reader) []byte { +func relayReadFrame(r io.Reader) []byte { var length uint32 if err := binary.Read(r, binary.BigEndian, &length); err != nil { log.Println("Event receive failed: " + err.Error()) @@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte { return nil } - log.Printf(" %v\n", b) + if *debug { + log.Printf(" %v\n", b) - var m RelayEventMessage - if after, ok := m.ConsumeFrom(b); !ok { - log.Println("Event deserialization failed") - return nil - } else if len(after) != 0 { - log.Println("Event deserialization failed: trailing data") - return nil - } + var m RelayEventMessage + if after, ok := m.ConsumeFrom(b); !ok { + log.Println("Event deserialization failed") + return nil + } else if len(after) != 0 { + log.Println("Event deserialization failed: trailing data") + return nil + } - j, err := m.MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) - return nil + j, err := m.MarshalJSON() + if err != nil { + log.Println("Event marshalling failed: " + err.Error()) + return nil + } + + log.Printf("<- %s\n", j) } - return j + return b } func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte { - p := make(chan []byte, 1) - r := bufio.NewReader(conn) + // The usual event message rarely gets above 1 kilobyte, + // thus this is set to buffer up at most 1 megabyte or so. + p := make(chan []byte, 1000) + r := bufio.NewReaderSize(conn, 65536) go func() { defer close(p) for { - j := relayReadJSON(r) + j := relayReadFrame(r) if j == nil { return } @@ -97,7 +106,9 @@ func relayWriteJSON(conn net.Conn, j []byte) bool { return false } - log.Printf("-> %v\n", b) + if *debug { + log.Printf("-> %v\n", b) + } return true } @@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte { "Command receive failed: " + "binary messages are not supported") return nil } - log.Printf("?> %s\n", j) + + if *debug { + log.Printf("?> %s\n", j) + } return j } -func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { - if err := ws.Write(ctx, websocket.MessageText, j); err != nil { +func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool { + if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil { log.Println("Event send failed: " + err.Error()) return false } - log.Printf("<- %s\n", j) return true } func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { - j, err := (&RelayEventMessage{ + b, ok := (&RelayEventMessage{ EventSeq: 0, Data: RelayEventData{ Interface: RelayEventDataError{ @@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { Error: err.Error(), }, }, - }).MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) + }).AppendTo(nil) + if ok { + log.Println("Event serialization failed") return false } - return clientWriteJSON(ctx, ws, j) + return clientWriteBinary(ctx, ws, b) } func handleWS(w http.ResponseWriter, r *http.Request) { @@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) { conn, err := net.Dial("tcp", addressConnect) if err != nil { + log.Println("Connection failed: " + err.Error()) clientWriteError(ctx, ws, err) return } defer conn.Close() + // To decrease latencies, events are received and decoded in parallel + // to their sending, and we try to batch them together. + relayFrames := relayMakeReceiver(ctx, conn) + batchFrames := func() []byte { + batch, ok := <-relayFrames + if !ok { + return nil + } + Batch: + for { + select { + case b, ok := <-relayFrames: + if !ok { + break Batch + } + batch = append(batch, b...) + default: + break Batch + } + } + return batch + } + // We don't need to intervene, so it's just two separate pipes so far. - // However, to decrease latencies, events are received and decoded - // in parallel to their sending. - relayJSON := relayMakeReceiver(ctx, conn) go func() { defer cancel() for { @@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) { go func() { defer cancel() for { - j, ok := <-relayJSON - if !ok { + b := batchFrames() + if b == nil { return } - clientWriteJSON(ctx, ws, j) + clientWriteBinary(ctx, ws, b) } }() <-ctx.Done() @@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(` -