From fd6959fff82a87e92d9e73cb07e210cebb675050 Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch
Date: Fri, 29 Mar 2024 14:08:15 +0100 Subject: Initial commit --- .gitignore | 2 + LICENSE | 12 + Makefile | 13 + README.adoc | 27 ++ acid.adoc | 80 ++++ acid.go | 1157 +++++++++++++++++++++++++++++++++++++++++++++++++++++ acid.yaml.example | 64 +++ go.mod | 11 + go.sum | 12 + 9 files changed, 1378 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.adoc create mode 100644 acid.adoc create mode 100644 acid.go create mode 100644 acid.yaml.example create mode 100644 go.mod create mode 100644 go.sum diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..39a51af --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/acid +/acid.1 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d48c2ad --- /dev/null +++ b/LICENSE @@ -0,0 +1,12 @@ +Copyright (c) 2024, Přemysl Eric Janouch
+ +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY +SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION +OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1fd3dc3 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.POSIX: +.SUFFIXES: + +version = dev +outputs = acid acid.1 +all: $(outputs) + +acid: acid.go + go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@ +acid.1: acid.adoc + asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc +clean: + rm -f $(outputs) diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..27be041 --- /dev/null +++ b/README.adoc @@ -0,0 +1,27 @@ +acid +==== + +'acid' is A Continuous Integration Daemon. + +The aim of this project is to provide a trivial CI daemon for Gitea. +I find most alternatives way too complex to set up and run in a local setting, +while the gist of it is actually very simple--run some stuff on new git commits. + +'acid' provides a simple web frontend, as well as a webhook endpoint +for notifications about new commits. The daemon is supposed to be "firewalled" +by a normal HTTP server, and it will not provide TLS support to secure +communications. + +'acid' runs tasks over SSH, which should be universal enough. +It can tell you the build results via any method you can put in a shell script. + +Getting it to work +------------------ + # apt install git golang asciidoctor + $ git clone https://git.janouch.name/p/acid.git + $ cd acid + $ make + $ man -l acid.1 + +You will need to write your own runner scripts, which may be nontrivial. +The author suggests using __cloud-init__-enabled virtual machines with QEMU. diff --git a/acid.adoc b/acid.adoc new file mode 100644 index 0000000..e72be60 --- /dev/null +++ b/acid.adoc @@ -0,0 +1,80 @@ +acid(1) +======= +:doctype: manpage +:manmanual: acid Manual +:mansource: acid {release-version} + +Name +---- +acid - A Continuous Integration Daemon + +Synopsis +-------- +*acid* [_OPTION_]... acid.yaml [_COMMAND_...] + +Description +----------- +*acid* run without command arguments will start an HTTP server that creates +and executes tasks upon receiving push notifications from a Gitea instance, +according to the passed configuration file. + +When a command is passed, *acid* will relay it to that running instance +as an RPC call. + +Options +------- +*-version*:: + Output version information and exit. + +Commands +-------- +*restart* _ID_...:: + Schedule tasks with the given IDs to be rerun. + +Configuration +------------- +For help with creating the configuration file, consult the _acid.yaml.example_ +file present in the distribution. + +All paths are currently relative to the directory you launch *acid* from. + +The *notify*, *setup*, and *build* scripts are processed using Go's +_text/template_ package, and take an object describing the task, +which has the following fields: + +*ID*:: + Unique integer ID for each task. + +*Owner*:: + Gitea user owning the repository. +*Repo*:: + Name of the repository. +*FullName*:: + Full name of the repository, including the owner. +*Hash*:: + Commit hash pertaining to the task. +*Runner*:: + Runner ID. +*RunnerName*:: + Descriptive name of the runner. + +*URL*:: + *acid* link to the task, where its log output can be seen. +*RepoURL*:: + Gitea link to the repository. +*CommitURL*:: + Gitea link to the commit. +*CloneURL*:: + Gitea link for cloning the repository over HTTP. + +Runners +------- +Runners receive the following additional environment variables: + +*ACID_ROOT*:: The same as the base directory for configuration. +*ACID_RUNNER*:: The same as *Runner* in script templates. + +Reporting bugs +-------------- +Use https://git.janouch.name/p/acid to report bugs, request features, +or submit pull requests. diff --git a/acid.go b/acid.go new file mode 100644 index 0000000..2f676d2 --- /dev/null +++ b/acid.go @@ -0,0 +1,1157 @@ +package main + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "flag" + "fmt" + "html/template" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "sort" + "strconv" + "sync" + "syscall" + ttemplate "text/template" + "time" + + _ "github.com/mattn/go-sqlite3" + "golang.org/x/crypto/ssh" + "gopkg.in/yaml.v3" +) + +var ( + projectName = "acid" + projectVersion = "?" + + gConfig Config = Config{Listen: ":http"} + gNotifyScript *ttemplate.Template + gDB *sql.DB + + gNotifierSignal = make(chan struct{}, 1) + gExecutorSignal = make(chan struct{}, 1) + + // The mutex is at least supposed to lock over the tasks as well. + gRunningMutex sync.Mutex + gRunning = make(map[int64]*RunningTask) +) + +// --- Config ------------------------------------------------------------------ + +type Config struct { + DB string `yaml:"db"` // database file path + Listen string `yaml:"listen"` // HTTP listener address + Root string `yaml:"root"` // HTTP root URI + Gitea string `yaml:"gitea"` // Gitea base URL + Secret string `yaml:"secret"` // Gitea hook secret + Token string `yaml:"token"` // Gitea API token + Notify string `yaml:"notify"` // notifier script + + Runners map[string]ConfigRunner `yaml:"runners"` // script runners + Projects map[string]ConfigProject `yaml:"projects"` // configured projects +} + +type ConfigRunner struct { + Name string `yaml:"name"` // descriptive name + Run string `yaml:"run"` // runner executable + Setup string `yaml:"setup"` // runner setup script (SSH) + SSH struct { + User string `yaml:"user"` // remote username + Address string `yaml:"address"` // TCP host:port + Identity string `yaml:"identity"` // private key path + } `yaml:"ssh"` // shell access +} + +type ConfigProject struct { + Runners map[string]ConfigProjectRunner `yaml:"runners"` +} + +type ConfigProjectRunner struct { + Setup string `yaml:"setup"` // project setup script (SSH) + Build string `yaml:"build"` // project build script (SSH) +} + +func parseConfig(path string) error { + if f, err := os.Open(path); err != nil { + return err + } else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil { + return err + } + + var err error + gNotifyScript, err = ttemplate.New("notify").Parse(gConfig.Notify) + return err +} + +// --- Task views -------------------------------------------------------------- + +func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { + rows, err := gDB.QueryContext(ctx, ` + SELECT id, owner, repo, hash, runner, + state, detail, notified, runlog, tasklog FROM task `+query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + tasks := []Task{} + for rows.Next() { + var t Task + err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, + &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog) + if err != nil { + return nil, err + } + tasks = append(tasks, t) + } + return tasks, rows.Err() +} + +var templateTasks = template.Must(template.New("tasks").Parse(` + + +
+ +ID | +Repository | +Hash | +Runner | +State | +Detail | +Notified | +
---|---|---|---|---|---|---|
{{.ID}} | +{{.FullName}} | +{{.Hash}} | +{{.RunnerName}} | +{{.State}} | +{{.Detail}} | +{{.Notified}} | +
{{printf "%s" .RunLog}}+{{end}} +{{if .TaskLog}} +
{{printf "%s" .TaskLog}}+{{end}} + + + +`)) + +func handleTask(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(r.PathValue("id")) + if err != nil { + http.Error(w, + "Invalid ID: "+err.Error(), http.StatusBadRequest) + return + } + + tasks, err := getTasks(r.Context(), `WHERE id = ?`, id) + if err != nil { + http.Error(w, + "Error retrieving task: "+err.Error(), + http.StatusInternalServerError) + return + } + if len(tasks) == 0 { + http.NotFound(w, r) + return + } + + task := struct { + Task + IsRunning bool + }{Task: tasks[0]} + func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt, ok := gRunning[task.ID] + task.IsRunning = ok + if !ok { + return + } + + rt.RunLog.mu.Lock() + defer rt.RunLog.mu.Unlock() + rt.TaskLog.mu.Lock() + defer rt.TaskLog.mu.Unlock() + + task.RunLog = rt.RunLog.b + task.TaskLog = rt.TaskLog.b + }() + + if err := templateTask.Execute(w, &task); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// --- Push hook --------------------------------------------------------------- + +type GiteaPushEvent struct { + HeadCommit struct { + ID string `json:"id"` + } `json:"head_commit"` + Repository struct { + Name string `json:"name"` + FullName string `json:"full_name"` + Owner struct { + Username string `json:"username"` + } `json:"owner"` + } `json:"repository"` +} + +func createTasks(ctx context.Context, + owner, repo, hash string, runners []string) error { + tx, err := gDB.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner) + VALUES (?, ?, ?, ?)`) + if err != nil { + return err + } + + for _, runner := range runners { + if _, err := stmt.Exec(owner, repo, hash, runner); err != nil { + return err + } + } + if err := tx.Commit(); err != nil { + return err + } + + notifierAwaken() + executorAwaken() + return nil +} + +func giteaSign(b []byte) string { + payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) + payloadHmac.Write(b) + return hex.EncodeToString(payloadHmac.Sum(nil)) +} + +func handlePush(w http.ResponseWriter, r *http.Request) { + // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. + if r.Header.Get("X-Gitea-Event") != "push" { + http.Error(w, + "Expected a Gitea push event", http.StatusBadRequest) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, + "Error reading request body", http.StatusInternalServerError) + return + } + if r.Header.Get("X-Gitea-Signature") != giteaSign(body) { + http.Error(w, + "Signature mismatch", http.StatusBadRequest) + return + } + + var event GiteaPushEvent + if err := json.Unmarshal(body, &event); err != nil { + http.Error(w, + "Invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + + log.Printf("received push: %s %s\n", + event.Repository.FullName, event.HeadCommit.ID) + + project, ok := gConfig.Projects[event.Repository.FullName] + if !ok { + // This is okay, don't set any commit statuses. + fmt.Fprintf(w, "The project is not configured.") + return + } + + runners := []string{} + for name := range project.Runners { + runners = append(runners, name) + } + sort.Strings(runners) + + if err := createTasks(r.Context(), + event.Repository.Owner.Username, event.Repository.Name, + event.HeadCommit.ID, runners); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// --- RPC --------------------------------------------------------------------- + +const rpcHeaderSignature = "X-ACID-Signature" + +func rpcRestart(w io.Writer, ids []int64) { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + for _, id := range ids { + if _, ok := gRunning[id]; ok { + fmt.Fprintf(w, "%d: not restarting running tasks\n", id) + continue + } + + // The executor bumps to "running" after inserting into gRunning, + // so we should not need to exclude that state here. + result, err := gDB.ExecContext(context.Background(), + `UPDATE task SET state = ? WHERE id = ?`, taskStateNew, id) + if err != nil { + fmt.Fprintf(w, "%d: %s\n", id, err) + } else if n, _ := result.RowsAffected(); n != 1 { + fmt.Fprintf(w, "%d: no such task\n", id) + } + } + executorAwaken() +} + +func handleRPC(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, + "Error reading request body", http.StatusInternalServerError) + return + } + if r.Header.Get(rpcHeaderSignature) != giteaSign(body) { + http.Error(w, + "Signature mismatch", http.StatusBadRequest) + return + } + + var args []string + if err := json.Unmarshal(body, &args); err != nil { + http.Error(w, + "Invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + if len(args) == 0 { + http.Error(w, "Missing command", http.StatusBadRequest) + return + } + + command, args := args[0], args[1:] + switch command { + case "restart": + ids := []int64{} + for _, arg := range args { + id, err := strconv.ParseInt(arg, 10, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + ids = append(ids, id) + } + rpcRestart(w, ids) + default: + http.Error(w, "Unknown command: "+command, http.StatusBadRequest) + } +} + +// --- Notifier ---------------------------------------------------------------- + +func notifierRunCommand(ctx context.Context, task Task) { + script := bytes.NewBuffer(nil) + if err := gNotifyScript.Execute(script, &task); err != nil { + log.Printf("error: notify: %s", err) + return + } + + cmd := exec.CommandContext(ctx, "sh") + cmd.Stdin = script + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + log.Printf("error: notify: %s", err) + } +} + +func notifierNotify(ctx context.Context, task Task) error { + // Loosely assuming that this only runs on state changes. + if task.State != taskStateNew && task.State != taskStateRunning { + go notifierRunCommand(ctx, task) + } + + payload := struct { + Context string `json:"context"` + Description string `json:"description"` + State string `json:"state"` + TargetURL string `json:"target_url"` + }{} + + runner, ok := gConfig.Runners[task.Runner] + if !ok { + log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) + return nil + } + payload.Context = runner.Name + payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID) + + switch task.State { + case taskStateNew: + payload.State, payload.Description = "pending", "Pending" + case taskStateRunning: + payload.State, payload.Description = "pending", "Running" + case taskStateError: + payload.State, payload.Description = "error", "Error" + case taskStateFailed: + payload.State, payload.Description = "failure", "Failure" + case taskStateSuccess: + payload.State, payload.Description = "success", "Success" + default: + log.Printf("task %d is in unknown state %d\n", task.ID, task.State) + return nil + } + + // We should only fill this in case we have some specific information. + if task.Detail != "" { + payload.Description = task.Detail + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n", + task.ID, task.FullName(), task.Hash, + payload.Context, payload.State, payload.Description) + + uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", + gConfig.Gitea, task.Owner, task.Repo, task.Hash) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, + bytes.NewReader(body)) + if err != nil { + return err + } + + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "token "+gConfig.Token) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + _, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1 + WHERE id = ? AND state = ? AND detail = ? AND notified = 0`, + task.ID, task.State, task.Detail) + return err +} + +func notifierRun(ctx context.Context) error { + tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`) + if err != nil { + return err + } + + for _, task := range tasks { + if err := notifierNotify(ctx, task); err != nil { + return fmt.Errorf( + "task %d for %s: %w", task.ID, task.FullName(), err) + } + } + return nil +} + +func notifier(ctx context.Context) { + for { + select { + case <-gNotifierSignal: + case <-ctx.Done(): + return + } + + if err := notifierRun(ctx); err != nil { + log.Printf("error: notifier: %s\n", err) + } + } +} + +func notifierAwaken() { + select { + case gNotifierSignal <- struct{}{}: + default: + } +} + +// --- Executor ---------------------------------------------------------------- + +type terminalWriter struct { + b []byte + cur int + mu sync.Mutex +} + +func (tw *terminalWriter) Write(p []byte) (written int, err error) { + tw.mu.Lock() + defer tw.mu.Unlock() + + // Extremely rudimentary emulation of a dumb terminal. + for _, b := range p { + // Enough is enough, writing too much is highly suspicious. + if len(tw.b) > 64<<20 { + return written, errors.New("too much terminal output") + } + + switch b { + case '\b': + if tw.cur > 0 && tw.b[tw.cur-1] != '\n' { + tw.cur-- + } + case '\r': + for tw.cur > 0 && tw.b[tw.cur-1] != '\n' { + tw.cur-- + } + case '\n': + tw.b = append(tw.b, b) + tw.cur = len(tw.b) + default: + tw.b = append(tw.b[:tw.cur], b) + tw.cur = len(tw.b) + } + + if err != nil { + break + } + written += 1 + } + return +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type RunningTask struct { + DB Task + Runner ConfigRunner + ProjectRunner ConfigProjectRunner + + RunLog terminalWriter + TaskLog terminalWriter +} + +func executorUpdate(rt *RunningTask) error { + rt.RunLog.mu.Lock() + defer rt.RunLog.mu.Unlock() + rt.DB.RunLog = bytes.Clone(rt.RunLog.b) + if rt.DB.RunLog == nil { + rt.DB.RunLog = []byte{} + } + + rt.TaskLog.mu.Lock() + defer rt.TaskLog.mu.Unlock() + rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b) + if rt.DB.TaskLog == nil { + rt.DB.TaskLog = []byte{} + } + + _, err := gDB.ExecContext(context.Background(), `UPDATE task + SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ? + WHERE id = ?`, + rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog, + rt.DB.ID) + if err == nil { + notifierAwaken() + } + return err +} + +func executorConnect( + ctx context.Context, config *ssh.ClientConfig, address string) ( + *ssh.Client, error) { + deadline := time.Now().Add(3 * time.Minute) + ctxDeadlined, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + var d net.Dialer + for { + // net.DNSError eats the cause, as in it cannot be unwrapped + // and tested for a particular subtype. + conn, err := d.DialContext(ctxDeadlined, "tcp", address) + if e := ctxDeadlined.Err(); e != nil { + // This may provide a little bit more information. + if err != nil { + return nil, err + } + return nil, e + } + if err != nil { + time.Sleep(1 * time.Second) + continue + } + + // We ignore the parent context, but at least we try. + conn.SetDeadline(deadline) + sc, chans, reqs, err := ssh.NewClientConn(conn, address, config) + conn.SetDeadline(time.Time{}) + + // cloud-init-enabled machines, such as OpenBSD, + // may have a race condition between sshd starting for the first time, + // and having a configured user. + // + // Authentication may therefore regularly fail, + // and we need to ignore all errors whatsoever, + // not just spurious partial successes resulting in RST or FIN. + var neterr net.Error + if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil { + time.Sleep(1 * time.Second) + continue + } + + return ssh.NewClient(sc, chans, reqs), nil + } +} + +func executorRunTask(ctx context.Context, task Task) error { + rt := &RunningTask{DB: task} + + var ok bool + rt.Runner, ok = gConfig.Runners[rt.DB.Runner] + if !ok { + return fmt.Errorf("unknown runner: %s", rt.DB.Runner) + } + project, ok := gConfig.Projects[rt.DB.FullName()] + if !ok { + return fmt.Errorf("project configuration not found") + } + rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] + if !ok { + return fmt.Errorf( + "project not configured for runner %s", rt.DB.Runner) + } + + wd, err := os.Getwd() + if err != nil { + return err + } + + // The runner setup script may change the working directory, + // so do everything in one go. However, this approach also makes it + // difficult to distinguish project-independent runner failures. + // (For that, we can start multiple ssh.Sessions.) + // + // We could pass variables through SSH environment variables, + // which would require enabling PermitUserEnvironment in sshd_config, + // or through prepending script lines, but templates are a bit simpler. + // + // We let the runner itself clone the repository: + // - it is a more flexible in that it can test AUR packages more normally, + // - we might have to clone submodules as well. + // Otherwise, we could download a source archive from Gitea, + // and use SFTP to upload it to the runner. + tmplScript, err := ttemplate.New("script").Parse(rt.Runner.Setup + "\n" + + rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) + if err != nil { + return fmt.Errorf("script: %w", err) + } + + privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) + if err != nil { + return fmt.Errorf( + "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) + } + signer, err := ssh.ParsePrivateKey(privateKey) + if err != nil { + return fmt.Errorf( + "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) + } + + defer func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + delete(gRunning, rt.DB.ID) + }() + func() { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt.DB.State, rt.DB.Detail = taskStateRunning, "" + rt.DB.Notified = 0 + rt.DB.RunLog = []byte{} + rt.DB.TaskLog = []byte{} + gRunning[rt.DB.ID] = rt + }() + if err := executorUpdate(rt); err != nil { + return fmt.Errorf("SQL: %w", err) + } + + // Errors happening while trying to write an error are unfortunate, + // but not important enough to abort entirely. + setError := func(detail string) { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + rt.DB.State, rt.DB.Detail = taskStateError, detail + if err := executorUpdate(rt); err != nil { + log.Printf("error: task %d for %s: SQL: %s", + rt.DB.ID, rt.DB.FullName(), err) + } + } + + script := bytes.NewBuffer(nil) + if err := tmplScript.Execute(script, &rt.DB); err != nil { + setError("Script template failed") + return err + } + + cmd := exec.CommandContext(ctx, rt.Runner.Run) + cmd.Env = append( + os.Environ(), + "ACID_ROOT="+wd, + "ACID_RUNNER="+rt.DB.Runner, + ) + + log.Printf("task %d for %s: starting %s\n", + rt.DB.ID, rt.DB.FullName(), rt.Runner.Name) + + cmd.Stdout = &rt.RunLog + cmd.Stderr = &rt.RunLog + if err := cmd.Start(); err != nil { + setError("Runner failed to start") + return err + } + + ctxRunner, cancelRunner := context.WithCancelCause(ctx) + defer cancelRunner(context.Canceled) + go func() { + if err := cmd.Wait(); err != nil { + cancelRunner(err) + } else { + cancelRunner(errors.New("runner exited successfully but early")) + } + }() + defer func() { + _ = cmd.Process.Signal(os.Interrupt) + select { + case <-ctxRunner.Done(): + // This doesn't leave the runner almost any time on our shutdown, + // but whatever--they're supposed to be ephemeral. + // Moreover, we don't even override cmd.CancelFunc. + case <-time.After(5 * time.Second): + } + _ = cmd.Process.Kill() + }() + + client, err := executorConnect(ctxRunner, &ssh.ClientConfig{ + User: rt.Runner.SSH.User, + Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + }, rt.Runner.SSH.Address) + if err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + defer client.Close() + + session, err := client.NewSession() + if err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + defer session.Close() + + modes := ssh.TerminalModes{ssh.ECHO: 0} + if err := session.RequestPty("dumb", 24, 80, modes); err != nil { + fmt.Fprintf(&rt.TaskLog, "%s\n", err) + setError("SSH failure") + return err + } + + log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) + + session.Stdout = &rt.TaskLog + session.Stderr = &rt.TaskLog + + // Although passing the script directly takes away the option to specify + // a particular shell (barring here-documents!), it is simple and reliable. + // + // Passing the script over Stdin to sh tended to end up with commands + // eating the script during processing, and resulted in a hang, + // because closing the Stdin does not result in remote processes + // getting a stream of EOF. + // + // Piping the script into `cat | sh` while appending a ^D to the end of it + // appeared to work, but it seems likely that commands might still steal + // script bytes from the cat program if they choose to read from the tty + // and the script is longer than the buffer. + chSession := make(chan error, 1) + go func() { + chSession <- session.Run(script.String()) + close(chSession) + }() + + select { + case <-ctxRunner.Done(): + // Either shutdown, or early runner termination. + // The runner is not supposed to finish before the session. + err = context.Cause(ctxRunner) + case err = <-chSession: + // Killing a runner may perfectly well trigger this first, + // in particular when it's on the same machine. + } + + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + var ee *ssh.ExitError + if err == nil { + rt.DB.State, rt.DB.Detail = taskStateSuccess, "" + } else if errors.As(err, &ee) { + rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } else { + rt.DB.State, rt.DB.Detail = taskStateError, "" + fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) + } + return executorUpdate(rt) +} + +func executorRun(ctx context.Context) error { + tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`, + taskStateNew, taskStateRunning) + if err != nil { + return err + } + + for _, task := range tasks { + if err := executorRunTask(ctx, task); err != nil { + return fmt.Errorf("task %d for %s: %w", + task.ID, task.FullName(), err) + } + } + return nil +} + +func executor(ctx context.Context) { + for { + select { + case <-gExecutorSignal: + case <-ctx.Done(): + return + } + + if err := executorRun(ctx); err != nil { + log.Printf("error: executor: %s\n", err) + } + } +} + +func executorAwaken() { + select { + case gExecutorSignal <- struct{}{}: + default: + } +} + +// --- Main -------------------------------------------------------------------- + +type taskState int64 + +const ( + taskStateNew taskState = iota // → · pending (queued) + taskStateRunning // → · pending (running) + taskStateError // → ! error (internal issue) + taskStateFailed // → × failure (runner issue) + taskStateSuccess // → ✓ success (runner finished) +) + +func (ts taskState) String() string { + switch ts { + case taskStateNew: + return "New" + case taskStateRunning: + return "Running" + case taskStateError: + return "Error" + case taskStateFailed: + return "Failed" + case taskStateSuccess: + return "Success" + default: + return fmt.Sprintf("%d", ts) + } +} + +// Task mirrors SQL task table records, adding a few convenience methods. +type Task struct { + ID int64 + + Owner string + Repo string + Hash string + Runner string + + State taskState + Detail string + Notified int64 + RunLog []byte + TaskLog []byte +} + +func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } + +func (t *Task) RunnerName() string { + if runner, ok := gConfig.Runners[t.Runner]; !ok { + return t.Runner + } else { + return runner.Name + } +} + +func (t *Task) URL() string { + return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID) +} + +func (t *Task) RepoURL() string { + return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo) +} + +func (t *Task) CommitURL() string { + return fmt.Sprintf("%s/%s/%s/commit/%s", + gConfig.Gitea, t.Owner, t.Repo, t.Hash) +} + +func (t *Task) CloneURL() string { + return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo) +} + +const schemaSQL = ` +CREATE TABLE IF NOT EXISTS task( + id INTEGER NOT NULL, -- unique ID + + owner TEXT NOT NULL, -- Gitea username + repo TEXT NOT NULL, -- Gitea repository name + hash TEXT NOT NULL, -- commit hash + runner TEXT NOT NULL, -- the runner to use + + state INTEGER NOT NULL DEFAULT 0, -- task state + detail TEXT NOT NULL DEFAULT '', -- task state detail + notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state + runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output + tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output + + PRIMARY KEY (id) +) STRICT; +` + +func openDB(path string) error { + var err error + gDB, err = sql.Open("sqlite3", + "file:"+path+"?_foreign_keys=1&_busy_timeout=1000") + if err != nil { + return err + } + + _, err = gDB.Exec(schemaSQL) + return err +} + +// callRPC forwards command line commands to a running server. +func callRPC(args []string) error { + body, err := json.Marshal(args) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body)) + if err != nil { + return err + } + + req.Header.Set(rpcHeaderSignature, giteaSign(body)) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if _, err = io.Copy(os.Stdout, resp.Body); err != nil { + return err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + os.Exit(1) + } + return nil +} + +func main() { + version := flag.Bool("version", false, "show version and exit") + + flag.Usage = func() { + f := flag.CommandLine.Output() + fmt.Fprintf(f, + "Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + if flag.NArg() < 1 { + flag.Usage() + os.Exit(2) + } + + if *version { + fmt.Printf("%s %s\n", projectName, projectVersion) + return + } + + if err := parseConfig(flag.Arg(0)); err != nil { + log.Fatalln(err) + } + if flag.NArg() > 1 { + if err := callRPC(flag.Args()[1:]); err != nil { + log.Fatalln(err) + } + return + } + + if err := openDB(gConfig.DB); err != nil { + log.Fatalln(err) + } + defer gDB.Close() + + var wg sync.WaitGroup + ctx, stop := signal.NotifyContext( + context.Background(), syscall.SIGINT, syscall.SIGTERM) + + server := &http.Server{Addr: gConfig.Listen} + http.HandleFunc("/{$}", handleTasks) + http.HandleFunc("/task/{id}", handleTask) + http.HandleFunc("/push", handlePush) + http.HandleFunc("/rpc", handleRPC) + + ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr) + if err != nil { + log.Fatalln(err) + } + + notifierAwaken() + wg.Add(1) + go func() { + defer wg.Done() + notifier(ctx) + }() + + executorAwaken() + wg.Add(1) + go func() { + defer wg.Done() + executor(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer stop() + if err := server.Serve(ln); err != http.ErrServerClosed { + log.Println(err) + } + }() + + // Wait until we either receive a signal, or get a server failure. + <-ctx.Done() + log.Println("shutting down") + + wg.Add(1) + go func() { + defer wg.Done() + if err := server.Shutdown(context.Background()); err != nil { + log.Println(err) + } + }() + + // Repeated signal deliveries during shutdown assume default behaviour. + // This might or might not be desirable. + stop() + wg.Wait() +} diff --git a/acid.yaml.example b/acid.yaml.example new file mode 100644 index 0000000..a80cc4c --- /dev/null +++ b/acid.yaml.example @@ -0,0 +1,64 @@ +--- +# Path to an SQLite database file, which will be automatically created. +db: acid.db +# Address to listen on. +listen: :http +# Externally visible base URL that Gitea, its users, and RPC can connect to. +# The root/push endpoint accepts Gitea push notifications. +root: http://acid + +# Arbitrary secret that Gitea and RPC will sign their requests with. +secret: 0123456789abcde +# Base URL of the Gitea instance. +gitea: http://gitea +# Gitea access token used for writing commit statuses back to repositories. +token: 0123456789abcdefghijklmnopqrstuvwxyzABCD + +# Arbitrary sh script to notify about the results of finished tasks. +notify: | + xN irc://acid@/acid?skipjoin&usenotice <