diff options
Diffstat (limited to 'main.go')
-rw-r--r-- | main.go | 230 |
1 files changed, 156 insertions, 74 deletions
@@ -20,14 +20,19 @@ import ( "unicode/utf8" ) +const ( + targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" + + "audio/simulcast/hls/%s/%s/ak/%s.m3u8" + metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/" +) + type meta struct { - title string // What's playing right now - timeout uint // Timeout for the next poll in ms + title string // what's playing right now + timeout uint // timeout for the next poll in ms } -// Retrieve and decode metadata information from an independent webservice +// getMeta retrieves and decodes metadata info from an independent webservice. func getMeta(name string) (*meta, error) { - const metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/" resp, err := http.Get(metaBaseURI + name) if resp != nil { defer resp.Body.Close() @@ -41,10 +46,9 @@ func getMeta(name string) (*meta, error) { return nil, errors.New("invalid metadata response") } - // TODO: also retrieve richtracks/is_now_playing, see example file type broadcast struct { - Title string // Title of the broadcast - Percentage int // How far we're in + Title string // title of the broadcast + Percentage int // how far we're in } var v struct { Packages struct { @@ -52,6 +56,11 @@ func getMeta(name string) (*meta, error) { Broadcasts []broadcast BroadcastNowIndex uint } `json:"on-air"` + Richtracks []struct { + Artist string + Title string + IsNowPlaying bool `json:"is_now_playing"` + } } Timeouts struct { PollingTimeout uint `json:"polling_timeout"` @@ -65,13 +74,17 @@ func getMeta(name string) (*meta, error) { if onAir.BroadcastNowIndex >= uint(len(onAir.Broadcasts)) { return nil, errors.New("no active broadcast") } - return &meta{ - timeout: v.Timeouts.PollingTimeout, - title: onAir.Broadcasts[onAir.BroadcastNowIndex].Title, - }, nil + title := onAir.Broadcasts[onAir.BroadcastNowIndex].Title + for _, rt := range v.Packages.Richtracks { + if rt.IsNowPlaying { + title = rt.Artist + " - " + rt.Title + " / " + title + } + } + return &meta{timeout: v.Timeouts.PollingTimeout, title: title}, nil } -// Resolve an M3U8 playlist to the first link that seems to be playable +// resolveM3U8 resolves an M3U8 playlist to the first link that seems to +// be playable, possibly recursing. func resolveM3U8(target string) (out []string, err error) { resp, err := http.Get(target) if resp != nil { @@ -90,12 +103,13 @@ func resolveM3U8(target string) (out []string, err error) { continue } if !strings.Contains(line, "/") { - // Seems to be a relative link, let's make it absolute + // Seems to be a relative link, let's make it absolute. dir, _ := path.Split(target) line = dir + line } if strings.HasSuffix(line, "m3u8") { - // The playlist seems to recurse, and so do we + // The playlist seems to recurse, and so will we. + // XXX: This should be bounded, not just by the stack. return resolveM3U8(line) } out = append(out, line) @@ -103,6 +117,8 @@ func resolveM3U8(target string) (out []string, err error) { return out, nil } +// metaProc periodically polls the sub-URL given by name for titles and sends +// them out the given channel. Never returns prematurely. func metaProc(ctx context.Context, name string, out chan<- string) { defer close(out) @@ -116,10 +132,14 @@ func metaProc(ctx context.Context, name string, out chan<- string) { } else { current = meta.title interval = time.Duration(meta.timeout) + + // It seems to normally use 25 seconds which is a lot, + // especially considering all the possible additional buffering. + if interval > 5000 { + interval = 5000 + } } if current != last { - // TODO: see https://blog.golang.org/pipelines - // find out if we can do this better select { case out <- current: case <-ctx.Done(): @@ -136,12 +156,77 @@ func metaProc(ctx context.Context, name string, out chan<- string) { } } +// urlProc periodically checks the playlist for yet unseen URLs and sends them +// over the channel. Assumes that URLs are incremental for simplicity, although +// there doesn't seem to be any such gaurantee by the HLS protocol. +func urlProc(ctx context.Context, playlistURL string, out chan<- string) { + defer close(out) + + highest := "" + for { + target, err := resolveM3U8(playlistURL) + if err != nil { + return + } + for _, url := range target { + if url <= highest { + continue + } + select { + case out <- url: + highest = url + case <-ctx.Done(): + return + } + } + // I expect this to be mainly driven by the buffered channel but + // a small (less than target duration) additional pause will not hurt. + time.Sleep(1 * time.Second) + } +} + +// https://tools.ietf.org/html/rfc8216 +// http://www.gpac-licensing.com/2014/12/08/apple-hls-technical-depth/ +func dataProc(ctx context.Context, playlistURL string, maxChunk int, + out chan<- []byte) { + defer close(out) + + // The channel is buffered so that the urlProc can fetch in advance. + urls := make(chan string, 3) + go urlProc(ctx, playlistURL, urls) + + for url := range urls { + resp, err := http.Get(url) + if resp != nil { + defer resp.Body.Close() + } + if err != nil { + return + } + + for { + chunk := make([]byte, maxChunk) + n, err := resp.Body.Read(chunk) + + select { + case out <- chunk[:n]: + case <-ctx.Done(): + return + } + if err == io.EOF { + break + } + if err != nil { + return + } + } + } +} + var pathRE = regexp.MustCompile(`^/(.*?)/(.*?)/(.*?)$`) func proxy(w http.ResponseWriter, req *http.Request) { - const targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" + - "audio/simulcast/hls/%s/%s/ak/%s.m3u8" - const metaint = 1 << 16 + const metaint = 1 << 15 m := pathRE.FindStringSubmatch(req.URL.Path) if m == nil { http.NotFound(w, req) @@ -149,14 +234,18 @@ func proxy(w http.ResponseWriter, req *http.Request) { } hijacker, ok := w.(http.Hijacker) if !ok { - // We're not using TLS where HTTP/2 could have caused this + // We're not using TLS where HTTP/2 could have caused this. panic("cannot hijack connection") } // E.g. `nonuk`, `sbr_low` `bbc_radio_one`, or `uk`, `sbr_high`, `bbc_1xtra` region, quality, name := m[1], m[2], m[3] - // This validates the params as a side-effect - target, err := resolveM3U8(fmt.Sprintf(targetURI, region, quality, name)) + + // TODO: We probably shouldn't poll the top level playlist. + mainPlaylistURL := fmt.Sprintf(targetURI, region, quality, name) + + // This validates the parameters as a side-effect. + target, err := resolveM3U8(mainPlaylistURL) if err == nil && len(target) == 0 { err = errors.New("cannot resolve playlist") } @@ -165,11 +254,8 @@ func proxy(w http.ResponseWriter, req *http.Request) { return } - wantMeta := false - if icyMeta, ok := req.Header["Icy-MetaData"]; ok { - wantMeta = len(icyMeta) == 1 && icyMeta[0] == "1" - } - resp, err := http.Get(target[0]) + wantMeta := req.Header.Get("Icy-MetaData") == "1" + resp, err := http.Head(target[0]) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -182,11 +268,12 @@ func proxy(w http.ResponseWriter, req *http.Request) { } defer conn.Close() - // TODO: retrieve some general information from somewhere? - // There's nothing interesting in the playlist files. + // TODO: Retrieve some general information from somewhere? + // There's nothing interesting in the playlist files. + fmt.Fprintf(bufrw, "ICY 200 OK\r\n") fmt.Fprintf(bufrw, "icy-name:%s\r\n", name) - // BBC marks this as a video type, maybe just force audio/mpeg + // BBC marks this as a video type, maybe just force audio/mpeg. fmt.Fprintf(bufrw, "content-type:%s\r\n", resp.Header["Content-Type"][0]) fmt.Fprintf(bufrw, "icy-pub:%d\r\n", 0) if wantMeta { @@ -197,50 +284,44 @@ func proxy(w http.ResponseWriter, req *http.Request) { metaChan := make(chan string) go metaProc(req.Context(), name, metaChan) - // TODO: move to a normal function - // FIXME: this will load a few seconds (one URL) and die - // - we can either try to implement this and hope for the best - // https://tools.ietf.org/html/draft-pantos-http-live-streaming-20 - // then like https://github.com/kz26/gohls/blob/master/main.go - // - or we can become more of a proxy, which complicates ICY chunkChan := make(chan []byte) - go func() { - defer resp.Body.Close() - defer close(chunkChan) - for { - chunk := make([]byte, metaint) - n, err := io.ReadFull(resp.Body, chunk) - chunkChan <- chunk[:n] - if err != nil { - return - } - - select { - default: - case <-req.Context().Done(): - return - } - } - }() + go dataProc(req.Context(), mainPlaylistURL, metaint, chunkChan) - var queuedMeta []byte + // dataProc may return less data near the end of a subfile, so we give it + // a maximum count of bytes to return at once and do our own buffering. + var queuedMetaUpdate, queuedChunk []byte for { select { case title := <-metaChan: - queuedMeta = []byte(fmt.Sprintf("StreamTitle='%s'", title)) + queuedMetaUpdate = []byte(fmt.Sprintf("StreamTitle='%s'", title)) case chunk, ok := <-chunkChan: if !ok { return } - if wantMeta { - var meta [1 + 16*255]byte - meta[0] = byte((copy(meta[1:], queuedMeta) + 15) / 16) - chunk = append(chunk, meta[:1+int(meta[0])*16]...) - queuedMeta = nil + + space := metaint - len(queuedChunk) + if space > len(chunk) { + space = len(chunk) + } + + queuedChunk = append(queuedChunk, chunk[:space]...) + if len(queuedChunk) < metaint { + break } - if _, err := bufrw.Write(chunk); err != nil { + if _, err := bufrw.Write(queuedChunk); err != nil { return } + + queuedChunk = chunk[space:] + if wantMeta { + var meta [1 + 16*255]byte + meta[0] = byte((copy(meta[1:], queuedMetaUpdate) + 15) / 16) + queuedMetaUpdate = nil + + if _, err := bufrw.Write(meta[:1+int(meta[0])*16]); err != nil { + return + } + } if err := bufrw.Flush(); err != nil { return } @@ -257,6 +338,7 @@ func socketActivationListener() net.Listener { nfds, err := strconv.Atoi(os.Getenv("LISTEN_FDS")) if err != nil || nfds == 0 { + log.Println("LISTEN_FDS unworkable") return nil } else if nfds > 1 { log.Fatalln("not supporting more than one listening socket") @@ -271,7 +353,7 @@ func socketActivationListener() net.Listener { return ln } -// Had to copy this from Server.ListenAndServe() +// Had to copy this from Server.ListenAndServe. type tcpKeepAliveListener struct{ *net.TCPListener } func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { @@ -285,22 +367,22 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { } func main() { - listenAddr := ":8000" - if len(os.Args) == 2 { - listenAddr = os.Args[1] - } - var listener net.Listener - if ln := socketActivationListener(); listener != nil { - // Keepalives can be set in the systemd unit, see systemd.socket(5) + if ln := socketActivationListener(); ln != nil { + // Keepalives can be set in the systemd unit, see systemd.socket(5). listener = ln - } else if ln, err := net.Listen("tcp", listenAddr); err != nil { - log.Fatalln(err) } else { - listener = tcpKeepAliveListener{ln.(*net.TCPListener)} + if len(os.Args) < 2 { + log.Fatalf("usage: %s LISTEN-ADDR\n", os.Args[0]) + } + if ln, err := net.Listen("tcp", os.Args[1]); err != nil { + log.Fatalln(err) + } else { + listener = tcpKeepAliveListener{ln.(*net.TCPListener)} + } } http.HandleFunc("/", proxy) - // We don't need to clean up properly since we store no data + // We don't need to clean up properly since we store no data. log.Fatalln(http.Serve(listener, nil)) } |