aboutsummaryrefslogtreecommitdiff
path: root/acid.go
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2024-12-22 09:00:02 +0100
committerPřemysl Eric Janouch <p@janouch.name>2024-12-23 14:35:46 +0100
commita09b11256b1ad540d6606912dab82a992f74b130 (patch)
tree9a91e97fe321f49435600b3466894d8e7efbc90d /acid.go
parentbd130537736d74d8d77080d9876767981df759d9 (diff)
downloadacid-a09b11256b1ad540d6606912dab82a992f74b130.tar.gz
acid-a09b11256b1ad540d6606912dab82a992f74b130.tar.xz
acid-a09b11256b1ad540d6606912dab82a992f74b130.zip
Clean up, add a deployment stage
Errors should be handled a bit more nicely now. The SFTP part could also be done from deploy scripts like: scp -i {runner.ssh.identity} \ -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \ {runner.ssh.user}@{runner.ssh.address%:*} -p {runner.ssh.address#*:} but that is deemed way too annoying, so we do it from Go.
Diffstat (limited to 'acid.go')
-rw-r--r--acid.go627
1 files changed, 439 insertions, 188 deletions
diff --git a/acid.go b/acid.go
index f938d02..6f62893 100644
--- a/acid.go
+++ b/acid.go
@@ -21,6 +21,7 @@ import (
"os"
"os/exec"
"os/signal"
+ "path/filepath"
"sort"
"strconv"
"strings"
@@ -30,6 +31,7 @@ import (
"time"
_ "github.com/mattn/go-sqlite3"
+ "github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"gopkg.in/yaml.v3"
)
@@ -96,6 +98,7 @@ func (cf *ConfigProject) AutomaticRunners() (runners []string) {
type ConfigProjectRunner struct {
Setup string `yaml:"setup"` // project setup script (SSH)
Build string `yaml:"build"` // project build script (SSH)
+ Deploy string `yaml:"deploy"` // project deploy script (local)
Timeout string `yaml:"timeout"` // timeout duration
}
@@ -153,7 +156,8 @@ func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, `
SELECT id, owner, repo, hash, runner,
- state, detail, notified, runlog, tasklog FROM task `+query, args...)
+ state, detail, notified,
+ runlog, tasklog, deploylog FROM task `+query, args...)
if err != nil {
return nil, err
}
@@ -163,7 +167,8 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
for rows.Next() {
var t Task
err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
- &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog)
+ &t.State, &t.Detail, &t.Notified,
+ &t.RunLog, &t.TaskLog, &t.DeployLog)
if err != nil {
return nil, err
}
@@ -259,6 +264,10 @@ var templateTask = template.Must(template.New("tasks").Parse(`
<h2>Task log</h2>
<pre>{{printf "%s" .TaskLog}}</pre>
{{end}}
+{{if .DeployLog}}
+<h2>Deploy log</h2>
+<pre>{{printf "%s" .DeployLog}}</pre>
+{{end}}
</table>
</body>
</html>
@@ -302,9 +311,12 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
defer rt.RunLog.mu.Unlock()
rt.TaskLog.mu.Lock()
defer rt.TaskLog.mu.Unlock()
+ rt.DeployLog.mu.Lock()
+ defer rt.DeployLog.mu.Unlock()
task.RunLog = rt.RunLog.b
task.TaskLog = rt.TaskLog.b
+ task.DeployLog = rt.DeployLog.b
}()
if err := templateTask.Execute(w, &task); err != nil {
@@ -816,43 +828,297 @@ func (tw *terminalWriter) Write(p []byte) (written int, err error) {
return
}
-// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+// ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+// RunningTask stores all data pertaining to a currently running task.
type RunningTask struct {
DB Task
Runner ConfigRunner
ProjectRunner ConfigProjectRunner
- RunLog terminalWriter
- TaskLog terminalWriter
+ RunLog terminalWriter
+ TaskLog terminalWriter
+ DeployLog terminalWriter
+
+ wd string // acid working directory
+ timeout time.Duration // time limit on task execution
+ signer ssh.Signer // SSH private key
+ tmplScript *ttemplate.Template // remote build script
+ tmplDeploy *ttemplate.Template // local deployment script
}
-func executorUpdate(rt *RunningTask) error {
- rt.RunLog.mu.Lock()
- defer rt.RunLog.mu.Unlock()
- rt.DB.RunLog = bytes.Clone(rt.RunLog.b)
- if rt.DB.RunLog == nil {
- rt.DB.RunLog = []byte{}
+// newRunningTask prepares a task for running, without executing anything yet.
+func newRunningTask(task Task) (*RunningTask, error) {
+ rt := &RunningTask{DB: task}
+
+ var ok bool
+ rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner)
+ }
+ project, ok := gConfig.Projects[rt.DB.FullName()]
+ if !ok {
+ return nil, fmt.Errorf("project configuration not found")
+ }
+ rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
+ if !ok {
+ return nil, fmt.Errorf(
+ "project not configured for runner %s", rt.DB.Runner)
}
- rt.TaskLog.mu.Lock()
- defer rt.TaskLog.mu.Unlock()
- rt.DB.TaskLog = bytes.Clone(rt.TaskLog.b)
- if rt.DB.TaskLog == nil {
- rt.DB.TaskLog = []byte{}
+ var err error
+ if rt.wd, err = os.Getwd(); err != nil {
+ return nil, err
}
- _, err := gDB.ExecContext(context.Background(), `UPDATE task
- SET state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ?
- WHERE id = ?`,
- rt.DB.State, rt.DB.Detail, rt.DB.Notified, rt.DB.RunLog, rt.DB.TaskLog,
- rt.DB.ID)
- if err == nil {
- notifierAwaken()
+ // Lenient or not, some kind of a time limit is desirable.
+ rt.timeout = time.Hour
+ if rt.ProjectRunner.Timeout != "" {
+ rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
+ if err != nil {
+ return nil, fmt.Errorf("timeout: %w", err)
+ }
+ }
+
+ privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+ rt.signer, err = ssh.ParsePrivateKey(privateKey)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
+ }
+
+ // The runner setup script may change the working directory,
+ // so do everything in one go. However, this approach also makes it
+ // difficult to distinguish project-independent runner failures.
+ // (For that, we can start multiple ssh.Sessions.)
+ //
+ // We could pass variables through SSH environment variables,
+ // which would require enabling PermitUserEnvironment in sshd_config,
+ // or through prepending script lines, but templates are a bit simpler.
+ //
+ // We let the runner itself clone the repository:
+ // - it is a more flexible in that it can test AUR packages more normally,
+ // - we might have to clone submodules as well.
+ // Otherwise, we could download a source archive from Gitea,
+ // and use SFTP to upload it to the runner.
+ rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs).
+ Parse(rt.Runner.Setup + "\n" +
+ rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ if err != nil {
+ return nil, fmt.Errorf("script/build: %w", err)
+ }
+
+ rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs).
+ Parse(rt.ProjectRunner.Deploy)
+ if err != nil {
+ return nil, fmt.Errorf("script/deploy: %w", err)
+ }
+
+ return rt, nil
+}
+
+// localEnv creates a process environment for locally run executables.
+func (rt *RunningTask) localEnv() []string {
+ return append(os.Environ(),
+ "ACID_ROOT="+rt.wd,
+ "ACID_RUNNER="+rt.DB.Runner,
+ )
+}
+
+// update stores the running task's state in the database.
+func (rt *RunningTask) update() error {
+ for _, i := range []struct {
+ tw *terminalWriter
+ log *[]byte
+ }{
+ {&rt.RunLog, &rt.DB.RunLog},
+ {&rt.TaskLog, &rt.DB.TaskLog},
+ {&rt.DeployLog, &rt.DB.DeployLog},
+ } {
+ i.tw.mu.Lock()
+ defer i.tw.mu.Unlock()
+ if *i.log = bytes.Clone(i.tw.b); *i.log == nil {
+ *i.log = []byte{}
+ }
+ }
+ return rt.DB.update()
+}
+
+// ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorDownloadNode(sc *sftp.Client, remotePath, localPath string,
+ info os.FileInfo) error {
+ if info.IsDir() {
+ // Hoping that caller invokes us on parents first.
+ return os.MkdirAll(localPath, info.Mode().Perm())
+ }
+
+ src, err := sc.Open(remotePath)
+ if err != nil {
+ return fmt.Errorf("failed to open remote file %s: %w", remotePath, err)
+ }
+ defer src.Close()
+
+ dst, err := os.OpenFile(
+ localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
+ if err != nil {
+ return fmt.Errorf("failed to create local file: %w", err)
+ }
+ defer dst.Close()
+
+ if _, err = io.Copy(dst, src); err != nil {
+ return fmt.Errorf("failed to copy file from remote %s to local %s: %w",
+ remotePath, localPath, err)
+ }
+ return nil
+}
+
+func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error {
+ sc, err := sftp.NewClient(client)
+ if err != nil {
+ return err
+ }
+ defer sc.Close()
+
+ walker := sc.Walk(remoteRoot)
+ for walker.Step() {
+ if walker.Err() != nil {
+ return walker.Err()
+ }
+ relativePath, err := filepath.Rel(remoteRoot, walker.Path())
+ if err != nil {
+ return err
+ }
+ if err = executorDownloadNode(sc, walker.Path(),
+ filepath.Join(localRoot, relativePath), walker.Stat()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func executorLocalShell() string {
+ if shell := os.Getenv("SHELL"); shell != "" {
+ return shell
+ }
+ // The os/user package doesn't store the parsed out shell field.
+ return "/bin/sh"
+}
+
+func executorDeploy(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Deploy template failed", err}
+ }
+
+ // Thus the deployment directory must exist iff the script is not empty.
+ if script.Len() == 0 {
+ return nil
+ }
+
+ // We expect the files to be moved elsewhere on the filesystem,
+ // and they may get very large, so avoid /tmp.
+ //
+ // See also: https://systemd.io/TEMPORARY_DIRECTORIES/
+ tmp := os.Getenv("TMPDIR")
+ if tmp == "" {
+ tmp = "/var/tmp"
+ }
+ dir := filepath.Join(tmp, "acid-deploy")
+ if err := os.RemoveAll(dir); err != nil {
+ return err
+ }
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return err
+ }
+
+ // The passed remoteRoot is relative to sc.Getwd.
+ if err := executorDownload(client, "acid-deploy", dir); err != nil {
+ return err
+ }
+
+ cmd := exec.CommandContext(ctx, executorLocalShell(), "-c", script.String())
+ cmd.Env = rt.localEnv()
+ cmd.Dir = dir
+ cmd.Stdout = &rt.DeployLog
+ cmd.Stderr = &rt.DeployLog
+ return cmd.Run()
+}
+
+// ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func executorBuild(
+ ctx context.Context, client *ssh.Client, rt *RunningTask) error {
+ // This is here to fail early, though logically it is misplaced.
+ script := bytes.NewBuffer(nil)
+ if err := rt.tmplScript.Execute(script, &rt.DB); err != nil {
+ return &executorError{"Script template failed", err}
+ }
+
+ session, err := client.NewSession()
+ if err != nil {
+ return &executorError{"SSH failure", err}
+ }
+ defer session.Close()
+
+ modes := ssh.TerminalModes{ssh.ECHO: 0}
+ if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
+ return &executorError{"SSH failure", err}
+ }
+
+ log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
+
+ session.Stdout = &rt.TaskLog
+ session.Stderr = &rt.TaskLog
+
+ // Although passing the script directly takes away the option to specify
+ // a particular shell (barring here-documents!), it is simple and reliable.
+ //
+ // Passing the script over Stdin to sh tended to end up with commands
+ // eating the script during processing, and resulted in a hang,
+ // because closing the Stdin does not result in remote processes
+ // getting a stream of EOF.
+ //
+ // Piping the script into `cat | sh` while appending a ^D to the end of it
+ // appeared to work, but it seems likely that commands might still steal
+ // script bytes from the cat program if they choose to read from the tty
+ // and the script is longer than the buffer.
+ chSession := make(chan error, 1)
+ go func() {
+ chSession <- session.Run(script.String())
+ close(chSession)
+ }()
+
+ select {
+ case <-ctx.Done():
+ // Either shutdown, or early runner termination.
+ // The runner is not supposed to finish before the session.
+ err = context.Cause(ctx)
+ case err = <-chSession:
+ // Killing a runner may perfectly well trigger this first,
+ // in particular when it's on the same machine.
}
return err
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+// executorError describes a taskStateError.
+type executorError struct {
+ Detail string
+ Err error
+}
+
+func (e *executorError) Unwrap() error { return e.Err }
+func (e *executorError) Error() string {
+ return fmt.Sprintf("%s: %s", e.Detail, e.Err)
+}
+
func executorConnect(
ctx context.Context, config *ssh.ClientConfig, address string) (
*ssh.Client, error) {
@@ -894,122 +1160,57 @@ func executorConnect(
time.Sleep(1 * time.Second)
continue
}
-
return ssh.NewClient(sc, chans, reqs), nil
}
}
func executorRunTask(ctx context.Context, task Task) error {
- rt := &RunningTask{DB: task}
-
- var ok bool
- rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf("unknown runner: %s", rt.DB.Runner)
- }
- project, ok := gConfig.Projects[rt.DB.FullName()]
- if !ok {
- return fmt.Errorf("project configuration not found")
- }
- rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
- if !ok {
- return fmt.Errorf(
- "project not configured for runner %s", rt.DB.Runner)
- }
-
- wd, err := os.Getwd()
- if err != nil {
- return err
- }
-
- // The runner setup script may change the working directory,
- // so do everything in one go. However, this approach also makes it
- // difficult to distinguish project-independent runner failures.
- // (For that, we can start multiple ssh.Sessions.)
- //
- // We could pass variables through SSH environment variables,
- // which would require enabling PermitUserEnvironment in sshd_config,
- // or through prepending script lines, but templates are a bit simpler.
- //
- // We let the runner itself clone the repository:
- // - it is a more flexible in that it can test AUR packages more normally,
- // - we might have to clone submodules as well.
- // Otherwise, we could download a source archive from Gitea,
- // and use SFTP to upload it to the runner.
- tmplScript, err := ttemplate.New("script").Funcs(shellFuncs).
- Parse(rt.Runner.Setup + "\n" +
- rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
+ rt, err := newRunningTask(task)
if err != nil {
- return fmt.Errorf("script: %w", err)
+ task.State, task.Detail = taskStateError, "Misconfigured"
+ task.Notified = 0
+ task.RunLog = []byte(err.Error())
+ task.TaskLog = []byte{}
+ task.DeployLog = []byte{}
+ return task.update()
}
- // Lenient or not, some kind of a time limit is desirable.
- timeout := time.Hour
- if rt.ProjectRunner.Timeout != "" {
- timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
- if err != nil {
- return fmt.Errorf("timeout: %w", err)
- }
- }
- ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
+ ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout)
defer cancelTimeout()
- privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
- if err != nil {
- return fmt.Errorf(
- "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
- signer, err := ssh.ParsePrivateKey(privateKey)
- if err != nil {
- return fmt.Errorf(
- "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
- }
-
- defer func() {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
- delete(gRunning, rt.DB.ID)
- }()
- func() {
+ // RunningTasks can be concurrently accessed by HTTP handlers.
+ locked := func(f func()) {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
-
+ f()
+ }
+ locked(func() {
rt.DB.State, rt.DB.Detail = taskStateRunning, ""
rt.DB.Notified = 0
rt.DB.RunLog = []byte{}
rt.DB.TaskLog = []byte{}
+ rt.DB.DeployLog = []byte{}
gRunning[rt.DB.ID] = rt
- }()
- if err := executorUpdate(rt); err != nil {
+ })
+ defer locked(func() {
+ delete(gRunning, rt.DB.ID)
+ })
+ if err := rt.update(); err != nil {
return fmt.Errorf("SQL: %w", err)
}
// Errors happening while trying to write an error are unfortunate,
// but not important enough to abort entirely.
setError := func(detail string) {
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
rt.DB.State, rt.DB.Detail = taskStateError, detail
- if err := executorUpdate(rt); err != nil {
+ if err := rt.update(); err != nil {
log.Printf("error: task %d for %s: SQL: %s",
rt.DB.ID, rt.DB.FullName(), err)
}
}
- script := bytes.NewBuffer(nil)
- if err := tmplScript.Execute(script, &rt.DB); err != nil {
- setError("Script template failed")
- return err
- }
-
cmd := exec.CommandContext(ctx, rt.Runner.Run)
- cmd.Env = append(
- os.Environ(),
- "ACID_ROOT="+wd,
- "ACID_RUNNER="+rt.DB.Runner,
- )
+ cmd.Env = rt.localEnv()
// Pushing the runner into a new process group that can be killed at once
// with all its children isn't bullet-proof, it messes with job control
@@ -1033,7 +1234,8 @@ func executorRunTask(ctx context.Context, task Task) error {
cmd.Stdout = &rt.RunLog
cmd.Stderr = &rt.RunLog
if err := cmd.Start(); err != nil {
- setError("Runner failed to start")
+ fmt.Fprintf(&rt.TaskLog, "%s\n", err)
+ locked(func() { setError("Runner failed to start") })
return err
}
@@ -1059,78 +1261,61 @@ func executorRunTask(ctx context.Context, task Task) error {
client, err := executorConnect(ctxRunner, &ssh.ClientConfig{
User: rt.Runner.SSH.User,
- Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
+ Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}, rt.Runner.SSH.Address)
if err != nil {
fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
+ locked(func() { setError("SSH failure") })
return err
}
defer client.Close()
- session, err := client.NewSession()
- if err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
- }
- defer session.Close()
+ var (
+ ee1 *ssh.ExitError
+ ee2 *executorError
+ )
- modes := ssh.TerminalModes{ssh.ECHO: 0}
- if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
- fmt.Fprintf(&rt.TaskLog, "%s\n", err)
- setError("SSH failure")
- return err
+ err = executorBuild(ctxRunner, client, rt)
+ if err != nil {
+ locked(func() {
+ if errors.As(err, &ee1) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ } else if errors.As(err, &ee2) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee2.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
- log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
-
- session.Stdout = &rt.TaskLog
- session.Stderr = &rt.TaskLog
-
- // Although passing the script directly takes away the option to specify
- // a particular shell (barring here-documents!), it is simple and reliable.
- //
- // Passing the script over Stdin to sh tended to end up with commands
- // eating the script during processing, and resulted in a hang,
- // because closing the Stdin does not result in remote processes
- // getting a stream of EOF.
- //
- // Piping the script into `cat | sh` while appending a ^D to the end of it
- // appeared to work, but it seems likely that commands might still steal
- // script bytes from the cat program if they choose to read from the tty
- // and the script is longer than the buffer.
- chSession := make(chan error, 1)
+ // This is so that it doesn't stay hanging within the sftp package,
+ // which uses context.Background() everywhere.
go func() {
- chSession <- session.Run(script.String())
- close(chSession)
+ <-ctxRunner.Done()
+ client.Close()
}()
- select {
- case <-ctxRunner.Done():
- // Either shutdown, or early runner termination.
- // The runner is not supposed to finish before the session.
- err = context.Cause(ctxRunner)
- case err = <-chSession:
- // Killing a runner may perfectly well trigger this first,
- // in particular when it's on the same machine.
- }
-
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
- var ee *ssh.ExitError
- if err == nil {
- rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
- } else if errors.As(err, &ee) {
- rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- } else {
- rt.DB.State, rt.DB.Detail = taskStateError, ""
- fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
- }
- return executorUpdate(rt)
+ err = executorDeploy(ctxRunner, client, rt)
+ locked(func() {
+ if err == nil {
+ rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
+ } else if errors.As(err, &ee1) {
+ rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed"
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ } else if errors.As(err, &ee2) {
+ rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee2.Err)
+ } else {
+ rt.DB.State, rt.DB.Detail = taskStateError, ""
+ fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
+ }
+ })
+ return rt.update()
}
func executorRun(ctx context.Context) error {
@@ -1208,11 +1393,12 @@ type Task struct {
Hash string
Runner string
- State taskState
- Detail string
- Notified int64
- RunLog []byte
- TaskLog []byte
+ State taskState
+ Detail string
+ Notified int64
+ RunLog []byte
+ TaskLog []byte
+ DeployLog []byte
}
func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
@@ -1242,26 +1428,58 @@ func (t *Task) CloneURL() string {
return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo)
}
-const schemaSQL = `
+func (t *Task) update() error {
+ _, err := gDB.ExecContext(context.Background(), `UPDATE task
+ SET state = ?, detail = ?, notified = ?,
+ runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`,
+ t.State, t.Detail, t.Notified,
+ t.RunLog, t.TaskLog, t.DeployLog, t.ID)
+ if err == nil {
+ notifierAwaken()
+ }
+ return err
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+const initializeSQL = `
+PRAGMA application_id = 0x61636964; -- "acid" in big endian
+
CREATE TABLE IF NOT EXISTS task(
- id INTEGER NOT NULL, -- unique ID
+ id INTEGER NOT NULL, -- unique ID
- owner TEXT NOT NULL, -- Gitea username
- repo TEXT NOT NULL, -- Gitea repository name
- hash TEXT NOT NULL, -- commit hash
- runner TEXT NOT NULL, -- the runner to use
+ owner TEXT NOT NULL, -- Gitea username
+ repo TEXT NOT NULL, -- Gitea repository name
+ hash TEXT NOT NULL, -- commit hash
+ runner TEXT NOT NULL, -- the runner to use
- state INTEGER NOT NULL DEFAULT 0, -- task state
- detail TEXT NOT NULL DEFAULT '', -- task state detail
- notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
- runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
- tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ state INTEGER NOT NULL DEFAULT 0, -- task state
+ detail TEXT NOT NULL DEFAULT '', -- task state detail
+ notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
+ runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
+ tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
+ deploylog BLOB NOT NULL DEFAULT x'', -- deployment output
PRIMARY KEY (id)
) STRICT;
`
-func openDB(path string) error {
+func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error {
+ var count int64
+ if err := tx.QueryRow(
+ `SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`,
+ table, column).Scan(&count); err != nil {
+ return err
+ } else if count == 1 {
+ return nil
+ }
+
+ _, err := tx.Exec(
+ `ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition)
+ return err
+}
+
+func dbOpen(path string) error {
var err error
gDB, err = sql.Open("sqlite3",
"file:"+path+"?_foreign_keys=1&_busy_timeout=1000")
@@ -1269,10 +1487,43 @@ func openDB(path string) error {
return err
}
- _, err = gDB.Exec(schemaSQL)
- return err
+ tx, err := gDB.BeginTx(context.Background(), nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var version int64
+ if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
+ return err
+ }
+
+ switch version {
+ case 0:
+ if _, err = tx.Exec(initializeSQL); err != nil {
+ return err
+ }
+
+ // We had not initially set a database schema version,
+ // so we're stuck checking this column even on new databases.
+ if err = dbEnsureColumn(tx,
+ `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil {
+ return err
+ }
+ break
+ case 1:
+ // The next migration goes here, remember to increment the number below.
+ }
+
+ if _, err = tx.Exec(
+ `PRAGMA user_version = ` + strconv.Itoa(1)); err != nil {
+ return err
+ }
+ return tx.Commit()
}
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
// callRPC forwards command line commands to a running server.
func callRPC(args []string) error {
body, err := json.Marshal(args)
@@ -1334,7 +1585,7 @@ func main() {
return
}
- if err := openDB(gConfig.DB); err != nil {
+ if err := dbOpen(gConfig.DB); err != nil {
log.Fatalln(err)
}
defer gDB.Close()