aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile5
-rw-r--r--acid.adoc18
-rw-r--r--acid.go1098
-rw-r--r--acid.yaml.example8
-rw-r--r--acid_test.go17
-rw-r--r--go.mod10
-rw-r--r--go.sum69
-rw-r--r--terminal.go379
-rw-r--r--terminal_test.go100
9 files changed, 1399 insertions, 305 deletions
diff --git a/Makefile b/Makefile
index 455c22d..79d35e6 100644
--- a/Makefile
+++ b/Makefile
@@ -5,10 +5,11 @@ version = dev
outputs = acid acid.1
all: $(outputs)
-acid: acid.go
+acid: acid.go terminal.go
go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@
acid.1: acid.adoc
- asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc
+ asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc || \
+ a2x -d manpage -f manpage -a release-version=$(version) acid.adoc
test: all
go test
clean:
diff --git a/acid.adoc b/acid.adoc
index ff5d69b..b60cc80 100644
--- a/acid.adoc
+++ b/acid.adoc
@@ -37,6 +37,8 @@ Commands
*restart* [_ID_]...::
Schedule tasks with the given IDs to be rerun.
Run this command without arguments to pick up external database changes.
+*reload*::
+ Reload configuration.
Configuration
-------------
@@ -45,7 +47,7 @@ file present in the distribution.
All paths are currently relative to the directory you launch *acid* from.
-The *notify*, *setup*, and *build* scripts are processed using Go's
+The *notify*, *setup*, *build*, and *deploy* scripts are processed using Go's
_text/template_ package, and take an object describing the task,
which has the following fields:
@@ -65,6 +67,17 @@ which has the following fields:
*RunnerName*::
Descriptive name of the runner.
+// Intentionally not documenting CreatedUnix, ChangedUnix, DurationSeconds,
+// which can be derived from the objects.
+*Created*, *Changed*::
+ `*time.Time` of task creation and last task state change respectively,
+ or nil if not known.
+*CreatedAgo*, *ChangedAgo*::
+ Abbreviated human-friendly relative elapsed time duration
+ since *Created* and *Changed* respectively.
+*Duration*::
+ `*time.Duration` of the last run in seconds, or nil if not known.
+
*URL*::
*acid* link to the task, where its log output can be seen.
*RepoURL*::
@@ -79,7 +92,8 @@ in *sh*(1) command arguments.
Runners
-------
-Runners receive the following additional environment variables:
+Runners and deploy scripts receive the following additional
+environment variables:
*ACID_ROOT*:: The same as the base directory for configuration.
*ACID_RUNNER*:: The same as *Runner* in script templates.
diff --git a/acid.go b/acid.go
index f938d02..9fe9fe4 100644
--- a/acid.go
+++ b/acid.go
@@ -21,15 +21,18 @@ import (
"os"
"os/exec"
"os/signal"
+ "path/filepath"
"sort"
"strconv"
"strings"
"sync"
+ "sync/atomic"
"syscall"
ttemplate "text/template"
"time"
_ "github.com/mattn/go-sqlite3"
+ "github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"gopkg.in/yaml.v3"
)
@@ -38,9 +41,9 @@ var (
projectName = "acid"
projectVersion = "?"
- gConfig Config = Config{Listen: ":http"}
- gNotifyScript *ttemplate.Template
- gDB *sql.DB
+ gConfigPath string
+ gConfig atomic.Pointer[Config]
+ gDB *sql.DB
gNotifierSignal = make(chan struct{}, 1)
gExecutorSignal = make(chan struct{}, 1)
@@ -50,6 +53,8 @@ var (
gRunning = make(map[int64]*RunningTask)
)
+func getConfig() *Config { return gConfig.Load() }
+
// --- Config ------------------------------------------------------------------
type Config struct {
@@ -63,6 +68,8 @@ type Config struct {
Runners map[string]ConfigRunner `yaml:"runners"` // script runners
Projects map[string]ConfigProject `yaml:"projects"` // configured projects
+
+ notifyTemplate *ttemplate.Template
}
type ConfigRunner struct {
@@ -84,8 +91,9 @@ type ConfigProject struct {
func (cf *ConfigProject) AutomaticRunners() (runners []string) {
// We pass through unknown runner names,
// so that they can cause reference errors later.
+ config := getConfig()
for runner := range cf.Runners {
- if r, _ := gConfig.Runners[runner]; !r.Manual {
+ if r, _ := config.Runners[runner]; !r.Manual {
runners = append(runners, runner)
}
}
@@ -96,20 +104,32 @@ func (cf *ConfigProject) AutomaticRunners() (runners []string) {
type ConfigProjectRunner struct {
Setup string `yaml:"setup"` // project setup script (SSH)
Build string `yaml:"build"` // project build script (SSH)
+ Deploy string `yaml:"deploy"` // project deploy script (local)
Timeout string `yaml:"timeout"` // timeout duration
}
-func parseConfig(path string) error {
- if f, err := os.Open(path); err != nil {
+// loadConfig reloads configuration.
+// Beware that changes do not get applied globally at the same moment.
+func loadConfig() error {
+ new := &Config{}
+ if f, err := os.Open(gConfigPath); err != nil {
return err
- } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil {
+ } else if err = yaml.NewDecoder(f).Decode(new); err != nil {
return err
}
+ if old := getConfig(); old != nil && old.DB != new.DB {
+ return fmt.Errorf("the database file cannot be changed in runtime")
+ }
var err error
- gNotifyScript, err =
- ttemplate.New("notify").Funcs(shellFuncs).Parse(gConfig.Notify)
- return err
+ new.notifyTemplate, err =
+ ttemplate.New("notify").Funcs(shellFuncs).Parse(new.Notify)
+ if err != nil {
+ return err
+ }
+
+ gConfig.Store(new)
+ return nil
}
var shellFuncs = ttemplate.FuncMap{
@@ -133,8 +153,16 @@ var shellFuncs = ttemplate.FuncMap{
// --- Utilities ---------------------------------------------------------------
+func localShell() string {
+ if shell := os.Getenv("SHELL"); shell != "" {
+ return shell
+ }
+ // The os/user package doesn't store the parsed out shell field.
+ return "/bin/sh"
+}
+
func giteaSign(b []byte) string {
- payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
+ payloadHmac := hmac.New(sha256.New, []byte(getConfig().Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
@@ -142,9 +170,9 @@ func giteaSign(b []byte) string {
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
*http.Request, error) {
req, err := http.NewRequestWithContext(
- ctx, method, gConfig.Gitea+path, body)
+ ctx, method, getConfig().Gitea+path, body)
if req != nil {
- req.Header.Set("Authorization", "token "+gConfig.Token)
+ req.Header.Set("Authorization", "token "+getConfig().Token)
req.Header.Set("Accept", "application/json")
}
return req, err
@@ -153,7 +181,9 @@ func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, `
SELECT id, owner, repo, hash, runner,
- state, detail, notified, runlog, tasklog FROM task `+query, args...)
+ created, changed, duration,
+ state, detail, notified,
+ runlog, tasklog, deploylog FROM task `+query, args...)
if err != nil {
return nil, err
}
@@ -163,10 +193,13 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
for rows.Next() {
var t Task
err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
- &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog)
+ &t.CreatedUnix, &t.ChangedUnix, &t.DurationSeconds,
+ &t.State, &t.Detail, &t.Notified,
+ &t.RunLog, &t.TaskLog, &t.DeployLog)
if err != nil {
return nil, err
}
+ // We could also update some fields from gRunning.
tasks = append(tasks, t)
}
return tasks, rows.Err()
@@ -187,6 +220,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
<thead>
<tr>
<th>ID</th>
+ <th>Created</th>
+ <th>Changed</th>
<th>Repository</th>
<th>Hash</th>
<th>Runner</th>
@@ -199,6 +234,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
{{range .}}
<tr>
<td><a href="task/{{.ID}}">{{.ID}}</a></td>
+ <td align="right"><span title="{{.Created}}">{{.CreatedAgo}}</span></td>
+ <td align="right"><span title="{{.Changed}}">{{.ChangedAgo}}</span></td>
<td><a href="{{.RepoURL}}">{{.FullName}}</a></td>
<td><a href="{{.CommitURL}}">{{.Hash}}</a></td>
<td>{{.RunnerName}}</td>
@@ -214,7 +251,7 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
`))
func handleTasks(w http.ResponseWriter, r *http.Request) {
- tasks, err := getTasks(r.Context(), `ORDER BY id DESC`)
+ tasks, err := getTasks(r.Context(), `ORDER BY changed DESC, id DESC`)
if err != nil {
http.Error(w,
"Error retrieving tasks: "+err.Error(),
@@ -233,37 +270,179 @@ var templateTask = template.Must(template.New("tasks").Parse(`
<head>
<title>Task {{.ID}}</title>
<meta charset="utf-8">
-{{if .IsRunning}}
-<meta http-equiv="refresh" content="5">
-{{end}}
</head>
<body>
<h1><a href="..">Tasks</a> &raquo; {{.ID}}</h1>
<dl>
+<!-- Remember to synchronise these lists with Javascript updates. -->
+{{if .Created -}}
+<dt>Created</dt>
+ <dd><span id="created" title="{{.Created}}">{{.CreatedAgo}} ago</span></dd>
+{{end -}}
+{{if .Changed -}}
+<dt>Changed</dt>
+ <dd><span id="changed" title="{{.Changed}}">{{.ChangedAgo}} ago</span></dd>
+{{end -}}
<dt>Project</dt>
- <dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
+ <dd id="project"><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
<dt>Commit</dt>
- <dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
+ <dd id="commit"><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
<dt>Runner</dt>
- <dd>{{.RunnerName}}</dd>
+ <dd id="runner">{{.RunnerName}}</dd>
<dt>State</dt>
- <dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
+ <dd id="state">{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
<dt>Notified</dt>
- <dd>{{.Notified}}</dd>
+ <dd id="notified">{{.Notified}}</dd>
+<dt>Duration</dt>
+ <dd id="duration">{{if .Duration}}{{.Duration}}{{else}}&mdash;{{end}}</dd>
</dl>
-{{if .RunLog}}
-<h2>Runner log</h2>
-<pre>{{printf "%s" .RunLog}}</pre>
-{{end}}
-{{if .TaskLog}}
-<h2>Task log</h2>
-<pre>{{printf "%s" .TaskLog}}</pre>
-{{end}}
-</table>
+
+<h2 id="run"{{if not .RunLog}} hidden{{end}}>Runner log</h2>
+<pre id="runlog"{{if not .RunLog}} hidden{{
+end}}>{{printf "%s" .RunLog}}</pre>
+<h2 id="task"{{if not .TaskLog}} hidden{{end}}>Task log</h2>
+<pre id="tasklog"{{if not .TaskLog}} hidden{{
+end}}>{{printf "%s" .TaskLog}}</pre>
+<h2 id="deploy"{{if not .DeployLog}} hidden{{end}}>Deploy log</h2>
+<pre id="deploylog"{{if not .DeployLog}} hidden{{
+end}}>{{printf "%s" .DeployLog}}</pre>
+
+{{if .IsRunning -}}
+<script>
+function get(id) {
+ return document.getElementById(id)
+}
+function getLog(id) {
+ const header = get(id), log = get(id + 'log'), text = log.textContent
+ // lines[-1] is an implementation detail of terminalWriter.Serialize,
+ // lines[-2] is the actual last line.
+ const last = Math.max(0, text.split('\n').length - 2)
+ return {header, log, text, last}
+}
+function refreshLog(log, top, changed) {
+ if (top <= 0)
+ log.log.textContent = changed
+ else
+ log.log.textContent =
+ log.text.split('\n').slice(0, top).join('\n') + '\n' + changed
+
+ const empty = log.log.textContent === ''
+ log.header.hidden = empty
+ log.log.hidden = empty
+}
+let refresher = setInterval(() => {
+ const run = getLog('run'), task = getLog('task'), deploy = getLog('deploy')
+ const url = new URL(window.location.href)
+ url.search = ''
+ url.searchParams.set('json', '')
+ url.searchParams.set('run', run.last)
+ url.searchParams.set('task', task.last)
+ url.searchParams.set('deploy', deploy.last)
+ fetch(url.toString()).then(response => {
+ if (!response.ok)
+ throw response.statusText
+ return response.json()
+ }).then(data => {
+ const scroll = window.scrollY + window.innerHeight
+ >= document.documentElement.scrollHeight
+ if (data.Created) {
+ get('created').title = data.Created
+ get('created').textContent = data.CreatedAgo + " ago"
+ }
+ if (data.Changed) {
+ get('changed').title = data.Changed
+ get('changed').textContent = data.ChangedAgo + " ago"
+ }
+ get('state').textContent = data.State
+ if (data.Detail !== '')
+ get('state').textContent += " (" + data.Detail + ")"
+ get('notified').textContent = String(data.Notified)
+ if (data.Duration)
+ get('duration').textContent = data.Duration
+
+ refreshLog(run, data.RunLogTop, data.RunLog)
+ refreshLog(task, data.TaskLogTop, data.TaskLog)
+ refreshLog(deploy, data.DeployLogTop, data.DeployLog)
+
+ if (scroll)
+ document.documentElement.scrollTop =
+ document.documentElement.scrollHeight
+ if (!data.IsRunning)
+ clearInterval(refresher)
+ }).catch(error => {
+ clearInterval(refresher)
+ alert(error)
+ })
+}, 1000 /* For faster updates than this, we should use WebSockets. */)
+</script>
+{{end -}}
+
</body>
</html>
`))
+// handlerTask serves as the data for JSON encoding and the task HTML template.
+// It needs to explicitly include many convenience method results.
+type handlerTask struct {
+ Task
+ IsRunning bool
+
+ Created *string // Task.Created?.String()
+ Changed *string // Task.Changed?.String()
+ CreatedAgo string // Task.CreatedAgo()
+ ChangedAgo string // Task.ChangedAgo()
+ Duration *string // Task.Duration?.String()
+ State string // Task.State.String()
+ RunLog string
+ RunLogTop int
+ TaskLog string
+ TaskLogTop int
+ DeployLog string
+ DeployLogTop int
+}
+
+func toNilableString[T fmt.Stringer](stringer *T) *string {
+ if stringer == nil {
+ return nil
+ }
+ s := (*stringer).String()
+ return &s
+}
+
+func newHandlerTask(task Task) handlerTask {
+ return handlerTask{
+ Task: task,
+ RunLog: string(task.RunLog),
+ TaskLog: string(task.TaskLog),
+ DeployLog: string(task.DeployLog),
+
+ Created: toNilableString(task.Created()),
+ Changed: toNilableString(task.Changed()),
+ CreatedAgo: task.CreatedAgo(),
+ ChangedAgo: task.ChangedAgo(),
+ Duration: toNilableString(task.Duration()),
+ State: task.State.String(),
+ }
+}
+
+func (ht *handlerTask) updateFromRunning(
+ rt *RunningTask, lastRun, lastTask, lastDeploy int) {
+ ht.IsRunning = true
+ ht.Task.DurationSeconds = rt.elapsed()
+ ht.Duration = toNilableString(ht.Task.Duration())
+
+ rt.RunLog.Lock()
+ defer rt.RunLog.Unlock()
+ rt.TaskLog.Lock()
+ defer rt.TaskLog.Unlock()
+ rt.DeployLog.Lock()
+ defer rt.DeployLog.Unlock()
+
+ ht.RunLog, ht.RunLogTop = rt.RunLog.SerializeUpdates(lastRun)
+ ht.TaskLog, ht.TaskLogTop = rt.TaskLog.SerializeUpdates(lastTask)
+ ht.DeployLog, ht.DeployLogTop = rt.DeployLog.SerializeUpdates(lastDeploy)
+}
+
func handleTask(w http.ResponseWriter, r *http.Request) {
id, err := strconv.Atoi(r.PathValue("id"))
if err != nil {
@@ -284,31 +463,32 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
return
}
- task := struct {
- Task
- IsRunning bool
- }{Task: tasks[0]}
+ // These are intended for running tasks,
+ // so don't reprocess DB logs, which would only help the last update.
+ q := r.URL.Query()
+ lastRun, _ := strconv.Atoi(q.Get("run"))
+ lastTask, _ := strconv.Atoi(q.Get("task"))
+ lastDeploy, _ := strconv.Atoi(q.Get("deploy"))
+
+ task := newHandlerTask(tasks[0])
func() {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
- rt, ok := gRunning[task.ID]
- task.IsRunning = ok
- if !ok {
- return
+ if rt, ok := gRunning[task.ID]; ok {
+ task.updateFromRunning(
+ rt, int(lastRun), int(lastTask), int(lastDeploy))
}
-
- rt.RunLog.mu.Lock()
- defer rt.RunLog.mu.Unlock()
- rt.TaskLog.mu.Lock()
- defer rt.TaskLog.mu.Unlock()
-
- task.RunLog = rt.RunLog.b
- task.TaskLog = rt.TaskLog.b
}()
- if err := templateTask.Execute(w, &task); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ if q.Has("json") {
+ w.Header().Set("Content-Type", "application/json")
+ err = json.NewEncoder(w).Encode(&task)
+ } else {
+ err = templateTask.Execute(w, &task)
+ }
+ if err != nil {
+ log.Println(err)
}
}
@@ -335,8 +515,9 @@ func createTasks(ctx context.Context,
}
defer tx.Rollback()
- stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner)
- VALUES (?, ?, ?, ?)`)
+ stmt, err := tx.Prepare(
+ `INSERT INTO task(owner, repo, hash, runner, created, changed)
+ VALUES (?, ?, ?, ?, unixepoch('now'), unixepoch('now'))`)
if err != nil {
return err
}
@@ -385,7 +566,7 @@ func handlePush(w http.ResponseWriter, r *http.Request) {
log.Printf("received push: %s %s\n",
event.Repository.FullName, event.HeadCommit.ID)
- project, ok := gConfig.Projects[event.Repository.FullName]
+ project, ok := getConfig().Projects[event.Repository.FullName]
if !ok {
// This is okay, don't set any commit statuses.
fmt.Fprintf(w, "The project is not configured.")
@@ -417,8 +598,11 @@ func rpcRestartOne(ctx context.Context, id int64) error {
// The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here.
- result, err := gDB.ExecContext(ctx, `UPDATE task
- SET state = ?, detail = '', notified = 0 WHERE id = ?`,
+ //
+ // We deliberately do not clear previous run data (duration, *log).
+ result, err := gDB.ExecContext(ctx,
+ `UPDATE task SET changed = unixepoch('now'),
+ state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
return fmt.Errorf("%d: %w", id, err)
@@ -494,7 +678,7 @@ func rpcEnqueue(ctx context.Context,
return fmt.Errorf("%s: %w", ref, err)
}
- project, ok := gConfig.Projects[owner+"/"+repo]
+ project, ok := getConfig().Projects[owner+"/"+repo]
if !ok {
return fmt.Errorf("project configuration not found")
}
@@ -546,6 +730,17 @@ func rpcRestart(ctx context.Context,
return nil
}
+func rpcReload(ctx context.Context,
+ w io.Writer, fs *flag.FlagSet, args []string) error {
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if fs.NArg() > 0 {
+ return errWrongUsage
+ }
+ return loadConfig()
+}
+
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
var rpcCommands = map[string]struct {
@@ -558,6 +753,8 @@ var rpcCommands = map[string]struct {
"Create or restart tasks for the given reference."},
"restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."},
+ "reload": {rpcReload, "",
+ "Reload configuration."},
}
func rpcPrintCommands(w io.Writer) {
@@ -644,12 +841,12 @@ func handleRPC(w http.ResponseWriter, r *http.Request) {
func notifierRunCommand(ctx context.Context, task Task) {
script := bytes.NewBuffer(nil)
- if err := gNotifyScript.Execute(script, &task); err != nil {
+ if err := getConfig().notifyTemplate.Execute(script, &task); err != nil {
log.Printf("error: notify: %s", err)
return
}
- cmd := exec.CommandContext(ctx, "sh")
+ cmd := exec.CommandContext(ctx, localShell())
cmd.Stdin = script
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -671,13 +868,14 @@ func notifierNotify(ctx context.Context, task Task) error {
TargetURL string `json:"target_url"`
}{}
- runner, ok := gConfig.Runners[task.Runner]
+ config := getConfig()
+ runner, ok := config.Runners[task.Runner]
if !ok {
log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner)
return nil
}
payload.Context = runner.Name
- payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID)
+ payload.TargetURL = fmt.Sprintf("%s/task/%d", config.Root, task.ID)
switch task.State {
case taskStateNew:
@@ -773,86 +971,318 @@ func notifierAwaken() {
}
// --- Executor ----------------------------------------------------------------
+// ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-type terminalWriter struct {
- b []byte
- cur int
- mu sync.Mutex
+// RunningTask stores all data pertaining to a currently running task.
+type RunningTask struct {
+ DB Task
+ Runner ConfigRunner
+ ProjectRunner ConfigProjectRunner
+
+ RunLog terminalWriter
+ TaskLog terminalWriter
+ DeployLog terminalWriter
+
+ wd string // acid working directory
+ timeout time.Duration // time limit on task execution
+ signer ssh.Signer // SSH private key
+ tmplScript *ttemplate.Template // remote build script
+ tmplDeploy *ttemplate.Template // local deployment script
}
-func (tw *terminalWriter) Write(p []byte) (written int, err error) {
- tw.mu.Lock()
- defer tw.mu.Unlock()
+// newRunningTask prepares a task for running, without executing anything yet.
+func newRunningTask(task Task) (*RunningTask, error) {
+ rt := &RunningTask{DB: task}
+ config := getConfig()
+
+ // This is for our own tracking, not actually written to database.
+ rt.DB.ChangedUnix = time.Now().Unix()
+
+ var ok bool
+ rt.Runner, ok = config.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner)
+ }
+ project, ok := config.Projects[rt.DB.FullName()]
+ if !ok {
+ return nil, fmt.Errorf("project configuration not found")
+ }
+ rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf(
+ "project not configured for runner %s", rt.DB.Runner)
+ }
+
+ var err error
+ if rt.wd, err = os.Getwd(); err != nil {
+ return nil, err
+ }
- // Extremely rudimentary emulation of a dumb terminal.
- for _, b := range p {
- // Enough is enough, writing too much is highly suspicious.
- if len(tw.b) > 64<<20 {
- return written, errors.New("too much terminal output")
+ // Lenient or not, some kind of a time limit is desirable.
+ rt.timeout = time.Hour
+ if rt.ProjectRunner.Timeout != "" {
+ rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
+ if err != nil {
+ return nil, fmt.Errorf("timeout: %w", err)
}
+ }
- switch b {
- case '\b':
- if tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
- tw.cur--
- }
- case '\r':
- for tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
- tw.cur--
- }
- case '\n':
- tw.b = append(tw.b, b)
- tw.cur = len(tw.b)
- default:
- tw.b = append(tw.b[:tw.cur], b)
- tw.cur = len(tw.b)
+ privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+ rt.signer, err = ssh.ParsePrivateKey(privateKey)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+
+ // The runner setup script may change the working directory,
+ // so do everything in one go. However, this approach also makes it
+ // difficult to distinguish project-independent runner failures.
+ // (For that, we can start multiple ssh.Sessions.)
+ //
+ // We could pass variables through SSH environment variables,
+ // which would require enabling PermitUserEnvironment in sshd_config,
+ // or through prepending script lines, but templates are a bit simpler.
+ //
+ // We let the runner itself clone the repository:
+ // - it is a more flexible in that it can test AUR packages more normally,
+ // - we might have to clone submodules as well.
+ // Otherwise, we could download a source archive from Gitea,
+ // and use SFTP to upload it to the runner.
+ rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs).
+ Parse(rt.Runner.Setup + "\n" +
+ rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ if err != nil {
+ return nil, fmt.Errorf("script/build: %w", err)
+ }
+
+ rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs).
+ Parse(rt.ProjectRunner.Deploy)
+ if err != nil {
+ return nil, fmt.Errorf("script/deploy: %w", err)
+ }
+
+ if os.Getenv("ACID_TERMINAL_DEBUG") != "" {
+ base := filepath.Join(executorTmpDir("/tmp"),
+ fmt.Sprintf("acid-%d-%s-%s-%s-",
+ task.ID, task.Owner, task.Repo, task.Runner))
+ rt.RunLog.Tee, _ = os.Create(base + "runlog")
+ rt.TaskLog.Tee, _ = os.Create(base + "tasklog")
+ // The deployment log should not be interesting.
+ }
+ return rt, nil
+}
+
+func (rt *RunningTask) close() {
+ for _, tee := range []io.WriteCloser{
+ rt.RunLog.Tee, rt.TaskLog.Tee, rt.DeployLog.Tee} {
+ if tee != nil {
+ tee.Close()
}
+ }
+}
+
+// localEnv creates a process environment for locally run executables.
+func (rt *RunningTask) localEnv() []string {
+ return append(os.Environ(),
+ "ACID_ROOT="+rt.wd,
+ "ACID_RUNNER="+rt.DB.Runner,
+ )
+}
+
+func (rt *RunningTask) elapsed() int64 {
+ return int64(time.Since(time.Unix(rt.DB.ChangedUnix, 0)).Seconds())
+}
+
+// update stores the running task's state in the database.
+func (rt *RunningTask) update() error {
+ for _, i := range []struct {
+ tw *terminalWriter
+ log *[]byte
+ }{
+ {&rt.RunLog, &rt.DB.RunLog},
+ {&rt.TaskLog, &rt.DB.TaskLog},
+ {&rt.DeployLog, &rt.DB.DeployLog},
+ } {
+ i.tw.Lock()
+ defer i.tw.Unlock()
+ if *i.log = i.tw.Serialize(0); *i.log == nil {
+ *i.log = []byte{}
+ }
+ }
+ rt.DB.DurationSeconds = rt.elapsed()
+ return rt.DB.update()
+}
+
+// ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorDownloadNode(sc *sftp.Client, remotePath, localPath string,
+ info os.FileInfo) error {
+ if info.IsDir() {
+ // Hoping that caller invokes us on parents first.
+ return os.MkdirAll(localPath, info.Mode().Perm())
+ }
+
+ src, err := sc.Open(remotePath)
+ if err != nil {
+ return fmt.Errorf("failed to open remote file %s: %w", remotePath, err)
+ }
+ defer src.Close()
+
+ dst, err := os.OpenFile(
+ localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
+ if err != nil {
+ return fmt.Errorf("failed to create local file: %w", err)
+ }
+ defer dst.Close()
+
+ if _, err = io.Copy(dst, src); err != nil {
+ return fmt.Errorf("failed to copy file from remote %s to local %s: %w",
+ remotePath, localPath, err)
+ }
+ return nil
+}
+func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error {
+ sc, err := sftp.NewClient(client)
+ if err != nil {
+ return err
+ }
+ defer sc.Close()
+
+ walker := sc.Walk(remoteRoot)
+ for walker.Step() {
+ if walker.Err() != nil {
+ return walker.Err()
+ }
+ relativePath, err := filepath.Rel(remoteRoot, walker.Path())
if err != nil {
- break
+ return err
+ }
+ if err = executorDownloadNode(sc, walker.Path(),
+ filepath.Join(localRoot, relativePath), walker.Stat()); err != nil {
+ return err
}
- written += 1
}
- return
+ return nil
}
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+func executorTmpDir(fallback string) string {
+ // See also: https://systemd.io/TEMPORARY_DIRECTORIES/
+ if tmp := os.Getenv("TMPDIR"); tmp != "" {
+ return tmp
+ }
+ return fallback
+}
-type RunningTask struct {
- DB Task
- Runner ConfigRunner
- ProjectRunner ConfigProjectRunner
+func executorDeploy(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Deploy template failed", err}
+ }
+
+ // Thus the deployment directory must exist iff the script is not empty.
+ if script.Len() == 0 {
+ return nil
+ }
+
+ // We expect the files to be moved elsewhere on the filesystem,
+ // and they may get very large, so avoid /tmp.
+ dir := filepath.Join(executorTmpDir("/var/tmp"), "acid-deploy")
+ if err := os.RemoveAll(dir); err != nil {
+ return err
+ }
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return err
+ }
- RunLog terminalWriter
- TaskLog terminalWriter
+ // The passed remoteRoot is relative to sc.Getwd.
+ if err := executorDownload(client, "acid-deploy", dir); err != nil {
+ return err
+ }
+
+ cmd := exec.CommandContext(ctx, localShell())
+ cmd.Env = rt.localEnv()
+ cmd.Dir = dir
+ cmd.Stdin = script
+ cmd.Stdout = &rt.DeployLog
+ cmd.Stderr = &rt.DeployLog
+ return cmd.Run()
}
-func executorUpdate(rt *RunningTask) error {
- rt.RunLog.mu.Lock()
- defer rt.RunLog.mu.Unlock()
- rt.DB.RunLog = bytes.Clone(rt.RunLog.b)
- if rt.DB.RunLog == nil {
- rt.DB.RunLog = []byte{}
+// ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorBuild(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ // This is here to fail early, though logically it is misplaced.
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplScript.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Script template failed", err}
}
- rt.TaskLog.mu.Lock()
- defer rt.TaskLog.mu.Unlock()
- rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b)
- if rt.DB.TaskLog == nil {
- rt.DB.TaskLog = []byte{}
+ session, err := client.NewSession()
+ if err != nil {
+ return &executorError{"SSH failure", err}
+ }
+ defer session.Close()
+
+ modes := ssh.TerminalModes{ssh.ECHO: 0}
+ if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
+ return &executorError{"SSH failure", err}
}
- _, err := gDB.ExecContext(context.Background(), `UPDATE task
- SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ?
- WHERE id = ?`,
- rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog,
- rt.DB.ID)
- if err == nil {
- notifierAwaken()
+ log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
+
+ session.Stdout = &rt.TaskLog
+ session.Stderr = &rt.TaskLog
+
+ // Although passing the script directly takes away the option to specify
+ // a particular shell (barring here-documents!), it is simple and reliable.
+ //
+ // Passing the script over Stdin to sh tended to end up with commands
+ // eating the script during processing, and resulted in a hang,
+ // because closing the Stdin does not result in remote processes
+ // getting a stream of EOF.
+ //
+ // Piping the script into `cat | sh` while appending a ^D to the end of it
+ // appeared to work, but it seems likely that commands might still steal
+ // script bytes from the cat program if they choose to read from the tty
+ // and the script is longer than the buffer.
+ chSession := make(chan error, 1)
+ go func() {
+ chSession <- session.Run(script.String())
+ close(chSession)
+ }()
+
+ select {
+ case <-ctx.Done():
+ // Either shutdown, or early runner termination.
+ // The runner is not supposed to finish before the session.
+ err = context.Cause(ctx)
+ case err = <-chSession:
+ // Killing a runner may perfectly well trigger this first,
+ // in particular when it's on the same machine.
}
return err
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+// executorError describes a taskStateError.
+type executorError struct {
+ Detail string
+ Err error
+}
+
+func (e *executorError) Unwrap() error { return e.Err }
+func (e *executorError) Error() string {
+ return fmt.Sprintf("%s: %s", e.Detail, e.Err)
+}
+
func executorConnect(
ctx context.Context, config *ssh.ClientConfig, address string) (
*ssh.Client, error) {
@@ -894,122 +1324,60 @@ func executorConnect(
time.Sleep(1 * time.Second)
continue
}
-
return ssh.NewClient(sc, chans, reqs), nil
}
}
func executorRunTask(ctx context.Context, task Task) error {
- rt := &RunningTask{DB: task}
-
- var ok bool
- rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf("unknown runner: %s", rt.DB.Runner)
- }
- project, ok := gConfig.Projects[rt.DB.FullName()]
- if !ok {
- return fmt.Errorf("project configuration not found")
- }
- rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf(
- "project not configured for runner %s", rt.DB.Runner)
- }
-
- wd, err := os.Getwd()
- if err != nil {
- return err
- }
-
- // The runner setup script may change the working directory,
- // so do everything in one go. However, this approach also makes it
- // difficult to distinguish project-independent runner failures.
- // (For that, we can start multiple ssh.Sessions.)
- //
- // We could pass variables through SSH environment variables,
- // which would require enabling PermitUserEnvironment in sshd_config,
- // or through prepending script lines, but templates are a bit simpler.
- //
- // We let the runner itself clone the repository:
- // - it is a more flexible in that it can test AUR packages more normally,
- // - we might have to clone submodules as well.
- // Otherwise, we could download a source archive from Gitea,
- // and use SFTP to upload it to the runner.
- tmplScript, err := ttemplate.New("script").Funcs(shellFuncs).
- Parse(rt.Runner.Setup + "\n" +
- rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ rt, err := newRunningTask(task)
if err != nil {
- return fmt.Errorf("script: %w", err)
+ task.DurationSeconds = 0
+ task.State, task.Detail = taskStateError, "Misconfigured"
+ task.Notified = 0
+ task.RunLog = []byte(err.Error())
+ task.TaskLog = []byte{}
+ task.DeployLog = []byte{}
+ return task.update()
}
+ defer rt.close()
- // Lenient or not, some kind of a time limit is desirable.
- timeout := time.Hour
- if rt.ProjectRunner.Timeout != "" {
- timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
- if err != nil {
- return fmt.Errorf("timeout: %w", err)
- }
- }
- ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
+ ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout)
defer cancelTimeout()
- privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
- if err != nil {
- return fmt.Errorf(
- "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
- signer, err := ssh.ParsePrivateKey(privateKey)
- if err != nil {
- return fmt.Errorf(
- "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
-
- defer func() {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
- delete(gRunning, rt.DB.ID)
- }()
- func() {
+ // RunningTasks can be concurrently accessed by HTTP handlers.
+ locked := func(f func()) {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
-
+ f()
+ }
+ locked(func() {
+ rt.DB.DurationSeconds = 0
rt.DB.State, rt.DB.Detail = taskStateRunning, ""
rt.DB.Notified = 0
rt.DB.RunLog = []byte{}
rt.DB.TaskLog = []byte{}
+ rt.DB.DeployLog = []byte{}
gRunning[rt.DB.ID] = rt
- }()
- if err := executorUpdate(rt); err != nil {
+ })
+ defer locked(func() {
+ delete(gRunning, rt.DB.ID)
+ })
+ if err := rt.update(); err != nil {
return fmt.Errorf("SQL: %w", err)
}
// Errors happening while trying to write an error are unfortunate,
// but not important enough to abort entirely.
setError := func(detail string) {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
rt.DB.State, rt.DB.Detail = taskStateError, detail
- if err := executorUpdate(rt); err != nil {
+ if err := rt.update(); err != nil {
log.Printf("error: task %d for %s: SQL: %s",
rt.DB.ID, rt.DB.FullName(), err)
}
}
- script := bytes.NewBuffer(nil)
- if err := tmplScript.Execute(script, &rt.DB); err != nil {
- setError("Script template failed")
- return err
- }
-
cmd := exec.CommandContext(ctx, rt.Runner.Run)
- cmd.Env = append(
- os.Environ(),
- "ACID_ROOT="+wd,
- "ACID_RUNNER="+rt.DB.Runner,
- )
+ cmd.Env = rt.localEnv()
// Pushing the runner into a new process group that can be killed at once
// with all its children isn't bullet-proof, it messes with job control
@@ -1033,7 +1401,8 @@ func executorRunTask(ctx context.Context, task Task) error {
cmd.Stdout = &rt.RunLog
cmd.Stderr = &rt.RunLog
if err := cmd.Start(); err != nil {
- setError("Runner failed to start")
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ locked(func() { setError("Runner failed to start") })
return err
}
@@ -1059,78 +1428,62 @@ func executorRunTask(ctx context.Context, task Task) error {
client, err := executorConnect(ctxRunner, &ssh.ClientConfig{
User: rt.Runner.SSH.User,
- Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
+ Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}, rt.Runner.SSH.Address)
if err != nil {
fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
+ locked(func() { setError("SSH failure") })
return err
}
defer client.Close()
- session, err := client.NewSession()
- if err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
- }
- defer session.Close()
+ var (
+ eeSSH *ssh.ExitError
+ eeExec *exec.ExitError
+ ee3 *executorError
+ )
- modes := ssh.TerminalModes{ssh.ECHO: 0}
- if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
+ err = executorBuild(ctxRunner, client, rt)
+ if err != nil {
+ locked(func() {
+ if errors.As(err, &eeSSH) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ } else if errors.As(err, &ee3) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee3.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
- log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
-
- session.Stdout = &rt.TaskLog
- session.Stderr = &rt.TaskLog
-
- // Although passing the script directly takes away the option to specify
- // a particular shell (barring here-documents!), it is simple and reliable.
- //
- // Passing the script over Stdin to sh tended to end up with commands
- // eating the script during processing, and resulted in a hang,
- // because closing the Stdin does not result in remote processes
- // getting a stream of EOF.
- //
- // Piping the script into `cat | sh` while appending a ^D to the end of it
- // appeared to work, but it seems likely that commands might still steal
- // script bytes from the cat program if they choose to read from the tty
- // and the script is longer than the buffer.
- chSession := make(chan error, 1)
+ // This is so that it doesn't stay hanging within the sftp package,
+ // which uses context.Background() everywhere.
go func() {
- chSession <- session.Run(script.String())
- close(chSession)
+ <-ctxRunner.Done()
+ client.Close()
}()
- select {
- case <-ctxRunner.Done():
- // Either shutdown, or early runner termination.
- // The runner is not supposed to finish before the session.
- err = context.Cause(ctxRunner)
- case err = <-chSession:
- // Killing a runner may perfectly well trigger this first,
- // in particular when it's on the same machine.
- }
-
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
- var ee *ssh.ExitError
- if err == nil {
- rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
- } else if errors.As(err, &ee) {
- rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- } else {
- rt.DB.State, rt.DB.Detail = taskStateError, ""
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- }
- return executorUpdate(rt)
+ err = executorDeploy(ctxRunner, client, rt)
+ locked(func() {
+ if err == nil {
+ rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
+ } else if errors.As(err, &eeExec) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed"
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ } else if errors.As(err, &ee3) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee3.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
func executorRun(ctx context.Context) error {
@@ -1208,17 +1561,23 @@ type Task struct {
Hash string
Runner string
- State taskState
- Detail string
- Notified int64
- RunLog []byte
- TaskLog []byte
+ // True database names for these are occupied by accessors.
+ CreatedUnix int64
+ ChangedUnix int64
+ DurationSeconds int64
+
+ State taskState
+ Detail string
+ Notified int64
+ RunLog []byte
+ TaskLog []byte
+ DeployLog []byte
}
func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
func (t *Task) RunnerName() string {
- if runner, ok := gConfig.Runners[t.Runner]; !ok {
+ if runner, ok := getConfig().Runners[t.Runner]; !ok {
return t.Runner
} else {
return runner.Name
@@ -1226,42 +1585,129 @@ func (t *Task) RunnerName() string {
}
func (t *Task) URL() string {
- return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID)
+ return fmt.Sprintf("%s/task/%d", getConfig().Root, t.ID)
}
func (t *Task) RepoURL() string {
- return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo)
+ return fmt.Sprintf("%s/%s/%s", getConfig().Gitea, t.Owner, t.Repo)
}
func (t *Task) CommitURL() string {
return fmt.Sprintf("%s/%s/%s/commit/%s",
- gConfig.Gitea, t.Owner, t.Repo, t.Hash)
+ getConfig().Gitea, t.Owner, t.Repo, t.Hash)
}
func (t *Task) CloneURL() string {
- return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo)
+ return fmt.Sprintf("%s/%s/%s.git", getConfig().Gitea, t.Owner, t.Repo)
+}
+
+func shortDurationString(d time.Duration) string {
+ if d.Abs() >= 24*time.Hour {
+ return strconv.FormatInt(int64(d/time.Hour/24), 10) + "d"
+ } else if d.Abs() >= time.Hour {
+ return strconv.FormatInt(int64(d/time.Hour), 10) + "h"
+ } else if d.Abs() >= time.Minute {
+ return strconv.FormatInt(int64(d/time.Minute), 10) + "m"
+ } else {
+ return strconv.FormatInt(int64(d/time.Second), 10) + "s"
+ }
+}
+
+func (t *Task) Created() *time.Time {
+ if t.CreatedUnix == 0 {
+ return nil
+ }
+ tt := time.Unix(t.CreatedUnix, 0)
+ return &tt
+}
+func (t *Task) Changed() *time.Time {
+ if t.ChangedUnix == 0 {
+ return nil
+ }
+ tt := time.Unix(t.ChangedUnix, 0)
+ return &tt
+}
+
+func (t *Task) CreatedAgo() string {
+ if t.CreatedUnix == 0 {
+ return ""
+ }
+ return shortDurationString(time.Since(*t.Created()))
+}
+
+func (t *Task) ChangedAgo() string {
+ if t.ChangedUnix == 0 {
+ return ""
+ }
+ return shortDurationString(time.Since(*t.Changed()))
+}
+
+func (t *Task) Duration() *time.Duration {
+ if t.DurationSeconds == 0 {
+ return nil
+ }
+ td := time.Duration(t.DurationSeconds * int64(time.Second))
+ return &td
+}
+
+func (t *Task) update() error {
+ _, err := gDB.ExecContext(context.Background(),
+ `UPDATE task SET changed = unixepoch('now'), duration = ?,
+ state = ?, detail = ?, notified = ?,
+ runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`,
+ t.DurationSeconds,
+ t.State, t.Detail, t.Notified,
+ t.RunLog, t.TaskLog, t.DeployLog, t.ID)
+ if err == nil {
+ notifierAwaken()
+ }
+ return err
}
-const schemaSQL = `
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+const initializeSQL = `
+PRAGMA application_id = 0x61636964; -- "acid" in big endian
+
CREATE TABLE IF NOT EXISTS task(
- id INTEGER NOT NULL, -- unique ID
+ id INTEGER NOT NULL, -- unique ID
+
+ owner TEXT NOT NULL, -- Gitea username
+ repo TEXT NOT NULL, -- Gitea repository name
+ hash TEXT NOT NULL, -- commit hash
+ runner TEXT NOT NULL, -- the runner to use
- owner TEXT NOT NULL, -- Gitea username
- repo TEXT NOT NULL, -- Gitea repository name
- hash TEXT NOT NULL, -- commit hash
- runner TEXT NOT NULL, -- the runner to use
+ created INTEGER NOT NULL DEFAULT 0, -- creation timestamp
+ changed INTEGER NOT NULL DEFAULT 0, -- last state change timestamp
+ duration INTEGER NOT NULL DEFAULT 0, -- duration of last run
- state INTEGER NOT NULL DEFAULT 0, -- task state
- detail TEXT NOT NULL DEFAULT '', -- task state detail
- notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
- runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
- tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ state INTEGER NOT NULL DEFAULT 0, -- task state
+ detail TEXT NOT NULL DEFAULT '', -- task state detail
+ notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
+ runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
+ tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ deploylog BLOB NOT NULL DEFAULT x'', -- deployment output
PRIMARY KEY (id)
) STRICT;
`
-func openDB(path string) error {
+func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error {
+ var count int64
+ if err := tx.QueryRow(
+ `SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`,
+ table, column).Scan(&count); err != nil {
+ return err
+ } else if count == 1 {
+ return nil
+ }
+
+ _, err := tx.Exec(
+ `ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition)
+ return err
+}
+
+func dbOpen(path string) error {
var err error
gDB, err = sql.Open("sqlite3",
"file:"+path+"?_foreign_keys=1&_busy_timeout=1000")
@@ -1269,10 +1715,55 @@ func openDB(path string) error {
return err
}
- _, err = gDB.Exec(schemaSQL)
- return err
+ tx, err := gDB.BeginTx(context.Background(), nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var version int64
+ if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
+ return err
+ }
+
+ switch version {
+ case 0:
+ if _, err = tx.Exec(initializeSQL); err != nil {
+ return err
+ }
+
+ // We had not initially set a database schema version,
+ // so we're stuck checking this column even on new databases.
+ if err = dbEnsureColumn(tx,
+ `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil {
+ return err
+ }
+ case 1:
+ if err = dbEnsureColumn(tx,
+ `task`, `created`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ if err = dbEnsureColumn(tx,
+ `task`, `changed`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ if err = dbEnsureColumn(tx,
+ `task`, `duration`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ case 2:
+ // The next migration goes here, remember to increment the number below.
+ }
+
+ if _, err = tx.Exec(
+ `PRAGMA user_version = ` + strconv.Itoa(2)); err != nil {
+ return err
+ }
+ return tx.Commit()
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
// callRPC forwards command line commands to a running server.
func callRPC(args []string) error {
body, err := json.Marshal(args)
@@ -1281,7 +1772,7 @@ func callRPC(args []string) error {
}
req, err := http.NewRequest(http.MethodPost,
- fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body))
+ fmt.Sprintf("%s/rpc", getConfig().Root), bytes.NewReader(body))
if err != nil {
return err
}
@@ -1304,8 +1795,30 @@ func callRPC(args []string) error {
return nil
}
+// filterTTY exposes the internal virtual terminal filter.
+func filterTTY(path string) {
+ var r io.Reader = os.Stdin
+ if path != "-" {
+ if f, err := os.Open(path); err != nil {
+ log.Println(err)
+ } else {
+ r = f
+ defer f.Close()
+ }
+ }
+
+ var tw terminalWriter
+ if _, err := io.Copy(&tw, r); err != nil {
+ log.Printf("%s: %s\n", path, err)
+ }
+ if _, err := os.Stdout.Write(tw.Serialize(0)); err != nil {
+ log.Printf("%s: %s\n", path, err)
+ }
+}
+
func main() {
version := flag.Bool("version", false, "show version and exit")
+ tty := flag.Bool("tty", false, "run the internal virtual terminal filter")
flag.Usage = func() {
f := flag.CommandLine.Output()
@@ -1323,8 +1836,15 @@ func main() {
fmt.Printf("%s %s\n", projectName, projectVersion)
return
}
+ if *tty {
+ for _, path := range flag.Args() {
+ filterTTY(path)
+ }
+ return
+ }
- if err := parseConfig(flag.Arg(0)); err != nil {
+ gConfigPath = flag.Arg(0)
+ if err := loadConfig(); err != nil {
log.Fatalln(err)
}
if flag.NArg() > 1 {
@@ -1334,7 +1854,7 @@ func main() {
return
}
- if err := openDB(gConfig.DB); err != nil {
+ if err := dbOpen(getConfig().DB); err != nil {
log.Fatalln(err)
}
defer gDB.Close()
@@ -1343,7 +1863,7 @@ func main() {
ctx, stop := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM)
- server := &http.Server{Addr: gConfig.Listen}
+ server := &http.Server{Addr: getConfig().Listen}
http.HandleFunc("/{$}", handleTasks)
http.HandleFunc("/task/{id}", handleTask)
http.HandleFunc("/push", handlePush)
diff --git a/acid.yaml.example b/acid.yaml.example
index 499366e..c0b9b28 100644
--- a/acid.yaml.example
+++ b/acid.yaml.example
@@ -61,7 +61,13 @@ projects:
# Project build script.
build: |
echo Computing line count...
- find . -not -path '*/.*' -type f -print0 | xargs -0 cat | wc -l
+ mkdir ~/acid-deploy
+ find . -not -path '*/.*' -type f -print0 | xargs -0 cat | wc -l \
+ > ~/acid-deploy/count
+
+ # Project deployment script (runs locally in a temporary directory).
+ deploy: |
+ cat count
# Time limit in time.ParseDuration format.
# The default of one hour should suffice.
diff --git a/acid_test.go b/acid_test.go
index 614fd0a..e8bd5c2 100644
--- a/acid_test.go
+++ b/acid_test.go
@@ -4,6 +4,7 @@ import (
"bytes"
"testing"
ttemplate "text/template"
+ "time"
)
func TestTemplateQuote(t *testing.T) {
@@ -30,3 +31,19 @@ func TestTemplateQuote(t *testing.T) {
}
}
}
+
+func TestShortDurationString(t *testing.T) {
+ for _, test := range []struct {
+ d time.Duration
+ expect string
+ }{
+ {72 * time.Hour, "3d"},
+ {-3 * time.Hour, "-3h"},
+ {12 * time.Minute, "12m"},
+ {time.Millisecond, "0s"},
+ } {
+ if sd := shortDurationString(test.d); sd != test.expect {
+ t.Errorf("%s = %s; want %s\n", test.d, sd, test.expect)
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 534f6bd..58ce64c 100644
--- a/go.mod
+++ b/go.mod
@@ -3,9 +3,13 @@ module janouch.name/acid
go 1.22.0
require (
- github.com/mattn/go-sqlite3 v1.14.22
- golang.org/x/crypto v0.21.0
+ github.com/mattn/go-sqlite3 v1.14.24
+ github.com/pkg/sftp v1.13.7
+ golang.org/x/crypto v0.31.0
gopkg.in/yaml.v3 v3.0.1
)
-require golang.org/x/sys v0.18.0 // indirect
+require (
+ github.com/kr/fs v0.1.0 // indirect
+ golang.org/x/sys v0.28.0 // indirect
+)
diff --git a/go.sum b/go.sum
index 5e353af..4468ea8 100644
--- a/go.sum
+++ b/go.sum
@@ -1,12 +1,65 @@
-github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
-github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
-golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
-golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
-golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
-golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
-golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
+github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
+github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
+github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
+github.com/pkg/sftp v1.13.7 h1:uv+I3nNJvlKZIQGSr8JVQLNHFU9YhhNpvC14Y6KgmSM=
+github.com/pkg/sftp v1.13.7/go.mod h1:KMKI0t3T6hfA+lTR/ssZdunHo+uwq7ghoN09/FSu3DY=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
+golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
+golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
+golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
+golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
+golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/terminal.go b/terminal.go
new file mode 100644
index 0000000..55eabb5
--- /dev/null
+++ b/terminal.go
@@ -0,0 +1,379 @@
+package main
+
+import (
+ "bytes"
+ "io"
+ "log"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "unicode/utf8"
+)
+
+type terminalLine struct {
+ // For simplicity, we assume that all runes take up one cell,
+ // including TAB and non-spacing ones.
+ // The next step would be grouping non-spacing characters,
+ // in particular Unicode modifier letters, with their base.
+ columns []rune
+
+ // updateGroup is the topmost line that has changed since this line
+ // has appeared, for the purpose of update tracking.
+ updateGroup int
+}
+
+// terminalWriter does a best-effort approximation of an infinite-size
+// virtual terminal.
+type terminalWriter struct {
+ sync.Mutex
+ Tee io.WriteCloser
+ lines []terminalLine
+
+ // Zero-based coordinates within lines.
+ column, line int
+
+ // lineTop is used as the base for positioning commands.
+ lineTop int
+
+ written int
+ byteBuffer []byte
+ runeBuffer []rune
+}
+
+func (tw *terminalWriter) log(format string, v ...interface{}) {
+ if os.Getenv("ACID_TERMINAL_DEBUG") != "" {
+ log.Printf("terminal: "+format+"\n", v...)
+ }
+}
+
+// SerializeUpdates returns an update block for a client with a given last line,
+// and the index of the first line in the update block.
+func (tw *terminalWriter) SerializeUpdates(last int) (string, int) {
+ if last < 0 || last >= len(tw.lines) {
+ return "", last
+ }
+ top := tw.lines[last].updateGroup
+ return string(tw.Serialize(top)), top
+}
+
+func (tw *terminalWriter) Serialize(top int) []byte {
+ var b bytes.Buffer
+ for i := top; i < len(tw.lines); i++ {
+ b.WriteString(string(tw.lines[i].columns))
+ b.WriteByte('\n')
+ }
+ return b.Bytes()
+}
+
+func (tw *terminalWriter) Write(p []byte) (written int, err error) {
+ tw.Lock()
+ defer tw.Unlock()
+
+ // TODO(p): Rather use io.MultiWriter?
+ // Though I'm not sure what to do about closing (FD leaks).
+ // Eventually, any handles would be garbage collected in any case.
+ if tw.Tee != nil {
+ tw.Tee.Write(p)
+ }
+
+ // Enough is enough, writing too much is highly suspicious.
+ ok, remaining := true, 64<<20-tw.written
+ if remaining < 0 {
+ ok, p = false, nil
+ } else if remaining < len(p) {
+ ok, p = false, p[:remaining]
+ }
+ tw.written += len(p)
+
+ // By now, more or less everything should run in UTF-8.
+ //
+ // This might have better performance with a ring buffer,
+ // so as to avoid reallocations.
+ b := append(tw.byteBuffer, p...)
+ if !ok {
+ b = append(b, "\nToo much terminal output\n"...)
+ }
+ for utf8.FullRune(b) {
+ r, len := utf8.DecodeRune(b)
+ b, tw.runeBuffer = b[len:], append(tw.runeBuffer, r)
+ }
+ tw.byteBuffer = b
+ for tw.processRunes() {
+ }
+ return len(p), nil
+}
+
+func (tw *terminalWriter) processPrint(r rune) {
+ // Extend the buffer vertically.
+ for len(tw.lines) <= tw.line {
+ tw.lines = append(tw.lines,
+ terminalLine{updateGroup: len(tw.lines)})
+ }
+
+ // Refresh update trackers, if necessary.
+ if tw.lines[len(tw.lines)-1].updateGroup > tw.line {
+ for i := tw.line; i < len(tw.lines); i++ {
+ tw.lines[i].updateGroup = min(tw.lines[i].updateGroup, tw.line)
+ }
+ }
+
+ // Emulate `cat -v` for C0 characters.
+ seq := make([]rune, 0, 2)
+ if r < 32 && r != '\t' {
+ seq = append(seq, '^', 64+r)
+ } else {
+ seq = append(seq, r)
+ }
+
+ // Extend the line horizontally and write the rune.
+ for _, r := range seq {
+ line := &tw.lines[tw.line]
+ for len(line.columns) <= tw.column {
+ line.columns = append(line.columns, ' ')
+ }
+
+ line.columns[tw.column] = r
+ tw.column++
+ }
+}
+
+func (tw *terminalWriter) processFlush() {
+ tw.column = 0
+ tw.line = len(tw.lines)
+ tw.lineTop = tw.line
+}
+
+func (tw *terminalWriter) processParsedCSI(
+ private rune, param, intermediate []rune, final rune) bool {
+ var params []int
+ if len(param) > 0 {
+ for _, p := range strings.Split(string(param), ";") {
+ i, _ := strconv.Atoi(p)
+ params = append(params, i)
+ }
+ }
+
+ if private == '?' && len(intermediate) == 0 &&
+ (final == 'h' || final == 'l') {
+ for _, p := range params {
+ // 25 (DECTCEM): There is no cursor to show or hide.
+ // 7 (DECAWM): We cannot wrap, we're infinite.
+ if !(p == 25 || (p == 7 && final == 'l')) {
+ return false
+ }
+ }
+ return true
+ }
+ if private != 0 || len(intermediate) > 0 {
+ return false
+ }
+
+ switch {
+ case final == 'C': // Cursor Forward
+ if len(params) == 0 {
+ tw.column++
+ } else if len(params) >= 1 {
+ tw.column += params[0]
+ }
+ return true
+ case final == 'D': // Cursor Backward
+ if len(params) == 0 {
+ tw.column--
+ } else if len(params) >= 1 {
+ tw.column -= params[0]
+ }
+ if tw.column < 0 {
+ tw.column = 0
+ }
+ return true
+ case final == 'E': // Cursor Next Line
+ if len(params) == 0 {
+ tw.line++
+ } else if len(params) >= 1 {
+ tw.line += params[0]
+ }
+ tw.column = 0
+ return true
+ case final == 'F': // Cursor Preceding Line
+ if len(params) == 0 {
+ tw.line--
+ } else if len(params) >= 1 {
+ tw.line -= params[0]
+ }
+ if tw.line < tw.lineTop {
+ tw.line = tw.lineTop
+ }
+ tw.column = 0
+ return true
+ case final == 'H': // Cursor Position
+ if len(params) == 0 {
+ tw.line = tw.lineTop
+ tw.column = 0
+ } else if len(params) >= 2 && params[0] != 0 && params[1] != 0 {
+ tw.line = tw.lineTop + params[0] - 1
+ tw.column = params[1] - 1
+ } else {
+ return false
+ }
+ return true
+ case final == 'J': // Erase in Display
+ if len(params) == 0 || params[0] == 0 || params[0] == 2 {
+ // We're not going to erase anything, thank you very much.
+ tw.processFlush()
+ } else {
+ return false
+ }
+ return true
+ case final == 'K': // Erase in Line
+ if tw.line >= len(tw.lines) {
+ return true
+ }
+ line := &tw.lines[tw.line]
+ if len(params) == 0 || params[0] == 0 {
+ if len(line.columns) > tw.column {
+ line.columns = line.columns[:tw.column]
+ }
+ } else if params[0] == 1 {
+ for i := 0; i < tw.column; i++ {
+ line.columns[i] = ' '
+ }
+ } else if params[0] == 2 {
+ line.columns = nil
+ } else {
+ return false
+ }
+ return true
+ case final == 'm':
+ // Straight up ignoring all attributes, at least for now.
+ return true
+ }
+ return false
+}
+
+func (tw *terminalWriter) processCSI(rb []rune) ([]rune, bool) {
+ if len(rb) < 3 {
+ return nil, true
+ }
+
+ i, private, param, intermediate := 2, rune(0), []rune{}, []rune{}
+ if rb[i] >= 0x3C && rb[i] <= 0x3F {
+ private = rb[i]
+ i++
+ }
+ for i < len(rb) && ((rb[i] >= '0' && rb[i] <= '9') || rb[i] == ';') {
+ param = append(param, rb[i])
+ i++
+ }
+ for i < len(rb) && rb[i] >= 0x20 && rb[i] <= 0x2F {
+ intermediate = append(intermediate, rb[i])
+ i++
+ }
+ if i == len(rb) {
+ return nil, true
+ }
+ if rb[i] < 0x40 || rb[i] > 0x7E {
+ return rb, false
+ }
+ if !tw.processParsedCSI(private, param, intermediate, rb[i]) {
+ tw.log("unhandled CSI %s", string(rb[2:i+1]))
+ return rb, false
+ }
+ return rb[i+1:], true
+}
+
+func (tw *terminalWriter) processEscape(rb []rune) ([]rune, bool) {
+ if len(rb) < 2 {
+ return nil, true
+ }
+
+ // Very roughly following https://vt100.net/emu/dec_ansi_parser
+ // but being a bit stricter.
+ switch r := rb[1]; {
+ case r == '[':
+ return tw.processCSI(rb)
+ case r == ']':
+ // TODO(p): Skip this properly, once we actually hit it.
+ tw.log("unhandled OSC")
+ return rb, false
+ case r == 'P':
+ // TODO(p): Skip this properly, once we actually hit it.
+ tw.log("unhandled DCS")
+ return rb, false
+
+ // Only handling sequences we've seen bother us in real life.
+ case r == 'c':
+ // Full reset, use this to flush all output.
+ tw.processFlush()
+ return rb[2:], true
+ case r == 'M':
+ tw.line--
+ return rb[2:], true
+
+ case (r >= 0x30 && r <= 0x4F) || (r >= 0x51 && r <= 0x57) ||
+ r == 0x59 || r == 0x5A || r == 0x5C || (r >= 0x60 && r <= 0x7E):
+ // → esc_dispatch
+ tw.log("unhandled ESC %c", r)
+ return rb, false
+ //return rb[2:], true
+ case r >= 0x20 && r <= 0x2F:
+ // → escape intermediate
+ i := 2
+ for i < len(rb) && rb[i] >= 0x20 && rb[i] <= 0x2F {
+ i++
+ }
+ if i == len(rb) {
+ return nil, true
+ }
+ if rb[i] < 0x30 || rb[i] > 0x7E {
+ return rb, false
+ }
+ // → esc_dispatch
+ tw.log("unhandled ESC %s", string(rb[1:i+1]))
+ return rb, false
+ //return rb[i+1:], true
+ default:
+ // Note that Debian 12 has been seen to produce ESC<U+2026>
+ // and such due to some very blind string processing.
+ return rb, false
+ }
+}
+
+func (tw *terminalWriter) processRunes() bool {
+ rb := tw.runeBuffer
+ if len(rb) == 0 {
+ return false
+ }
+
+ switch rb[0] {
+ case '\a':
+ // Ding dong!
+ case '\b':
+ if tw.column > 0 {
+ tw.column--
+ }
+ case '\n', '\v':
+ tw.line++
+
+ // Forced ONLCR flag, because that's what most shell output expects.
+ fallthrough
+ case '\r':
+ tw.column = 0
+
+ case '\x1b':
+ var ok bool
+ if rb, ok = tw.processEscape(rb); rb == nil {
+ return false
+ } else if ok {
+ tw.runeBuffer = rb
+ return true
+ }
+
+ // Unsuccessful parses get printed for later inspection.
+ fallthrough
+ default:
+ tw.processPrint(rb[0])
+ }
+ tw.runeBuffer = rb[1:]
+ return true
+}
diff --git a/terminal_test.go b/terminal_test.go
new file mode 100644
index 0000000..b0686ed
--- /dev/null
+++ b/terminal_test.go
@@ -0,0 +1,100 @@
+package main
+
+import (
+ "slices"
+ "testing"
+)
+
+// This could be way more extensive, but we're not aiming for perfection.
+var tests = []struct {
+ push, want string
+}{
+ {
+ // Escaping and UTF-8.
+ "\x03\x1bž\bř",
+ "^C^[ř\n",
+ },
+ {
+ // Several kinds of sequences to be ignored.
+ "\x1bc\x1b[?7l\x1b[2J\x1b[0;1mSeaBIOS\rTea",
+ "TeaBIOS\n",
+ },
+ {
+ // New origin and absolute positioning.
+ "Line 1\n\x1bcWine B\nFine 3\x1b[1;6H2\x1b[HL\nL",
+ "Line 1\nLine 2\nLine 3\n",
+ },
+ {
+ // In-line positioning (without corner cases).
+ "A\x1b[CB\x1b[2C?\x1b[DC\x1b[2D\b->",
+ "A B->C\n",
+ },
+ {
+ // Up and down.
+ "\nB\x1bMA\v\vC" + "\x1b[4EG" + "\x1b[FF" + "\x1b[2FD" + "\x1b[EE",
+ " A\nB\nC\nD\nE\nF\nG\n",
+ },
+ {
+ // In-line erasing.
+ "1234\b\b\x1b[K\n5678\b\b\x1b[0K\n" + "abcd\b\b\x1b[1K\nefgh\x1b[2K",
+ "12\n56\n cd\n\n",
+ },
+}
+
+func TestTerminal(t *testing.T) {
+ for _, test := range tests {
+ tw := terminalWriter{}
+ if _, err := tw.Write([]byte(test.push)); err != nil {
+ t.Errorf("%#v: %s", test.push, err)
+ continue
+ }
+ have := string(tw.Serialize(0))
+ if have != test.want {
+ t.Errorf("%#v: %#v; want %#v", test.push, have, test.want)
+ }
+ }
+}
+
+func TestTerminalExploded(t *testing.T) {
+Loop:
+ for _, test := range tests {
+ tw := terminalWriter{}
+ for _, b := range []byte(test.push) {
+ if _, err := tw.Write([]byte{b}); err != nil {
+ t.Errorf("%#v: %s", test.push, err)
+ continue Loop
+ }
+ }
+ have := string(tw.Serialize(0))
+ if have != test.want {
+ t.Errorf("%#v: %#v; want %#v", test.push, have, test.want)
+ }
+ }
+}
+
+func TestTerminalUpdateGroups(t *testing.T) {
+ tw := terminalWriter{}
+ collect := func() (have []int) {
+ for _, line := range tw.lines {
+ have = append(have, line.updateGroup)
+ }
+ return
+ }
+
+ // 0: A 0 0 0
+ // 1: B X 1 1 1
+ // 2: C Y 1 2 1 1
+ // 3: Z 2 3 2
+ // 4: 3 4
+ tw.Write([]byte("A\nB\nC\x1b[FX\nY\nZ"))
+ have, want := collect(), []int{0, 1, 1, 3}
+ if !slices.Equal(want, have) {
+ t.Errorf("update groups: %+v; want: %+v", have, want)
+ }
+
+ tw.Write([]byte("\x1b[F1\n2\n3"))
+ have, want = collect(), []int{0, 1, 1, 2, 4}
+ if !slices.Equal(want, have) {
+ t.Errorf("update groups: %+v; want: %+v", have, want)
+ }
+}