aboutsummaryrefslogtreecommitdiff
path: root/acid.go
diff options
context:
space:
mode:
Diffstat (limited to 'acid.go')
-rw-r--r--acid.go1125
1 files changed, 831 insertions, 294 deletions
diff --git a/acid.go b/acid.go
index 17ca23b..9fe9fe4 100644
--- a/acid.go
+++ b/acid.go
@@ -21,15 +21,18 @@ import (
"os"
"os/exec"
"os/signal"
+ "path/filepath"
"sort"
"strconv"
"strings"
"sync"
+ "sync/atomic"
"syscall"
ttemplate "text/template"
"time"
_ "github.com/mattn/go-sqlite3"
+ "github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"gopkg.in/yaml.v3"
)
@@ -38,9 +41,9 @@ var (
projectName = "acid"
projectVersion = "?"
- gConfig Config = Config{Listen: ":http"}
- gNotifyScript *ttemplate.Template
- gDB *sql.DB
+ gConfigPath string
+ gConfig atomic.Pointer[Config]
+ gDB *sql.DB
gNotifierSignal = make(chan struct{}, 1)
gExecutorSignal = make(chan struct{}, 1)
@@ -50,6 +53,8 @@ var (
gRunning = make(map[int64]*RunningTask)
)
+func getConfig() *Config { return gConfig.Load() }
+
// --- Config ------------------------------------------------------------------
type Config struct {
@@ -63,13 +68,16 @@ type Config struct {
Runners map[string]ConfigRunner `yaml:"runners"` // script runners
Projects map[string]ConfigProject `yaml:"projects"` // configured projects
+
+ notifyTemplate *ttemplate.Template
}
type ConfigRunner struct {
- Name string `yaml:"name"` // descriptive name
- Run string `yaml:"run"` // runner executable
- Setup string `yaml:"setup"` // runner setup script (SSH)
- SSH struct {
+ Name string `yaml:"name"` // descriptive name
+ Manual bool `yaml:"manual"` // only run on request
+ Run string `yaml:"run"` // runner executable
+ Setup string `yaml:"setup"` // runner setup script (SSH)
+ SSH struct {
User string `yaml:"user"` // remote username
Address string `yaml:"address"` // TCP host:port
Identity string `yaml:"identity"` // private key path
@@ -80,22 +88,48 @@ type ConfigProject struct {
Runners map[string]ConfigProjectRunner `yaml:"runners"`
}
+func (cf *ConfigProject) AutomaticRunners() (runners []string) {
+ // We pass through unknown runner names,
+ // so that they can cause reference errors later.
+ config := getConfig()
+ for runner := range cf.Runners {
+ if r, _ := config.Runners[runner]; !r.Manual {
+ runners = append(runners, runner)
+ }
+ }
+ sort.Strings(runners)
+ return
+}
+
type ConfigProjectRunner struct {
- Setup string `yaml:"setup"` // project setup script (SSH)
- Build string `yaml:"build"` // project build script (SSH)
+ Setup string `yaml:"setup"` // project setup script (SSH)
+ Build string `yaml:"build"` // project build script (SSH)
+ Deploy string `yaml:"deploy"` // project deploy script (local)
+ Timeout string `yaml:"timeout"` // timeout duration
}
-func parseConfig(path string) error {
- if f, err := os.Open(path); err != nil {
+// loadConfig reloads configuration.
+// Beware that changes do not get applied globally at the same moment.
+func loadConfig() error {
+ new := &Config{}
+ if f, err := os.Open(gConfigPath); err != nil {
return err
- } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil {
+ } else if err = yaml.NewDecoder(f).Decode(new); err != nil {
return err
}
+ if old := getConfig(); old != nil && old.DB != new.DB {
+ return fmt.Errorf("the database file cannot be changed in runtime")
+ }
var err error
- gNotifyScript, err =
- ttemplate.New("notify").Funcs(shellFuncs).Parse(gConfig.Notify)
- return err
+ new.notifyTemplate, err =
+ ttemplate.New("notify").Funcs(shellFuncs).Parse(new.Notify)
+ if err != nil {
+ return err
+ }
+
+ gConfig.Store(new)
+ return nil
}
var shellFuncs = ttemplate.FuncMap{
@@ -119,8 +153,16 @@ var shellFuncs = ttemplate.FuncMap{
// --- Utilities ---------------------------------------------------------------
+func localShell() string {
+ if shell := os.Getenv("SHELL"); shell != "" {
+ return shell
+ }
+ // The os/user package doesn't store the parsed out shell field.
+ return "/bin/sh"
+}
+
func giteaSign(b []byte) string {
- payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
+ payloadHmac := hmac.New(sha256.New, []byte(getConfig().Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
@@ -128,9 +170,9 @@ func giteaSign(b []byte) string {
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
*http.Request, error) {
req, err := http.NewRequestWithContext(
- ctx, method, gConfig.Gitea+path, body)
+ ctx, method, getConfig().Gitea+path, body)
if req != nil {
- req.Header.Set("Authorization", "token "+gConfig.Token)
+ req.Header.Set("Authorization", "token "+getConfig().Token)
req.Header.Set("Accept", "application/json")
}
return req, err
@@ -139,7 +181,9 @@ func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, `
SELECT id, owner, repo, hash, runner,
- state, detail, notified, runlog, tasklog FROM task `+query, args...)
+ created, changed, duration,
+ state, detail, notified,
+ runlog, tasklog, deploylog FROM task `+query, args...)
if err != nil {
return nil, err
}
@@ -149,10 +193,13 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
for rows.Next() {
var t Task
err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
- &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog)
+ &t.CreatedUnix, &t.ChangedUnix, &t.DurationSeconds,
+ &t.State, &t.Detail, &t.Notified,
+ &t.RunLog, &t.TaskLog, &t.DeployLog)
if err != nil {
return nil, err
}
+ // We could also update some fields from gRunning.
tasks = append(tasks, t)
}
return tasks, rows.Err()
@@ -173,6 +220,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
<thead>
<tr>
<th>ID</th>
+ <th>Created</th>
+ <th>Changed</th>
<th>Repository</th>
<th>Hash</th>
<th>Runner</th>
@@ -185,6 +234,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
{{range .}}
<tr>
<td><a href="task/{{.ID}}">{{.ID}}</a></td>
+ <td align="right"><span title="{{.Created}}">{{.CreatedAgo}}</span></td>
+ <td align="right"><span title="{{.Changed}}">{{.ChangedAgo}}</span></td>
<td><a href="{{.RepoURL}}">{{.FullName}}</a></td>
<td><a href="{{.CommitURL}}">{{.Hash}}</a></td>
<td>{{.RunnerName}}</td>
@@ -200,7 +251,7 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
`))
func handleTasks(w http.ResponseWriter, r *http.Request) {
- tasks, err := getTasks(r.Context(), `ORDER BY id DESC`)
+ tasks, err := getTasks(r.Context(), `ORDER BY changed DESC, id DESC`)
if err != nil {
http.Error(w,
"Error retrieving tasks: "+err.Error(),
@@ -219,37 +270,179 @@ var templateTask = template.Must(template.New("tasks").Parse(`
<head>
<title>Task {{.ID}}</title>
<meta charset="utf-8">
-{{if .IsRunning}}
-<meta http-equiv="refresh" content="5">
-{{end}}
</head>
<body>
<h1><a href="..">Tasks</a> &raquo; {{.ID}}</h1>
<dl>
+<!-- Remember to synchronise these lists with Javascript updates. -->
+{{if .Created -}}
+<dt>Created</dt>
+ <dd><span id="created" title="{{.Created}}">{{.CreatedAgo}} ago</span></dd>
+{{end -}}
+{{if .Changed -}}
+<dt>Changed</dt>
+ <dd><span id="changed" title="{{.Changed}}">{{.ChangedAgo}} ago</span></dd>
+{{end -}}
<dt>Project</dt>
- <dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
+ <dd id="project"><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
<dt>Commit</dt>
- <dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
+ <dd id="commit"><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
<dt>Runner</dt>
- <dd>{{.RunnerName}}</dd>
+ <dd id="runner">{{.RunnerName}}</dd>
<dt>State</dt>
- <dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
+ <dd id="state">{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
<dt>Notified</dt>
- <dd>{{.Notified}}</dd>
+ <dd id="notified">{{.Notified}}</dd>
+<dt>Duration</dt>
+ <dd id="duration">{{if .Duration}}{{.Duration}}{{else}}&mdash;{{end}}</dd>
</dl>
-{{if .RunLog}}
-<h2>Runner log</h2>
-<pre>{{printf "%s" .RunLog}}</pre>
-{{end}}
-{{if .TaskLog}}
-<h2>Task log</h2>
-<pre>{{printf "%s" .TaskLog}}</pre>
-{{end}}
-</table>
+
+<h2 id="run"{{if not .RunLog}} hidden{{end}}>Runner log</h2>
+<pre id="runlog"{{if not .RunLog}} hidden{{
+end}}>{{printf "%s" .RunLog}}</pre>
+<h2 id="task"{{if not .TaskLog}} hidden{{end}}>Task log</h2>
+<pre id="tasklog"{{if not .TaskLog}} hidden{{
+end}}>{{printf "%s" .TaskLog}}</pre>
+<h2 id="deploy"{{if not .DeployLog}} hidden{{end}}>Deploy log</h2>
+<pre id="deploylog"{{if not .DeployLog}} hidden{{
+end}}>{{printf "%s" .DeployLog}}</pre>
+
+{{if .IsRunning -}}
+<script>
+function get(id) {
+ return document.getElementById(id)
+}
+function getLog(id) {
+ const header = get(id), log = get(id + 'log'), text = log.textContent
+ // lines[-1] is an implementation detail of terminalWriter.Serialize,
+ // lines[-2] is the actual last line.
+ const last = Math.max(0, text.split('\n').length - 2)
+ return {header, log, text, last}
+}
+function refreshLog(log, top, changed) {
+ if (top <= 0)
+ log.log.textContent = changed
+ else
+ log.log.textContent =
+ log.text.split('\n').slice(0, top).join('\n') + '\n' + changed
+
+ const empty = log.log.textContent === ''
+ log.header.hidden = empty
+ log.log.hidden = empty
+}
+let refresher = setInterval(() => {
+ const run = getLog('run'), task = getLog('task'), deploy = getLog('deploy')
+ const url = new URL(window.location.href)
+ url.search = ''
+ url.searchParams.set('json', '')
+ url.searchParams.set('run', run.last)
+ url.searchParams.set('task', task.last)
+ url.searchParams.set('deploy', deploy.last)
+ fetch(url.toString()).then(response => {
+ if (!response.ok)
+ throw response.statusText
+ return response.json()
+ }).then(data => {
+ const scroll = window.scrollY + window.innerHeight
+ >= document.documentElement.scrollHeight
+ if (data.Created) {
+ get('created').title = data.Created
+ get('created').textContent = data.CreatedAgo + " ago"
+ }
+ if (data.Changed) {
+ get('changed').title = data.Changed
+ get('changed').textContent = data.ChangedAgo + " ago"
+ }
+ get('state').textContent = data.State
+ if (data.Detail !== '')
+ get('state').textContent += " (" + data.Detail + ")"
+ get('notified').textContent = String(data.Notified)
+ if (data.Duration)
+ get('duration').textContent = data.Duration
+
+ refreshLog(run, data.RunLogTop, data.RunLog)
+ refreshLog(task, data.TaskLogTop, data.TaskLog)
+ refreshLog(deploy, data.DeployLogTop, data.DeployLog)
+
+ if (scroll)
+ document.documentElement.scrollTop =
+ document.documentElement.scrollHeight
+ if (!data.IsRunning)
+ clearInterval(refresher)
+ }).catch(error => {
+ clearInterval(refresher)
+ alert(error)
+ })
+}, 1000 /* For faster updates than this, we should use WebSockets. */)
+</script>
+{{end -}}
+
</body>
</html>
`))
+// handlerTask serves as the data for JSON encoding and the task HTML template.
+// It needs to explicitly include many convenience method results.
+type handlerTask struct {
+ Task
+ IsRunning bool
+
+ Created *string // Task.Created?.String()
+ Changed *string // Task.Changed?.String()
+ CreatedAgo string // Task.CreatedAgo()
+ ChangedAgo string // Task.ChangedAgo()
+ Duration *string // Task.Duration?.String()
+ State string // Task.State.String()
+ RunLog string
+ RunLogTop int
+ TaskLog string
+ TaskLogTop int
+ DeployLog string
+ DeployLogTop int
+}
+
+func toNilableString[T fmt.Stringer](stringer *T) *string {
+ if stringer == nil {
+ return nil
+ }
+ s := (*stringer).String()
+ return &s
+}
+
+func newHandlerTask(task Task) handlerTask {
+ return handlerTask{
+ Task: task,
+ RunLog: string(task.RunLog),
+ TaskLog: string(task.TaskLog),
+ DeployLog: string(task.DeployLog),
+
+ Created: toNilableString(task.Created()),
+ Changed: toNilableString(task.Changed()),
+ CreatedAgo: task.CreatedAgo(),
+ ChangedAgo: task.ChangedAgo(),
+ Duration: toNilableString(task.Duration()),
+ State: task.State.String(),
+ }
+}
+
+func (ht *handlerTask) updateFromRunning(
+ rt *RunningTask, lastRun, lastTask, lastDeploy int) {
+ ht.IsRunning = true
+ ht.Task.DurationSeconds = rt.elapsed()
+ ht.Duration = toNilableString(ht.Task.Duration())
+
+ rt.RunLog.Lock()
+ defer rt.RunLog.Unlock()
+ rt.TaskLog.Lock()
+ defer rt.TaskLog.Unlock()
+ rt.DeployLog.Lock()
+ defer rt.DeployLog.Unlock()
+
+ ht.RunLog, ht.RunLogTop = rt.RunLog.SerializeUpdates(lastRun)
+ ht.TaskLog, ht.TaskLogTop = rt.TaskLog.SerializeUpdates(lastTask)
+ ht.DeployLog, ht.DeployLogTop = rt.DeployLog.SerializeUpdates(lastDeploy)
+}
+
func handleTask(w http.ResponseWriter, r *http.Request) {
id, err := strconv.Atoi(r.PathValue("id"))
if err != nil {
@@ -270,31 +463,32 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
return
}
- task := struct {
- Task
- IsRunning bool
- }{Task: tasks[0]}
+ // These are intended for running tasks,
+ // so don't reprocess DB logs, which would only help the last update.
+ q := r.URL.Query()
+ lastRun, _ := strconv.Atoi(q.Get("run"))
+ lastTask, _ := strconv.Atoi(q.Get("task"))
+ lastDeploy, _ := strconv.Atoi(q.Get("deploy"))
+
+ task := newHandlerTask(tasks[0])
func() {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
- rt, ok := gRunning[task.ID]
- task.IsRunning = ok
- if !ok {
- return
+ if rt, ok := gRunning[task.ID]; ok {
+ task.updateFromRunning(
+ rt, int(lastRun), int(lastTask), int(lastDeploy))
}
-
- rt.RunLog.mu.Lock()
- defer rt.RunLog.mu.Unlock()
- rt.TaskLog.mu.Lock()
- defer rt.TaskLog.mu.Unlock()
-
- task.RunLog = rt.RunLog.b
- task.TaskLog = rt.TaskLog.b
}()
- if err := templateTask.Execute(w, &task); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ if q.Has("json") {
+ w.Header().Set("Content-Type", "application/json")
+ err = json.NewEncoder(w).Encode(&task)
+ } else {
+ err = templateTask.Execute(w, &task)
+ }
+ if err != nil {
+ log.Println(err)
}
}
@@ -321,8 +515,9 @@ func createTasks(ctx context.Context,
}
defer tx.Rollback()
- stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner)
- VALUES (?, ?, ?, ?)`)
+ stmt, err := tx.Prepare(
+ `INSERT INTO task(owner, repo, hash, runner, created, changed)
+ VALUES (?, ?, ?, ?, unixepoch('now'), unixepoch('now'))`)
if err != nil {
return err
}
@@ -371,19 +566,14 @@ func handlePush(w http.ResponseWriter, r *http.Request) {
log.Printf("received push: %s %s\n",
event.Repository.FullName, event.HeadCommit.ID)
- project, ok := gConfig.Projects[event.Repository.FullName]
+ project, ok := getConfig().Projects[event.Repository.FullName]
if !ok {
// This is okay, don't set any commit statuses.
fmt.Fprintf(w, "The project is not configured.")
return
}
- runners := []string{}
- for name := range project.Runners {
- runners = append(runners, name)
- }
- sort.Strings(runners)
-
+ runners := project.AutomaticRunners()
if err := createTasks(r.Context(),
event.Repository.Owner.Username, event.Repository.Name,
event.HeadCommit.ID, runners); err != nil {
@@ -408,8 +598,11 @@ func rpcRestartOne(ctx context.Context, id int64) error {
// The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here.
- result, err := gDB.ExecContext(ctx, `UPDATE task
- SET state = ?, detail = '', notified = 0 WHERE id = ?`,
+ //
+ // We deliberately do not clear previous run data (duration, *log).
+ result, err := gDB.ExecContext(ctx,
+ `UPDATE task SET changed = unixepoch('now'),
+ state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
return fmt.Errorf("%d: %w", id, err)
@@ -485,18 +678,15 @@ func rpcEnqueue(ctx context.Context,
return fmt.Errorf("%s: %w", ref, err)
}
- project, ok := gConfig.Projects[owner+"/"+repo]
+ project, ok := getConfig().Projects[owner+"/"+repo]
if !ok {
return fmt.Errorf("project configuration not found")
}
runners := fs.Args()[3:]
if len(runners) == 0 {
- for runner := range project.Runners {
- runners = append(runners, runner)
- }
+ runners = project.AutomaticRunners()
}
- sort.Strings(runners)
for _, runner := range runners {
if _, ok := project.Runners[runner]; !ok {
@@ -540,6 +730,17 @@ func rpcRestart(ctx context.Context,
return nil
}
+func rpcReload(ctx context.Context,
+ w io.Writer, fs *flag.FlagSet, args []string) error {
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if fs.NArg() > 0 {
+ return errWrongUsage
+ }
+ return loadConfig()
+}
+
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
var rpcCommands = map[string]struct {
@@ -552,6 +753,8 @@ var rpcCommands = map[string]struct {
"Create or restart tasks for the given reference."},
"restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."},
+ "reload": {rpcReload, "",
+ "Reload configuration."},
}
func rpcPrintCommands(w io.Writer) {
@@ -638,12 +841,12 @@ func handleRPC(w http.ResponseWriter, r *http.Request) {
func notifierRunCommand(ctx context.Context, task Task) {
script := bytes.NewBuffer(nil)
- if err := gNotifyScript.Execute(script, &task); err != nil {
+ if err := getConfig().notifyTemplate.Execute(script, &task); err != nil {
log.Printf("error: notify: %s", err)
return
}
- cmd := exec.CommandContext(ctx, "sh")
+ cmd := exec.CommandContext(ctx, localShell())
cmd.Stdin = script
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -665,13 +868,14 @@ func notifierNotify(ctx context.Context, task Task) error {
TargetURL string `json:"target_url"`
}{}
- runner, ok := gConfig.Runners[task.Runner]
+ config := getConfig()
+ runner, ok := config.Runners[task.Runner]
if !ok {
log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner)
return nil
}
payload.Context = runner.Name
- payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID)
+ payload.TargetURL = fmt.Sprintf("%s/task/%d", config.Root, task.ID)
switch task.State {
case taskStateNew:
@@ -767,86 +971,318 @@ func notifierAwaken() {
}
// --- Executor ----------------------------------------------------------------
+// ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-type terminalWriter struct {
- b []byte
- cur int
- mu sync.Mutex
+// RunningTask stores all data pertaining to a currently running task.
+type RunningTask struct {
+ DB Task
+ Runner ConfigRunner
+ ProjectRunner ConfigProjectRunner
+
+ RunLog terminalWriter
+ TaskLog terminalWriter
+ DeployLog terminalWriter
+
+ wd string // acid working directory
+ timeout time.Duration // time limit on task execution
+ signer ssh.Signer // SSH private key
+ tmplScript *ttemplate.Template // remote build script
+ tmplDeploy *ttemplate.Template // local deployment script
}
-func (tw *terminalWriter) Write(p []byte) (written int, err error) {
- tw.mu.Lock()
- defer tw.mu.Unlock()
+// newRunningTask prepares a task for running, without executing anything yet.
+func newRunningTask(task Task) (*RunningTask, error) {
+ rt := &RunningTask{DB: task}
+ config := getConfig()
+
+ // This is for our own tracking, not actually written to database.
+ rt.DB.ChangedUnix = time.Now().Unix()
- // Extremely rudimentary emulation of a dumb terminal.
- for _, b := range p {
- // Enough is enough, writing too much is highly suspicious.
- if len(tw.b) > 64<<20 {
- return written, errors.New("too much terminal output")
+ var ok bool
+ rt.Runner, ok = config.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner)
+ }
+ project, ok := config.Projects[rt.DB.FullName()]
+ if !ok {
+ return nil, fmt.Errorf("project configuration not found")
+ }
+ rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf(
+ "project not configured for runner %s", rt.DB.Runner)
+ }
+
+ var err error
+ if rt.wd, err = os.Getwd(); err != nil {
+ return nil, err
+ }
+
+ // Lenient or not, some kind of a time limit is desirable.
+ rt.timeout = time.Hour
+ if rt.ProjectRunner.Timeout != "" {
+ rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
+ if err != nil {
+ return nil, fmt.Errorf("timeout: %w", err)
}
+ }
- switch b {
- case '\b':
- if tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
- tw.cur--
- }
- case '\r':
- for tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
- tw.cur--
- }
- case '\n':
- tw.b = append(tw.b, b)
- tw.cur = len(tw.b)
- default:
- tw.b = append(tw.b[:tw.cur], b)
- tw.cur = len(tw.b)
+ privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+ rt.signer, err = ssh.ParsePrivateKey(privateKey)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+
+ // The runner setup script may change the working directory,
+ // so do everything in one go. However, this approach also makes it
+ // difficult to distinguish project-independent runner failures.
+ // (For that, we can start multiple ssh.Sessions.)
+ //
+ // We could pass variables through SSH environment variables,
+ // which would require enabling PermitUserEnvironment in sshd_config,
+ // or through prepending script lines, but templates are a bit simpler.
+ //
+ // We let the runner itself clone the repository:
+ // - it is a more flexible in that it can test AUR packages more normally,
+ // - we might have to clone submodules as well.
+ // Otherwise, we could download a source archive from Gitea,
+ // and use SFTP to upload it to the runner.
+ rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs).
+ Parse(rt.Runner.Setup + "\n" +
+ rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ if err != nil {
+ return nil, fmt.Errorf("script/build: %w", err)
+ }
+
+ rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs).
+ Parse(rt.ProjectRunner.Deploy)
+ if err != nil {
+ return nil, fmt.Errorf("script/deploy: %w", err)
+ }
+
+ if os.Getenv("ACID_TERMINAL_DEBUG") != "" {
+ base := filepath.Join(executorTmpDir("/tmp"),
+ fmt.Sprintf("acid-%d-%s-%s-%s-",
+ task.ID, task.Owner, task.Repo, task.Runner))
+ rt.RunLog.Tee, _ = os.Create(base + "runlog")
+ rt.TaskLog.Tee, _ = os.Create(base + "tasklog")
+ // The deployment log should not be interesting.
+ }
+ return rt, nil
+}
+
+func (rt *RunningTask) close() {
+ for _, tee := range []io.WriteCloser{
+ rt.RunLog.Tee, rt.TaskLog.Tee, rt.DeployLog.Tee} {
+ if tee != nil {
+ tee.Close()
}
+ }
+}
+
+// localEnv creates a process environment for locally run executables.
+func (rt *RunningTask) localEnv() []string {
+ return append(os.Environ(),
+ "ACID_ROOT="+rt.wd,
+ "ACID_RUNNER="+rt.DB.Runner,
+ )
+}
+
+func (rt *RunningTask) elapsed() int64 {
+ return int64(time.Since(time.Unix(rt.DB.ChangedUnix, 0)).Seconds())
+}
+
+// update stores the running task's state in the database.
+func (rt *RunningTask) update() error {
+ for _, i := range []struct {
+ tw *terminalWriter
+ log *[]byte
+ }{
+ {&rt.RunLog, &rt.DB.RunLog},
+ {&rt.TaskLog, &rt.DB.TaskLog},
+ {&rt.DeployLog, &rt.DB.DeployLog},
+ } {
+ i.tw.Lock()
+ defer i.tw.Unlock()
+ if *i.log = i.tw.Serialize(0); *i.log == nil {
+ *i.log = []byte{}
+ }
+ }
+ rt.DB.DurationSeconds = rt.elapsed()
+ return rt.DB.update()
+}
+
+// ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorDownloadNode(sc *sftp.Client, remotePath, localPath string,
+ info os.FileInfo) error {
+ if info.IsDir() {
+ // Hoping that caller invokes us on parents first.
+ return os.MkdirAll(localPath, info.Mode().Perm())
+ }
+
+ src, err := sc.Open(remotePath)
+ if err != nil {
+ return fmt.Errorf("failed to open remote file %s: %w", remotePath, err)
+ }
+ defer src.Close()
+
+ dst, err := os.OpenFile(
+ localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
+ if err != nil {
+ return fmt.Errorf("failed to create local file: %w", err)
+ }
+ defer dst.Close()
+
+ if _, err = io.Copy(dst, src); err != nil {
+ return fmt.Errorf("failed to copy file from remote %s to local %s: %w",
+ remotePath, localPath, err)
+ }
+ return nil
+}
+
+func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error {
+ sc, err := sftp.NewClient(client)
+ if err != nil {
+ return err
+ }
+ defer sc.Close()
+ walker := sc.Walk(remoteRoot)
+ for walker.Step() {
+ if walker.Err() != nil {
+ return walker.Err()
+ }
+ relativePath, err := filepath.Rel(remoteRoot, walker.Path())
if err != nil {
- break
+ return err
+ }
+ if err = executorDownloadNode(sc, walker.Path(),
+ filepath.Join(localRoot, relativePath), walker.Stat()); err != nil {
+ return err
}
- written += 1
}
- return
+ return nil
}
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+func executorTmpDir(fallback string) string {
+ // See also: https://systemd.io/TEMPORARY_DIRECTORIES/
+ if tmp := os.Getenv("TMPDIR"); tmp != "" {
+ return tmp
+ }
+ return fallback
+}
-type RunningTask struct {
- DB Task
- Runner ConfigRunner
- ProjectRunner ConfigProjectRunner
+func executorDeploy(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Deploy template failed", err}
+ }
+
+ // Thus the deployment directory must exist iff the script is not empty.
+ if script.Len() == 0 {
+ return nil
+ }
+
+ // We expect the files to be moved elsewhere on the filesystem,
+ // and they may get very large, so avoid /tmp.
+ dir := filepath.Join(executorTmpDir("/var/tmp"), "acid-deploy")
+ if err := os.RemoveAll(dir); err != nil {
+ return err
+ }
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return err
+ }
+
+ // The passed remoteRoot is relative to sc.Getwd.
+ if err := executorDownload(client, "acid-deploy", dir); err != nil {
+ return err
+ }
- RunLog terminalWriter
- TaskLog terminalWriter
+ cmd := exec.CommandContext(ctx, localShell())
+ cmd.Env = rt.localEnv()
+ cmd.Dir = dir
+ cmd.Stdin = script
+ cmd.Stdout = &rt.DeployLog
+ cmd.Stderr = &rt.DeployLog
+ return cmd.Run()
}
-func executorUpdate(rt *RunningTask) error {
- rt.RunLog.mu.Lock()
- defer rt.RunLog.mu.Unlock()
- rt.DB.RunLog = bytes.Clone(rt.RunLog.b)
- if rt.DB.RunLog == nil {
- rt.DB.RunLog = []byte{}
+// ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorBuild(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ // This is here to fail early, though logically it is misplaced.
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplScript.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Script template failed", err}
}
- rt.TaskLog.mu.Lock()
- defer rt.TaskLog.mu.Unlock()
- rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b)
- if rt.DB.TaskLog == nil {
- rt.DB.TaskLog = []byte{}
+ session, err := client.NewSession()
+ if err != nil {
+ return &executorError{"SSH failure", err}
}
+ defer session.Close()
- _, err := gDB.ExecContext(context.Background(), `UPDATE task
- SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ?
- WHERE id = ?`,
- rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog,
- rt.DB.ID)
- if err == nil {
- notifierAwaken()
+ modes := ssh.TerminalModes{ssh.ECHO: 0}
+ if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
+ return &executorError{"SSH failure", err}
+ }
+
+ log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
+
+ session.Stdout = &rt.TaskLog
+ session.Stderr = &rt.TaskLog
+
+ // Although passing the script directly takes away the option to specify
+ // a particular shell (barring here-documents!), it is simple and reliable.
+ //
+ // Passing the script over Stdin to sh tended to end up with commands
+ // eating the script during processing, and resulted in a hang,
+ // because closing the Stdin does not result in remote processes
+ // getting a stream of EOF.
+ //
+ // Piping the script into `cat | sh` while appending a ^D to the end of it
+ // appeared to work, but it seems likely that commands might still steal
+ // script bytes from the cat program if they choose to read from the tty
+ // and the script is longer than the buffer.
+ chSession := make(chan error, 1)
+ go func() {
+ chSession <- session.Run(script.String())
+ close(chSession)
+ }()
+
+ select {
+ case <-ctx.Done():
+ // Either shutdown, or early runner termination.
+ // The runner is not supposed to finish before the session.
+ err = context.Cause(ctx)
+ case err = <-chSession:
+ // Killing a runner may perfectly well trigger this first,
+ // in particular when it's on the same machine.
}
return err
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+// executorError describes a taskStateError.
+type executorError struct {
+ Detail string
+ Err error
+}
+
+func (e *executorError) Unwrap() error { return e.Err }
+func (e *executorError) Error() string {
+ return fmt.Sprintf("%s: %s", e.Detail, e.Err)
+}
+
func executorConnect(
ctx context.Context, config *ssh.ClientConfig, address string) (
*ssh.Client, error) {
@@ -888,111 +1324,60 @@ func executorConnect(
time.Sleep(1 * time.Second)
continue
}
-
return ssh.NewClient(sc, chans, reqs), nil
}
}
func executorRunTask(ctx context.Context, task Task) error {
- rt := &RunningTask{DB: task}
-
- var ok bool
- rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf("unknown runner: %s", rt.DB.Runner)
- }
- project, ok := gConfig.Projects[rt.DB.FullName()]
- if !ok {
- return fmt.Errorf("project configuration not found")
- }
- rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf(
- "project not configured for runner %s", rt.DB.Runner)
- }
-
- wd, err := os.Getwd()
- if err != nil {
- return err
- }
-
- // The runner setup script may change the working directory,
- // so do everything in one go. However, this approach also makes it
- // difficult to distinguish project-independent runner failures.
- // (For that, we can start multiple ssh.Sessions.)
- //
- // We could pass variables through SSH environment variables,
- // which would require enabling PermitUserEnvironment in sshd_config,
- // or through prepending script lines, but templates are a bit simpler.
- //
- // We let the runner itself clone the repository:
- // - it is a more flexible in that it can test AUR packages more normally,
- // - we might have to clone submodules as well.
- // Otherwise, we could download a source archive from Gitea,
- // and use SFTP to upload it to the runner.
- tmplScript, err := ttemplate.New("script").Funcs(shellFuncs).
- Parse(rt.Runner.Setup + "\n" +
- rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ rt, err := newRunningTask(task)
if err != nil {
- return fmt.Errorf("script: %w", err)
+ task.DurationSeconds = 0
+ task.State, task.Detail = taskStateError, "Misconfigured"
+ task.Notified = 0
+ task.RunLog = []byte(err.Error())
+ task.TaskLog = []byte{}
+ task.DeployLog = []byte{}
+ return task.update()
}
+ defer rt.close()
- privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
- if err != nil {
- return fmt.Errorf(
- "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
- signer, err := ssh.ParsePrivateKey(privateKey)
- if err != nil {
- return fmt.Errorf(
- "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
+ ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout)
+ defer cancelTimeout()
- defer func() {
+ // RunningTasks can be concurrently accessed by HTTP handlers.
+ locked := func(f func()) {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
-
- delete(gRunning, rt.DB.ID)
- }()
- func() {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
+ f()
+ }
+ locked(func() {
+ rt.DB.DurationSeconds = 0
rt.DB.State, rt.DB.Detail = taskStateRunning, ""
rt.DB.Notified = 0
rt.DB.RunLog = []byte{}
rt.DB.TaskLog = []byte{}
+ rt.DB.DeployLog = []byte{}
gRunning[rt.DB.ID] = rt
- }()
- if err := executorUpdate(rt); err != nil {
+ })
+ defer locked(func() {
+ delete(gRunning, rt.DB.ID)
+ })
+ if err := rt.update(); err != nil {
return fmt.Errorf("SQL: %w", err)
}
// Errors happening while trying to write an error are unfortunate,
// but not important enough to abort entirely.
setError := func(detail string) {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
rt.DB.State, rt.DB.Detail = taskStateError, detail
- if err := executorUpdate(rt); err != nil {
+ if err := rt.update(); err != nil {
log.Printf("error: task %d for %s: SQL: %s",
rt.DB.ID, rt.DB.FullName(), err)
}
}
- script := bytes.NewBuffer(nil)
- if err := tmplScript.Execute(script, &rt.DB); err != nil {
- setError("Script template failed")
- return err
- }
-
cmd := exec.CommandContext(ctx, rt.Runner.Run)
- cmd.Env = append(
- os.Environ(),
- "ACID_ROOT="+wd,
- "ACID_RUNNER="+rt.DB.Runner,
- )
+ cmd.Env = rt.localEnv()
// Pushing the runner into a new process group that can be killed at once
// with all its children isn't bullet-proof, it messes with job control
@@ -1016,7 +1401,8 @@ func executorRunTask(ctx context.Context, task Task) error {
cmd.Stdout = &rt.RunLog
cmd.Stderr = &rt.RunLog
if err := cmd.Start(); err != nil {
- setError("Runner failed to start")
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ locked(func() { setError("Runner failed to start") })
return err
}
@@ -1042,78 +1428,62 @@ func executorRunTask(ctx context.Context, task Task) error {
client, err := executorConnect(ctxRunner, &ssh.ClientConfig{
User: rt.Runner.SSH.User,
- Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
+ Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}, rt.Runner.SSH.Address)
if err != nil {
fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
+ locked(func() { setError("SSH failure") })
return err
}
defer client.Close()
- session, err := client.NewSession()
- if err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
- }
- defer session.Close()
+ var (
+ eeSSH *ssh.ExitError
+ eeExec *exec.ExitError
+ ee3 *executorError
+ )
- modes := ssh.TerminalModes{ssh.ECHO: 0}
- if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
+ err = executorBuild(ctxRunner, client, rt)
+ if err != nil {
+ locked(func() {
+ if errors.As(err, &eeSSH) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ } else if errors.As(err, &ee3) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee3.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
- log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
-
- session.Stdout = &rt.TaskLog
- session.Stderr = &rt.TaskLog
-
- // Although passing the script directly takes away the option to specify
- // a particular shell (barring here-documents!), it is simple and reliable.
- //
- // Passing the script over Stdin to sh tended to end up with commands
- // eating the script during processing, and resulted in a hang,
- // because closing the Stdin does not result in remote processes
- // getting a stream of EOF.
- //
- // Piping the script into `cat | sh` while appending a ^D to the end of it
- // appeared to work, but it seems likely that commands might still steal
- // script bytes from the cat program if they choose to read from the tty
- // and the script is longer than the buffer.
- chSession := make(chan error, 1)
+ // This is so that it doesn't stay hanging within the sftp package,
+ // which uses context.Background() everywhere.
go func() {
- chSession <- session.Run(script.String())
- close(chSession)
+ <-ctxRunner.Done()
+ client.Close()
}()
- select {
- case <-ctxRunner.Done():
- // Either shutdown, or early runner termination.
- // The runner is not supposed to finish before the session.
- err = context.Cause(ctxRunner)
- case err = <-chSession:
- // Killing a runner may perfectly well trigger this first,
- // in particular when it's on the same machine.
- }
-
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
- var ee *ssh.ExitError
- if err == nil {
- rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
- } else if errors.As(err, &ee) {
- rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- } else {
- rt.DB.State, rt.DB.Detail = taskStateError, ""
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- }
- return executorUpdate(rt)
+ err = executorDeploy(ctxRunner, client, rt)
+ locked(func() {
+ if err == nil {
+ rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
+ } else if errors.As(err, &eeExec) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed"
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ } else if errors.As(err, &ee3) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee3.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
func executorRun(ctx context.Context) error {
@@ -1191,17 +1561,23 @@ type Task struct {
Hash string
Runner string
- State taskState
- Detail string
- Notified int64
- RunLog []byte
- TaskLog []byte
+ // True database names for these are occupied by accessors.
+ CreatedUnix int64
+ ChangedUnix int64
+ DurationSeconds int64
+
+ State taskState
+ Detail string
+ Notified int64
+ RunLog []byte
+ TaskLog []byte
+ DeployLog []byte
}
func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
func (t *Task) RunnerName() string {
- if runner, ok := gConfig.Runners[t.Runner]; !ok {
+ if runner, ok := getConfig().Runners[t.Runner]; !ok {
return t.Runner
} else {
return runner.Name
@@ -1209,42 +1585,129 @@ func (t *Task) RunnerName() string {
}
func (t *Task) URL() string {
- return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID)
+ return fmt.Sprintf("%s/task/%d", getConfig().Root, t.ID)
}
func (t *Task) RepoURL() string {
- return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo)
+ return fmt.Sprintf("%s/%s/%s", getConfig().Gitea, t.Owner, t.Repo)
}
func (t *Task) CommitURL() string {
return fmt.Sprintf("%s/%s/%s/commit/%s",
- gConfig.Gitea, t.Owner, t.Repo, t.Hash)
+ getConfig().Gitea, t.Owner, t.Repo, t.Hash)
}
func (t *Task) CloneURL() string {
- return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo)
+ return fmt.Sprintf("%s/%s/%s.git", getConfig().Gitea, t.Owner, t.Repo)
+}
+
+func shortDurationString(d time.Duration) string {
+ if d.Abs() >= 24*time.Hour {
+ return strconv.FormatInt(int64(d/time.Hour/24), 10) + "d"
+ } else if d.Abs() >= time.Hour {
+ return strconv.FormatInt(int64(d/time.Hour), 10) + "h"
+ } else if d.Abs() >= time.Minute {
+ return strconv.FormatInt(int64(d/time.Minute), 10) + "m"
+ } else {
+ return strconv.FormatInt(int64(d/time.Second), 10) + "s"
+ }
+}
+
+func (t *Task) Created() *time.Time {
+ if t.CreatedUnix == 0 {
+ return nil
+ }
+ tt := time.Unix(t.CreatedUnix, 0)
+ return &tt
+}
+func (t *Task) Changed() *time.Time {
+ if t.ChangedUnix == 0 {
+ return nil
+ }
+ tt := time.Unix(t.ChangedUnix, 0)
+ return &tt
}
-const schemaSQL = `
+func (t *Task) CreatedAgo() string {
+ if t.CreatedUnix == 0 {
+ return ""
+ }
+ return shortDurationString(time.Since(*t.Created()))
+}
+
+func (t *Task) ChangedAgo() string {
+ if t.ChangedUnix == 0 {
+ return ""
+ }
+ return shortDurationString(time.Since(*t.Changed()))
+}
+
+func (t *Task) Duration() *time.Duration {
+ if t.DurationSeconds == 0 {
+ return nil
+ }
+ td := time.Duration(t.DurationSeconds * int64(time.Second))
+ return &td
+}
+
+func (t *Task) update() error {
+ _, err := gDB.ExecContext(context.Background(),
+ `UPDATE task SET changed = unixepoch('now'), duration = ?,
+ state = ?, detail = ?, notified = ?,
+ runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`,
+ t.DurationSeconds,
+ t.State, t.Detail, t.Notified,
+ t.RunLog, t.TaskLog, t.DeployLog, t.ID)
+ if err == nil {
+ notifierAwaken()
+ }
+ return err
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+const initializeSQL = `
+PRAGMA application_id = 0x61636964; -- "acid" in big endian
+
CREATE TABLE IF NOT EXISTS task(
- id INTEGER NOT NULL, -- unique ID
+ id INTEGER NOT NULL, -- unique ID
+
+ owner TEXT NOT NULL, -- Gitea username
+ repo TEXT NOT NULL, -- Gitea repository name
+ hash TEXT NOT NULL, -- commit hash
+ runner TEXT NOT NULL, -- the runner to use
- owner TEXT NOT NULL, -- Gitea username
- repo TEXT NOT NULL, -- Gitea repository name
- hash TEXT NOT NULL, -- commit hash
- runner TEXT NOT NULL, -- the runner to use
+ created INTEGER NOT NULL DEFAULT 0, -- creation timestamp
+ changed INTEGER NOT NULL DEFAULT 0, -- last state change timestamp
+ duration INTEGER NOT NULL DEFAULT 0, -- duration of last run
- state INTEGER NOT NULL DEFAULT 0, -- task state
- detail TEXT NOT NULL DEFAULT '', -- task state detail
- notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
- runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
- tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ state INTEGER NOT NULL DEFAULT 0, -- task state
+ detail TEXT NOT NULL DEFAULT '', -- task state detail
+ notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
+ runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
+ tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ deploylog BLOB NOT NULL DEFAULT x'', -- deployment output
PRIMARY KEY (id)
) STRICT;
`
-func openDB(path string) error {
+func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error {
+ var count int64
+ if err := tx.QueryRow(
+ `SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`,
+ table, column).Scan(&count); err != nil {
+ return err
+ } else if count == 1 {
+ return nil
+ }
+
+ _, err := tx.Exec(
+ `ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition)
+ return err
+}
+
+func dbOpen(path string) error {
var err error
gDB, err = sql.Open("sqlite3",
"file:"+path+"?_foreign_keys=1&_busy_timeout=1000")
@@ -1252,10 +1715,55 @@ func openDB(path string) error {
return err
}
- _, err = gDB.Exec(schemaSQL)
- return err
+ tx, err := gDB.BeginTx(context.Background(), nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var version int64
+ if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
+ return err
+ }
+
+ switch version {
+ case 0:
+ if _, err = tx.Exec(initializeSQL); err != nil {
+ return err
+ }
+
+ // We had not initially set a database schema version,
+ // so we're stuck checking this column even on new databases.
+ if err = dbEnsureColumn(tx,
+ `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil {
+ return err
+ }
+ case 1:
+ if err = dbEnsureColumn(tx,
+ `task`, `created`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ if err = dbEnsureColumn(tx,
+ `task`, `changed`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ if err = dbEnsureColumn(tx,
+ `task`, `duration`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
+ return err
+ }
+ case 2:
+ // The next migration goes here, remember to increment the number below.
+ }
+
+ if _, err = tx.Exec(
+ `PRAGMA user_version = ` + strconv.Itoa(2)); err != nil {
+ return err
+ }
+ return tx.Commit()
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
// callRPC forwards command line commands to a running server.
func callRPC(args []string) error {
body, err := json.Marshal(args)
@@ -1264,7 +1772,7 @@ func callRPC(args []string) error {
}
req, err := http.NewRequest(http.MethodPost,
- fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body))
+ fmt.Sprintf("%s/rpc", getConfig().Root), bytes.NewReader(body))
if err != nil {
return err
}
@@ -1287,8 +1795,30 @@ func callRPC(args []string) error {
return nil
}
+// filterTTY exposes the internal virtual terminal filter.
+func filterTTY(path string) {
+ var r io.Reader = os.Stdin
+ if path != "-" {
+ if f, err := os.Open(path); err != nil {
+ log.Println(err)
+ } else {
+ r = f
+ defer f.Close()
+ }
+ }
+
+ var tw terminalWriter
+ if _, err := io.Copy(&tw, r); err != nil {
+ log.Printf("%s: %s\n", path, err)
+ }
+ if _, err := os.Stdout.Write(tw.Serialize(0)); err != nil {
+ log.Printf("%s: %s\n", path, err)
+ }
+}
+
func main() {
version := flag.Bool("version", false, "show version and exit")
+ tty := flag.Bool("tty", false, "run the internal virtual terminal filter")
flag.Usage = func() {
f := flag.CommandLine.Output()
@@ -1306,8 +1836,15 @@ func main() {
fmt.Printf("%s %s\n", projectName, projectVersion)
return
}
+ if *tty {
+ for _, path := range flag.Args() {
+ filterTTY(path)
+ }
+ return
+ }
- if err := parseConfig(flag.Arg(0)); err != nil {
+ gConfigPath = flag.Arg(0)
+ if err := loadConfig(); err != nil {
log.Fatalln(err)
}
if flag.NArg() > 1 {
@@ -1317,7 +1854,7 @@ func main() {
return
}
- if err := openDB(gConfig.DB); err != nil {
+ if err := dbOpen(getConfig().DB); err != nil {
log.Fatalln(err)
}
defer gDB.Close()
@@ -1326,7 +1863,7 @@ func main() {
ctx, stop := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM)
- server := &http.Server{Addr: gConfig.Listen}
+ server := &http.Server{Addr: getConfig().Listen}
http.HandleFunc("/{$}", handleTasks)
http.HandleFunc("/task/{id}", handleTask)
http.HandleFunc("/push", handlePush)