diff options
Diffstat (limited to 'acid.go')
-rw-r--r-- | acid.go | 1157 |
1 files changed, 1157 insertions, 0 deletions
@@ -0,0 +1,1157 @@ +package main + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "flag" + "fmt" + "html/template" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "sort" + "strconv" + "sync" + "syscall" + ttemplate "text/template" + "time" + + _ "github.com/mattn/go-sqlite3" + "golang.org/x/crypto/ssh" + "gopkg.in/yaml.v3" +) + +var ( + projectName = "acid" + projectVersion = "?" + + gConfig Config = Config{Listen: ":http"} + gNotifyScript *ttemplate.Template + gDB *sql.DB + + gNotifierSignal = make(chan struct{}, 1) + gExecutorSignal = make(chan struct{}, 1) + + // The mutex is at least supposed to lock over the tasks as well. + gRunningMutex sync.Mutex + gRunning = make(map[int64]*RunningTask) +) + +// --- Config ------------------------------------------------------------------ + +type Config struct { + DB string `yaml:"db"` // database file path + Listen string `yaml:"listen"` // HTTP listener address + Root string `yaml:"root"` // HTTP root URI + Gitea string `yaml:"gitea"` // Gitea base URL + Secret string `yaml:"secret"` // Gitea hook secret + Token string `yaml:"token"` // Gitea API token + Notify string `yaml:"notify"` // notifier script + + Runners map[string]ConfigRunner `yaml:"runners"` // script runners + Projects map[string]ConfigProject `yaml:"projects"` // configured projects +} + +type ConfigRunner struct { + Name string `yaml:"name"` // descriptive name + Run string `yaml:"run"` // runner executable + Setup string `yaml:"setup"` // runner setup script (SSH) + SSH struct { + User string `yaml:"user"` // remote username + Address string `yaml:"address"` // TCP host:port + Identity string `yaml:"identity"` // private key path + } `yaml:"ssh"` // shell access +} + +type ConfigProject struct { + Runners map[string]ConfigProjectRunner `yaml:"runners"` +} + +type ConfigProjectRunner struct { + Setup string `yaml:"setup"` // project setup script (SSH) + Build string `yaml:"build"` // project build script (SSH) +} + +func parseConfig(path string) error { + if f, err := os.Open(path); err != nil { + return err + } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil { + return err + } + + var err error + gNotifyScript, err = ttemplate.New("notify").Parse(gConfig.Notify) + return err +} + +// --- Task views -------------------------------------------------------------- + +func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { + rows, err := gDB.QueryContext(ctx, ` + SELECT id, owner, repo, hash, runner, + state, detail, notified, runlog, tasklog FROM task `+query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + tasks := []Task{} + for rows.Next() { + var t Task + err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, + &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog) + if err != nil { + return nil, err + } + tasks = append(tasks, t) + } + return tasks, rows.Err() +} + +var templateTasks = template.Must(template.New("tasks").Parse(` +<!DOCTYPE html> +<html> +<head> +<meta charset="utf-8"> +<title>Tasks</title> +</head> +<body> +<h1>Tasks</h1> +<table border="1"> +<thead> + <tr> + <th>ID</th> + <th>Repository</th> + <th>Hash</th> + <th>Runner</th> + <th>State</th> + <th>Detail</th> + <th>Notified</th> + </tr> +</thead> +<tbody> +{{range .}} + <tr> + <td><a href="task/{{.ID}}">{{.ID}}</a></td> + <td><a href="{{.RepoURL}}">{{.FullName}}</a></td> + <td><a href="{{.CommitURL}}">{{.Hash}}</a></td> + <td>{{.RunnerName}}</td> + <td>{{.State}}</td> + <td>{{.Detail}}</td> + <td>{{.Notified}}</td> + </tr> +{{end}} +</tbody> +</table> +</body> +</html> +`)) + +func handleTasks(w http.ResponseWriter, r *http.Request) { + tasks, err := getTasks(r.Context(), `ORDER BY id DESC`) + if err != nil { + http.Error(w, + "Error retrieving tasks: "+err.Error(), + http.StatusInternalServerError) + return + } + + if err := templateTasks.Execute(w, tasks); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +var templateTask = template.Must(template.New("tasks").Parse(` +<!DOCTYPE html> +<html> +<head> +<title>Task {{.ID}}</title> +<meta charset="utf-8"> +{{if .IsRunning}} +<meta http-equiv="refresh" content="5"> +{{end}} +</head> +<body> +<h1><a href="/">Tasks</a> » {{.ID}}</h1> +<dl> +<dt>Project</dt> + <dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd> +<dt>Commit</dt> + <dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd> +<dt>Runner</dt> + <dd>{{.RunnerName}}</dd> +<dt>State</dt> + <dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd> +<dt>Notified</dt> + <dd>{{.Notified}}</dd> +</dl> +{{if .RunLog}} +<h2>Runner log</h2> +<pre>{{printf "%s" .RunLog}}</pre> +{{end}} +{{if .TaskLog}} +<h2>Task log</h2> +<pre>{{printf "%s" .TaskLog}}</pre> +{{end}} +</table> +</body> +</html> +`)) + +func handleTask(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(r.PathValue("id")) + if err != nil { + http.Error(w, + "Invalid ID: "+err.Error(), http.StatusBadRequest) + return + } + + tasks, err := getTasks(r.Context(), `WHERE id = ?`, id) + if err != nil { + http.Error(w, + "Error retrieving task: "+err.Error(), + http.StatusInternalServerError) + return + } + if len(tasks) == 0 { + http.NotFound(w, r) + return + } + + task := struct { + Task + IsRunning bool + }{Task: tasks[0]} + func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt, ok := gRunning[task.ID] + task.IsRunning = ok + if !ok { + return + } + + rt.RunLog.mu.Lock() + defer rt.RunLog.mu.Unlock() + rt.TaskLog.mu.Lock() + defer rt.TaskLog.mu.Unlock() + + task.RunLog = rt.RunLog.b + task.TaskLog = rt.TaskLog.b + }() + + if err := templateTask.Execute(w, &task); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// --- Push hook --------------------------------------------------------------- + +type GiteaPushEvent struct { + HeadCommit struct { + ID string `json:"id"` + } `json:"head_commit"` + Repository struct { + Name string `json:"name"` + FullName string `json:"full_name"` + Owner struct { + Username string `json:"username"` + } `json:"owner"` + } `json:"repository"` +} + +func createTasks(ctx context.Context, + owner, repo, hash string, runners []string) error { + tx, err := gDB.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner) + VALUES (?, ?, ?, ?)`) + if err != nil { + return err + } + + for _, runner := range runners { + if _, err := stmt.Exec(owner, repo, hash, runner); err != nil { + return err + } + } + if err := tx.Commit(); err != nil { + return err + } + + notifierAwaken() + executorAwaken() + return nil +} + +func giteaSign(b []byte) string { + payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) + payloadHmac.Write(b) + return hex.EncodeToString(payloadHmac.Sum(nil)) +} + +func handlePush(w http.ResponseWriter, r *http.Request) { + // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. + if r.Header.Get("X-Gitea-Event") != "push" { + http.Error(w, + "Expected a Gitea push event", http.StatusBadRequest) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, + "Error reading request body", http.StatusInternalServerError) + return + } + if r.Header.Get("X-Gitea-Signature") != giteaSign(body) { + http.Error(w, + "Signature mismatch", http.StatusBadRequest) + return + } + + var event GiteaPushEvent + if err := json.Unmarshal(body, &event); err != nil { + http.Error(w, + "Invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + + log.Printf("received push: %s %s\n", + event.Repository.FullName, event.HeadCommit.ID) + + project, ok := gConfig.Projects[event.Repository.FullName] + if !ok { + // This is okay, don't set any commit statuses. + fmt.Fprintf(w, "The project is not configured.") + return + } + + runners := []string{} + for name := range project.Runners { + runners = append(runners, name) + } + sort.Strings(runners) + + if err := createTasks(r.Context(), + event.Repository.Owner.Username, event.Repository.Name, + event.HeadCommit.ID, runners); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// --- RPC --------------------------------------------------------------------- + +const rpcHeaderSignature = "X-ACID-Signature" + +func rpcRestart(w io.Writer, ids []int64) { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + for _, id := range ids { + if _, ok := gRunning[id]; ok { + fmt.Fprintf(w, "%d: not restarting running tasks\n", id) + continue + } + + // The executor bumps to "running" after inserting into gRunning, + // so we should not need to exclude that state here. + result, err := gDB.ExecContext(context.Background(), + `UPDATE task SET state = ? WHERE id = ?`, taskStateNew, id) + if err != nil { + fmt.Fprintf(w, "%d: %s\n", id, err) + } else if n, _ := result.RowsAffected(); n != 1 { + fmt.Fprintf(w, "%d: no such task\n", id) + } + } + executorAwaken() +} + +func handleRPC(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, + "Error reading request body", http.StatusInternalServerError) + return + } + if r.Header.Get(rpcHeaderSignature) != giteaSign(body) { + http.Error(w, + "Signature mismatch", http.StatusBadRequest) + return + } + + var args []string + if err := json.Unmarshal(body, &args); err != nil { + http.Error(w, + "Invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + if len(args) == 0 { + http.Error(w, "Missing command", http.StatusBadRequest) + return + } + + command, args := args[0], args[1:] + switch command { + case "restart": + ids := []int64{} + for _, arg := range args { + id, err := strconv.ParseInt(arg, 10, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + ids = append(ids, id) + } + rpcRestart(w, ids) + default: + http.Error(w, "Unknown command: "+command, http.StatusBadRequest) + } +} + +// --- Notifier ---------------------------------------------------------------- + +func notifierRunCommand(ctx context.Context, task Task) { + script := bytes.NewBuffer(nil) + if err := gNotifyScript.Execute(script, &task); err != nil { + log.Printf("error: notify: %s", err) + return + } + + cmd := exec.CommandContext(ctx, "sh") + cmd.Stdin = script + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + log.Printf("error: notify: %s", err) + } +} + +func notifierNotify(ctx context.Context, task Task) error { + // Loosely assuming that this only runs on state changes. + if task.State != taskStateNew && task.State != taskStateRunning { + go notifierRunCommand(ctx, task) + } + + payload := struct { + Context string `json:"context"` + Description string `json:"description"` + State string `json:"state"` + TargetURL string `json:"target_url"` + }{} + + runner, ok := gConfig.Runners[task.Runner] + if !ok { + log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) + return nil + } + payload.Context = runner.Name + payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID) + + switch task.State { + case taskStateNew: + payload.State, payload.Description = "pending", "Pending" + case taskStateRunning: + payload.State, payload.Description = "pending", "Running" + case taskStateError: + payload.State, payload.Description = "error", "Error" + case taskStateFailed: + payload.State, payload.Description = "failure", "Failure" + case taskStateSuccess: + payload.State, payload.Description = "success", "Success" + default: + log.Printf("task %d is in unknown state %d\n", task.ID, task.State) + return nil + } + + // We should only fill this in case we have some specific information. + if task.Detail != "" { + payload.Description = task.Detail + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n", + task.ID, task.FullName(), task.Hash, + payload.Context, payload.State, payload.Description) + + uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", + gConfig.Gitea, task.Owner, task.Repo, task.Hash) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, + bytes.NewReader(body)) + if err != nil { + return err + } + + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "token "+gConfig.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + _, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1 + WHERE id = ? AND state = ? AND detail = ? AND notified = 0`, + task.ID, task.State, task.Detail) + return err +} + +func notifierRun(ctx context.Context) error { + tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`) + if err != nil { + return err + } + + for _, task := range tasks { + if err := notifierNotify(ctx, task); err != nil { + return fmt.Errorf( + "task %d for %s: %w", task.ID, task.FullName(), err) + } + } + return nil +} + +func notifier(ctx context.Context) { + for { + select { + case <-gNotifierSignal: + case <-ctx.Done(): + return + } + + if err := notifierRun(ctx); err != nil { + log.Printf("error: notifier: %s\n", err) + } + } +} + +func notifierAwaken() { + select { + case gNotifierSignal <- struct{}{}: + default: + } +} + +// --- Executor ---------------------------------------------------------------- + +type terminalWriter struct { + b []byte + cur int + mu sync.Mutex +} + +func (tw *terminalWriter) Write(p []byte) (written int, err error) { + tw.mu.Lock() + defer tw.mu.Unlock() + + // Extremely rudimentary emulation of a dumb terminal. + for _, b := range p { + // Enough is enough, writing too much is highly suspicious. + if len(tw.b) > 64<<20 { + return written, errors.New("too much terminal output") + } + + switch b { + case '\b': + if tw.cur > 0 && tw.b[tw.cur-1] != '\n' { + tw.cur-- + } + case '\r': + for tw.cur > 0 && tw.b[tw.cur-1] != '\n' { + tw.cur-- + } + case '\n': + tw.b = append(tw.b, b) + tw.cur = len(tw.b) + default: + tw.b = append(tw.b[:tw.cur], b) + tw.cur = len(tw.b) + } + + if err != nil { + break + } + written += 1 + } + return +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type RunningTask struct { + DB Task + Runner ConfigRunner + ProjectRunner ConfigProjectRunner + + RunLog terminalWriter + TaskLog terminalWriter +} + +func executorUpdate(rt *RunningTask) error { + rt.RunLog.mu.Lock() + defer rt.RunLog.mu.Unlock() + rt.DB.RunLog = bytes.Clone(rt.RunLog.b) + if rt.DB.RunLog == nil { + rt.DB.RunLog = []byte{} + } + + rt.TaskLog.mu.Lock() + defer rt.TaskLog.mu.Unlock() + rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b) + if rt.DB.TaskLog == nil { + rt.DB.TaskLog = []byte{} + } + + _, err := gDB.ExecContext(context.Background(), `UPDATE task + SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ? + WHERE id = ?`, + rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog, + rt.DB.ID) + if err == nil { + notifierAwaken() + } + return err +} + +func executorConnect( + ctx context.Context, config *ssh.ClientConfig, address string) ( + *ssh.Client, error) { + deadline := time.Now().Add(3 * time.Minute) + ctxDeadlined, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + var d net.Dialer + for { + // net.DNSError eats the cause, as in it cannot be unwrapped + // and tested for a particular subtype. + conn, err := d.DialContext(ctxDeadlined, "tcp", address) + if e := ctxDeadlined.Err(); e != nil { + // This may provide a little bit more information. + if err != nil { + return nil, err + } + return nil, e + } + if err != nil { + time.Sleep(1 * time.Second) + continue + } + + // We ignore the parent context, but at least we try. + conn.SetDeadline(deadline) + sc, chans, reqs, err := ssh.NewClientConn(conn, address, config) + conn.SetDeadline(time.Time{}) + + // cloud-init-enabled machines, such as OpenBSD, + // may have a race condition between sshd starting for the first time, + // and having a configured user. + // + // Authentication may therefore regularly fail, + // and we need to ignore all errors whatsoever, + // not just spurious partial successes resulting in RST or FIN. + var neterr net.Error + if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil { + time.Sleep(1 * time.Second) + continue + } + + return ssh.NewClient(sc, chans, reqs), nil + } +} + +func executorRunTask(ctx context.Context, task Task) error { + rt := &RunningTask{DB: task} + + var ok bool + rt.Runner, ok = gConfig.Runners[rt.DB.Runner] + if !ok { + return fmt.Errorf("unknown runner: %s", rt.DB.Runner) + } + project, ok := gConfig.Projects[rt.DB.FullName()] + if !ok { + return fmt.Errorf("project configuration not found") + } + rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] + if !ok { + return fmt.Errorf( + "project not configured for runner %s", rt.DB.Runner) + } + + wd, err := os.Getwd() + if err != nil { + return err + } + + // The runner setup script may change the working directory, + // so do everything in one go. However, this approach also makes it + // difficult to distinguish project-independent runner failures. + // (For that, we can start multiple ssh.Sessions.) + // + // We could pass variables through SSH environment variables, + // which would require enabling PermitUserEnvironment in sshd_config, + // or through prepending script lines, but templates are a bit simpler. + // + // We let the runner itself clone the repository: + // - it is a more flexible in that it can test AUR packages more normally, + // - we might have to clone submodules as well. + // Otherwise, we could download a source archive from Gitea, + // and use SFTP to upload it to the runner. + tmplScript, err := ttemplate.New("script").Parse(rt.Runner.Setup + "\n" + + rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) + if err != nil { + return fmt.Errorf("script: %w", err) + } + + privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) + if err != nil { + return fmt.Errorf( + "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) + } + signer, err := ssh.ParsePrivateKey(privateKey) + if err != nil { + return fmt.Errorf( + "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) + } + + defer func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + delete(gRunning, rt.DB.ID) + }() + func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt.DB.State, rt.DB.Detail = taskStateRunning, "" + rt.DB.Notified = 0 + rt.DB.RunLog = []byte{} + rt.DB.TaskLog = []byte{} + gRunning[rt.DB.ID] = rt + }() + if err := executorUpdate(rt); err != nil { + return fmt.Errorf("SQL: %w", err) + } + + // Errors happening while trying to write an error are unfortunate, + // but not important enough to abort entirely. + setError := func(detail string) { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt.DB.State, rt.DB.Detail = taskStateError, detail + if err := executorUpdate(rt); err != nil { + log.Printf("error: task %d for %s: SQL: %s", + rt.DB.ID, rt.DB.FullName(), err) + } + } + + script := bytes.NewBuffer(nil) + if err := tmplScript.Execute(script, &rt.DB); err != nil { + setError("Script template failed") + return err + } + + cmd := exec.CommandContext(ctx, rt.Runner.Run) + cmd.Env = append( + os.Environ(), + "ACID_ROOT="+wd, + "ACID_RUNNER="+rt.DB.Runner, + ) + + log.Printf("task %d for %s: starting %s\n", + rt.DB.ID, rt.DB.FullName(), rt.Runner.Name) + + cmd.Stdout = &rt.RunLog + cmd.Stderr = &rt.RunLog + if err := cmd.Start(); err != nil { + setError("Runner failed to start") + return err + } + + ctxRunner, cancelRunner := context.WithCancelCause(ctx) + defer cancelRunner(context.Canceled) + go func() { + if err := cmd.Wait(); err != nil { + cancelRunner(err) + } else { + cancelRunner(errors.New("runner exited successfully but early")) + } + }() + defer func() { + _ = cmd.Process.Signal(os.Interrupt) + select { + case <-ctxRunner.Done(): + // This doesn't leave the runner almost any time on our shutdown, + // but whatever--they're supposed to be ephemeral. + // Moreover, we don't even override cmd.CancelFunc. + case <-time.After(5 * time.Second): + } + _ = cmd.Process.Kill() + }() + + client, err := executorConnect(ctxRunner, &ssh.ClientConfig{ + User: rt.Runner.SSH.User, + Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + }, rt.Runner.SSH.Address) + if err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + defer client.Close() + + session, err := client.NewSession() + if err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + defer session.Close() + + modes := ssh.TerminalModes{ssh.ECHO: 0} + if err := session.RequestPty("dumb", 24, 80, modes); err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + + log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) + + session.Stdout = &rt.TaskLog + session.Stderr = &rt.TaskLog + + // Although passing the script directly takes away the option to specify + // a particular shell (barring here-documents!), it is simple and reliable. + // + // Passing the script over Stdin to sh tended to end up with commands + // eating the script during processing, and resulted in a hang, + // because closing the Stdin does not result in remote processes + // getting a stream of EOF. + // + // Piping the script into `cat | sh` while appending a ^D to the end of it + // appeared to work, but it seems likely that commands might still steal + // script bytes from the cat program if they choose to read from the tty + // and the script is longer than the buffer. + chSession := make(chan error, 1) + go func() { + chSession <- session.Run(script.String()) + close(chSession) + }() + + select { + case <-ctxRunner.Done(): + // Either shutdown, or early runner termination. + // The runner is not supposed to finish before the session. + err = context.Cause(ctxRunner) + case err = <-chSession: + // Killing a runner may perfectly well trigger this first, + // in particular when it's on the same machine. + } + + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + var ee *ssh.ExitError + if err == nil { + rt.DB.State, rt.DB.Detail = taskStateSuccess, "" + } else if errors.As(err, &ee) { + rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } else { + rt.DB.State, rt.DB.Detail = taskStateError, "" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } + return executorUpdate(rt) +} + +func executorRun(ctx context.Context) error { + tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`, + taskStateNew, taskStateRunning) + if err != nil { + return err + } + + for _, task := range tasks { + if err := executorRunTask(ctx, task); err != nil { + return fmt.Errorf("task %d for %s: %w", + task.ID, task.FullName(), err) + } + } + return nil +} + +func executor(ctx context.Context) { + for { + select { + case <-gExecutorSignal: + case <-ctx.Done(): + return + } + + if err := executorRun(ctx); err != nil { + log.Printf("error: executor: %s\n", err) + } + } +} + +func executorAwaken() { + select { + case gExecutorSignal <- struct{}{}: + default: + } +} + +// --- Main -------------------------------------------------------------------- + +type taskState int64 + +const ( + taskStateNew taskState = iota // → · pending (queued) + taskStateRunning // → · pending (running) + taskStateError // → ! error (internal issue) + taskStateFailed // → × failure (runner issue) + taskStateSuccess // → ✓ success (runner finished) +) + +func (ts taskState) String() string { + switch ts { + case taskStateNew: + return "New" + case taskStateRunning: + return "Running" + case taskStateError: + return "Error" + case taskStateFailed: + return "Failed" + case taskStateSuccess: + return "Success" + default: + return fmt.Sprintf("%d", ts) + } +} + +// Task mirrors SQL task table records, adding a few convenience methods. +type Task struct { + ID int64 + + Owner string + Repo string + Hash string + Runner string + + State taskState + Detail string + Notified int64 + RunLog []byte + TaskLog []byte +} + +func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } + +func (t *Task) RunnerName() string { + if runner, ok := gConfig.Runners[t.Runner]; !ok { + return t.Runner + } else { + return runner.Name + } +} + +func (t *Task) URL() string { + return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID) +} + +func (t *Task) RepoURL() string { + return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo) +} + +func (t *Task) CommitURL() string { + return fmt.Sprintf("%s/%s/%s/commit/%s", + gConfig.Gitea, t.Owner, t.Repo, t.Hash) +} + +func (t *Task) CloneURL() string { + return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo) +} + +const schemaSQL = ` +CREATE TABLE IF NOT EXISTS task( + id INTEGER NOT NULL, -- unique ID + + owner TEXT NOT NULL, -- Gitea username + repo TEXT NOT NULL, -- Gitea repository name + hash TEXT NOT NULL, -- commit hash + runner TEXT NOT NULL, -- the runner to use + + state INTEGER NOT NULL DEFAULT 0, -- task state + detail TEXT NOT NULL DEFAULT '', -- task state detail + notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state + runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output + tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output + + PRIMARY KEY (id) +) STRICT; +` + +func openDB(path string) error { + var err error + gDB, err = sql.Open("sqlite3", + "file:"+path+"?_foreign_keys=1&_busy_timeout=1000") + if err != nil { + return err + } + + _, err = gDB.Exec(schemaSQL) + return err +} + +// callRPC forwards command line commands to a running server. +func callRPC(args []string) error { + body, err := json.Marshal(args) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body)) + if err != nil { + return err + } + + req.Header.Set(rpcHeaderSignature, giteaSign(body)) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if _, err = io.Copy(os.Stdout, resp.Body); err != nil { + return err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + os.Exit(1) + } + return nil +} + +func main() { + version := flag.Bool("version", false, "show version and exit") + + flag.Usage = func() { + f := flag.CommandLine.Output() + fmt.Fprintf(f, + "Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + if flag.NArg() < 1 { + flag.Usage() + os.Exit(2) + } + + if *version { + fmt.Printf("%s %s\n", projectName, projectVersion) + return + } + + if err := parseConfig(flag.Arg(0)); err != nil { + log.Fatalln(err) + } + if flag.NArg() > 1 { + if err := callRPC(flag.Args()[1:]); err != nil { + log.Fatalln(err) + } + return + } + + if err := openDB(gConfig.DB); err != nil { + log.Fatalln(err) + } + defer gDB.Close() + + var wg sync.WaitGroup + ctx, stop := signal.NotifyContext( + context.Background(), syscall.SIGINT, syscall.SIGTERM) + + server := &http.Server{Addr: gConfig.Listen} + http.HandleFunc("/{$}", handleTasks) + http.HandleFunc("/task/{id}", handleTask) + http.HandleFunc("/push", handlePush) + http.HandleFunc("/rpc", handleRPC) + + ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr) + if err != nil { + log.Fatalln(err) + } + + notifierAwaken() + wg.Add(1) + go func() { + defer wg.Done() + notifier(ctx) + }() + + executorAwaken() + wg.Add(1) + go func() { + defer wg.Done() + executor(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer stop() + if err := server.Serve(ln); err != http.ErrServerClosed { + log.Println(err) + } + }() + + // Wait until we either receive a signal, or get a server failure. + <-ctx.Done() + log.Println("shutting down") + + wg.Add(1) + go func() { + defer wg.Done() + if err := server.Shutdown(context.Background()); err != nil { + log.Println(err) + } + }() + + // Repeated signal deliveries during shutdown assume default behaviour. + // This might or might not be desirable. + stop() + wg.Wait() +} |