aboutsummaryrefslogtreecommitdiff
path: root/acid.go
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2024-04-14 22:24:33 +0200
committerPřemysl Eric Janouch <p@janouch.name>2024-04-15 00:20:39 +0200
commitfe81d713e1c59f2175974f0bc3eda5ff7a5f0749 (patch)
treeb0628788e23e0e0ffceaca58bcbe0d8779935e96 /acid.go
parenteda0f22f072b7985c6919770858bcdd566290f86 (diff)
downloadacid-fe81d713e1c59f2175974f0bc3eda5ff7a5f0749.tar.gz
acid-fe81d713e1c59f2175974f0bc3eda5ff7a5f0749.tar.xz
acid-fe81d713e1c59f2175974f0bc3eda5ff7a5f0749.zip
Add an enqueue command
Diffstat (limited to 'acid.go')
-rw-r--r--acid.go184
1 files changed, 150 insertions, 34 deletions
diff --git a/acid.go b/acid.go
index 64d69dc..b029c2a 100644
--- a/acid.go
+++ b/acid.go
@@ -17,6 +17,7 @@ import (
"log"
"net"
"net/http"
+ "net/url"
"os"
"os/exec"
"os/signal"
@@ -96,7 +97,24 @@ func parseConfig(path string) error {
return err
}
-// --- Task views --------------------------------------------------------------
+// --- Utilities ---------------------------------------------------------------
+
+func giteaSign(b []byte) string {
+ payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
+ payloadHmac.Write(b)
+ return hex.EncodeToString(payloadHmac.Sum(nil))
+}
+
+func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
+ *http.Request, error) {
+ req, err := http.NewRequestWithContext(
+ ctx, method, gConfig.Gitea+path, body)
+ if req != nil {
+ req.Header.Set("Authorization", "token "+gConfig.Token)
+ req.Header.Set("Accept", "application/json")
+ }
+ return req, err
+}
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, `
@@ -120,6 +138,8 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
return tasks, rows.Err()
}
+// --- Task views --------------------------------------------------------------
+
var templateTasks = template.Must(template.New("tasks").Parse(`
<!DOCTYPE html>
<html>
@@ -301,12 +321,6 @@ func createTasks(ctx context.Context,
return nil
}
-func giteaSign(b []byte) string {
- payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
- payloadHmac.Write(b)
- return hex.EncodeToString(payloadHmac.Sum(nil))
-}
-
func handlePush(w http.ResponseWriter, r *http.Request) {
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
if r.Header.Get("X-Gitea-Event") != "push" {
@@ -364,6 +378,120 @@ const rpcHeaderSignature = "X-ACID-Signature"
var errWrongUsage = errors.New("wrong usage")
+func rpcRestartOne(ctx context.Context, id int64) error {
+ gRunningMutex.Lock()
+ defer gRunningMutex.Unlock()
+
+ if _, ok := gRunning[id]; ok {
+ return fmt.Errorf("%d: not restarting running tasks", id)
+ }
+
+ // The executor bumps to "running" after inserting into gRunning,
+ // so we should not need to exclude that state here.
+ result, err := gDB.ExecContext(ctx, `UPDATE task
+ SET state = ?, detail = '', notified = 0 WHERE id = ?`,
+ taskStateNew, id)
+ if err != nil {
+ return fmt.Errorf("%d: %w", id, err)
+ } else if n, _ := result.RowsAffected(); n != 1 {
+ return fmt.Errorf("%d: no such task", id)
+ }
+
+ notifierAwaken()
+ executorAwaken()
+ return nil
+}
+
+func rpcEnqueueOne(ctx context.Context,
+ owner, repo, hash, runner string) error {
+ tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ?
+ AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner)
+ if err != nil {
+ return err
+ }
+
+ if len(tasks) != 0 {
+ return rpcRestartOne(ctx, tasks[0].ID)
+ } else {
+ return createTasks(ctx, owner, repo, hash, []string{runner})
+ }
+}
+
+func giteaResolveRef(ctx context.Context, owner, repo, ref string) (
+ string, error) {
+ req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf(
+ "/api/v1/repos/%s/%s/git/commits/%s",
+ url.PathEscape(owner),
+ url.PathEscape(repo),
+ url.PathEscape(ref)), nil)
+ if err != nil {
+ return "", err
+ }
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return "", err
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return "", err
+ }
+
+ commit := struct {
+ SHA string `json:"sha"`
+ }{}
+ if resp.StatusCode != http.StatusOK {
+ return "", errors.New(resp.Status)
+ } else if err := json.Unmarshal(body, &commit); err != nil {
+ return "", err
+ }
+ return commit.SHA, nil
+}
+
+func rpcEnqueue(ctx context.Context,
+ w io.Writer, fs *flag.FlagSet, args []string) error {
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if fs.NArg() < 3 {
+ return errWrongUsage
+ }
+
+ owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2)
+ hash, err := giteaResolveRef(ctx, owner, repo, ref)
+ if err != nil {
+ return fmt.Errorf("%s: %w", ref, err)
+ }
+
+ project, ok := gConfig.Projects[owner+"/"+repo]
+ if !ok {
+ return fmt.Errorf("project configuration not found")
+ }
+
+ runners := fs.Args()[3:]
+ if len(runners) == 0 {
+ for runner := range project.Runners {
+ runners = append(runners, runner)
+ }
+ }
+ sort.Strings(runners)
+
+ for _, runner := range runners {
+ if _, ok := project.Runners[runner]; !ok {
+ return fmt.Errorf("project not configured for runner %s", runner)
+ }
+ }
+ for _, runner := range runners {
+ err := rpcEnqueueOne(ctx, owner, repo, hash, runner)
+ if err != nil {
+ fmt.Fprintf(w, "runner %s: %s\n", runner, err)
+ }
+ }
+ return nil
+}
+
func rpcRestart(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil {
@@ -378,29 +506,17 @@ func rpcRestart(ctx context.Context,
}
ids = append(ids, id)
}
-
- gRunningMutex.Lock()
- defer gRunningMutex.Unlock()
-
for _, id := range ids {
- if _, ok := gRunning[id]; ok {
- fmt.Fprintf(w, "%d: not restarting running tasks\n", id)
- continue
+ if err := rpcRestartOne(ctx, id); err != nil {
+ fmt.Fprintln(w, err)
}
+ }
- // The executor bumps to "running" after inserting into gRunning,
- // so we should not need to exclude that state here.
- result, err := gDB.ExecContext(ctx, `UPDATE task
- SET state = ?, detail = '', notified = 0 WHERE id = ?`,
- taskStateNew, id)
- if err != nil {
- fmt.Fprintf(w, "%d: %s\n", id, err)
- } else if n, _ := result.RowsAffected(); n != 1 {
- fmt.Fprintf(w, "%d: no such task\n", id)
- }
+ // Mainly to allow scripts to touch the database directly.
+ if len(ids) == 0 {
+ notifierAwaken()
+ executorAwaken()
}
- notifierAwaken()
- executorAwaken()
return nil
}
@@ -412,7 +528,9 @@ var rpcCommands = map[string]struct {
usage string
function string
}{
- "restart": {rpcRestart, "ID...",
+ "enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...",
+ "Create or restart tasks for the given reference."},
+ "restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."},
}
@@ -565,18 +683,16 @@ func notifierNotify(ctx context.Context, task Task) error {
task.ID, task.FullName(), task.Hash,
payload.Context, payload.State, payload.Description)
- uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s",
- gConfig.Gitea, task.Owner, task.Repo, task.Hash)
- req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri,
- bytes.NewReader(body))
+ req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf(
+ "/api/v1/repos/%s/%s/statuses/%s",
+ url.PathEscape(task.Owner),
+ url.PathEscape(task.Repo),
+ url.PathEscape(task.Hash)), bytes.NewReader(body))
if err != nil {
return err
}
- req.Header.Set("Accept", "application/json")
- req.Header.Set("Authorization", "token "+gConfig.Token)
req.Header.Set("Content-Type", "application/json")
-
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err