summaryrefslogtreecommitdiff
path: root/xP/xP.go
diff options
context:
space:
mode:
Diffstat (limited to 'xP/xP.go')
-rw-r--r--xP/xP.go122
1 files changed, 82 insertions, 40 deletions
diff --git a/xP/xP.go b/xP/xP.go
index 7e0285c..20117b2 100644
--- a/xP/xP.go
+++ b/xP/xP.go
@@ -8,6 +8,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
+ "flag"
"fmt"
"html/template"
"io"
@@ -21,6 +22,8 @@ import (
)
var (
+ debug = flag.Bool("debug", false, "enable debug output")
+
addressBind string
addressConnect string
addressWS string
@@ -28,7 +31,7 @@ var (
// -----------------------------------------------------------------------------
-func relayReadJSON(r io.Reader) []byte {
+func relayReadFrame(r io.Reader) []byte {
var length uint32
if err := binary.Read(r, binary.BigEndian, &length); err != nil {
log.Println("Event receive failed: " + err.Error())
@@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte {
return nil
}
- log.Printf("<? %v\n", b)
+ if *debug {
+ log.Printf("<? %v\n", b)
- var m RelayEventMessage
- if after, ok := m.ConsumeFrom(b); !ok {
- log.Println("Event deserialization failed")
- return nil
- } else if len(after) != 0 {
- log.Println("Event deserialization failed: trailing data")
- return nil
- }
+ var m RelayEventMessage
+ if after, ok := m.ConsumeFrom(b); !ok {
+ log.Println("Event deserialization failed")
+ return nil
+ } else if len(after) != 0 {
+ log.Println("Event deserialization failed: trailing data")
+ return nil
+ }
- j, err := m.MarshalJSON()
- if err != nil {
- log.Println("Event marshalling failed: " + err.Error())
- return nil
+ j, err := m.MarshalJSON()
+ if err != nil {
+ log.Println("Event marshalling failed: " + err.Error())
+ return nil
+ }
+
+ log.Printf("<- %s\n", j)
}
- return j
+ return b
}
func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte {
- p := make(chan []byte, 1)
- r := bufio.NewReader(conn)
+ // The usual event message rarely gets above 1 kilobyte,
+ // thus this is set to buffer up at most 1 megabyte or so.
+ p := make(chan []byte, 1000)
+ r := bufio.NewReaderSize(conn, 65536)
go func() {
defer close(p)
for {
- j := relayReadJSON(r)
+ j := relayReadFrame(r)
if j == nil {
return
}
@@ -97,7 +106,9 @@ func relayWriteJSON(conn net.Conn, j []byte) bool {
return false
}
- log.Printf("-> %v\n", b)
+ if *debug {
+ log.Printf("-> %v\n", b)
+ }
return true
}
@@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte {
"Command receive failed: " + "binary messages are not supported")
return nil
}
- log.Printf("?> %s\n", j)
+
+ if *debug {
+ log.Printf("?> %s\n", j)
+ }
return j
}
-func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool {
- if err := ws.Write(ctx, websocket.MessageText, j); err != nil {
+func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool {
+ if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil {
log.Println("Event send failed: " + err.Error())
return false
}
- log.Printf("<- %s\n", j)
return true
}
func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
- j, err := (&RelayEventMessage{
+ b, ok := (&RelayEventMessage{
EventSeq: 0,
Data: RelayEventData{
Interface: RelayEventDataError{
@@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
Error: err.Error(),
},
},
- }).MarshalJSON()
- if err != nil {
- log.Println("Event marshalling failed: " + err.Error())
+ }).AppendTo(nil)
+ if ok {
+ log.Println("Event serialization failed")
return false
}
- return clientWriteJSON(ctx, ws, j)
+ return clientWriteBinary(ctx, ws, b)
}
func handleWS(w http.ResponseWriter, r *http.Request) {
@@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", addressConnect)
if err != nil {
+ log.Println("Connection failed: " + err.Error())
clientWriteError(ctx, ws, err)
return
}
defer conn.Close()
+ // To decrease latencies, events are received and decoded in parallel
+ // to their sending, and we try to batch them together.
+ relayFrames := relayMakeReceiver(ctx, conn)
+ batchFrames := func() []byte {
+ batch, ok := <-relayFrames
+ if !ok {
+ return nil
+ }
+ Batch:
+ for {
+ select {
+ case b, ok := <-relayFrames:
+ if !ok {
+ break Batch
+ }
+ batch = append(batch, b...)
+ default:
+ break Batch
+ }
+ }
+ return batch
+ }
+
// We don't need to intervene, so it's just two separate pipes so far.
- // However, to decrease latencies, events are received and decoded
- // in parallel to their sending.
- relayJSON := relayMakeReceiver(ctx, conn)
go func() {
defer cancel()
for {
@@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
go func() {
defer cancel()
for {
- j, ok := <-relayJSON
- if !ok {
+ b := batchFrames()
+ if b == nil {
return
}
- clientWriteJSON(ctx, ws, j)
+ clientWriteBinary(ctx, ws, b)
}
}()
<-ctx.Done()
@@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(`<!DOCTYPE html>
<script>
let proxy = '{{ . }}'
</script>
- <script src="xP.js">
+ <script type="module" src="xP.js">
</script>
</body>
</html>`))
@@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) {
}
func main() {
- if len(os.Args) < 3 || len(os.Args) > 4 {
- log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0])
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(),
+ "Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0])
+ flag.PrintDefaults()
+ }
+
+ flag.Parse()
+ if flag.NArg() < 2 || flag.NArg() > 3 {
+ flag.Usage()
+ os.Exit(1)
}
- addressBind, addressConnect = os.Args[1], os.Args[2]
- if len(os.Args) > 3 {
- addressWS = os.Args[3]
+ addressBind, addressConnect = flag.Arg(0), flag.Arg(1)
+ if flag.NArg() > 2 {
+ addressWS = flag.Arg(2)
}
http.Handle("/ws", http.HandlerFunc(handleWS))