From 6f39aa66156f53b27e3b9cfe8457fc2f64129e56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Eric=20Janouch?= Date: Thu, 15 Sep 2022 22:45:14 +0200 Subject: xP: use the binary protocol for incoming events And batch event messages together as much as possible. JSON has proven itself to be really slow (for example, encoding/json.Marshaler is a slow interface), and browsers have significant overhead per WS message. Commands are still sent as JSON, sending them in binary would be a laborious rewrite without measurable merits. The xP server now only prints debug output when requested, because that was another source of major slowdowns. --- xP/xP.go | 122 ++++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 82 insertions(+), 40 deletions(-) (limited to 'xP/xP.go') diff --git a/xP/xP.go b/xP/xP.go index 7e0285c..20117b2 100644 --- a/xP/xP.go +++ b/xP/xP.go @@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "encoding/json" + "flag" "fmt" "html/template" "io" @@ -21,6 +22,8 @@ import ( ) var ( + debug = flag.Bool("debug", false, "enable debug output") + addressBind string addressConnect string addressWS string @@ -28,7 +31,7 @@ var ( // ----------------------------------------------------------------------------- -func relayReadJSON(r io.Reader) []byte { +func relayReadFrame(r io.Reader) []byte { var length uint32 if err := binary.Read(r, binary.BigEndian, &length); err != nil { log.Println("Event receive failed: " + err.Error()) @@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte { return nil } - log.Printf(" %v\n", b) + if *debug { + log.Printf("-> %v\n", b) + } return true } @@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte { "Command receive failed: " + "binary messages are not supported") return nil } - log.Printf("?> %s\n", j) + + if *debug { + log.Printf("?> %s\n", j) + } return j } -func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { - if err := ws.Write(ctx, websocket.MessageText, j); err != nil { +func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool { + if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil { log.Println("Event send failed: " + err.Error()) return false } - log.Printf("<- %s\n", j) return true } func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { - j, err := (&RelayEventMessage{ + b, ok := (&RelayEventMessage{ EventSeq: 0, Data: RelayEventData{ Interface: RelayEventDataError{ @@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { Error: err.Error(), }, }, - }).MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) + }).AppendTo(nil) + if ok { + log.Println("Event serialization failed") return false } - return clientWriteJSON(ctx, ws, j) + return clientWriteBinary(ctx, ws, b) } func handleWS(w http.ResponseWriter, r *http.Request) { @@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) { conn, err := net.Dial("tcp", addressConnect) if err != nil { + log.Println("Connection failed: " + err.Error()) clientWriteError(ctx, ws, err) return } defer conn.Close() + // To decrease latencies, events are received and decoded in parallel + // to their sending, and we try to batch them together. + relayFrames := relayMakeReceiver(ctx, conn) + batchFrames := func() []byte { + batch, ok := <-relayFrames + if !ok { + return nil + } + Batch: + for { + select { + case b, ok := <-relayFrames: + if !ok { + break Batch + } + batch = append(batch, b...) + default: + break Batch + } + } + return batch + } + // We don't need to intervene, so it's just two separate pipes so far. - // However, to decrease latencies, events are received and decoded - // in parallel to their sending. - relayJSON := relayMakeReceiver(ctx, conn) go func() { defer cancel() for { @@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) { go func() { defer cancel() for { - j, ok := <-relayJSON - if !ok { + b := batchFrames() + if b == nil { return } - clientWriteJSON(ctx, ws, j) + clientWriteBinary(ctx, ws, b) } }() <-ctx.Done() @@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(` - `)) @@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) { } func main() { - if len(os.Args) < 3 || len(os.Args) > 4 { - log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0]) + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), + "Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + if flag.NArg() < 2 || flag.NArg() > 3 { + flag.Usage() + os.Exit(1) } - addressBind, addressConnect = os.Args[1], os.Args[2] - if len(os.Args) > 3 { - addressWS = os.Args[3] + addressBind, addressConnect = flag.Arg(0), flag.Arg(1) + if flag.NArg() > 2 { + addressWS = flag.Arg(2) } http.Handle("/ws", http.HandlerFunc(handleWS)) -- cgit v1.2.3