diff options
| author | Přemysl Eric Janouch <p@janouch.name> | 2024-03-29 14:08:15 +0100 | 
|---|---|---|
| committer | Přemysl Eric Janouch <p@janouch.name> | 2024-04-04 19:40:14 +0200 | 
| commit | fd6959fff82a87e92d9e73cb07e210cebb675050 (patch) | |
| tree | d7fd170022d99e9d6a978208ab31f169f3574ad8 | |
| download | acid-fd6959fff82a87e92d9e73cb07e210cebb675050.tar.gz acid-fd6959fff82a87e92d9e73cb07e210cebb675050.tar.xz acid-fd6959fff82a87e92d9e73cb07e210cebb675050.zip | |
Initial commit
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | LICENSE | 12 | ||||
| -rw-r--r-- | Makefile | 13 | ||||
| -rw-r--r-- | README.adoc | 27 | ||||
| -rw-r--r-- | acid.adoc | 80 | ||||
| -rw-r--r-- | acid.go | 1157 | ||||
| -rw-r--r-- | acid.yaml.example | 64 | ||||
| -rw-r--r-- | go.mod | 11 | ||||
| -rw-r--r-- | go.sum | 12 | 
9 files changed, 1378 insertions, 0 deletions
| diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..39a51af --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/acid +/acid.1 @@ -0,0 +1,12 @@ +Copyright (c) 2024, Přemysl Eric Janouch <p@janouch.name> + +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY +SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION +OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1fd3dc3 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.POSIX: +.SUFFIXES: + +version = dev +outputs = acid acid.1 +all: $(outputs) + +acid: acid.go +	go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@ +acid.1: acid.adoc +	asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc +clean: +	rm -f $(outputs) diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..27be041 --- /dev/null +++ b/README.adoc @@ -0,0 +1,27 @@ +acid +==== + +'acid' is A Continuous Integration Daemon. + +The aim of this project is to provide a trivial CI daemon for Gitea. +I find most alternatives way too complex to set up and run in a local setting, +while the gist of it is actually very simple--run some stuff on new git commits. + +'acid' provides a simple web frontend, as well as a webhook endpoint +for notifications about new commits.  The daemon is supposed to be "firewalled" +by a normal HTTP server, and it will not provide TLS support to secure +communications. + +'acid' runs tasks over SSH, which should be universal enough. +It can tell you the build results via any method you can put in a shell script. + +Getting it to work +------------------ + # apt install git golang asciidoctor + $ git clone https://git.janouch.name/p/acid.git + $ cd acid + $ make + $ man -l acid.1 + +You will need to write your own runner scripts, which may be nontrivial. +The author suggests using __cloud-init__-enabled virtual machines with QEMU. diff --git a/acid.adoc b/acid.adoc new file mode 100644 index 0000000..e72be60 --- /dev/null +++ b/acid.adoc @@ -0,0 +1,80 @@ +acid(1) +======= +:doctype: manpage +:manmanual: acid Manual +:mansource: acid {release-version} + +Name +---- +acid - A Continuous Integration Daemon + +Synopsis +-------- +*acid* [_OPTION_]... acid.yaml [_COMMAND_...] + +Description +----------- +*acid* run without command arguments will start an HTTP server that creates +and executes tasks upon receiving push notifications from a Gitea instance, +according to the passed configuration file. + +When a command is passed, *acid* will relay it to that running instance +as an RPC call. + +Options +------- +*-version*:: +	Output version information and exit. + +Commands +-------- +*restart* _ID_...:: +	Schedule tasks with the given IDs to be rerun. + +Configuration +------------- +For help with creating the configuration file, consult the _acid.yaml.example_ +file present in the distribution. + +All paths are currently relative to the directory you launch *acid* from. + +The *notify*, *setup*, and *build* scripts are processed using Go's +_text/template_ package, and take an object describing the task, +which has the following fields: + +*ID*:: +	Unique integer ID for each task. + +*Owner*:: +	Gitea user owning the repository. +*Repo*:: +	Name of the repository. +*FullName*:: +	Full name of the repository, including the owner. +*Hash*:: +	Commit hash pertaining to the task. +*Runner*:: +	Runner ID. +*RunnerName*:: +	Descriptive name of the runner. + +*URL*:: +	*acid* link to the task, where its log output can be seen. +*RepoURL*:: +	Gitea link to the repository. +*CommitURL*:: +	Gitea link to the commit. +*CloneURL*:: +	Gitea link for cloning the repository over HTTP. + +Runners +------- +Runners receive the following additional environment variables: + +*ACID_ROOT*::   The same as the base directory for configuration. +*ACID_RUNNER*:: The same as *Runner* in script templates. + +Reporting bugs +-------------- +Use https://git.janouch.name/p/acid to report bugs, request features, +or submit pull requests. @@ -0,0 +1,1157 @@ +package main + +import ( +	"bytes" +	"context" +	"crypto/hmac" +	"crypto/sha256" +	"database/sql" +	"encoding/hex" +	"encoding/json" +	"errors" +	"flag" +	"fmt" +	"html/template" +	"io" +	"io/ioutil" +	"log" +	"net" +	"net/http" +	"os" +	"os/exec" +	"os/signal" +	"sort" +	"strconv" +	"sync" +	"syscall" +	ttemplate "text/template" +	"time" + +	_ "github.com/mattn/go-sqlite3" +	"golang.org/x/crypto/ssh" +	"gopkg.in/yaml.v3" +) + +var ( +	projectName    = "acid" +	projectVersion = "?" + +	gConfig       Config = Config{Listen: ":http"} +	gNotifyScript *ttemplate.Template +	gDB           *sql.DB + +	gNotifierSignal = make(chan struct{}, 1) +	gExecutorSignal = make(chan struct{}, 1) + +	// The mutex is at least supposed to lock over the tasks as well. +	gRunningMutex sync.Mutex +	gRunning      = make(map[int64]*RunningTask) +) + +// --- Config ------------------------------------------------------------------ + +type Config struct { +	DB     string `yaml:"db"`     // database file path +	Listen string `yaml:"listen"` // HTTP listener address +	Root   string `yaml:"root"`   // HTTP root URI +	Gitea  string `yaml:"gitea"`  // Gitea base URL +	Secret string `yaml:"secret"` // Gitea hook secret +	Token  string `yaml:"token"`  // Gitea API token +	Notify string `yaml:"notify"` // notifier script + +	Runners  map[string]ConfigRunner  `yaml:"runners"`  // script runners +	Projects map[string]ConfigProject `yaml:"projects"` // configured projects +} + +type ConfigRunner struct { +	Name  string `yaml:"name"`  // descriptive name +	Run   string `yaml:"run"`   // runner executable +	Setup string `yaml:"setup"` // runner setup script (SSH) +	SSH   struct { +		User     string `yaml:"user"`     // remote username +		Address  string `yaml:"address"`  // TCP host:port +		Identity string `yaml:"identity"` // private key path +	} `yaml:"ssh"` // shell access +} + +type ConfigProject struct { +	Runners map[string]ConfigProjectRunner `yaml:"runners"` +} + +type ConfigProjectRunner struct { +	Setup string `yaml:"setup"` // project setup script (SSH) +	Build string `yaml:"build"` // project build script (SSH) +} + +func parseConfig(path string) error { +	if f, err := os.Open(path); err != nil { +		return err +	} else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil { +		return err +	} + +	var err error +	gNotifyScript, err = ttemplate.New("notify").Parse(gConfig.Notify) +	return err +} + +// --- Task views -------------------------------------------------------------- + +func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { +	rows, err := gDB.QueryContext(ctx, ` +		SELECT id, owner, repo, hash, runner, +			state, detail, notified, runlog, tasklog FROM task `+query, args...) +	if err != nil { +		return nil, err +	} +	defer rows.Close() + +	tasks := []Task{} +	for rows.Next() { +		var t Task +		err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, +			&t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog) +		if err != nil { +			return nil, err +		} +		tasks = append(tasks, t) +	} +	return tasks, rows.Err() +} + +var templateTasks = template.Must(template.New("tasks").Parse(` +<!DOCTYPE html> +<html> +<head> +<meta charset="utf-8"> +<title>Tasks</title> +</head> +<body> +<h1>Tasks</h1> +<table border="1"> +<thead> +	<tr> +		<th>ID</th> +		<th>Repository</th> +		<th>Hash</th> +		<th>Runner</th> +		<th>State</th> +		<th>Detail</th> +		<th>Notified</th> +	</tr> +</thead> +<tbody> +{{range .}} +	<tr> +		<td><a href="task/{{.ID}}">{{.ID}}</a></td> +		<td><a href="{{.RepoURL}}">{{.FullName}}</a></td> +		<td><a href="{{.CommitURL}}">{{.Hash}}</a></td> +		<td>{{.RunnerName}}</td> +		<td>{{.State}}</td> +		<td>{{.Detail}}</td> +		<td>{{.Notified}}</td> +	</tr> +{{end}} +</tbody> +</table> +</body> +</html> +`)) + +func handleTasks(w http.ResponseWriter, r *http.Request) { +	tasks, err := getTasks(r.Context(), `ORDER BY id DESC`) +	if err != nil { +		http.Error(w, +			"Error retrieving tasks: "+err.Error(), +			http.StatusInternalServerError) +		return +	} + +	if err := templateTasks.Execute(w, tasks); err != nil { +		http.Error(w, err.Error(), http.StatusInternalServerError) +	} +} + +var templateTask = template.Must(template.New("tasks").Parse(` +<!DOCTYPE html> +<html> +<head> +<title>Task {{.ID}}</title> +<meta charset="utf-8"> +{{if .IsRunning}} +<meta http-equiv="refresh" content="5"> +{{end}} +</head> +<body> +<h1><a href="/">Tasks</a> » {{.ID}}</h1> +<dl> +<dt>Project</dt> +	<dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd> +<dt>Commit</dt> +	<dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd> +<dt>Runner</dt> +	<dd>{{.RunnerName}}</dd> +<dt>State</dt> +	<dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd> +<dt>Notified</dt> +	<dd>{{.Notified}}</dd> +</dl> +{{if .RunLog}} +<h2>Runner log</h2> +<pre>{{printf "%s" .RunLog}}</pre> +{{end}} +{{if .TaskLog}} +<h2>Task log</h2> +<pre>{{printf "%s" .TaskLog}}</pre> +{{end}} +</table> +</body> +</html> +`)) + +func handleTask(w http.ResponseWriter, r *http.Request) { +	id, err := strconv.Atoi(r.PathValue("id")) +	if err != nil { +		http.Error(w, +			"Invalid ID: "+err.Error(), http.StatusBadRequest) +		return +	} + +	tasks, err := getTasks(r.Context(), `WHERE id = ?`, id) +	if err != nil { +		http.Error(w, +			"Error retrieving task: "+err.Error(), +			http.StatusInternalServerError) +		return +	} +	if len(tasks) == 0 { +		http.NotFound(w, r) +		return +	} + +	task := struct { +		Task +		IsRunning bool +	}{Task: tasks[0]} +	func() { +		gRunningMutex.Lock() +		defer gRunningMutex.Unlock() + +		rt, ok := gRunning[task.ID] +		task.IsRunning = ok +		if !ok { +			return +		} + +		rt.RunLog.mu.Lock() +		defer rt.RunLog.mu.Unlock() +		rt.TaskLog.mu.Lock() +		defer rt.TaskLog.mu.Unlock() + +		task.RunLog = rt.RunLog.b +		task.TaskLog = rt.TaskLog.b +	}() + +	if err := templateTask.Execute(w, &task); err != nil { +		http.Error(w, err.Error(), http.StatusInternalServerError) +	} +} + +// --- Push hook --------------------------------------------------------------- + +type GiteaPushEvent struct { +	HeadCommit struct { +		ID string `json:"id"` +	} `json:"head_commit"` +	Repository struct { +		Name     string `json:"name"` +		FullName string `json:"full_name"` +		Owner    struct { +			Username string `json:"username"` +		} `json:"owner"` +	} `json:"repository"` +} + +func createTasks(ctx context.Context, +	owner, repo, hash string, runners []string) error { +	tx, err := gDB.BeginTx(ctx, nil) +	if err != nil { +		return err +	} +	defer tx.Rollback() + +	stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner) +		VALUES (?, ?, ?, ?)`) +	if err != nil { +		return err +	} + +	for _, runner := range runners { +		if _, err := stmt.Exec(owner, repo, hash, runner); err != nil { +			return err +		} +	} +	if err := tx.Commit(); err != nil { +		return err +	} + +	notifierAwaken() +	executorAwaken() +	return nil +} + +func giteaSign(b []byte) string { +	payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) +	payloadHmac.Write(b) +	return hex.EncodeToString(payloadHmac.Sum(nil)) +} + +func handlePush(w http.ResponseWriter, r *http.Request) { +	// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. +	if r.Header.Get("X-Gitea-Event") != "push" { +		http.Error(w, +			"Expected a Gitea push event", http.StatusBadRequest) +		return +	} + +	body, err := ioutil.ReadAll(r.Body) +	if err != nil { +		http.Error(w, +			"Error reading request body", http.StatusInternalServerError) +		return +	} +	if r.Header.Get("X-Gitea-Signature") != giteaSign(body) { +		http.Error(w, +			"Signature mismatch", http.StatusBadRequest) +		return +	} + +	var event GiteaPushEvent +	if err := json.Unmarshal(body, &event); err != nil { +		http.Error(w, +			"Invalid request body: "+err.Error(), http.StatusBadRequest) +		return +	} + +	log.Printf("received push: %s %s\n", +		event.Repository.FullName, event.HeadCommit.ID) + +	project, ok := gConfig.Projects[event.Repository.FullName] +	if !ok { +		// This is okay, don't set any commit statuses. +		fmt.Fprintf(w, "The project is not configured.") +		return +	} + +	runners := []string{} +	for name := range project.Runners { +		runners = append(runners, name) +	} +	sort.Strings(runners) + +	if err := createTasks(r.Context(), +		event.Repository.Owner.Username, event.Repository.Name, +		event.HeadCommit.ID, runners); err != nil { +		http.Error(w, err.Error(), http.StatusInternalServerError) +		return +	} +} + +// --- RPC --------------------------------------------------------------------- + +const rpcHeaderSignature = "X-ACID-Signature" + +func rpcRestart(w io.Writer, ids []int64) { +	gRunningMutex.Lock() +	defer gRunningMutex.Unlock() + +	for _, id := range ids { +		if _, ok := gRunning[id]; ok { +			fmt.Fprintf(w, "%d: not restarting running tasks\n", id) +			continue +		} + +		// The executor bumps to "running" after inserting into gRunning, +		// so we should not need to exclude that state here. +		result, err := gDB.ExecContext(context.Background(), +			`UPDATE task SET state = ? WHERE id = ?`, taskStateNew, id) +		if err != nil { +			fmt.Fprintf(w, "%d: %s\n", id, err) +		} else if n, _ := result.RowsAffected(); n != 1 { +			fmt.Fprintf(w, "%d: no such task\n", id) +		} +	} +	executorAwaken() +} + +func handleRPC(w http.ResponseWriter, r *http.Request) { +	body, err := ioutil.ReadAll(r.Body) +	if err != nil { +		http.Error(w, +			"Error reading request body", http.StatusInternalServerError) +		return +	} +	if r.Header.Get(rpcHeaderSignature) != giteaSign(body) { +		http.Error(w, +			"Signature mismatch", http.StatusBadRequest) +		return +	} + +	var args []string +	if err := json.Unmarshal(body, &args); err != nil { +		http.Error(w, +			"Invalid request body: "+err.Error(), http.StatusBadRequest) +		return +	} +	if len(args) == 0 { +		http.Error(w, "Missing command", http.StatusBadRequest) +		return +	} + +	command, args := args[0], args[1:] +	switch command { +	case "restart": +		ids := []int64{} +		for _, arg := range args { +			id, err := strconv.ParseInt(arg, 10, 64) +			if err != nil { +				http.Error(w, err.Error(), http.StatusBadRequest) +				return +			} +			ids = append(ids, id) +		} +		rpcRestart(w, ids) +	default: +		http.Error(w, "Unknown command: "+command, http.StatusBadRequest) +	} +} + +// --- Notifier ---------------------------------------------------------------- + +func notifierRunCommand(ctx context.Context, task Task) { +	script := bytes.NewBuffer(nil) +	if err := gNotifyScript.Execute(script, &task); err != nil { +		log.Printf("error: notify: %s", err) +		return +	} + +	cmd := exec.CommandContext(ctx, "sh") +	cmd.Stdin = script +	cmd.Stdout = os.Stdout +	cmd.Stderr = os.Stderr +	if err := cmd.Run(); err != nil { +		log.Printf("error: notify: %s", err) +	} +} + +func notifierNotify(ctx context.Context, task Task) error { +	// Loosely assuming that this only runs on state changes. +	if task.State != taskStateNew && task.State != taskStateRunning { +		go notifierRunCommand(ctx, task) +	} + +	payload := struct { +		Context     string `json:"context"` +		Description string `json:"description"` +		State       string `json:"state"` +		TargetURL   string `json:"target_url"` +	}{} + +	runner, ok := gConfig.Runners[task.Runner] +	if !ok { +		log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) +		return nil +	} +	payload.Context = runner.Name +	payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID) + +	switch task.State { +	case taskStateNew: +		payload.State, payload.Description = "pending", "Pending" +	case taskStateRunning: +		payload.State, payload.Description = "pending", "Running" +	case taskStateError: +		payload.State, payload.Description = "error", "Error" +	case taskStateFailed: +		payload.State, payload.Description = "failure", "Failure" +	case taskStateSuccess: +		payload.State, payload.Description = "success", "Success" +	default: +		log.Printf("task %d is in unknown state %d\n", task.ID, task.State) +		return nil +	} + +	// We should only fill this in case we have some specific information. +	if task.Detail != "" { +		payload.Description = task.Detail +	} + +	body, err := json.Marshal(payload) +	if err != nil { +		return err +	} + +	log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n", +		task.ID, task.FullName(), task.Hash, +		payload.Context, payload.State, payload.Description) + +	uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", +		gConfig.Gitea, task.Owner, task.Repo, task.Hash) +	req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, +		bytes.NewReader(body)) +	if err != nil { +		return err +	} + +	req.Header.Set("Accept", "application/json") +	req.Header.Set("Authorization", "token "+gConfig.Token) +	req.Header.Set("Content-Type", "application/json") + +	resp, err := http.DefaultClient.Do(req) +	if err != nil { +		return err +	} +	defer resp.Body.Close() + +	body, err = ioutil.ReadAll(resp.Body) +	if err != nil { +		return err +	} + +	_, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1 +		WHERE id = ? AND state = ? AND detail = ? AND notified = 0`, +		task.ID, task.State, task.Detail) +	return err +} + +func notifierRun(ctx context.Context) error { +	tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`) +	if err != nil { +		return err +	} + +	for _, task := range tasks { +		if err := notifierNotify(ctx, task); err != nil { +			return fmt.Errorf( +				"task %d for %s: %w", task.ID, task.FullName(), err) +		} +	} +	return nil +} + +func notifier(ctx context.Context) { +	for { +		select { +		case <-gNotifierSignal: +		case <-ctx.Done(): +			return +		} + +		if err := notifierRun(ctx); err != nil { +			log.Printf("error: notifier: %s\n", err) +		} +	} +} + +func notifierAwaken() { +	select { +	case gNotifierSignal <- struct{}{}: +	default: +	} +} + +// --- Executor ---------------------------------------------------------------- + +type terminalWriter struct { +	b   []byte +	cur int +	mu  sync.Mutex +} + +func (tw *terminalWriter) Write(p []byte) (written int, err error) { +	tw.mu.Lock() +	defer tw.mu.Unlock() + +	// Extremely rudimentary emulation of a dumb terminal. +	for _, b := range p { +		// Enough is enough, writing too much is highly suspicious. +		if len(tw.b) > 64<<20 { +			return written, errors.New("too much terminal output") +		} + +		switch b { +		case '\b': +			if tw.cur > 0 && tw.b[tw.cur-1] != '\n' { +				tw.cur-- +			} +		case '\r': +			for tw.cur > 0 && tw.b[tw.cur-1] != '\n' { +				tw.cur-- +			} +		case '\n': +			tw.b = append(tw.b, b) +			tw.cur = len(tw.b) +		default: +			tw.b = append(tw.b[:tw.cur], b) +			tw.cur = len(tw.b) +		} + +		if err != nil { +			break +		} +		written += 1 +	} +	return +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type RunningTask struct { +	DB            Task +	Runner        ConfigRunner +	ProjectRunner ConfigProjectRunner + +	RunLog  terminalWriter +	TaskLog terminalWriter +} + +func executorUpdate(rt *RunningTask) error { +	rt.RunLog.mu.Lock() +	defer rt.RunLog.mu.Unlock() +	rt.DB.RunLog = bytes.Clone(rt.RunLog.b) +	if rt.DB.RunLog == nil { +		rt.DB.RunLog = []byte{} +	} + +	rt.TaskLog.mu.Lock() +	defer rt.TaskLog.mu.Unlock() +	rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b) +	if rt.DB.TaskLog == nil { +		rt.DB.TaskLog = []byte{} +	} + +	_, err := gDB.ExecContext(context.Background(), `UPDATE task +		SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ? +		WHERE id = ?`, +		rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog, +		rt.DB.ID) +	if err == nil { +		notifierAwaken() +	} +	return err +} + +func executorConnect( +	ctx context.Context, config *ssh.ClientConfig, address string) ( +	*ssh.Client, error) { +	deadline := time.Now().Add(3 * time.Minute) +	ctxDeadlined, cancel := context.WithDeadline(ctx, deadline) +	defer cancel() + +	var d net.Dialer +	for { +		// net.DNSError eats the cause, as in it cannot be unwrapped +		// and tested for a particular subtype. +		conn, err := d.DialContext(ctxDeadlined, "tcp", address) +		if e := ctxDeadlined.Err(); e != nil { +			// This may provide a little bit more information. +			if err != nil { +				return nil, err +			} +			return nil, e +		} +		if err != nil { +			time.Sleep(1 * time.Second) +			continue +		} + +		// We ignore the parent context, but at least we try. +		conn.SetDeadline(deadline) +		sc, chans, reqs, err := ssh.NewClientConn(conn, address, config) +		conn.SetDeadline(time.Time{}) + +		// cloud-init-enabled machines, such as OpenBSD, +		// may have a race condition between sshd starting for the first time, +		// and having a configured user. +		// +		// Authentication may therefore regularly fail, +		// and we need to ignore all errors whatsoever, +		// not just spurious partial successes resulting in RST or FIN. +		var neterr net.Error +		if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil { +			time.Sleep(1 * time.Second) +			continue +		} + +		return ssh.NewClient(sc, chans, reqs), nil +	} +} + +func executorRunTask(ctx context.Context, task Task) error { +	rt := &RunningTask{DB: task} + +	var ok bool +	rt.Runner, ok = gConfig.Runners[rt.DB.Runner] +	if !ok { +		return fmt.Errorf("unknown runner: %s", rt.DB.Runner) +	} +	project, ok := gConfig.Projects[rt.DB.FullName()] +	if !ok { +		return fmt.Errorf("project configuration not found") +	} +	rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] +	if !ok { +		return fmt.Errorf( +			"project not configured for runner %s", rt.DB.Runner) +	} + +	wd, err := os.Getwd() +	if err != nil { +		return err +	} + +	// The runner setup script may change the working directory, +	// so do everything in one go. However, this approach also makes it +	// difficult to distinguish project-independent runner failures. +	// (For that, we can start multiple ssh.Sessions.) +	// +	// We could pass variables through SSH environment variables, +	// which would require enabling PermitUserEnvironment in sshd_config, +	// or through prepending script lines, but templates are a bit simpler. +	// +	// We let the runner itself clone the repository: +	//  - it is a more flexible in that it can test AUR packages more normally, +	//  - we might have to clone submodules as well. +	// Otherwise, we could download a source archive from Gitea, +	// and use SFTP to upload it to the runner. +	tmplScript, err := ttemplate.New("script").Parse(rt.Runner.Setup + "\n" + +		rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) +	if err != nil { +		return fmt.Errorf("script: %w", err) +	} + +	privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) +	if err != nil { +		return fmt.Errorf( +			"cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) +	} +	signer, err := ssh.ParsePrivateKey(privateKey) +	if err != nil { +		return fmt.Errorf( +			"cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) +	} + +	defer func() { +		gRunningMutex.Lock() +		defer gRunningMutex.Unlock() + +		delete(gRunning, rt.DB.ID) +	}() +	func() { +		gRunningMutex.Lock() +		defer gRunningMutex.Unlock() + +		rt.DB.State, rt.DB.Detail = taskStateRunning, "" +		rt.DB.Notified = 0 +		rt.DB.RunLog = []byte{} +		rt.DB.TaskLog = []byte{} +		gRunning[rt.DB.ID] = rt +	}() +	if err := executorUpdate(rt); err != nil { +		return fmt.Errorf("SQL: %w", err) +	} + +	// Errors happening while trying to write an error are unfortunate, +	// but not important enough to abort entirely. +	setError := func(detail string) { +		gRunningMutex.Lock() +		defer gRunningMutex.Unlock() + +		rt.DB.State, rt.DB.Detail = taskStateError, detail +		if err := executorUpdate(rt); err != nil { +			log.Printf("error: task %d for %s: SQL: %s", +				rt.DB.ID, rt.DB.FullName(), err) +		} +	} + +	script := bytes.NewBuffer(nil) +	if err := tmplScript.Execute(script, &rt.DB); err != nil { +		setError("Script template failed") +		return err +	} + +	cmd := exec.CommandContext(ctx, rt.Runner.Run) +	cmd.Env = append( +		os.Environ(), +		"ACID_ROOT="+wd, +		"ACID_RUNNER="+rt.DB.Runner, +	) + +	log.Printf("task %d for %s: starting %s\n", +		rt.DB.ID, rt.DB.FullName(), rt.Runner.Name) + +	cmd.Stdout = &rt.RunLog +	cmd.Stderr = &rt.RunLog +	if err := cmd.Start(); err != nil { +		setError("Runner failed to start") +		return err +	} + +	ctxRunner, cancelRunner := context.WithCancelCause(ctx) +	defer cancelRunner(context.Canceled) +	go func() { +		if err := cmd.Wait(); err != nil { +			cancelRunner(err) +		} else { +			cancelRunner(errors.New("runner exited successfully but early")) +		} +	}() +	defer func() { +		_ = cmd.Process.Signal(os.Interrupt) +		select { +		case <-ctxRunner.Done(): +			// This doesn't leave the runner almost any time on our shutdown, +			// but whatever--they're supposed to be ephemeral. +			// Moreover, we don't even override cmd.CancelFunc. +		case <-time.After(5 * time.Second): +		} +		_ = cmd.Process.Kill() +	}() + +	client, err := executorConnect(ctxRunner, &ssh.ClientConfig{ +		User:            rt.Runner.SSH.User, +		Auth:            []ssh.AuthMethod{ssh.PublicKeys(signer)}, +		HostKeyCallback: ssh.InsecureIgnoreHostKey(), +	}, rt.Runner.SSH.Address) +	if err != nil { +		fmt.Fprintf(&rt.TaskLog, "%s\n", err) +		setError("SSH failure") +		return err +	} +	defer client.Close() + +	session, err := client.NewSession() +	if err != nil { +		fmt.Fprintf(&rt.TaskLog, "%s\n", err) +		setError("SSH failure") +		return err +	} +	defer session.Close() + +	modes := ssh.TerminalModes{ssh.ECHO: 0} +	if err := session.RequestPty("dumb", 24, 80, modes); err != nil { +		fmt.Fprintf(&rt.TaskLog, "%s\n", err) +		setError("SSH failure") +		return err +	} + +	log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) + +	session.Stdout = &rt.TaskLog +	session.Stderr = &rt.TaskLog + +	// Although passing the script directly takes away the option to specify +	// a particular shell (barring here-documents!), it is simple and reliable. +	// +	// Passing the script over Stdin to sh tended to end up with commands +	// eating the script during processing, and resulted in a hang, +	// because closing the Stdin does not result in remote processes +	// getting a stream of EOF. +	// +	// Piping the script into `cat | sh` while appending a ^D to the end of it +	// appeared to work, but it seems likely that commands might still steal +	// script bytes from the cat program if they choose to read from the tty +	// and the script is longer than the buffer. +	chSession := make(chan error, 1) +	go func() { +		chSession <- session.Run(script.String()) +		close(chSession) +	}() + +	select { +	case <-ctxRunner.Done(): +		// Either shutdown, or early runner termination. +		// The runner is not supposed to finish before the session. +		err = context.Cause(ctxRunner) +	case err = <-chSession: +		// Killing a runner may perfectly well trigger this first, +		// in particular when it's on the same machine. +	} + +	gRunningMutex.Lock() +	defer gRunningMutex.Unlock() + +	var ee *ssh.ExitError +	if err == nil { +		rt.DB.State, rt.DB.Detail = taskStateSuccess, "" +	} else if errors.As(err, &ee) { +		rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" +		fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) +	} else { +		rt.DB.State, rt.DB.Detail = taskStateError, "" +		fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) +	} +	return executorUpdate(rt) +} + +func executorRun(ctx context.Context) error { +	tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`, +		taskStateNew, taskStateRunning) +	if err != nil { +		return err +	} + +	for _, task := range tasks { +		if err := executorRunTask(ctx, task); err != nil { +			return fmt.Errorf("task %d for %s: %w", +				task.ID, task.FullName(), err) +		} +	} +	return nil +} + +func executor(ctx context.Context) { +	for { +		select { +		case <-gExecutorSignal: +		case <-ctx.Done(): +			return +		} + +		if err := executorRun(ctx); err != nil { +			log.Printf("error: executor: %s\n", err) +		} +	} +} + +func executorAwaken() { +	select { +	case gExecutorSignal <- struct{}{}: +	default: +	} +} + +// --- Main -------------------------------------------------------------------- + +type taskState int64 + +const ( +	taskStateNew     taskState = iota // → · pending (queued) +	taskStateRunning                  // → · pending (running) +	taskStateError                    // → ! error (internal issue) +	taskStateFailed                   // → × failure (runner issue) +	taskStateSuccess                  // → ✓ success (runner finished) +) + +func (ts taskState) String() string { +	switch ts { +	case taskStateNew: +		return "New" +	case taskStateRunning: +		return "Running" +	case taskStateError: +		return "Error" +	case taskStateFailed: +		return "Failed" +	case taskStateSuccess: +		return "Success" +	default: +		return fmt.Sprintf("%d", ts) +	} +} + +// Task mirrors SQL task table records, adding a few convenience methods. +type Task struct { +	ID int64 + +	Owner  string +	Repo   string +	Hash   string +	Runner string + +	State    taskState +	Detail   string +	Notified int64 +	RunLog   []byte +	TaskLog  []byte +} + +func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } + +func (t *Task) RunnerName() string { +	if runner, ok := gConfig.Runners[t.Runner]; !ok { +		return t.Runner +	} else { +		return runner.Name +	} +} + +func (t *Task) URL() string { +	return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID) +} + +func (t *Task) RepoURL() string { +	return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo) +} + +func (t *Task) CommitURL() string { +	return fmt.Sprintf("%s/%s/%s/commit/%s", +		gConfig.Gitea, t.Owner, t.Repo, t.Hash) +} + +func (t *Task) CloneURL() string { +	return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo) +} + +const schemaSQL = ` +CREATE TABLE IF NOT EXISTS task( +	id       INTEGER NOT NULL,  -- unique ID + +	owner    TEXT NOT NULL,     -- Gitea username +	repo     TEXT NOT NULL,     -- Gitea repository name +	hash     TEXT NOT NULL,     -- commit hash +	runner   TEXT NOT NULL,     -- the runner to use + +	state    INTEGER NOT NULL DEFAULT 0,    -- task state +	detail   TEXT    NOT NULL DEFAULT '',   -- task state detail +	notified INTEGER NOT NULL DEFAULT 0,    -- Gitea knows the state +	runlog   BLOB    NOT NULL DEFAULT x'',  -- combined task runner output +	tasklog  BLOB    NOT NULL DEFAULT x'',  -- combined task SSH output + +	PRIMARY KEY (id) +) STRICT; +` + +func openDB(path string) error { +	var err error +	gDB, err = sql.Open("sqlite3", +		"file:"+path+"?_foreign_keys=1&_busy_timeout=1000") +	if err != nil { +		return err +	} + +	_, err = gDB.Exec(schemaSQL) +	return err +} + +// callRPC forwards command line commands to a running server. +func callRPC(args []string) error { +	body, err := json.Marshal(args) +	if err != nil { +		return err +	} + +	req, err := http.NewRequest(http.MethodPost, +		fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body)) +	if err != nil { +		return err +	} + +	req.Header.Set(rpcHeaderSignature, giteaSign(body)) +	req.Header.Set("Content-Type", "application/json") + +	resp, err := http.DefaultClient.Do(req) +	if err != nil { +		return err +	} +	defer resp.Body.Close() + +	if _, err = io.Copy(os.Stdout, resp.Body); err != nil { +		return err +	} +	if resp.StatusCode < 200 || resp.StatusCode >= 300 { +		os.Exit(1) +	} +	return nil +} + +func main() { +	version := flag.Bool("version", false, "show version and exit") + +	flag.Usage = func() { +		f := flag.CommandLine.Output() +		fmt.Fprintf(f, +			"Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0]) +		flag.PrintDefaults() +	} +	flag.Parse() +	if flag.NArg() < 1 { +		flag.Usage() +		os.Exit(2) +	} + +	if *version { +		fmt.Printf("%s %s\n", projectName, projectVersion) +		return +	} + +	if err := parseConfig(flag.Arg(0)); err != nil { +		log.Fatalln(err) +	} +	if flag.NArg() > 1 { +		if err := callRPC(flag.Args()[1:]); err != nil { +			log.Fatalln(err) +		} +		return +	} + +	if err := openDB(gConfig.DB); err != nil { +		log.Fatalln(err) +	} +	defer gDB.Close() + +	var wg sync.WaitGroup +	ctx, stop := signal.NotifyContext( +		context.Background(), syscall.SIGINT, syscall.SIGTERM) + +	server := &http.Server{Addr: gConfig.Listen} +	http.HandleFunc("/{$}", handleTasks) +	http.HandleFunc("/task/{id}", handleTask) +	http.HandleFunc("/push", handlePush) +	http.HandleFunc("/rpc", handleRPC) + +	ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr) +	if err != nil { +		log.Fatalln(err) +	} + +	notifierAwaken() +	wg.Add(1) +	go func() { +		defer wg.Done() +		notifier(ctx) +	}() + +	executorAwaken() +	wg.Add(1) +	go func() { +		defer wg.Done() +		executor(ctx) +	}() + +	wg.Add(1) +	go func() { +		defer wg.Done() +		defer stop() +		if err := server.Serve(ln); err != http.ErrServerClosed { +			log.Println(err) +		} +	}() + +	// Wait until we either receive a signal, or get a server failure. +	<-ctx.Done() +	log.Println("shutting down") + +	wg.Add(1) +	go func() { +		defer wg.Done() +		if err := server.Shutdown(context.Background()); err != nil { +			log.Println(err) +		} +	}() + +	// Repeated signal deliveries during shutdown assume default behaviour. +	// This might or might not be desirable. +	stop() +	wg.Wait() +} diff --git a/acid.yaml.example b/acid.yaml.example new file mode 100644 index 0000000..a80cc4c --- /dev/null +++ b/acid.yaml.example @@ -0,0 +1,64 @@ +--- +# Path to an SQLite database file, which will be automatically created. +db: acid.db +# Address to listen on. +listen: :http +# Externally visible base URL that Gitea, its users, and RPC can connect to. +# The root/push endpoint accepts Gitea push notifications. +root: http://acid + +# Arbitrary secret that Gitea and RPC will sign their requests with. +secret: 0123456789abcde +# Base URL of the Gitea instance. +gitea: http://gitea +# Gitea access token used for writing commit statuses back to repositories. +token: 0123456789abcdefghijklmnopqrstuvwxyzABCD + +# Arbitrary sh script to notify about the results of finished tasks. +notify: | +  xN irc://acid@/acid?skipjoin&usenotice <<END +  {{.FullName}} {{.Hash}}: {{.RunnerName}} {{.State}} {{.URL}} +  END + +# List of all available runners for projects, keyed by an ID (.Runner). +runners: +  arch: +    # Descriptive name of the runner (.RunnerName). +    name: Arch Linux + +    # Executable to make the runner present. +    # It may spawn a container, run a virtual machine, or even just sleep. +    # If it exits prematurely, the task fails. +    run: runners/arch.sh + +    # SSH configuration for connecting to the runner. +    ssh: +      # Username to connect as. +      user: ci +      # Adress of the SSH server. +      address: arch:22 +      # Path to an SSH private key file, which may be used for public key auth. +      identity: data/id_rsa + +    # Arbitrary shell script to prepare the stage for project scripts. +    setup: | +      set -ex +      sudo pacman -Syu --noconfirm git +      git clone --recursive '{{.CloneURL}}' '{{.Repo}}' +      cd '{{.Repo}}' +      git -c advice.detachedHead=false checkout '{{.Hash}}' + +# Configuration for individual Gitea repositories. +projects: +  # Full repository name (.FullName, .Owner/.Repo). +  owner/repo: +    runners: +      arch: +        # Project setup script, meant to install dependencies. +        setup: | +          sudo pacman -S --noconfirm findutils coreutils + +        # Project build script. +        build: | +          echo Computing line count... +          find . -not -path '*/.*' -type f -print0 | xargs -0 cat | wc -l @@ -0,0 +1,11 @@ +module janouch.name/acid + +go 1.22.0 + +require ( +	github.com/mattn/go-sqlite3 v1.14.22 +	golang.org/x/crypto v0.21.0 +	gopkg.in/yaml.v3 v3.0.1 +) + +require golang.org/x/sys v0.18.0 // indirect @@ -0,0 +1,12 @@ +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | 
