diff options
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | acid.adoc | 18 | ||||
-rw-r--r-- | acid.go | 1098 | ||||
-rw-r--r-- | acid.yaml.example | 8 | ||||
-rw-r--r-- | acid_test.go | 17 | ||||
-rw-r--r-- | go.mod | 10 | ||||
-rw-r--r-- | go.sum | 69 | ||||
-rw-r--r-- | terminal.go | 379 | ||||
-rw-r--r-- | terminal_test.go | 100 |
9 files changed, 1399 insertions, 305 deletions
@@ -5,10 +5,11 @@ version = dev outputs = acid acid.1 all: $(outputs) -acid: acid.go +acid: acid.go terminal.go go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@ acid.1: acid.adoc - asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc + asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc || \ + a2x -d manpage -f manpage -a release-version=$(version) acid.adoc test: all go test clean: @@ -37,6 +37,8 @@ Commands *restart* [_ID_]...:: Schedule tasks with the given IDs to be rerun. Run this command without arguments to pick up external database changes. +*reload*:: + Reload configuration. Configuration ------------- @@ -45,7 +47,7 @@ file present in the distribution. All paths are currently relative to the directory you launch *acid* from. -The *notify*, *setup*, and *build* scripts are processed using Go's +The *notify*, *setup*, *build*, and *deploy* scripts are processed using Go's _text/template_ package, and take an object describing the task, which has the following fields: @@ -65,6 +67,17 @@ which has the following fields: *RunnerName*:: Descriptive name of the runner. +// Intentionally not documenting CreatedUnix, ChangedUnix, DurationSeconds, +// which can be derived from the objects. +*Created*, *Changed*:: + `*time.Time` of task creation and last task state change respectively, + or nil if not known. +*CreatedAgo*, *ChangedAgo*:: + Abbreviated human-friendly relative elapsed time duration + since *Created* and *Changed* respectively. +*Duration*:: + `*time.Duration` of the last run in seconds, or nil if not known. + *URL*:: *acid* link to the task, where its log output can be seen. *RepoURL*:: @@ -79,7 +92,8 @@ in *sh*(1) command arguments. Runners ------- -Runners receive the following additional environment variables: +Runners and deploy scripts receive the following additional +environment variables: *ACID_ROOT*:: The same as the base directory for configuration. *ACID_RUNNER*:: The same as *Runner* in script templates. @@ -21,15 +21,18 @@ import ( "os" "os/exec" "os/signal" + "path/filepath" "sort" "strconv" "strings" "sync" + "sync/atomic" "syscall" ttemplate "text/template" "time" _ "github.com/mattn/go-sqlite3" + "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "gopkg.in/yaml.v3" ) @@ -38,9 +41,9 @@ var ( projectName = "acid" projectVersion = "?" - gConfig Config = Config{Listen: ":http"} - gNotifyScript *ttemplate.Template - gDB *sql.DB + gConfigPath string + gConfig atomic.Pointer[Config] + gDB *sql.DB gNotifierSignal = make(chan struct{}, 1) gExecutorSignal = make(chan struct{}, 1) @@ -50,6 +53,8 @@ var ( gRunning = make(map[int64]*RunningTask) ) +func getConfig() *Config { return gConfig.Load() } + // --- Config ------------------------------------------------------------------ type Config struct { @@ -63,6 +68,8 @@ type Config struct { Runners map[string]ConfigRunner `yaml:"runners"` // script runners Projects map[string]ConfigProject `yaml:"projects"` // configured projects + + notifyTemplate *ttemplate.Template } type ConfigRunner struct { @@ -84,8 +91,9 @@ type ConfigProject struct { func (cf *ConfigProject) AutomaticRunners() (runners []string) { // We pass through unknown runner names, // so that they can cause reference errors later. + config := getConfig() for runner := range cf.Runners { - if r, _ := gConfig.Runners[runner]; !r.Manual { + if r, _ := config.Runners[runner]; !r.Manual { runners = append(runners, runner) } } @@ -96,20 +104,32 @@ func (cf *ConfigProject) AutomaticRunners() (runners []string) { type ConfigProjectRunner struct { Setup string `yaml:"setup"` // project setup script (SSH) Build string `yaml:"build"` // project build script (SSH) + Deploy string `yaml:"deploy"` // project deploy script (local) Timeout string `yaml:"timeout"` // timeout duration } -func parseConfig(path string) error { - if f, err := os.Open(path); err != nil { +// loadConfig reloads configuration. +// Beware that changes do not get applied globally at the same moment. +func loadConfig() error { + new := &Config{} + if f, err := os.Open(gConfigPath); err != nil { return err - } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil { + } else if err = yaml.NewDecoder(f).Decode(new); err != nil { return err } + if old := getConfig(); old != nil && old.DB != new.DB { + return fmt.Errorf("the database file cannot be changed in runtime") + } var err error - gNotifyScript, err = - ttemplate.New("notify").Funcs(shellFuncs).Parse(gConfig.Notify) - return err + new.notifyTemplate, err = + ttemplate.New("notify").Funcs(shellFuncs).Parse(new.Notify) + if err != nil { + return err + } + + gConfig.Store(new) + return nil } var shellFuncs = ttemplate.FuncMap{ @@ -133,8 +153,16 @@ var shellFuncs = ttemplate.FuncMap{ // --- Utilities --------------------------------------------------------------- +func localShell() string { + if shell := os.Getenv("SHELL"); shell != "" { + return shell + } + // The os/user package doesn't store the parsed out shell field. + return "/bin/sh" +} + func giteaSign(b []byte) string { - payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) + payloadHmac := hmac.New(sha256.New, []byte(getConfig().Secret)) payloadHmac.Write(b) return hex.EncodeToString(payloadHmac.Sum(nil)) } @@ -142,9 +170,9 @@ func giteaSign(b []byte) string { func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) ( *http.Request, error) { req, err := http.NewRequestWithContext( - ctx, method, gConfig.Gitea+path, body) + ctx, method, getConfig().Gitea+path, body) if req != nil { - req.Header.Set("Authorization", "token "+gConfig.Token) + req.Header.Set("Authorization", "token "+getConfig().Token) req.Header.Set("Accept", "application/json") } return req, err @@ -153,7 +181,9 @@ func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) ( func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { rows, err := gDB.QueryContext(ctx, ` SELECT id, owner, repo, hash, runner, - state, detail, notified, runlog, tasklog FROM task `+query, args...) + created, changed, duration, + state, detail, notified, + runlog, tasklog, deploylog FROM task `+query, args...) if err != nil { return nil, err } @@ -163,10 +193,13 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { for rows.Next() { var t Task err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, - &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog) + &t.CreatedUnix, &t.ChangedUnix, &t.DurationSeconds, + &t.State, &t.Detail, &t.Notified, + &t.RunLog, &t.TaskLog, &t.DeployLog) if err != nil { return nil, err } + // We could also update some fields from gRunning. tasks = append(tasks, t) } return tasks, rows.Err() @@ -187,6 +220,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(` <thead> <tr> <th>ID</th> + <th>Created</th> + <th>Changed</th> <th>Repository</th> <th>Hash</th> <th>Runner</th> @@ -199,6 +234,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(` {{range .}} <tr> <td><a href="task/{{.ID}}">{{.ID}}</a></td> + <td align="right"><span title="{{.Created}}">{{.CreatedAgo}}</span></td> + <td align="right"><span title="{{.Changed}}">{{.ChangedAgo}}</span></td> <td><a href="{{.RepoURL}}">{{.FullName}}</a></td> <td><a href="{{.CommitURL}}">{{.Hash}}</a></td> <td>{{.RunnerName}}</td> @@ -214,7 +251,7 @@ var templateTasks = template.Must(template.New("tasks").Parse(` `)) func handleTasks(w http.ResponseWriter, r *http.Request) { - tasks, err := getTasks(r.Context(), `ORDER BY id DESC`) + tasks, err := getTasks(r.Context(), `ORDER BY changed DESC, id DESC`) if err != nil { http.Error(w, "Error retrieving tasks: "+err.Error(), @@ -233,37 +270,179 @@ var templateTask = template.Must(template.New("tasks").Parse(` <head> <title>Task {{.ID}}</title> <meta charset="utf-8"> -{{if .IsRunning}} -<meta http-equiv="refresh" content="5"> -{{end}} </head> <body> <h1><a href="..">Tasks</a> » {{.ID}}</h1> <dl> +<!-- Remember to synchronise these lists with Javascript updates. --> +{{if .Created -}} +<dt>Created</dt> + <dd><span id="created" title="{{.Created}}">{{.CreatedAgo}} ago</span></dd> +{{end -}} +{{if .Changed -}} +<dt>Changed</dt> + <dd><span id="changed" title="{{.Changed}}">{{.ChangedAgo}} ago</span></dd> +{{end -}} <dt>Project</dt> - <dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd> + <dd id="project"><a href="{{.RepoURL}}">{{.FullName}}</a></dd> <dt>Commit</dt> - <dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd> + <dd id="commit"><a href="{{.CommitURL}}">{{.Hash}}</a></dd> <dt>Runner</dt> - <dd>{{.RunnerName}}</dd> + <dd id="runner">{{.RunnerName}}</dd> <dt>State</dt> - <dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd> + <dd id="state">{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd> <dt>Notified</dt> - <dd>{{.Notified}}</dd> + <dd id="notified">{{.Notified}}</dd> +<dt>Duration</dt> + <dd id="duration">{{if .Duration}}{{.Duration}}{{else}}—{{end}}</dd> </dl> -{{if .RunLog}} -<h2>Runner log</h2> -<pre>{{printf "%s" .RunLog}}</pre> -{{end}} -{{if .TaskLog}} -<h2>Task log</h2> -<pre>{{printf "%s" .TaskLog}}</pre> -{{end}} -</table> + +<h2 id="run"{{if not .RunLog}} hidden{{end}}>Runner log</h2> +<pre id="runlog"{{if not .RunLog}} hidden{{ +end}}>{{printf "%s" .RunLog}}</pre> +<h2 id="task"{{if not .TaskLog}} hidden{{end}}>Task log</h2> +<pre id="tasklog"{{if not .TaskLog}} hidden{{ +end}}>{{printf "%s" .TaskLog}}</pre> +<h2 id="deploy"{{if not .DeployLog}} hidden{{end}}>Deploy log</h2> +<pre id="deploylog"{{if not .DeployLog}} hidden{{ +end}}>{{printf "%s" .DeployLog}}</pre> + +{{if .IsRunning -}} +<script> +function get(id) { + return document.getElementById(id) +} +function getLog(id) { + const header = get(id), log = get(id + 'log'), text = log.textContent + // lines[-1] is an implementation detail of terminalWriter.Serialize, + // lines[-2] is the actual last line. + const last = Math.max(0, text.split('\n').length - 2) + return {header, log, text, last} +} +function refreshLog(log, top, changed) { + if (top <= 0) + log.log.textContent = changed + else + log.log.textContent = + log.text.split('\n').slice(0, top).join('\n') + '\n' + changed + + const empty = log.log.textContent === '' + log.header.hidden = empty + log.log.hidden = empty +} +let refresher = setInterval(() => { + const run = getLog('run'), task = getLog('task'), deploy = getLog('deploy') + const url = new URL(window.location.href) + url.search = '' + url.searchParams.set('json', '') + url.searchParams.set('run', run.last) + url.searchParams.set('task', task.last) + url.searchParams.set('deploy', deploy.last) + fetch(url.toString()).then(response => { + if (!response.ok) + throw response.statusText + return response.json() + }).then(data => { + const scroll = window.scrollY + window.innerHeight + >= document.documentElement.scrollHeight + if (data.Created) { + get('created').title = data.Created + get('created').textContent = data.CreatedAgo + " ago" + } + if (data.Changed) { + get('changed').title = data.Changed + get('changed').textContent = data.ChangedAgo + " ago" + } + get('state').textContent = data.State + if (data.Detail !== '') + get('state').textContent += " (" + data.Detail + ")" + get('notified').textContent = String(data.Notified) + if (data.Duration) + get('duration').textContent = data.Duration + + refreshLog(run, data.RunLogTop, data.RunLog) + refreshLog(task, data.TaskLogTop, data.TaskLog) + refreshLog(deploy, data.DeployLogTop, data.DeployLog) + + if (scroll) + document.documentElement.scrollTop = + document.documentElement.scrollHeight + if (!data.IsRunning) + clearInterval(refresher) + }).catch(error => { + clearInterval(refresher) + alert(error) + }) +}, 1000 /* For faster updates than this, we should use WebSockets. */) +</script> +{{end -}} + </body> </html> `)) +// handlerTask serves as the data for JSON encoding and the task HTML template. +// It needs to explicitly include many convenience method results. +type handlerTask struct { + Task + IsRunning bool + + Created *string // Task.Created?.String() + Changed *string // Task.Changed?.String() + CreatedAgo string // Task.CreatedAgo() + ChangedAgo string // Task.ChangedAgo() + Duration *string // Task.Duration?.String() + State string // Task.State.String() + RunLog string + RunLogTop int + TaskLog string + TaskLogTop int + DeployLog string + DeployLogTop int +} + +func toNilableString[T fmt.Stringer](stringer *T) *string { + if stringer == nil { + return nil + } + s := (*stringer).String() + return &s +} + +func newHandlerTask(task Task) handlerTask { + return handlerTask{ + Task: task, + RunLog: string(task.RunLog), + TaskLog: string(task.TaskLog), + DeployLog: string(task.DeployLog), + + Created: toNilableString(task.Created()), + Changed: toNilableString(task.Changed()), + CreatedAgo: task.CreatedAgo(), + ChangedAgo: task.ChangedAgo(), + Duration: toNilableString(task.Duration()), + State: task.State.String(), + } +} + +func (ht *handlerTask) updateFromRunning( + rt *RunningTask, lastRun, lastTask, lastDeploy int) { + ht.IsRunning = true + ht.Task.DurationSeconds = rt.elapsed() + ht.Duration = toNilableString(ht.Task.Duration()) + + rt.RunLog.Lock() + defer rt.RunLog.Unlock() + rt.TaskLog.Lock() + defer rt.TaskLog.Unlock() + rt.DeployLog.Lock() + defer rt.DeployLog.Unlock() + + ht.RunLog, ht.RunLogTop = rt.RunLog.SerializeUpdates(lastRun) + ht.TaskLog, ht.TaskLogTop = rt.TaskLog.SerializeUpdates(lastTask) + ht.DeployLog, ht.DeployLogTop = rt.DeployLog.SerializeUpdates(lastDeploy) +} + func handleTask(w http.ResponseWriter, r *http.Request) { id, err := strconv.Atoi(r.PathValue("id")) if err != nil { @@ -284,31 +463,32 @@ func handleTask(w http.ResponseWriter, r *http.Request) { return } - task := struct { - Task - IsRunning bool - }{Task: tasks[0]} + // These are intended for running tasks, + // so don't reprocess DB logs, which would only help the last update. + q := r.URL.Query() + lastRun, _ := strconv.Atoi(q.Get("run")) + lastTask, _ := strconv.Atoi(q.Get("task")) + lastDeploy, _ := strconv.Atoi(q.Get("deploy")) + + task := newHandlerTask(tasks[0]) func() { gRunningMutex.Lock() defer gRunningMutex.Unlock() - rt, ok := gRunning[task.ID] - task.IsRunning = ok - if !ok { - return + if rt, ok := gRunning[task.ID]; ok { + task.updateFromRunning( + rt, int(lastRun), int(lastTask), int(lastDeploy)) } - - rt.RunLog.mu.Lock() - defer rt.RunLog.mu.Unlock() - rt.TaskLog.mu.Lock() - defer rt.TaskLog.mu.Unlock() - - task.RunLog = rt.RunLog.b - task.TaskLog = rt.TaskLog.b }() - if err := templateTask.Execute(w, &task); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + if q.Has("json") { + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(&task) + } else { + err = templateTask.Execute(w, &task) + } + if err != nil { + log.Println(err) } } @@ -335,8 +515,9 @@ func createTasks(ctx context.Context, } defer tx.Rollback() - stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner) - VALUES (?, ?, ?, ?)`) + stmt, err := tx.Prepare( + `INSERT INTO task(owner, repo, hash, runner, created, changed) + VALUES (?, ?, ?, ?, unixepoch('now'), unixepoch('now'))`) if err != nil { return err } @@ -385,7 +566,7 @@ func handlePush(w http.ResponseWriter, r *http.Request) { log.Printf("received push: %s %s\n", event.Repository.FullName, event.HeadCommit.ID) - project, ok := gConfig.Projects[event.Repository.FullName] + project, ok := getConfig().Projects[event.Repository.FullName] if !ok { // This is okay, don't set any commit statuses. fmt.Fprintf(w, "The project is not configured.") @@ -417,8 +598,11 @@ func rpcRestartOne(ctx context.Context, id int64) error { // The executor bumps to "running" after inserting into gRunning, // so we should not need to exclude that state here. - result, err := gDB.ExecContext(ctx, `UPDATE task - SET state = ?, detail = '', notified = 0 WHERE id = ?`, + // + // We deliberately do not clear previous run data (duration, *log). + result, err := gDB.ExecContext(ctx, + `UPDATE task SET changed = unixepoch('now'), + state = ?, detail = '', notified = 0 WHERE id = ?`, taskStateNew, id) if err != nil { return fmt.Errorf("%d: %w", id, err) @@ -494,7 +678,7 @@ func rpcEnqueue(ctx context.Context, return fmt.Errorf("%s: %w", ref, err) } - project, ok := gConfig.Projects[owner+"/"+repo] + project, ok := getConfig().Projects[owner+"/"+repo] if !ok { return fmt.Errorf("project configuration not found") } @@ -546,6 +730,17 @@ func rpcRestart(ctx context.Context, return nil } +func rpcReload(ctx context.Context, + w io.Writer, fs *flag.FlagSet, args []string) error { + if err := fs.Parse(args); err != nil { + return err + } + if fs.NArg() > 0 { + return errWrongUsage + } + return loadConfig() +} + // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - var rpcCommands = map[string]struct { @@ -558,6 +753,8 @@ var rpcCommands = map[string]struct { "Create or restart tasks for the given reference."}, "restart": {rpcRestart, "[ID]...", "Schedule tasks with the given IDs to be rerun."}, + "reload": {rpcReload, "", + "Reload configuration."}, } func rpcPrintCommands(w io.Writer) { @@ -644,12 +841,12 @@ func handleRPC(w http.ResponseWriter, r *http.Request) { func notifierRunCommand(ctx context.Context, task Task) { script := bytes.NewBuffer(nil) - if err := gNotifyScript.Execute(script, &task); err != nil { + if err := getConfig().notifyTemplate.Execute(script, &task); err != nil { log.Printf("error: notify: %s", err) return } - cmd := exec.CommandContext(ctx, "sh") + cmd := exec.CommandContext(ctx, localShell()) cmd.Stdin = script cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -671,13 +868,14 @@ func notifierNotify(ctx context.Context, task Task) error { TargetURL string `json:"target_url"` }{} - runner, ok := gConfig.Runners[task.Runner] + config := getConfig() + runner, ok := config.Runners[task.Runner] if !ok { log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) return nil } payload.Context = runner.Name - payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID) + payload.TargetURL = fmt.Sprintf("%s/task/%d", config.Root, task.ID) switch task.State { case taskStateNew: @@ -773,86 +971,318 @@ func notifierAwaken() { } // --- Executor ---------------------------------------------------------------- +// ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -type terminalWriter struct { - b []byte - cur int - mu sync.Mutex +// RunningTask stores all data pertaining to a currently running task. +type RunningTask struct { + DB Task + Runner ConfigRunner + ProjectRunner ConfigProjectRunner + + RunLog terminalWriter + TaskLog terminalWriter + DeployLog terminalWriter + + wd string // acid working directory + timeout time.Duration // time limit on task execution + signer ssh.Signer // SSH private key + tmplScript *ttemplate.Template // remote build script + tmplDeploy *ttemplate.Template // local deployment script } -func (tw *terminalWriter) Write(p []byte) (written int, err error) { - tw.mu.Lock() - defer tw.mu.Unlock() +// newRunningTask prepares a task for running, without executing anything yet. +func newRunningTask(task Task) (*RunningTask, error) { + rt := &RunningTask{DB: task} + config := getConfig() + + // This is for our own tracking, not actually written to database. + rt.DB.ChangedUnix = time.Now().Unix() + + var ok bool + rt.Runner, ok = config.Runners[rt.DB.Runner] + if !ok { + return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner) + } + project, ok := config.Projects[rt.DB.FullName()] + if !ok { + return nil, fmt.Errorf("project configuration not found") + } + rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] + if !ok { + return nil, fmt.Errorf( + "project not configured for runner %s", rt.DB.Runner) + } + + var err error + if rt.wd, err = os.Getwd(); err != nil { + return nil, err + } - // Extremely rudimentary emulation of a dumb terminal. - for _, b := range p { - // Enough is enough, writing too much is highly suspicious. - if len(tw.b) > 64<<20 { - return written, errors.New("too much terminal output") + // Lenient or not, some kind of a time limit is desirable. + rt.timeout = time.Hour + if rt.ProjectRunner.Timeout != "" { + rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout) + if err != nil { + return nil, fmt.Errorf("timeout: %w", err) } + } - switch b { - case '\b': - if tw.cur > 0 && tw.b[tw.cur-1] != '\n' { - tw.cur-- - } - case '\r': - for tw.cur > 0 && tw.b[tw.cur-1] != '\n' { - tw.cur-- - } - case '\n': - tw.b = append(tw.b, b) - tw.cur = len(tw.b) - default: - tw.b = append(tw.b[:tw.cur], b) - tw.cur = len(tw.b) + privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) + if err != nil { + return nil, fmt.Errorf( + "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) + } + rt.signer, err = ssh.ParsePrivateKey(privateKey) + if err != nil { + return nil, fmt.Errorf( + "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) + } + + // The runner setup script may change the working directory, + // so do everything in one go. However, this approach also makes it + // difficult to distinguish project-independent runner failures. + // (For that, we can start multiple ssh.Sessions.) + // + // We could pass variables through SSH environment variables, + // which would require enabling PermitUserEnvironment in sshd_config, + // or through prepending script lines, but templates are a bit simpler. + // + // We let the runner itself clone the repository: + // - it is a more flexible in that it can test AUR packages more normally, + // - we might have to clone submodules as well. + // Otherwise, we could download a source archive from Gitea, + // and use SFTP to upload it to the runner. + rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs). + Parse(rt.Runner.Setup + "\n" + + rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) + if err != nil { + return nil, fmt.Errorf("script/build: %w", err) + } + + rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs). + Parse(rt.ProjectRunner.Deploy) + if err != nil { + return nil, fmt.Errorf("script/deploy: %w", err) + } + + if os.Getenv("ACID_TERMINAL_DEBUG") != "" { + base := filepath.Join(executorTmpDir("/tmp"), + fmt.Sprintf("acid-%d-%s-%s-%s-", + task.ID, task.Owner, task.Repo, task.Runner)) + rt.RunLog.Tee, _ = os.Create(base + "runlog") + rt.TaskLog.Tee, _ = os.Create(base + "tasklog") + // The deployment log should not be interesting. + } + return rt, nil +} + +func (rt *RunningTask) close() { + for _, tee := range []io.WriteCloser{ + rt.RunLog.Tee, rt.TaskLog.Tee, rt.DeployLog.Tee} { + if tee != nil { + tee.Close() } + } +} + +// localEnv creates a process environment for locally run executables. +func (rt *RunningTask) localEnv() []string { + return append(os.Environ(), + "ACID_ROOT="+rt.wd, + "ACID_RUNNER="+rt.DB.Runner, + ) +} + +func (rt *RunningTask) elapsed() int64 { + return int64(time.Since(time.Unix(rt.DB.ChangedUnix, 0)).Seconds()) +} + +// update stores the running task's state in the database. +func (rt *RunningTask) update() error { + for _, i := range []struct { + tw *terminalWriter + log *[]byte + }{ + {&rt.RunLog, &rt.DB.RunLog}, + {&rt.TaskLog, &rt.DB.TaskLog}, + {&rt.DeployLog, &rt.DB.DeployLog}, + } { + i.tw.Lock() + defer i.tw.Unlock() + if *i.log = i.tw.Serialize(0); *i.log == nil { + *i.log = []byte{} + } + } + rt.DB.DurationSeconds = rt.elapsed() + return rt.DB.update() +} + +// ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +func executorDownloadNode(sc *sftp.Client, remotePath, localPath string, + info os.FileInfo) error { + if info.IsDir() { + // Hoping that caller invokes us on parents first. + return os.MkdirAll(localPath, info.Mode().Perm()) + } + + src, err := sc.Open(remotePath) + if err != nil { + return fmt.Errorf("failed to open remote file %s: %w", remotePath, err) + } + defer src.Close() + + dst, err := os.OpenFile( + localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm()) + if err != nil { + return fmt.Errorf("failed to create local file: %w", err) + } + defer dst.Close() + + if _, err = io.Copy(dst, src); err != nil { + return fmt.Errorf("failed to copy file from remote %s to local %s: %w", + remotePath, localPath, err) + } + return nil +} +func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error { + sc, err := sftp.NewClient(client) + if err != nil { + return err + } + defer sc.Close() + + walker := sc.Walk(remoteRoot) + for walker.Step() { + if walker.Err() != nil { + return walker.Err() + } + relativePath, err := filepath.Rel(remoteRoot, walker.Path()) if err != nil { - break + return err + } + if err = executorDownloadNode(sc, walker.Path(), + filepath.Join(localRoot, relativePath), walker.Stat()); err != nil { + return err } - written += 1 } - return + return nil } -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +func executorTmpDir(fallback string) string { + // See also: https://systemd.io/TEMPORARY_DIRECTORIES/ + if tmp := os.Getenv("TMPDIR"); tmp != "" { + return tmp + } + return fallback +} -type RunningTask struct { - DB Task - Runner ConfigRunner - ProjectRunner ConfigProjectRunner +func executorDeploy( + ctx context.Context, client *ssh.Client, rt *RunningTask) error { + script := bytes.NewBuffer(nil) + if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil { + return &executorError{"Deploy template failed", err} + } + + // Thus the deployment directory must exist iff the script is not empty. + if script.Len() == 0 { + return nil + } + + // We expect the files to be moved elsewhere on the filesystem, + // and they may get very large, so avoid /tmp. + dir := filepath.Join(executorTmpDir("/var/tmp"), "acid-deploy") + if err := os.RemoveAll(dir); err != nil { + return err + } + if err := os.Mkdir(dir, 0755); err != nil { + return err + } - RunLog terminalWriter - TaskLog terminalWriter + // The passed remoteRoot is relative to sc.Getwd. + if err := executorDownload(client, "acid-deploy", dir); err != nil { + return err + } + + cmd := exec.CommandContext(ctx, localShell()) + cmd.Env = rt.localEnv() + cmd.Dir = dir + cmd.Stdin = script + cmd.Stdout = &rt.DeployLog + cmd.Stderr = &rt.DeployLog + return cmd.Run() } -func executorUpdate(rt *RunningTask) error { - rt.RunLog.mu.Lock() - defer rt.RunLog.mu.Unlock() - rt.DB.RunLog = bytes.Clone(rt.RunLog.b) - if rt.DB.RunLog == nil { - rt.DB.RunLog = []byte{} +// ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +func executorBuild( + ctx context.Context, client *ssh.Client, rt *RunningTask) error { + // This is here to fail early, though logically it is misplaced. + script := bytes.NewBuffer(nil) + if err := rt.tmplScript.Execute(script, &rt.DB); err != nil { + return &executorError{"Script template failed", err} } - rt.TaskLog.mu.Lock() - defer rt.TaskLog.mu.Unlock() - rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b) - if rt.DB.TaskLog == nil { - rt.DB.TaskLog = []byte{} + session, err := client.NewSession() + if err != nil { + return &executorError{"SSH failure", err} + } + defer session.Close() + + modes := ssh.TerminalModes{ssh.ECHO: 0} + if err := session.RequestPty("dumb", 24, 80, modes); err != nil { + return &executorError{"SSH failure", err} } - _, err := gDB.ExecContext(context.Background(), `UPDATE task - SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ? - WHERE id = ?`, - rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog, - rt.DB.ID) - if err == nil { - notifierAwaken() + log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) + + session.Stdout = &rt.TaskLog + session.Stderr = &rt.TaskLog + + // Although passing the script directly takes away the option to specify + // a particular shell (barring here-documents!), it is simple and reliable. + // + // Passing the script over Stdin to sh tended to end up with commands + // eating the script during processing, and resulted in a hang, + // because closing the Stdin does not result in remote processes + // getting a stream of EOF. + // + // Piping the script into `cat | sh` while appending a ^D to the end of it + // appeared to work, but it seems likely that commands might still steal + // script bytes from the cat program if they choose to read from the tty + // and the script is longer than the buffer. + chSession := make(chan error, 1) + go func() { + chSession <- session.Run(script.String()) + close(chSession) + }() + + select { + case <-ctx.Done(): + // Either shutdown, or early runner termination. + // The runner is not supposed to finish before the session. + err = context.Cause(ctx) + case err = <-chSession: + // Killing a runner may perfectly well trigger this first, + // in particular when it's on the same machine. } return err } +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// executorError describes a taskStateError. +type executorError struct { + Detail string + Err error +} + +func (e *executorError) Unwrap() error { return e.Err } +func (e *executorError) Error() string { + return fmt.Sprintf("%s: %s", e.Detail, e.Err) +} + func executorConnect( ctx context.Context, config *ssh.ClientConfig, address string) ( *ssh.Client, error) { @@ -894,122 +1324,60 @@ func executorConnect( time.Sleep(1 * time.Second) continue } - return ssh.NewClient(sc, chans, reqs), nil } } func executorRunTask(ctx context.Context, task Task) error { - rt := &RunningTask{DB: task} - - var ok bool - rt.Runner, ok = gConfig.Runners[rt.DB.Runner] - if !ok { - return fmt.Errorf("unknown runner: %s", rt.DB.Runner) - } - project, ok := gConfig.Projects[rt.DB.FullName()] - if !ok { - return fmt.Errorf("project configuration not found") - } - rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] - if !ok { - return fmt.Errorf( - "project not configured for runner %s", rt.DB.Runner) - } - - wd, err := os.Getwd() - if err != nil { - return err - } - - // The runner setup script may change the working directory, - // so do everything in one go. However, this approach also makes it - // difficult to distinguish project-independent runner failures. - // (For that, we can start multiple ssh.Sessions.) - // - // We could pass variables through SSH environment variables, - // which would require enabling PermitUserEnvironment in sshd_config, - // or through prepending script lines, but templates are a bit simpler. - // - // We let the runner itself clone the repository: - // - it is a more flexible in that it can test AUR packages more normally, - // - we might have to clone submodules as well. - // Otherwise, we could download a source archive from Gitea, - // and use SFTP to upload it to the runner. - tmplScript, err := ttemplate.New("script").Funcs(shellFuncs). - Parse(rt.Runner.Setup + "\n" + - rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) + rt, err := newRunningTask(task) if err != nil { - return fmt.Errorf("script: %w", err) + task.DurationSeconds = 0 + task.State, task.Detail = taskStateError, "Misconfigured" + task.Notified = 0 + task.RunLog = []byte(err.Error()) + task.TaskLog = []byte{} + task.DeployLog = []byte{} + return task.update() } + defer rt.close() - // Lenient or not, some kind of a time limit is desirable. - timeout := time.Hour - if rt.ProjectRunner.Timeout != "" { - timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout) - if err != nil { - return fmt.Errorf("timeout: %w", err) - } - } - ctx, cancelTimeout := context.WithTimeout(ctx, timeout) + ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout) defer cancelTimeout() - privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) - if err != nil { - return fmt.Errorf( - "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) - } - signer, err := ssh.ParsePrivateKey(privateKey) - if err != nil { - return fmt.Errorf( - "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) - } - - defer func() { - gRunningMutex.Lock() - defer gRunningMutex.Unlock() - - delete(gRunning, rt.DB.ID) - }() - func() { + // RunningTasks can be concurrently accessed by HTTP handlers. + locked := func(f func()) { gRunningMutex.Lock() defer gRunningMutex.Unlock() - + f() + } + locked(func() { + rt.DB.DurationSeconds = 0 rt.DB.State, rt.DB.Detail = taskStateRunning, "" rt.DB.Notified = 0 rt.DB.RunLog = []byte{} rt.DB.TaskLog = []byte{} + rt.DB.DeployLog = []byte{} gRunning[rt.DB.ID] = rt - }() - if err := executorUpdate(rt); err != nil { + }) + defer locked(func() { + delete(gRunning, rt.DB.ID) + }) + if err := rt.update(); err != nil { return fmt.Errorf("SQL: %w", err) } // Errors happening while trying to write an error are unfortunate, // but not important enough to abort entirely. setError := func(detail string) { - gRunningMutex.Lock() - defer gRunningMutex.Unlock() - rt.DB.State, rt.DB.Detail = taskStateError, detail - if err := executorUpdate(rt); err != nil { + if err := rt.update(); err != nil { log.Printf("error: task %d for %s: SQL: %s", rt.DB.ID, rt.DB.FullName(), err) } } - script := bytes.NewBuffer(nil) - if err := tmplScript.Execute(script, &rt.DB); err != nil { - setError("Script template failed") - return err - } - cmd := exec.CommandContext(ctx, rt.Runner.Run) - cmd.Env = append( - os.Environ(), - "ACID_ROOT="+wd, - "ACID_RUNNER="+rt.DB.Runner, - ) + cmd.Env = rt.localEnv() // Pushing the runner into a new process group that can be killed at once // with all its children isn't bullet-proof, it messes with job control @@ -1033,7 +1401,8 @@ func executorRunTask(ctx context.Context, task Task) error { cmd.Stdout = &rt.RunLog cmd.Stderr = &rt.RunLog if err := cmd.Start(); err != nil { - setError("Runner failed to start") + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + locked(func() { setError("Runner failed to start") }) return err } @@ -1059,78 +1428,62 @@ func executorRunTask(ctx context.Context, task Task) error { client, err := executorConnect(ctxRunner, &ssh.ClientConfig{ User: rt.Runner.SSH.User, - Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, + Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), }, rt.Runner.SSH.Address) if err != nil { fmt.Fprintf(&rt.TaskLog, "%s\n", err) - setError("SSH failure") + locked(func() { setError("SSH failure") }) return err } defer client.Close() - session, err := client.NewSession() - if err != nil { - fmt.Fprintf(&rt.TaskLog, "%s\n", err) - setError("SSH failure") - return err - } - defer session.Close() + var ( + eeSSH *ssh.ExitError + eeExec *exec.ExitError + ee3 *executorError + ) - modes := ssh.TerminalModes{ssh.ECHO: 0} - if err := session.RequestPty("dumb", 24, 80, modes); err != nil { - fmt.Fprintf(&rt.TaskLog, "%s\n", err) - setError("SSH failure") - return err + err = executorBuild(ctxRunner, client, rt) + if err != nil { + locked(func() { + if errors.As(err, &eeSSH) { + rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } else if errors.As(err, &ee3) { + rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail + fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee3.Err) + } else { + rt.DB.State, rt.DB.Detail = taskStateError, "" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } + }) + return rt.update() } - log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) - - session.Stdout = &rt.TaskLog - session.Stderr = &rt.TaskLog - - // Although passing the script directly takes away the option to specify - // a particular shell (barring here-documents!), it is simple and reliable. - // - // Passing the script over Stdin to sh tended to end up with commands - // eating the script during processing, and resulted in a hang, - // because closing the Stdin does not result in remote processes - // getting a stream of EOF. - // - // Piping the script into `cat | sh` while appending a ^D to the end of it - // appeared to work, but it seems likely that commands might still steal - // script bytes from the cat program if they choose to read from the tty - // and the script is longer than the buffer. - chSession := make(chan error, 1) + // This is so that it doesn't stay hanging within the sftp package, + // which uses context.Background() everywhere. go func() { - chSession <- session.Run(script.String()) - close(chSession) + <-ctxRunner.Done() + client.Close() }() - select { - case <-ctxRunner.Done(): - // Either shutdown, or early runner termination. - // The runner is not supposed to finish before the session. - err = context.Cause(ctxRunner) - case err = <-chSession: - // Killing a runner may perfectly well trigger this first, - // in particular when it's on the same machine. - } - - gRunningMutex.Lock() - defer gRunningMutex.Unlock() - - var ee *ssh.ExitError - if err == nil { - rt.DB.State, rt.DB.Detail = taskStateSuccess, "" - } else if errors.As(err, &ee) { - rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" - fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) - } else { - rt.DB.State, rt.DB.Detail = taskStateError, "" - fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) - } - return executorUpdate(rt) + err = executorDeploy(ctxRunner, client, rt) + locked(func() { + if err == nil { + rt.DB.State, rt.DB.Detail = taskStateSuccess, "" + } else if errors.As(err, &eeExec) { + rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed" + fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) + } else if errors.As(err, &ee3) { + rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail + fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee3.Err) + } else { + rt.DB.State, rt.DB.Detail = taskStateError, "" + fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) + } + }) + return rt.update() } func executorRun(ctx context.Context) error { @@ -1208,17 +1561,23 @@ type Task struct { Hash string Runner string - State taskState - Detail string - Notified int64 - RunLog []byte - TaskLog []byte + // True database names for these are occupied by accessors. + CreatedUnix int64 + ChangedUnix int64 + DurationSeconds int64 + + State taskState + Detail string + Notified int64 + RunLog []byte + TaskLog []byte + DeployLog []byte } func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } func (t *Task) RunnerName() string { - if runner, ok := gConfig.Runners[t.Runner]; !ok { + if runner, ok := getConfig().Runners[t.Runner]; !ok { return t.Runner } else { return runner.Name @@ -1226,42 +1585,129 @@ func (t *Task) RunnerName() string { } func (t *Task) URL() string { - return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID) + return fmt.Sprintf("%s/task/%d", getConfig().Root, t.ID) } func (t *Task) RepoURL() string { - return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo) + return fmt.Sprintf("%s/%s/%s", getConfig().Gitea, t.Owner, t.Repo) } func (t *Task) CommitURL() string { return fmt.Sprintf("%s/%s/%s/commit/%s", - gConfig.Gitea, t.Owner, t.Repo, t.Hash) + getConfig().Gitea, t.Owner, t.Repo, t.Hash) } func (t *Task) CloneURL() string { - return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo) + return fmt.Sprintf("%s/%s/%s.git", getConfig().Gitea, t.Owner, t.Repo) +} + +func shortDurationString(d time.Duration) string { + if d.Abs() >= 24*time.Hour { + return strconv.FormatInt(int64(d/time.Hour/24), 10) + "d" + } else if d.Abs() >= time.Hour { + return strconv.FormatInt(int64(d/time.Hour), 10) + "h" + } else if d.Abs() >= time.Minute { + return strconv.FormatInt(int64(d/time.Minute), 10) + "m" + } else { + return strconv.FormatInt(int64(d/time.Second), 10) + "s" + } +} + +func (t *Task) Created() *time.Time { + if t.CreatedUnix == 0 { + return nil + } + tt := time.Unix(t.CreatedUnix, 0) + return &tt +} +func (t *Task) Changed() *time.Time { + if t.ChangedUnix == 0 { + return nil + } + tt := time.Unix(t.ChangedUnix, 0) + return &tt +} + +func (t *Task) CreatedAgo() string { + if t.CreatedUnix == 0 { + return "" + } + return shortDurationString(time.Since(*t.Created())) +} + +func (t *Task) ChangedAgo() string { + if t.ChangedUnix == 0 { + return "" + } + return shortDurationString(time.Since(*t.Changed())) +} + +func (t *Task) Duration() *time.Duration { + if t.DurationSeconds == 0 { + return nil + } + td := time.Duration(t.DurationSeconds * int64(time.Second)) + return &td +} + +func (t *Task) update() error { + _, err := gDB.ExecContext(context.Background(), + `UPDATE task SET changed = unixepoch('now'), duration = ?, + state = ?, detail = ?, notified = ?, + runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`, + t.DurationSeconds, + t.State, t.Detail, t.Notified, + t.RunLog, t.TaskLog, t.DeployLog, t.ID) + if err == nil { + notifierAwaken() + } + return err } -const schemaSQL = ` +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +const initializeSQL = ` +PRAGMA application_id = 0x61636964; -- "acid" in big endian + CREATE TABLE IF NOT EXISTS task( - id INTEGER NOT NULL, -- unique ID + id INTEGER NOT NULL, -- unique ID + + owner TEXT NOT NULL, -- Gitea username + repo TEXT NOT NULL, -- Gitea repository name + hash TEXT NOT NULL, -- commit hash + runner TEXT NOT NULL, -- the runner to use - owner TEXT NOT NULL, -- Gitea username - repo TEXT NOT NULL, -- Gitea repository name - hash TEXT NOT NULL, -- commit hash - runner TEXT NOT NULL, -- the runner to use + created INTEGER NOT NULL DEFAULT 0, -- creation timestamp + changed INTEGER NOT NULL DEFAULT 0, -- last state change timestamp + duration INTEGER NOT NULL DEFAULT 0, -- duration of last run - state INTEGER NOT NULL DEFAULT 0, -- task state - detail TEXT NOT NULL DEFAULT '', -- task state detail - notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state - runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output - tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output + state INTEGER NOT NULL DEFAULT 0, -- task state + detail TEXT NOT NULL DEFAULT '', -- task state detail + notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state + runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output + tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output + deploylog BLOB NOT NULL DEFAULT x'', -- deployment output PRIMARY KEY (id) ) STRICT; ` -func openDB(path string) error { +func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error { + var count int64 + if err := tx.QueryRow( + `SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`, + table, column).Scan(&count); err != nil { + return err + } else if count == 1 { + return nil + } + + _, err := tx.Exec( + `ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition) + return err +} + +func dbOpen(path string) error { var err error gDB, err = sql.Open("sqlite3", "file:"+path+"?_foreign_keys=1&_busy_timeout=1000") @@ -1269,10 +1715,55 @@ func openDB(path string) error { return err } - _, err = gDB.Exec(schemaSQL) - return err + tx, err := gDB.BeginTx(context.Background(), nil) + if err != nil { + return err + } + defer tx.Rollback() + + var version int64 + if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil { + return err + } + + switch version { + case 0: + if _, err = tx.Exec(initializeSQL); err != nil { + return err + } + + // We had not initially set a database schema version, + // so we're stuck checking this column even on new databases. + if err = dbEnsureColumn(tx, + `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil { + return err + } + case 1: + if err = dbEnsureColumn(tx, + `task`, `created`, `INTEGER NOT NULL DEFAULT 0`); err != nil { + return err + } + if err = dbEnsureColumn(tx, + `task`, `changed`, `INTEGER NOT NULL DEFAULT 0`); err != nil { + return err + } + if err = dbEnsureColumn(tx, + `task`, `duration`, `INTEGER NOT NULL DEFAULT 0`); err != nil { + return err + } + case 2: + // The next migration goes here, remember to increment the number below. + } + + if _, err = tx.Exec( + `PRAGMA user_version = ` + strconv.Itoa(2)); err != nil { + return err + } + return tx.Commit() } +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + // callRPC forwards command line commands to a running server. func callRPC(args []string) error { body, err := json.Marshal(args) @@ -1281,7 +1772,7 @@ func callRPC(args []string) error { } req, err := http.NewRequest(http.MethodPost, - fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body)) + fmt.Sprintf("%s/rpc", getConfig().Root), bytes.NewReader(body)) if err != nil { return err } @@ -1304,8 +1795,30 @@ func callRPC(args []string) error { return nil } +// filterTTY exposes the internal virtual terminal filter. +func filterTTY(path string) { + var r io.Reader = os.Stdin + if path != "-" { + if f, err := os.Open(path); err != nil { + log.Println(err) + } else { + r = f + defer f.Close() + } + } + + var tw terminalWriter + if _, err := io.Copy(&tw, r); err != nil { + log.Printf("%s: %s\n", path, err) + } + if _, err := os.Stdout.Write(tw.Serialize(0)); err != nil { + log.Printf("%s: %s\n", path, err) + } +} + func main() { version := flag.Bool("version", false, "show version and exit") + tty := flag.Bool("tty", false, "run the internal virtual terminal filter") flag.Usage = func() { f := flag.CommandLine.Output() @@ -1323,8 +1836,15 @@ func main() { fmt.Printf("%s %s\n", projectName, projectVersion) return } + if *tty { + for _, path := range flag.Args() { + filterTTY(path) + } + return + } - if err := parseConfig(flag.Arg(0)); err != nil { + gConfigPath = flag.Arg(0) + if err := loadConfig(); err != nil { log.Fatalln(err) } if flag.NArg() > 1 { @@ -1334,7 +1854,7 @@ func main() { return } - if err := openDB(gConfig.DB); err != nil { + if err := dbOpen(getConfig().DB); err != nil { log.Fatalln(err) } defer gDB.Close() @@ -1343,7 +1863,7 @@ func main() { ctx, stop := signal.NotifyContext( context.Background(), syscall.SIGINT, syscall.SIGTERM) - server := &http.Server{Addr: gConfig.Listen} + server := &http.Server{Addr: getConfig().Listen} http.HandleFunc("/{$}", handleTasks) http.HandleFunc("/task/{id}", handleTask) http.HandleFunc("/push", handlePush) diff --git a/acid.yaml.example b/acid.yaml.example index 499366e..c0b9b28 100644 --- a/acid.yaml.example +++ b/acid.yaml.example @@ -61,7 +61,13 @@ projects: # Project build script. build: | echo Computing line count... - find . -not -path '*/.*' -type f -print0 | xargs -0 cat | wc -l + mkdir ~/acid-deploy + find . -not -path '*/.*' -type f -print0 | xargs -0 cat | wc -l \ + > ~/acid-deploy/count + + # Project deployment script (runs locally in a temporary directory). + deploy: | + cat count # Time limit in time.ParseDuration format. # The default of one hour should suffice. diff --git a/acid_test.go b/acid_test.go index 614fd0a..e8bd5c2 100644 --- a/acid_test.go +++ b/acid_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" ttemplate "text/template" + "time" ) func TestTemplateQuote(t *testing.T) { @@ -30,3 +31,19 @@ func TestTemplateQuote(t *testing.T) { } } } + +func TestShortDurationString(t *testing.T) { + for _, test := range []struct { + d time.Duration + expect string + }{ + {72 * time.Hour, "3d"}, + {-3 * time.Hour, "-3h"}, + {12 * time.Minute, "12m"}, + {time.Millisecond, "0s"}, + } { + if sd := shortDurationString(test.d); sd != test.expect { + t.Errorf("%s = %s; want %s\n", test.d, sd, test.expect) + } + } +} @@ -3,9 +3,13 @@ module janouch.name/acid go 1.22.0 require ( - github.com/mattn/go-sqlite3 v1.14.22 - golang.org/x/crypto v0.21.0 + github.com/mattn/go-sqlite3 v1.14.24 + github.com/pkg/sftp v1.13.7 + golang.org/x/crypto v0.31.0 gopkg.in/yaml.v3 v3.0.1 ) -require golang.org/x/sys v0.18.0 // indirect +require ( + github.com/kr/fs v0.1.0 // indirect + golang.org/x/sys v0.28.0 // indirect +) @@ -1,12 +1,65 @@ -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= -github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/pkg/sftp v1.13.7 h1:uv+I3nNJvlKZIQGSr8JVQLNHFU9YhhNpvC14Y6KgmSM= +github.com/pkg/sftp v1.13.7/go.mod h1:KMKI0t3T6hfA+lTR/ssZdunHo+uwq7ghoN09/FSu3DY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/terminal.go b/terminal.go new file mode 100644 index 0000000..55eabb5 --- /dev/null +++ b/terminal.go @@ -0,0 +1,379 @@ +package main + +import ( + "bytes" + "io" + "log" + "os" + "strconv" + "strings" + "sync" + "unicode/utf8" +) + +type terminalLine struct { + // For simplicity, we assume that all runes take up one cell, + // including TAB and non-spacing ones. + // The next step would be grouping non-spacing characters, + // in particular Unicode modifier letters, with their base. + columns []rune + + // updateGroup is the topmost line that has changed since this line + // has appeared, for the purpose of update tracking. + updateGroup int +} + +// terminalWriter does a best-effort approximation of an infinite-size +// virtual terminal. +type terminalWriter struct { + sync.Mutex + Tee io.WriteCloser + lines []terminalLine + + // Zero-based coordinates within lines. + column, line int + + // lineTop is used as the base for positioning commands. + lineTop int + + written int + byteBuffer []byte + runeBuffer []rune +} + +func (tw *terminalWriter) log(format string, v ...interface{}) { + if os.Getenv("ACID_TERMINAL_DEBUG") != "" { + log.Printf("terminal: "+format+"\n", v...) + } +} + +// SerializeUpdates returns an update block for a client with a given last line, +// and the index of the first line in the update block. +func (tw *terminalWriter) SerializeUpdates(last int) (string, int) { + if last < 0 || last >= len(tw.lines) { + return "", last + } + top := tw.lines[last].updateGroup + return string(tw.Serialize(top)), top +} + +func (tw *terminalWriter) Serialize(top int) []byte { + var b bytes.Buffer + for i := top; i < len(tw.lines); i++ { + b.WriteString(string(tw.lines[i].columns)) + b.WriteByte('\n') + } + return b.Bytes() +} + +func (tw *terminalWriter) Write(p []byte) (written int, err error) { + tw.Lock() + defer tw.Unlock() + + // TODO(p): Rather use io.MultiWriter? + // Though I'm not sure what to do about closing (FD leaks). + // Eventually, any handles would be garbage collected in any case. + if tw.Tee != nil { + tw.Tee.Write(p) + } + + // Enough is enough, writing too much is highly suspicious. + ok, remaining := true, 64<<20-tw.written + if remaining < 0 { + ok, p = false, nil + } else if remaining < len(p) { + ok, p = false, p[:remaining] + } + tw.written += len(p) + + // By now, more or less everything should run in UTF-8. + // + // This might have better performance with a ring buffer, + // so as to avoid reallocations. + b := append(tw.byteBuffer, p...) + if !ok { + b = append(b, "\nToo much terminal output\n"...) + } + for utf8.FullRune(b) { + r, len := utf8.DecodeRune(b) + b, tw.runeBuffer = b[len:], append(tw.runeBuffer, r) + } + tw.byteBuffer = b + for tw.processRunes() { + } + return len(p), nil +} + +func (tw *terminalWriter) processPrint(r rune) { + // Extend the buffer vertically. + for len(tw.lines) <= tw.line { + tw.lines = append(tw.lines, + terminalLine{updateGroup: len(tw.lines)}) + } + + // Refresh update trackers, if necessary. + if tw.lines[len(tw.lines)-1].updateGroup > tw.line { + for i := tw.line; i < len(tw.lines); i++ { + tw.lines[i].updateGroup = min(tw.lines[i].updateGroup, tw.line) + } + } + + // Emulate `cat -v` for C0 characters. + seq := make([]rune, 0, 2) + if r < 32 && r != '\t' { + seq = append(seq, '^', 64+r) + } else { + seq = append(seq, r) + } + + // Extend the line horizontally and write the rune. + for _, r := range seq { + line := &tw.lines[tw.line] + for len(line.columns) <= tw.column { + line.columns = append(line.columns, ' ') + } + + line.columns[tw.column] = r + tw.column++ + } +} + +func (tw *terminalWriter) processFlush() { + tw.column = 0 + tw.line = len(tw.lines) + tw.lineTop = tw.line +} + +func (tw *terminalWriter) processParsedCSI( + private rune, param, intermediate []rune, final rune) bool { + var params []int + if len(param) > 0 { + for _, p := range strings.Split(string(param), ";") { + i, _ := strconv.Atoi(p) + params = append(params, i) + } + } + + if private == '?' && len(intermediate) == 0 && + (final == 'h' || final == 'l') { + for _, p := range params { + // 25 (DECTCEM): There is no cursor to show or hide. + // 7 (DECAWM): We cannot wrap, we're infinite. + if !(p == 25 || (p == 7 && final == 'l')) { + return false + } + } + return true + } + if private != 0 || len(intermediate) > 0 { + return false + } + + switch { + case final == 'C': // Cursor Forward + if len(params) == 0 { + tw.column++ + } else if len(params) >= 1 { + tw.column += params[0] + } + return true + case final == 'D': // Cursor Backward + if len(params) == 0 { + tw.column-- + } else if len(params) >= 1 { + tw.column -= params[0] + } + if tw.column < 0 { + tw.column = 0 + } + return true + case final == 'E': // Cursor Next Line + if len(params) == 0 { + tw.line++ + } else if len(params) >= 1 { + tw.line += params[0] + } + tw.column = 0 + return true + case final == 'F': // Cursor Preceding Line + if len(params) == 0 { + tw.line-- + } else if len(params) >= 1 { + tw.line -= params[0] + } + if tw.line < tw.lineTop { + tw.line = tw.lineTop + } + tw.column = 0 + return true + case final == 'H': // Cursor Position + if len(params) == 0 { + tw.line = tw.lineTop + tw.column = 0 + } else if len(params) >= 2 && params[0] != 0 && params[1] != 0 { + tw.line = tw.lineTop + params[0] - 1 + tw.column = params[1] - 1 + } else { + return false + } + return true + case final == 'J': // Erase in Display + if len(params) == 0 || params[0] == 0 || params[0] == 2 { + // We're not going to erase anything, thank you very much. + tw.processFlush() + } else { + return false + } + return true + case final == 'K': // Erase in Line + if tw.line >= len(tw.lines) { + return true + } + line := &tw.lines[tw.line] + if len(params) == 0 || params[0] == 0 { + if len(line.columns) > tw.column { + line.columns = line.columns[:tw.column] + } + } else if params[0] == 1 { + for i := 0; i < tw.column; i++ { + line.columns[i] = ' ' + } + } else if params[0] == 2 { + line.columns = nil + } else { + return false + } + return true + case final == 'm': + // Straight up ignoring all attributes, at least for now. + return true + } + return false +} + +func (tw *terminalWriter) processCSI(rb []rune) ([]rune, bool) { + if len(rb) < 3 { + return nil, true + } + + i, private, param, intermediate := 2, rune(0), []rune{}, []rune{} + if rb[i] >= 0x3C && rb[i] <= 0x3F { + private = rb[i] + i++ + } + for i < len(rb) && ((rb[i] >= '0' && rb[i] <= '9') || rb[i] == ';') { + param = append(param, rb[i]) + i++ + } + for i < len(rb) && rb[i] >= 0x20 && rb[i] <= 0x2F { + intermediate = append(intermediate, rb[i]) + i++ + } + if i == len(rb) { + return nil, true + } + if rb[i] < 0x40 || rb[i] > 0x7E { + return rb, false + } + if !tw.processParsedCSI(private, param, intermediate, rb[i]) { + tw.log("unhandled CSI %s", string(rb[2:i+1])) + return rb, false + } + return rb[i+1:], true +} + +func (tw *terminalWriter) processEscape(rb []rune) ([]rune, bool) { + if len(rb) < 2 { + return nil, true + } + + // Very roughly following https://vt100.net/emu/dec_ansi_parser + // but being a bit stricter. + switch r := rb[1]; { + case r == '[': + return tw.processCSI(rb) + case r == ']': + // TODO(p): Skip this properly, once we actually hit it. + tw.log("unhandled OSC") + return rb, false + case r == 'P': + // TODO(p): Skip this properly, once we actually hit it. + tw.log("unhandled DCS") + return rb, false + + // Only handling sequences we've seen bother us in real life. + case r == 'c': + // Full reset, use this to flush all output. + tw.processFlush() + return rb[2:], true + case r == 'M': + tw.line-- + return rb[2:], true + + case (r >= 0x30 && r <= 0x4F) || (r >= 0x51 && r <= 0x57) || + r == 0x59 || r == 0x5A || r == 0x5C || (r >= 0x60 && r <= 0x7E): + // → esc_dispatch + tw.log("unhandled ESC %c", r) + return rb, false + //return rb[2:], true + case r >= 0x20 && r <= 0x2F: + // → escape intermediate + i := 2 + for i < len(rb) && rb[i] >= 0x20 && rb[i] <= 0x2F { + i++ + } + if i == len(rb) { + return nil, true + } + if rb[i] < 0x30 || rb[i] > 0x7E { + return rb, false + } + // → esc_dispatch + tw.log("unhandled ESC %s", string(rb[1:i+1])) + return rb, false + //return rb[i+1:], true + default: + // Note that Debian 12 has been seen to produce ESC<U+2026> + // and such due to some very blind string processing. + return rb, false + } +} + +func (tw *terminalWriter) processRunes() bool { + rb := tw.runeBuffer + if len(rb) == 0 { + return false + } + + switch rb[0] { + case '\a': + // Ding dong! + case '\b': + if tw.column > 0 { + tw.column-- + } + case '\n', '\v': + tw.line++ + + // Forced ONLCR flag, because that's what most shell output expects. + fallthrough + case '\r': + tw.column = 0 + + case '\x1b': + var ok bool + if rb, ok = tw.processEscape(rb); rb == nil { + return false + } else if ok { + tw.runeBuffer = rb + return true + } + + // Unsuccessful parses get printed for later inspection. + fallthrough + default: + tw.processPrint(rb[0]) + } + tw.runeBuffer = rb[1:] + return true +} diff --git a/terminal_test.go b/terminal_test.go new file mode 100644 index 0000000..b0686ed --- /dev/null +++ b/terminal_test.go @@ -0,0 +1,100 @@ +package main + +import ( + "slices" + "testing" +) + +// This could be way more extensive, but we're not aiming for perfection. +var tests = []struct { + push, want string +}{ + { + // Escaping and UTF-8. + "\x03\x1bž\bř", + "^C^[ř\n", + }, + { + // Several kinds of sequences to be ignored. + "\x1bc\x1b[?7l\x1b[2J\x1b[0;1mSeaBIOS\rTea", + "TeaBIOS\n", + }, + { + // New origin and absolute positioning. + "Line 1\n\x1bcWine B\nFine 3\x1b[1;6H2\x1b[HL\nL", + "Line 1\nLine 2\nLine 3\n", + }, + { + // In-line positioning (without corner cases). + "A\x1b[CB\x1b[2C?\x1b[DC\x1b[2D\b->", + "A B->C\n", + }, + { + // Up and down. + "\nB\x1bMA\v\vC" + "\x1b[4EG" + "\x1b[FF" + "\x1b[2FD" + "\x1b[EE", + " A\nB\nC\nD\nE\nF\nG\n", + }, + { + // In-line erasing. + "1234\b\b\x1b[K\n5678\b\b\x1b[0K\n" + "abcd\b\b\x1b[1K\nefgh\x1b[2K", + "12\n56\n cd\n\n", + }, +} + +func TestTerminal(t *testing.T) { + for _, test := range tests { + tw := terminalWriter{} + if _, err := tw.Write([]byte(test.push)); err != nil { + t.Errorf("%#v: %s", test.push, err) + continue + } + have := string(tw.Serialize(0)) + if have != test.want { + t.Errorf("%#v: %#v; want %#v", test.push, have, test.want) + } + } +} + +func TestTerminalExploded(t *testing.T) { +Loop: + for _, test := range tests { + tw := terminalWriter{} + for _, b := range []byte(test.push) { + if _, err := tw.Write([]byte{b}); err != nil { + t.Errorf("%#v: %s", test.push, err) + continue Loop + } + } + have := string(tw.Serialize(0)) + if have != test.want { + t.Errorf("%#v: %#v; want %#v", test.push, have, test.want) + } + } +} + +func TestTerminalUpdateGroups(t *testing.T) { + tw := terminalWriter{} + collect := func() (have []int) { + for _, line := range tw.lines { + have = append(have, line.updateGroup) + } + return + } + + // 0: A 0 0 0 + // 1: B X 1 1 1 + // 2: C Y 1 2 1 1 + // 3: Z 2 3 2 + // 4: 3 4 + tw.Write([]byte("A\nB\nC\x1b[FX\nY\nZ")) + have, want := collect(), []int{0, 1, 1, 3} + if !slices.Equal(want, have) { + t.Errorf("update groups: %+v; want: %+v", have, want) + } + + tw.Write([]byte("\x1b[F1\n2\n3")) + have, want = collect(), []int{0, 1, 1, 2, 4} + if !slices.Equal(want, have) { + t.Errorf("update groups: %+v; want: %+v", have, want) + } +} |