diff options
author | Přemysl Eric Janouch <p@janouch.name> | 2023-12-21 03:14:49 +0100 |
---|---|---|
committer | Přemysl Eric Janouch <p@janouch.name> | 2023-12-21 03:25:12 +0100 |
commit | 3910e351123dec0f1caa3625cbc6cb63c84ee369 (patch) | |
tree | 0d03bb7d1bd9c327dfc472f8e668b32c841c1cd1 | |
parent | 075f391730ac4084ac24fbde1cc92a768b7c5d6b (diff) | |
download | gallery-3910e351123dec0f1caa3625cbc6cb63c84ee369.tar.gz gallery-3910e351123dec0f1caa3625cbc6cb63c84ee369.tar.xz gallery-3910e351123dec0f1caa3625cbc6cb63c84ee369.zip |
WIP: Sync FS to DB
-rw-r--r-- | main.go | 73 |
1 files changed, 36 insertions, 37 deletions
@@ -19,6 +19,7 @@ import ( "net/http" "os" "os/exec" + "os/signal" "path/filepath" "regexp" "runtime" @@ -802,13 +803,9 @@ type syncFileInfo struct { } type syncContext struct { - // TODO: Do we need the context at all? - ctx context.Context - cancel context.CancelFunc - tx *sql.Tx - - semaphore chan struct{} // asynchronous tasks - info chan syncFileInfo + ctx context.Context + tx *sql.Tx + info chan syncFileInfo } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -890,22 +887,12 @@ type syncPair struct { // syncEnqueue runs file scanning, which can be CPU and I/O expensive, // in parallel. The goroutine only touches the filesystem, read-only. func syncEnqueue(c *syncContext, info syncFileInfo) error { - select { - case <-c.ctx.Done(): - return c.ctx.Err() - case c.semaphore <- struct{}{}: - } - - // Give priority to context cancellation. - select { - case <-c.ctx.Done(): - <-c.semaphore - return c.ctx.Err() - default: + if err := taskSemaphore.acquire(c.ctx); err != nil { + return err } go func(info syncFileInfo) { - defer func() { <-c.semaphore }() + defer taskSemaphore.release() // TODO: Process the file and enqueue a result. }(info) @@ -1091,26 +1078,28 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { } func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { - // TODO: Let the context be interruptible with SIGINT (on a higher level). - var c syncContext - c.ctx, c.cancel = context.WithCancel(ctx) - c.tx = tx - defer c.cancel() + c := syncContext{ctx: ctx, tx: tx} - // Info tasks take a position in the semaphore channel. + // Info tasks take a position in the task semaphore channel. // then fill the info channel. - c.semaphore = make(chan struct{}, runtime.NumCPU()) - - // Immediately after syncDequeue(), this channel is empty, + // + // Immediately after syncDequeue(), the info channel is empty, // but the semaphore might be full. // - // By having at least one position in the channel, + // By having at least one position in the info channel, // we allow at least one info task to run to semaphore release, // so that syncEnqueue() doesn't deadlock. // // By making it the same size as the semaphore, // the end of this function doesn't need to dequeue while waiting. - c.info = make(chan syncFileInfo, cap(c.semaphore)) + // It also prevents goroutine leaks despite leaving them running-- + // once they finish their job, they're gone, + // and eventually the info channel would get garbage collected. + // + // The additional slot is there to handle the one result + // that may be placed while syncEnqueue() waits for the semaphore, + // i.e., it is for the result of the task that syncEnqueue() spawns. + c.info = make(chan syncFileInfo, cap(taskSemaphore)+1) // At least for now, turn all roots into absolute paths. fsPath, err := filepath.Abs(filepath.Clean(path)) @@ -1132,16 +1121,19 @@ func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { } // Wait for all tasks to finish, and process the results of their work. - for i := 0; i < cap(c.semaphore); i++ { - select { - case <-c.ctx.Done(): - return c.ctx.Err() - case c.semaphore <- struct{}{}: + for i := 0; i < cap(taskSemaphore); i++ { + if err := taskSemaphore.acquire(c.ctx); err != nil { + return err } } if err := syncDequeue(&c); err != nil { return err } + + // This is not our semaphore, so prepare it for the next user. + for i := 0; i < cap(taskSemaphore); i++ { + taskSemaphore.release() + } return nil } @@ -1175,14 +1167,21 @@ func cmdSync(args []string) error { BEGIN IMMEDIATE TRANSACTION`); err != nil { return err } + + // XXX: By not using the context for the transaction, + // interrupts can get ignored around the Commit. + ctxSignal, stop := signal.NotifyContext(ctx, os.Interrupt) + defer stop() + // TODO: Check if one is not a prefix of another, // and either filter out these duplicates (easy to do, may warn), // or plainly refuse to work. for _, path := range args[1:] { - if err := syncArgument(ctx, tx, path); err != nil { + if err := syncArgument(ctxSignal, tx, path); err != nil { return err } } + // TODO: Garbage collect empty directories, recursively. // Ideally, stop at the affected DB roots (assuming we go bottom-up). // |