aboutsummaryrefslogtreecommitdiff
path: root/acid.go
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2024-03-29 14:08:15 +0100
committerPřemysl Eric Janouch <p@janouch.name>2024-04-04 19:40:14 +0200
commitfd6959fff82a87e92d9e73cb07e210cebb675050 (patch)
treed7fd170022d99e9d6a978208ab31f169f3574ad8 /acid.go
downloadacid-fd6959fff82a87e92d9e73cb07e210cebb675050.tar.gz
acid-fd6959fff82a87e92d9e73cb07e210cebb675050.tar.xz
acid-fd6959fff82a87e92d9e73cb07e210cebb675050.zip
Initial commit
Diffstat (limited to 'acid.go')
-rw-r--r--acid.go1157
1 files changed, 1157 insertions, 0 deletions
diff --git a/acid.go b/acid.go
new file mode 100644
index 0000000..2f676d2
--- /dev/null
+++ b/acid.go
@@ -0,0 +1,1157 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "database/sql"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "html/template"
+ "io"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "os/exec"
+ "os/signal"
+ "sort"
+ "strconv"
+ "sync"
+ "syscall"
+ ttemplate "text/template"
+ "time"
+
+ _ "github.com/mattn/go-sqlite3"
+ "golang.org/x/crypto/ssh"
+ "gopkg.in/yaml.v3"
+)
+
+var (
+ projectName = "acid"
+ projectVersion = "?"
+
+ gConfig Config = Config{Listen: ":http"}
+ gNotifyScript *ttemplate.Template
+ gDB *sql.DB
+
+ gNotifierSignal = make(chan struct{}, 1)
+ gExecutorSignal = make(chan struct{}, 1)
+
+ // The mutex is at least supposed to lock over the tasks as well.
+ gRunningMutex sync.Mutex
+ gRunning = make(map[int64]*RunningTask)
+)
+
+// --- Config ------------------------------------------------------------------
+
+type Config struct {
+ DB string `yaml:"db"` // database file path
+ Listen string `yaml:"listen"` // HTTP listener address
+ Root string `yaml:"root"` // HTTP root URI
+ Gitea string `yaml:"gitea"` // Gitea base URL
+ Secret string `yaml:"secret"` // Gitea hook secret
+ Token string `yaml:"token"` // Gitea API token
+ Notify string `yaml:"notify"` // notifier script
+
+ Runners map[string]ConfigRunner `yaml:"runners"` // script runners
+ Projects map[string]ConfigProject `yaml:"projects"` // configured projects
+}
+
+type ConfigRunner struct {
+ Name string `yaml:"name"` // descriptive name
+ Run string `yaml:"run"` // runner executable
+ Setup string `yaml:"setup"` // runner setup script (SSH)
+ SSH struct {
+ User string `yaml:"user"` // remote username
+ Address string `yaml:"address"` // TCP host:port
+ Identity string `yaml:"identity"` // private key path
+ } `yaml:"ssh"` // shell access
+}
+
+type ConfigProject struct {
+ Runners map[string]ConfigProjectRunner `yaml:"runners"`
+}
+
+type ConfigProjectRunner struct {
+ Setup string `yaml:"setup"` // project setup script (SSH)
+ Build string `yaml:"build"` // project build script (SSH)
+}
+
+func parseConfig(path string) error {
+ if f, err := os.Open(path); err != nil {
+ return err
+ } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil {
+ return err
+ }
+
+ var err error
+ gNotifyScript, err = ttemplate.New("notify").Parse(gConfig.Notify)
+ return err
+}
+
+// --- Task views --------------------------------------------------------------
+
+func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
+ rows, err := gDB.QueryContext(ctx, `
+ SELECT id, owner, repo, hash, runner,
+ state, detail, notified, runlog, tasklog FROM task `+query, args...)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ tasks := []Task{}
+ for rows.Next() {
+ var t Task
+ err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
+ &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog)
+ if err != nil {
+ return nil, err
+ }
+ tasks = append(tasks, t)
+ }
+ return tasks, rows.Err()
+}
+
+var templateTasks = template.Must(template.New("tasks").Parse(`
+<!DOCTYPE html>
+<html>
+<head>
+<meta charset="utf-8">
+<title>Tasks</title>
+</head>
+<body>
+<h1>Tasks</h1>
+<table border="1">
+<thead>
+ <tr>
+ <th>ID</th>
+ <th>Repository</th>
+ <th>Hash</th>
+ <th>Runner</th>
+ <th>State</th>
+ <th>Detail</th>
+ <th>Notified</th>
+ </tr>
+</thead>
+<tbody>
+{{range .}}
+ <tr>
+ <td><a href="task/{{.ID}}">{{.ID}}</a></td>
+ <td><a href="{{.RepoURL}}">{{.FullName}}</a></td>
+ <td><a href="{{.CommitURL}}">{{.Hash}}</a></td>
+ <td>{{.RunnerName}}</td>
+ <td>{{.State}}</td>
+ <td>{{.Detail}}</td>
+ <td>{{.Notified}}</td>
+ </tr>
+{{end}}
+</tbody>
+</table>
+</body>
+</html>
+`))
+
+func handleTasks(w http.ResponseWriter, r *http.Request) {
+ tasks, err := getTasks(r.Context(), `ORDER BY id DESC`)
+ if err != nil {
+ http.Error(w,
+ "Error retrieving tasks: "+err.Error(),
+ http.StatusInternalServerError)
+ return
+ }
+
+ if err := templateTasks.Execute(w, tasks); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+}
+
+var templateTask = template.Must(template.New("tasks").Parse(`
+<!DOCTYPE html>
+<html>
+<head>
+<title>Task {{.ID}}</title>
+<meta charset="utf-8">
+{{if .IsRunning}}
+<meta http-equiv="refresh" content="5">
+{{end}}
+</head>
+<body>
+<h1><a href="/">Tasks</a> &raquo; {{.ID}}</h1>
+<dl>
+<dt>Project</dt>
+ <dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
+<dt>Commit</dt>
+ <dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
+<dt>Runner</dt>
+ <dd>{{.RunnerName}}</dd>
+<dt>State</dt>
+ <dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
+<dt>Notified</dt>
+ <dd>{{.Notified}}</dd>
+</dl>
+{{if .RunLog}}
+<h2>Runner log</h2>
+<pre>{{printf "%s" .RunLog}}</pre>
+{{end}}
+{{if .TaskLog}}
+<h2>Task log</h2>
+<pre>{{printf "%s" .TaskLog}}</pre>
+{{end}}
+</table>
+</body>
+</html>
+`))
+
+func handleTask(w http.ResponseWriter, r *http.Request) {
+ id, err := strconv.Atoi(r.PathValue("id"))
+ if err != nil {
+ http.Error(w,
+ "Invalid ID: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ tasks, err := getTasks(r.Context(), `WHERE id = ?`, id)
+ if err != nil {
+ http.Error(w,
+ "Error retrieving task: "+err.Error(),
+ http.StatusInternalServerError)
+ return
+ }
+ if len(tasks) == 0 {
+ http.NotFound(w, r)
+ return
+ }
+
+ task := struct {
+ Task
+ IsRunning bool
+ }{Task: tasks[0]}
+ func() {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ rt, ok := gRunning[task.ID]
+ task.IsRunning = ok
+ if !ok {
+ return
+ }
+
+ rt.RunLog.mu.Lock()
+ defer rt.RunLog.mu.Unlock()
+ rt.TaskLog.mu.Lock()
+ defer rt.TaskLog.mu.Unlock()
+
+ task.RunLog = rt.RunLog.b
+ task.TaskLog = rt.TaskLog.b
+ }()
+
+ if err := templateTask.Execute(w, &task); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+}
+
+// --- Push hook ---------------------------------------------------------------
+
+type GiteaPushEvent struct {
+ HeadCommit struct {
+ ID string `json:"id"`
+ } `json:"head_commit"`
+ Repository struct {
+ Name string `json:"name"`
+ FullName string `json:"full_name"`
+ Owner struct {
+ Username string `json:"username"`
+ } `json:"owner"`
+ } `json:"repository"`
+}
+
+func createTasks(ctx context.Context,
+ owner, repo, hash string, runners []string) error {
+ tx, err := gDB.BeginTx(ctx, nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner)
+ VALUES (?, ?, ?, ?)`)
+ if err != nil {
+ return err
+ }
+
+ for _, runner := range runners {
+ if _, err := stmt.Exec(owner, repo, hash, runner); err != nil {
+ return err
+ }
+ }
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+
+ notifierAwaken()
+ executorAwaken()
+ return nil
+}
+
+func giteaSign(b []byte) string {
+ payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
+ payloadHmac.Write(b)
+ return hex.EncodeToString(payloadHmac.Sum(nil))
+}
+
+func handlePush(w http.ResponseWriter, r *http.Request) {
+ // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
+ if r.Header.Get("X-Gitea-Event") != "push" {
+ http.Error(w,
+ "Expected a Gitea push event", http.StatusBadRequest)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w,
+ "Error reading request body", http.StatusInternalServerError)
+ return
+ }
+ if r.Header.Get("X-Gitea-Signature") != giteaSign(body) {
+ http.Error(w,
+ "Signature mismatch", http.StatusBadRequest)
+ return
+ }
+
+ var event GiteaPushEvent
+ if err := json.Unmarshal(body, &event); err != nil {
+ http.Error(w,
+ "Invalid request body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ log.Printf("received push: %s %s\n",
+ event.Repository.FullName, event.HeadCommit.ID)
+
+ project, ok := gConfig.Projects[event.Repository.FullName]
+ if !ok {
+ // This is okay, don't set any commit statuses.
+ fmt.Fprintf(w, "The project is not configured.")
+ return
+ }
+
+ runners := []string{}
+ for name := range project.Runners {
+ runners = append(runners, name)
+ }
+ sort.Strings(runners)
+
+ if err := createTasks(r.Context(),
+ event.Repository.Owner.Username, event.Repository.Name,
+ event.HeadCommit.ID, runners); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+// --- RPC ---------------------------------------------------------------------
+
+const rpcHeaderSignature = "X-ACID-Signature"
+
+func rpcRestart(w io.Writer, ids []int64) {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ for _, id := range ids {
+ if _, ok := gRunning[id]; ok {
+ fmt.Fprintf(w, "%d: not restarting running tasks\n", id)
+ continue
+ }
+
+ // The executor bumps to "running" after inserting into gRunning,
+ // so we should not need to exclude that state here.
+ result, err := gDB.ExecContext(context.Background(),
+ `UPDATE task SET state = ? WHERE id = ?`, taskStateNew, id)
+ if err != nil {
+ fmt.Fprintf(w, "%d: %s\n", id, err)
+ } else if n, _ := result.RowsAffected(); n != 1 {
+ fmt.Fprintf(w, "%d: no such task\n", id)
+ }
+ }
+ executorAwaken()
+}
+
+func handleRPC(w http.ResponseWriter, r *http.Request) {
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w,
+ "Error reading request body", http.StatusInternalServerError)
+ return
+ }
+ if r.Header.Get(rpcHeaderSignature) != giteaSign(body) {
+ http.Error(w,
+ "Signature mismatch", http.StatusBadRequest)
+ return
+ }
+
+ var args []string
+ if err := json.Unmarshal(body, &args); err != nil {
+ http.Error(w,
+ "Invalid request body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+ if len(args) == 0 {
+ http.Error(w, "Missing command", http.StatusBadRequest)
+ return
+ }
+
+ command, args := args[0], args[1:]
+ switch command {
+ case "restart":
+ ids := []int64{}
+ for _, arg := range args {
+ id, err := strconv.ParseInt(arg, 10, 64)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ ids = append(ids, id)
+ }
+ rpcRestart(w, ids)
+ default:
+ http.Error(w, "Unknown command: "+command, http.StatusBadRequest)
+ }
+}
+
+// --- Notifier ----------------------------------------------------------------
+
+func notifierRunCommand(ctx context.Context, task Task) {
+ script := bytes.NewBuffer(nil)
+ if err := gNotifyScript.Execute(script, &task); err != nil {
+ log.Printf("error: notify: %s", err)
+ return
+ }
+
+ cmd := exec.CommandContext(ctx, "sh")
+ cmd.Stdin = script
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if err := cmd.Run(); err != nil {
+ log.Printf("error: notify: %s", err)
+ }
+}
+
+func notifierNotify(ctx context.Context, task Task) error {
+ // Loosely assuming that this only runs on state changes.
+ if task.State != taskStateNew && task.State != taskStateRunning {
+ go notifierRunCommand(ctx, task)
+ }
+
+ payload := struct {
+ Context string `json:"context"`
+ Description string `json:"description"`
+ State string `json:"state"`
+ TargetURL string `json:"target_url"`
+ }{}
+
+ runner, ok := gConfig.Runners[task.Runner]
+ if !ok {
+ log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner)
+ return nil
+ }
+ payload.Context = runner.Name
+ payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID)
+
+ switch task.State {
+ case taskStateNew:
+ payload.State, payload.Description = "pending", "Pending"
+ case taskStateRunning:
+ payload.State, payload.Description = "pending", "Running"
+ case taskStateError:
+ payload.State, payload.Description = "error", "Error"
+ case taskStateFailed:
+ payload.State, payload.Description = "failure", "Failure"
+ case taskStateSuccess:
+ payload.State, payload.Description = "success", "Success"
+ default:
+ log.Printf("task %d is in unknown state %d\n", task.ID, task.State)
+ return nil
+ }
+
+ // We should only fill this in case we have some specific information.
+ if task.Detail != "" {
+ payload.Description = task.Detail
+ }
+
+ body, err := json.Marshal(payload)
+ if err != nil {
+ return err
+ }
+
+ log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n",
+ task.ID, task.FullName(), task.Hash,
+ payload.Context, payload.State, payload.Description)
+
+ uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s",
+ gConfig.Gitea, task.Owner, task.Repo, task.Hash)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri,
+ bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+
+ req.Header.Set("Accept", "application/json")
+ req.Header.Set("Authorization", "token "+gConfig.Token)
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ body, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+
+ _, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1
+ WHERE id = ? AND state = ? AND detail = ? AND notified = 0`,
+ task.ID, task.State, task.Detail)
+ return err
+}
+
+func notifierRun(ctx context.Context) error {
+ tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tasks {
+ if err := notifierNotify(ctx, task); err != nil {
+ return fmt.Errorf(
+ "task %d for %s: %w", task.ID, task.FullName(), err)
+ }
+ }
+ return nil
+}
+
+func notifier(ctx context.Context) {
+ for {
+ select {
+ case <-gNotifierSignal:
+ case <-ctx.Done():
+ return
+ }
+
+ if err := notifierRun(ctx); err != nil {
+ log.Printf("error: notifier: %s\n", err)
+ }
+ }
+}
+
+func notifierAwaken() {
+ select {
+ case gNotifierSignal <- struct{}{}:
+ default:
+ }
+}
+
+// --- Executor ----------------------------------------------------------------
+
+type terminalWriter struct {
+ b []byte
+ cur int
+ mu sync.Mutex
+}
+
+func (tw *terminalWriter) Write(p []byte) (written int, err error) {
+ tw.mu.Lock()
+ defer tw.mu.Unlock()
+
+ // Extremely rudimentary emulation of a dumb terminal.
+ for _, b := range p {
+ // Enough is enough, writing too much is highly suspicious.
+ if len(tw.b) > 64<<20 {
+ return written, errors.New("too much terminal output")
+ }
+
+ switch b {
+ case '\b':
+ if tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
+ tw.cur--
+ }
+ case '\r':
+ for tw.cur > 0 && tw.b[tw.cur-1] != '\n' {
+ tw.cur--
+ }
+ case '\n':
+ tw.b = append(tw.b, b)
+ tw.cur = len(tw.b)
+ default:
+ tw.b = append(tw.b[:tw.cur], b)
+ tw.cur = len(tw.b)
+ }
+
+ if err != nil {
+ break
+ }
+ written += 1
+ }
+ return
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+type RunningTask struct {
+ DB Task
+ Runner ConfigRunner
+ ProjectRunner ConfigProjectRunner
+
+ RunLog terminalWriter
+ TaskLog terminalWriter
+}
+
+func executorUpdate(rt *RunningTask) error {
+ rt.RunLog.mu.Lock()
+ defer rt.RunLog.mu.Unlock()
+ rt.DB.RunLog = bytes.Clone(rt.RunLog.b)
+ if rt.DB.RunLog == nil {
+ rt.DB.RunLog = []byte{}
+ }
+
+ rt.TaskLog.mu.Lock()
+ defer rt.TaskLog.mu.Unlock()
+ rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b)
+ if rt.DB.TaskLog == nil {
+ rt.DB.TaskLog = []byte{}
+ }
+
+ _, err := gDB.ExecContext(context.Background(), `UPDATE task
+ SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ?
+ WHERE id = ?`,
+ rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog,
+ rt.DB.ID)
+ if err == nil {
+ notifierAwaken()
+ }
+ return err
+}
+
+func executorConnect(
+ ctx context.Context, config *ssh.ClientConfig, address string) (
+ *ssh.Client, error) {
+ deadline := time.Now().Add(3 * time.Minute)
+ ctxDeadlined, cancel := context.WithDeadline(ctx, deadline)
+ defer cancel()
+
+ var d net.Dialer
+ for {
+ // net.DNSError eats the cause, as in it cannot be unwrapped
+ // and tested for a particular subtype.
+ conn, err := d.DialContext(ctxDeadlined, "tcp", address)
+ if e := ctxDeadlined.Err(); e != nil {
+ // This may provide a little bit more information.
+ if err != nil {
+ return nil, err
+ }
+ return nil, e
+ }
+ if err != nil {
+ time.Sleep(1 * time.Second)
+ continue
+ }
+
+ // We ignore the parent context, but at least we try.
+ conn.SetDeadline(deadline)
+ sc, chans, reqs, err := ssh.NewClientConn(conn, address, config)
+ conn.SetDeadline(time.Time{})
+
+ // cloud-init-enabled machines, such as OpenBSD,
+ // may have a race condition between sshd starting for the first time,
+ // and having a configured user.
+ //
+ // Authentication may therefore regularly fail,
+ // and we need to ignore all errors whatsoever,
+ // not just spurious partial successes resulting in RST or FIN.
+ var neterr net.Error
+ if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil {
+ time.Sleep(1 * time.Second)
+ continue
+ }
+
+ return ssh.NewClient(sc, chans, reqs), nil
+ }
+}
+
+func executorRunTask(ctx context.Context, task Task) error {
+ rt := &RunningTask{DB: task}
+
+ var ok bool
+ rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
+ if !ok {
+ return fmt.Errorf("unknown runner: %s", rt.DB.Runner)
+ }
+ project, ok := gConfig.Projects[rt.DB.FullName()]
+ if !ok {
+ return fmt.Errorf("project configuration not found")
+ }
+ rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
+ if !ok {
+ return fmt.Errorf(
+ "project not configured for runner %s", rt.DB.Runner)
+ }
+
+ wd, err := os.Getwd()
+ if err != nil {
+ return err
+ }
+
+ // The runner setup script may change the working directory,
+ // so do everything in one go. However, this approach also makes it
+ // difficult to distinguish project-independent runner failures.
+ // (For that, we can start multiple ssh.Sessions.)
+ //
+ // We could pass variables through SSH environment variables,
+ // which would require enabling PermitUserEnvironment in sshd_config,
+ // or through prepending script lines, but templates are a bit simpler.
+ //
+ // We let the runner itself clone the repository:
+ // - it is a more flexible in that it can test AUR packages more normally,
+ // - we might have to clone submodules as well.
+ // Otherwise, we could download a source archive from Gitea,
+ // and use SFTP to upload it to the runner.
+ tmplScript, err := ttemplate.New("script").Parse(rt.Runner.Setup + "\n" +
+ rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ if err != nil {
+ return fmt.Errorf("script: %w", err)
+ }
+
+ privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
+ if err != nil {
+ return fmt.Errorf(
+ "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+ signer, err := ssh.ParsePrivateKey(privateKey)
+ if err != nil {
+ return fmt.Errorf(
+ "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+
+ defer func() {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ delete(gRunning, rt.DB.ID)
+ }()
+ func() {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ rt.DB.State, rt.DB.Detail = taskStateRunning, ""
+ rt.DB.Notified = 0
+ rt.DB.RunLog = []byte{}
+ rt.DB.TaskLog = []byte{}
+ gRunning[rt.DB.ID] = rt
+ }()
+ if err := executorUpdate(rt); err != nil {
+ return fmt.Errorf("SQL: %w", err)
+ }
+
+ // Errors happening while trying to write an error are unfortunate,
+ // but not important enough to abort entirely.
+ setError := func(detail string) {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ rt.DB.State, rt.DB.Detail = taskStateError, detail
+ if err := executorUpdate(rt); err != nil {
+ log.Printf("error: task %d for %s: SQL: %s",
+ rt.DB.ID, rt.DB.FullName(), err)
+ }
+ }
+
+ script := bytes.NewBuffer(nil)
+ if err := tmplScript.Execute(script, &rt.DB); err != nil {
+ setError("Script template failed")
+ return err
+ }
+
+ cmd := exec.CommandContext(ctx, rt.Runner.Run)
+ cmd.Env = append(
+ os.Environ(),
+ "ACID_ROOT="+wd,
+ "ACID_RUNNER="+rt.DB.Runner,
+ )
+
+ log.Printf("task %d for %s: starting %s\n",
+ rt.DB.ID, rt.DB.FullName(), rt.Runner.Name)
+
+ cmd.Stdout = &rt.RunLog
+ cmd.Stderr = &rt.RunLog
+ if err := cmd.Start(); err != nil {
+ setError("Runner failed to start")
+ return err
+ }
+
+ ctxRunner, cancelRunner := context.WithCancelCause(ctx)
+ defer cancelRunner(context.Canceled)
+ go func() {
+ if err := cmd.Wait(); err != nil {
+ cancelRunner(err)
+ } else {
+ cancelRunner(errors.New("runner exited successfully but early"))
+ }
+ }()
+ defer func() {
+ _ = cmd.Process.Signal(os.Interrupt)
+ select {
+ case <-ctxRunner.Done():
+ // This doesn't leave the runner almost any time on our shutdown,
+ // but whatever--they're supposed to be ephemeral.
+ // Moreover, we don't even override cmd.CancelFunc.
+ case <-time.After(5 * time.Second):
+ }
+ _ = cmd.Process.Kill()
+ }()
+
+ client, err := executorConnect(ctxRunner, &ssh.ClientConfig{
+ User: rt.Runner.SSH.User,
+ Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+ }, rt.Runner.SSH.Address)
+ if err != nil {
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ setError("SSH failure")
+ return err
+ }
+ defer client.Close()
+
+ session, err := client.NewSession()
+ if err != nil {
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ setError("SSH failure")
+ return err
+ }
+ defer session.Close()
+
+ modes := ssh.TerminalModes{ssh.ECHO: 0}
+ if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ setError("SSH failure")
+ return err
+ }
+
+ log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
+
+ session.Stdout = &rt.TaskLog
+ session.Stderr = &rt.TaskLog
+
+ // Although passing the script directly takes away the option to specify
+ // a particular shell (barring here-documents!), it is simple and reliable.
+ //
+ // Passing the script over Stdin to sh tended to end up with commands
+ // eating the script during processing, and resulted in a hang,
+ // because closing the Stdin does not result in remote processes
+ // getting a stream of EOF.
+ //
+ // Piping the script into `cat | sh` while appending a ^D to the end of it
+ // appeared to work, but it seems likely that commands might still steal
+ // script bytes from the cat program if they choose to read from the tty
+ // and the script is longer than the buffer.
+ chSession := make(chan error, 1)
+ go func() {
+ chSession <- session.Run(script.String())
+ close(chSession)
+ }()
+
+ select {
+ case <-ctxRunner.Done():
+ // Either shutdown, or early runner termination.
+ // The runner is not supposed to finish before the session.
+ err = context.Cause(ctxRunner)
+ case err = <-chSession:
+ // Killing a runner may perfectly well trigger this first,
+ // in particular when it's on the same machine.
+ }
+
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ var ee *ssh.ExitError
+ if err == nil {
+ rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
+ } else if errors.As(err, &ee) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ }
+ return executorUpdate(rt)
+}
+
+func executorRun(ctx context.Context) error {
+ tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`,
+ taskStateNew, taskStateRunning)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tasks {
+ if err := executorRunTask(ctx, task); err != nil {
+ return fmt.Errorf("task %d for %s: %w",
+ task.ID, task.FullName(), err)
+ }
+ }
+ return nil
+}
+
+func executor(ctx context.Context) {
+ for {
+ select {
+ case <-gExecutorSignal:
+ case <-ctx.Done():
+ return
+ }
+
+ if err := executorRun(ctx); err != nil {
+ log.Printf("error: executor: %s\n", err)
+ }
+ }
+}
+
+func executorAwaken() {
+ select {
+ case gExecutorSignal <- struct{}{}:
+ default:
+ }
+}
+
+// --- Main --------------------------------------------------------------------
+
+type taskState int64
+
+const (
+ taskStateNew taskState = iota // → · pending (queued)
+ taskStateRunning // → · pending (running)
+ taskStateError // → ! error (internal issue)
+ taskStateFailed // → × failure (runner issue)
+ taskStateSuccess // → ✓ success (runner finished)
+)
+
+func (ts taskState) String() string {
+ switch ts {
+ case taskStateNew:
+ return "New"
+ case taskStateRunning:
+ return "Running"
+ case taskStateError:
+ return "Error"
+ case taskStateFailed:
+ return "Failed"
+ case taskStateSuccess:
+ return "Success"
+ default:
+ return fmt.Sprintf("%d", ts)
+ }
+}
+
+// Task mirrors SQL task table records, adding a few convenience methods.
+type Task struct {
+ ID int64
+
+ Owner string
+ Repo string
+ Hash string
+ Runner string
+
+ State taskState
+ Detail string
+ Notified int64
+ RunLog []byte
+ TaskLog []byte
+}
+
+func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
+
+func (t *Task) RunnerName() string {
+ if runner, ok := gConfig.Runners[t.Runner]; !ok {
+ return t.Runner
+ } else {
+ return runner.Name
+ }
+}
+
+func (t *Task) URL() string {
+ return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID)
+}
+
+func (t *Task) RepoURL() string {
+ return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo)
+}
+
+func (t *Task) CommitURL() string {
+ return fmt.Sprintf("%s/%s/%s/commit/%s",
+ gConfig.Gitea, t.Owner, t.Repo, t.Hash)
+}
+
+func (t *Task) CloneURL() string {
+ return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo)
+}
+
+const schemaSQL = `
+CREATE TABLE IF NOT EXISTS task(
+ id INTEGER NOT NULL, -- unique ID
+
+ owner TEXT NOT NULL, -- Gitea username
+ repo TEXT NOT NULL, -- Gitea repository name
+ hash TEXT NOT NULL, -- commit hash
+ runner TEXT NOT NULL, -- the runner to use
+
+ state INTEGER NOT NULL DEFAULT 0, -- task state
+ detail TEXT NOT NULL DEFAULT '', -- task state detail
+ notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
+ runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
+ tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+
+ PRIMARY KEY (id)
+) STRICT;
+`
+
+func openDB(path string) error {
+ var err error
+ gDB, err = sql.Open("sqlite3",
+ "file:"+path+"?_foreign_keys=1&_busy_timeout=1000")
+ if err != nil {
+ return err
+ }
+
+ _, err = gDB.Exec(schemaSQL)
+ return err
+}
+
+// callRPC forwards command line commands to a running server.
+func callRPC(args []string) error {
+ body, err := json.Marshal(args)
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest(http.MethodPost,
+ fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+
+ req.Header.Set(rpcHeaderSignature, giteaSign(body))
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if _, err = io.Copy(os.Stdout, resp.Body); err != nil {
+ return err
+ }
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ os.Exit(1)
+ }
+ return nil
+}
+
+func main() {
+ version := flag.Bool("version", false, "show version and exit")
+
+ flag.Usage = func() {
+ f := flag.CommandLine.Output()
+ fmt.Fprintf(f,
+ "Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0])
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+ if flag.NArg() < 1 {
+ flag.Usage()
+ os.Exit(2)
+ }
+
+ if *version {
+ fmt.Printf("%s %s\n", projectName, projectVersion)
+ return
+ }
+
+ if err := parseConfig(flag.Arg(0)); err != nil {
+ log.Fatalln(err)
+ }
+ if flag.NArg() > 1 {
+ if err := callRPC(flag.Args()[1:]); err != nil {
+ log.Fatalln(err)
+ }
+ return
+ }
+
+ if err := openDB(gConfig.DB); err != nil {
+ log.Fatalln(err)
+ }
+ defer gDB.Close()
+
+ var wg sync.WaitGroup
+ ctx, stop := signal.NotifyContext(
+ context.Background(), syscall.SIGINT, syscall.SIGTERM)
+
+ server := &http.Server{Addr: gConfig.Listen}
+ http.HandleFunc("/{$}", handleTasks)
+ http.HandleFunc("/task/{id}", handleTask)
+ http.HandleFunc("/push", handlePush)
+ http.HandleFunc("/rpc", handleRPC)
+
+ ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr)
+ if err != nil {
+ log.Fatalln(err)
+ }
+
+ notifierAwaken()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ notifier(ctx)
+ }()
+
+ executorAwaken()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ executor(ctx)
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer stop()
+ if err := server.Serve(ln); err != http.ErrServerClosed {
+ log.Println(err)
+ }
+ }()
+
+ // Wait until we either receive a signal, or get a server failure.
+ <-ctx.Done()
+ log.Println("shutting down")
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := server.Shutdown(context.Background()); err != nil {
+ log.Println(err)
+ }
+ }()
+
+ // Repeated signal deliveries during shutdown assume default behaviour.
+ // This might or might not be desirable.
+ stop()
+ wg.Wait()
+}