aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--xP/xP.go146
1 files changed, 92 insertions, 54 deletions
diff --git a/xP/xP.go b/xP/xP.go
index b858ace..dc5aa5e 100644
--- a/xP/xP.go
+++ b/xP/xP.go
@@ -25,23 +25,61 @@ var (
addressWS string
)
-func clientToRelay(
- ctx context.Context, ws *websocket.Conn, conn net.Conn) bool {
- t, b, err := ws.Read(ctx)
- if err != nil {
- log.Println("Command receive failed: " + err.Error())
- return false
+// -----------------------------------------------------------------------------
+
+func relayReadJSON(conn net.Conn) []byte {
+ var length uint32
+ if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
+ log.Println("Event receive failed: " + err.Error())
+ return nil
}
- if t != websocket.MessageText {
- log.Println("Command receive failed: " +
- "binary messages are not supported")
- return false
+ b := make([]byte, length)
+ if _, err := io.ReadFull(conn, b); err != nil {
+ log.Println("Event receive failed: " + err.Error())
+ return nil
+ }
+
+ log.Printf("<? %v\n", b)
+
+ var m RelayEventMessage
+ if after, ok := m.ConsumeFrom(b); !ok {
+ log.Println("Event deserialization failed")
+ return nil
+ } else if len(after) != 0 {
+ log.Println("Event deserialization failed: trailing data")
+ return nil
}
- log.Printf("?> %s\n", b)
+ j, err := json.Marshal(&m)
+ if err != nil {
+ log.Println("Event marshalling failed: " + err.Error())
+ return nil
+ }
+ return j
+}
+func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte {
+ p := make(chan []byte, 1)
+ go func() {
+ defer close(p)
+ for {
+ j := relayReadJSON(conn)
+ if j == nil {
+ return
+ }
+ select {
+ case p <- j:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return p
+}
+
+func relayWriteJSON(conn net.Conn, j []byte) bool {
var m RelayCommandMessage
- if err := json.Unmarshal(b, &m); err != nil {
+ if err := json.Unmarshal(j, &m); err != nil {
log.Println("Command unmarshalling failed: " + err.Error())
return false
}
@@ -61,45 +99,33 @@ func clientToRelay(
return true
}
-func relayToClient(
- ctx context.Context, ws *websocket.Conn, conn net.Conn) bool {
- var length uint32
- if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
- log.Println("Event receive failed: " + err.Error())
- return false
- }
- b := make([]byte, length)
- if _, err := io.ReadFull(conn, b); err != nil {
- log.Println("Event receive failed: " + err.Error())
- return false
- }
-
- log.Printf("<? %v\n", b)
-
- var m RelayEventMessage
- if after, ok := m.ConsumeFrom(b); !ok {
- log.Println("Event deserialization failed")
- return false
- } else if len(after) != 0 {
- log.Println("Event deserialization failed: trailing data")
- return false
- }
+// -----------------------------------------------------------------------------
- j, err := json.Marshal(&m)
+func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte {
+ t, j, err := ws.Read(ctx)
if err != nil {
- log.Println("Event marshalling failed: " + err.Error())
- return false
+ log.Println("Command receive failed: " + err.Error())
+ return nil
}
+ if t != websocket.MessageText {
+ log.Println(
+ "Command receive failed: " + "binary messages are not supported")
+ return nil
+ }
+ log.Printf("?> %s\n", j)
+ return j
+}
+
+func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool {
if err := ws.Write(ctx, websocket.MessageText, j); err != nil {
log.Println("Event send failed: " + err.Error())
return false
}
-
log.Printf("<- %s\n", j)
return true
}
-func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool {
+func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
j, err := json.Marshal(&RelayEventMessage{
EventSeq: 0,
Data: RelayEventData{
@@ -114,25 +140,21 @@ func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool {
log.Println("Event marshalling failed: " + err.Error())
return false
}
- if err := ws.Write(ctx, websocket.MessageText, j); err != nil {
- log.Println("Event send failed: " + err.Error())
- return false
- }
- return true
+ return clientWriteJSON(ctx, ws, j)
}
func handleWS(w http.ResponseWriter, r *http.Request) {
ws, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
- CompressionMode: websocket.CompressionContextTakeover,
- // This is for the payload, and happens to trigger on all messages.
- CompressionThreshold: 16,
+ // Note that Safari can be broken with compression.
+ CompressionMode: websocket.CompressionContextTakeover,
+ // This is for the payload; set higher to avoid overhead.
+ CompressionThreshold: 64 << 10,
})
if err != nil {
log.Println("Client rejected: " + err.Error())
return
}
-
defer ws.Close(websocket.StatusGoingAway, "Goodbye")
ctx, cancel := context.WithCancel(r.Context())
@@ -140,24 +162,40 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", addressConnect)
if err != nil {
- errorToClient(ctx, ws, err)
+ clientWriteError(ctx, ws, err)
return
}
+ defer conn.Close()
// We don't need to intervene, so it's just two separate pipes so far.
+ // However, to decrease latencies, events are received and decoded
+ // in parallel to their sending.
+ relayJSON := relayMakeReceiver(ctx, conn)
go func() {
- for clientToRelay(ctx, ws, conn) {
+ defer cancel()
+ for {
+ j := clientReadJSON(ctx, ws)
+ if j == nil {
+ return
+ }
+ relayWriteJSON(conn, j)
}
- cancel()
}()
go func() {
- for relayToClient(ctx, ws, conn) {
+ defer cancel()
+ for {
+ j, ok := <-relayJSON
+ if !ok {
+ return
+ }
+ clientWriteJSON(ctx, ws, j)
}
- cancel()
}()
<-ctx.Done()
}
+// -----------------------------------------------------------------------------
+
var staticHandler = http.FileServer(http.Dir("."))
var page = template.Must(template.New("/").Parse(`<!DOCTYPE html>