diff options
Diffstat (limited to 'xP/xP.go')
-rw-r--r-- | xP/xP.go | 122 |
1 files changed, 82 insertions, 40 deletions
@@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "encoding/json" + "flag" "fmt" "html/template" "io" @@ -21,6 +22,8 @@ import ( ) var ( + debug = flag.Bool("debug", false, "enable debug output") + addressBind string addressConnect string addressWS string @@ -28,7 +31,7 @@ var ( // ----------------------------------------------------------------------------- -func relayReadJSON(r io.Reader) []byte { +func relayReadFrame(r io.Reader) []byte { var length uint32 if err := binary.Read(r, binary.BigEndian, &length); err != nil { log.Println("Event receive failed: " + err.Error()) @@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte { return nil } - log.Printf("<? %v\n", b) + if *debug { + log.Printf("<? %v\n", b) - var m RelayEventMessage - if after, ok := m.ConsumeFrom(b); !ok { - log.Println("Event deserialization failed") - return nil - } else if len(after) != 0 { - log.Println("Event deserialization failed: trailing data") - return nil - } + var m RelayEventMessage + if after, ok := m.ConsumeFrom(b); !ok { + log.Println("Event deserialization failed") + return nil + } else if len(after) != 0 { + log.Println("Event deserialization failed: trailing data") + return nil + } - j, err := m.MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) - return nil + j, err := m.MarshalJSON() + if err != nil { + log.Println("Event marshalling failed: " + err.Error()) + return nil + } + + log.Printf("<- %s\n", j) } - return j + return b } func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte { - p := make(chan []byte, 1) - r := bufio.NewReader(conn) + // The usual event message rarely gets above 1 kilobyte, + // thus this is set to buffer up at most 1 megabyte or so. + p := make(chan []byte, 1000) + r := bufio.NewReaderSize(conn, 65536) go func() { defer close(p) for { - j := relayReadJSON(r) + j := relayReadFrame(r) if j == nil { return } @@ -97,7 +106,9 @@ func relayWriteJSON(conn net.Conn, j []byte) bool { return false } - log.Printf("-> %v\n", b) + if *debug { + log.Printf("-> %v\n", b) + } return true } @@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte { "Command receive failed: " + "binary messages are not supported") return nil } - log.Printf("?> %s\n", j) + + if *debug { + log.Printf("?> %s\n", j) + } return j } -func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { - if err := ws.Write(ctx, websocket.MessageText, j); err != nil { +func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool { + if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil { log.Println("Event send failed: " + err.Error()) return false } - log.Printf("<- %s\n", j) return true } func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { - j, err := (&RelayEventMessage{ + b, ok := (&RelayEventMessage{ EventSeq: 0, Data: RelayEventData{ Interface: RelayEventDataError{ @@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { Error: err.Error(), }, }, - }).MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) + }).AppendTo(nil) + if ok { + log.Println("Event serialization failed") return false } - return clientWriteJSON(ctx, ws, j) + return clientWriteBinary(ctx, ws, b) } func handleWS(w http.ResponseWriter, r *http.Request) { @@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) { conn, err := net.Dial("tcp", addressConnect) if err != nil { + log.Println("Connection failed: " + err.Error()) clientWriteError(ctx, ws, err) return } defer conn.Close() + // To decrease latencies, events are received and decoded in parallel + // to their sending, and we try to batch them together. + relayFrames := relayMakeReceiver(ctx, conn) + batchFrames := func() []byte { + batch, ok := <-relayFrames + if !ok { + return nil + } + Batch: + for { + select { + case b, ok := <-relayFrames: + if !ok { + break Batch + } + batch = append(batch, b...) + default: + break Batch + } + } + return batch + } + // We don't need to intervene, so it's just two separate pipes so far. - // However, to decrease latencies, events are received and decoded - // in parallel to their sending. - relayJSON := relayMakeReceiver(ctx, conn) go func() { defer cancel() for { @@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) { go func() { defer cancel() for { - j, ok := <-relayJSON - if !ok { + b := batchFrames() + if b == nil { return } - clientWriteJSON(ctx, ws, j) + clientWriteBinary(ctx, ws, b) } }() <-ctx.Done() @@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(`<!DOCTYPE html> <script> let proxy = '{{ . }}' </script> - <script src="xP.js"> + <script type="module" src="xP.js"> </script> </body> </html>`)) @@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) { } func main() { - if len(os.Args) < 3 || len(os.Args) > 4 { - log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0]) + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), + "Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + if flag.NArg() < 2 || flag.NArg() > 3 { + flag.Usage() + os.Exit(1) } - addressBind, addressConnect = os.Args[1], os.Args[2] - if len(os.Args) > 3 { - addressWS = os.Args[3] + addressBind, addressConnect = flag.Arg(0), flag.Arg(1) + if flag.NArg() > 2 { + addressWS = flag.Arg(2) } http.Handle("/ws", http.HandlerFunc(handleWS)) |