From fa85ea820832bfb5aa03a16e54b32b979cae6edf Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch
Date: Wed, 14 Sep 2022 00:47:46 +0200 Subject: xP: parallelize event reception and sending Still trying to make the frontend load tolerably fast, still unsuccessfully. --- xP/xP.go | 146 ++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 92 insertions(+), 54 deletions(-) diff --git a/xP/xP.go b/xP/xP.go index b858ace..dc5aa5e 100644 --- a/xP/xP.go +++ b/xP/xP.go @@ -25,23 +25,61 @@ var ( addressWS string ) -func clientToRelay( - ctx context.Context, ws *websocket.Conn, conn net.Conn) bool { - t, b, err := ws.Read(ctx) - if err != nil { - log.Println("Command receive failed: " + err.Error()) - return false +// ----------------------------------------------------------------------------- + +func relayReadJSON(conn net.Conn) []byte { + var length uint32 + if err := binary.Read(conn, binary.BigEndian, &length); err != nil { + log.Println("Event receive failed: " + err.Error()) + return nil } - if t != websocket.MessageText { - log.Println("Command receive failed: " + - "binary messages are not supported") - return false + b := make([]byte, length) + if _, err := io.ReadFull(conn, b); err != nil { + log.Println("Event receive failed: " + err.Error()) + return nil + } + + log.Printf(" %v\n", b) + + var m RelayEventMessage + if after, ok := m.ConsumeFrom(b); !ok { + log.Println("Event deserialization failed") + return nil + } else if len(after) != 0 { + log.Println("Event deserialization failed: trailing data") + return nil } - log.Printf("?> %s\n", b) + j, err := json.Marshal(&m) + if err != nil { + log.Println("Event marshalling failed: " + err.Error()) + return nil + } + return j +} +func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte { + p := make(chan []byte, 1) + go func() { + defer close(p) + for { + j := relayReadJSON(conn) + if j == nil { + return + } + select { + case p <- j: + case <-ctx.Done(): + return + } + } + }() + return p +} + +func relayWriteJSON(conn net.Conn, j []byte) bool { var m RelayCommandMessage - if err := json.Unmarshal(b, &m); err != nil { + if err := json.Unmarshal(j, &m); err != nil { log.Println("Command unmarshalling failed: " + err.Error()) return false } @@ -61,45 +99,33 @@ func clientToRelay( return true } -func relayToClient( - ctx context.Context, ws *websocket.Conn, conn net.Conn) bool { - var length uint32 - if err := binary.Read(conn, binary.BigEndian, &length); err != nil { - log.Println("Event receive failed: " + err.Error()) - return false - } - b := make([]byte, length) - if _, err := io.ReadFull(conn, b); err != nil { - log.Println("Event receive failed: " + err.Error()) - return false - } - - log.Printf(" %v\n", b) - - var m RelayEventMessage - if after, ok := m.ConsumeFrom(b); !ok { - log.Println("Event deserialization failed") - return false - } else if len(after) != 0 { - log.Println("Event deserialization failed: trailing data") - return false - } +// ----------------------------------------------------------------------------- - j, err := json.Marshal(&m) +func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte { + t, j, err := ws.Read(ctx) if err != nil { - log.Println("Event marshalling failed: " + err.Error()) - return false + log.Println("Command receive failed: " + err.Error()) + return nil } + if t != websocket.MessageText { + log.Println( + "Command receive failed: " + "binary messages are not supported") + return nil + } + log.Printf("?> %s\n", j) + return j +} + +func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { if err := ws.Write(ctx, websocket.MessageText, j); err != nil { log.Println("Event send failed: " + err.Error()) return false } - log.Printf("<- %s\n", j) return true } -func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool { +func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { j, err := json.Marshal(&RelayEventMessage{ EventSeq: 0, Data: RelayEventData{ @@ -114,25 +140,21 @@ func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool { log.Println("Event marshalling failed: " + err.Error()) return false } - if err := ws.Write(ctx, websocket.MessageText, j); err != nil { - log.Println("Event send failed: " + err.Error()) - return false - } - return true + return clientWriteJSON(ctx, ws, j) } func handleWS(w http.ResponseWriter, r *http.Request) { ws, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, - CompressionMode: websocket.CompressionContextTakeover, - // This is for the payload, and happens to trigger on all messages. - CompressionThreshold: 16, + // Note that Safari can be broken with compression. + CompressionMode: websocket.CompressionContextTakeover, + // This is for the payload; set higher to avoid overhead. + CompressionThreshold: 64 << 10, }) if err != nil { log.Println("Client rejected: " + err.Error()) return } - defer ws.Close(websocket.StatusGoingAway, "Goodbye") ctx, cancel := context.WithCancel(r.Context()) @@ -140,24 +162,40 @@ func handleWS(w http.ResponseWriter, r *http.Request) { conn, err := net.Dial("tcp", addressConnect) if err != nil { - errorToClient(ctx, ws, err) + clientWriteError(ctx, ws, err) return } + defer conn.Close() // We don't need to intervene, so it's just two separate pipes so far. + // However, to decrease latencies, events are received and decoded + // in parallel to their sending. + relayJSON := relayMakeReceiver(ctx, conn) go func() { - for clientToRelay(ctx, ws, conn) { + defer cancel() + for { + j := clientReadJSON(ctx, ws) + if j == nil { + return + } + relayWriteJSON(conn, j) } - cancel() }() go func() { - for relayToClient(ctx, ws, conn) { + defer cancel() + for { + j, ok := <-relayJSON + if !ok { + return + } + clientWriteJSON(ctx, ws, j) } - cancel() }() <-ctx.Done() } +// ----------------------------------------------------------------------------- + var staticHandler = http.FileServer(http.Dir(".")) var page = template.Must(template.New("/").Parse(` -- cgit v1.2.3-70-g09d2