aboutsummaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go230
1 files changed, 156 insertions, 74 deletions
diff --git a/main.go b/main.go
index ca80f42..a1a7357 100644
--- a/main.go
+++ b/main.go
@@ -20,14 +20,19 @@ import (
"unicode/utf8"
)
+const (
+ targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" +
+ "audio/simulcast/hls/%s/%s/ak/%s.m3u8"
+ metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/"
+)
+
type meta struct {
- title string // What's playing right now
- timeout uint // Timeout for the next poll in ms
+ title string // what's playing right now
+ timeout uint // timeout for the next poll in ms
}
-// Retrieve and decode metadata information from an independent webservice
+// getMeta retrieves and decodes metadata info from an independent webservice.
func getMeta(name string) (*meta, error) {
- const metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/"
resp, err := http.Get(metaBaseURI + name)
if resp != nil {
defer resp.Body.Close()
@@ -41,10 +46,9 @@ func getMeta(name string) (*meta, error) {
return nil, errors.New("invalid metadata response")
}
- // TODO: also retrieve richtracks/is_now_playing, see example file
type broadcast struct {
- Title string // Title of the broadcast
- Percentage int // How far we're in
+ Title string // title of the broadcast
+ Percentage int // how far we're in
}
var v struct {
Packages struct {
@@ -52,6 +56,11 @@ func getMeta(name string) (*meta, error) {
Broadcasts []broadcast
BroadcastNowIndex uint
} `json:"on-air"`
+ Richtracks []struct {
+ Artist string
+ Title string
+ IsNowPlaying bool `json:"is_now_playing"`
+ }
}
Timeouts struct {
PollingTimeout uint `json:"polling_timeout"`
@@ -65,13 +74,17 @@ func getMeta(name string) (*meta, error) {
if onAir.BroadcastNowIndex >= uint(len(onAir.Broadcasts)) {
return nil, errors.New("no active broadcast")
}
- return &meta{
- timeout: v.Timeouts.PollingTimeout,
- title: onAir.Broadcasts[onAir.BroadcastNowIndex].Title,
- }, nil
+ title := onAir.Broadcasts[onAir.BroadcastNowIndex].Title
+ for _, rt := range v.Packages.Richtracks {
+ if rt.IsNowPlaying {
+ title = rt.Artist + " - " + rt.Title + " / " + title
+ }
+ }
+ return &meta{timeout: v.Timeouts.PollingTimeout, title: title}, nil
}
-// Resolve an M3U8 playlist to the first link that seems to be playable
+// resolveM3U8 resolves an M3U8 playlist to the first link that seems to
+// be playable, possibly recursing.
func resolveM3U8(target string) (out []string, err error) {
resp, err := http.Get(target)
if resp != nil {
@@ -90,12 +103,13 @@ func resolveM3U8(target string) (out []string, err error) {
continue
}
if !strings.Contains(line, "/") {
- // Seems to be a relative link, let's make it absolute
+ // Seems to be a relative link, let's make it absolute.
dir, _ := path.Split(target)
line = dir + line
}
if strings.HasSuffix(line, "m3u8") {
- // The playlist seems to recurse, and so do we
+ // The playlist seems to recurse, and so will we.
+ // XXX: This should be bounded, not just by the stack.
return resolveM3U8(line)
}
out = append(out, line)
@@ -103,6 +117,8 @@ func resolveM3U8(target string) (out []string, err error) {
return out, nil
}
+// metaProc periodically polls the sub-URL given by name for titles and sends
+// them out the given channel. Never returns prematurely.
func metaProc(ctx context.Context, name string, out chan<- string) {
defer close(out)
@@ -116,10 +132,14 @@ func metaProc(ctx context.Context, name string, out chan<- string) {
} else {
current = meta.title
interval = time.Duration(meta.timeout)
+
+ // It seems to normally use 25 seconds which is a lot,
+ // especially considering all the possible additional buffering.
+ if interval > 5000 {
+ interval = 5000
+ }
}
if current != last {
- // TODO: see https://blog.golang.org/pipelines
- // find out if we can do this better
select {
case out <- current:
case <-ctx.Done():
@@ -136,12 +156,77 @@ func metaProc(ctx context.Context, name string, out chan<- string) {
}
}
+// urlProc periodically checks the playlist for yet unseen URLs and sends them
+// over the channel. Assumes that URLs are incremental for simplicity, although
+// there doesn't seem to be any such gaurantee by the HLS protocol.
+func urlProc(ctx context.Context, playlistURL string, out chan<- string) {
+ defer close(out)
+
+ highest := ""
+ for {
+ target, err := resolveM3U8(playlistURL)
+ if err != nil {
+ return
+ }
+ for _, url := range target {
+ if url <= highest {
+ continue
+ }
+ select {
+ case out <- url:
+ highest = url
+ case <-ctx.Done():
+ return
+ }
+ }
+ // I expect this to be mainly driven by the buffered channel but
+ // a small (less than target duration) additional pause will not hurt.
+ time.Sleep(1 * time.Second)
+ }
+}
+
+// https://tools.ietf.org/html/rfc8216
+// http://www.gpac-licensing.com/2014/12/08/apple-hls-technical-depth/
+func dataProc(ctx context.Context, playlistURL string, maxChunk int,
+ out chan<- []byte) {
+ defer close(out)
+
+ // The channel is buffered so that the urlProc can fetch in advance.
+ urls := make(chan string, 3)
+ go urlProc(ctx, playlistURL, urls)
+
+ for url := range urls {
+ resp, err := http.Get(url)
+ if resp != nil {
+ defer resp.Body.Close()
+ }
+ if err != nil {
+ return
+ }
+
+ for {
+ chunk := make([]byte, maxChunk)
+ n, err := resp.Body.Read(chunk)
+
+ select {
+ case out <- chunk[:n]:
+ case <-ctx.Done():
+ return
+ }
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return
+ }
+ }
+ }
+}
+
var pathRE = regexp.MustCompile(`^/(.*?)/(.*?)/(.*?)$`)
func proxy(w http.ResponseWriter, req *http.Request) {
- const targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" +
- "audio/simulcast/hls/%s/%s/ak/%s.m3u8"
- const metaint = 1 << 16
+ const metaint = 1 << 15
m := pathRE.FindStringSubmatch(req.URL.Path)
if m == nil {
http.NotFound(w, req)
@@ -149,14 +234,18 @@ func proxy(w http.ResponseWriter, req *http.Request) {
}
hijacker, ok := w.(http.Hijacker)
if !ok {
- // We're not using TLS where HTTP/2 could have caused this
+ // We're not using TLS where HTTP/2 could have caused this.
panic("cannot hijack connection")
}
// E.g. `nonuk`, `sbr_low` `bbc_radio_one`, or `uk`, `sbr_high`, `bbc_1xtra`
region, quality, name := m[1], m[2], m[3]
- // This validates the params as a side-effect
- target, err := resolveM3U8(fmt.Sprintf(targetURI, region, quality, name))
+
+ // TODO: We probably shouldn't poll the top level playlist.
+ mainPlaylistURL := fmt.Sprintf(targetURI, region, quality, name)
+
+ // This validates the parameters as a side-effect.
+ target, err := resolveM3U8(mainPlaylistURL)
if err == nil && len(target) == 0 {
err = errors.New("cannot resolve playlist")
}
@@ -165,11 +254,8 @@ func proxy(w http.ResponseWriter, req *http.Request) {
return
}
- wantMeta := false
- if icyMeta, ok := req.Header["Icy-MetaData"]; ok {
- wantMeta = len(icyMeta) == 1 && icyMeta[0] == "1"
- }
- resp, err := http.Get(target[0])
+ wantMeta := req.Header.Get("Icy-MetaData") == "1"
+ resp, err := http.Head(target[0])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -182,11 +268,12 @@ func proxy(w http.ResponseWriter, req *http.Request) {
}
defer conn.Close()
- // TODO: retrieve some general information from somewhere?
- // There's nothing interesting in the playlist files.
+ // TODO: Retrieve some general information from somewhere?
+ // There's nothing interesting in the playlist files.
+
fmt.Fprintf(bufrw, "ICY 200 OK\r\n")
fmt.Fprintf(bufrw, "icy-name:%s\r\n", name)
- // BBC marks this as a video type, maybe just force audio/mpeg
+ // BBC marks this as a video type, maybe just force audio/mpeg.
fmt.Fprintf(bufrw, "content-type:%s\r\n", resp.Header["Content-Type"][0])
fmt.Fprintf(bufrw, "icy-pub:%d\r\n", 0)
if wantMeta {
@@ -197,50 +284,44 @@ func proxy(w http.ResponseWriter, req *http.Request) {
metaChan := make(chan string)
go metaProc(req.Context(), name, metaChan)
- // TODO: move to a normal function
- // FIXME: this will load a few seconds (one URL) and die
- // - we can either try to implement this and hope for the best
- // https://tools.ietf.org/html/draft-pantos-http-live-streaming-20
- // then like https://github.com/kz26/gohls/blob/master/main.go
- // - or we can become more of a proxy, which complicates ICY
chunkChan := make(chan []byte)
- go func() {
- defer resp.Body.Close()
- defer close(chunkChan)
- for {
- chunk := make([]byte, metaint)
- n, err := io.ReadFull(resp.Body, chunk)
- chunkChan <- chunk[:n]
- if err != nil {
- return
- }
-
- select {
- default:
- case <-req.Context().Done():
- return
- }
- }
- }()
+ go dataProc(req.Context(), mainPlaylistURL, metaint, chunkChan)
- var queuedMeta []byte
+ // dataProc may return less data near the end of a subfile, so we give it
+ // a maximum count of bytes to return at once and do our own buffering.
+ var queuedMetaUpdate, queuedChunk []byte
for {
select {
case title := <-metaChan:
- queuedMeta = []byte(fmt.Sprintf("StreamTitle='%s'", title))
+ queuedMetaUpdate = []byte(fmt.Sprintf("StreamTitle='%s'", title))
case chunk, ok := <-chunkChan:
if !ok {
return
}
- if wantMeta {
- var meta [1 + 16*255]byte
- meta[0] = byte((copy(meta[1:], queuedMeta) + 15) / 16)
- chunk = append(chunk, meta[:1+int(meta[0])*16]...)
- queuedMeta = nil
+
+ space := metaint - len(queuedChunk)
+ if space > len(chunk) {
+ space = len(chunk)
+ }
+
+ queuedChunk = append(queuedChunk, chunk[:space]...)
+ if len(queuedChunk) < metaint {
+ break
}
- if _, err := bufrw.Write(chunk); err != nil {
+ if _, err := bufrw.Write(queuedChunk); err != nil {
return
}
+
+ queuedChunk = chunk[space:]
+ if wantMeta {
+ var meta [1 + 16*255]byte
+ meta[0] = byte((copy(meta[1:], queuedMetaUpdate) + 15) / 16)
+ queuedMetaUpdate = nil
+
+ if _, err := bufrw.Write(meta[:1+int(meta[0])*16]); err != nil {
+ return
+ }
+ }
if err := bufrw.Flush(); err != nil {
return
}
@@ -257,6 +338,7 @@ func socketActivationListener() net.Listener {
nfds, err := strconv.Atoi(os.Getenv("LISTEN_FDS"))
if err != nil || nfds == 0 {
+ log.Println("LISTEN_FDS unworkable")
return nil
} else if nfds > 1 {
log.Fatalln("not supporting more than one listening socket")
@@ -271,7 +353,7 @@ func socketActivationListener() net.Listener {
return ln
}
-// Had to copy this from Server.ListenAndServe()
+// Had to copy this from Server.ListenAndServe.
type tcpKeepAliveListener struct{ *net.TCPListener }
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
@@ -285,22 +367,22 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
}
func main() {
- listenAddr := ":8000"
- if len(os.Args) == 2 {
- listenAddr = os.Args[1]
- }
-
var listener net.Listener
- if ln := socketActivationListener(); listener != nil {
- // Keepalives can be set in the systemd unit, see systemd.socket(5)
+ if ln := socketActivationListener(); ln != nil {
+ // Keepalives can be set in the systemd unit, see systemd.socket(5).
listener = ln
- } else if ln, err := net.Listen("tcp", listenAddr); err != nil {
- log.Fatalln(err)
} else {
- listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
+ if len(os.Args) < 2 {
+ log.Fatalf("usage: %s LISTEN-ADDR\n", os.Args[0])
+ }
+ if ln, err := net.Listen("tcp", os.Args[1]); err != nil {
+ log.Fatalln(err)
+ } else {
+ listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
+ }
}
http.HandleFunc("/", proxy)
- // We don't need to clean up properly since we store no data
+ // We don't need to clean up properly since we store no data.
log.Fatalln(http.Serve(listener, nil))
}