From 3910e351123dec0f1caa3625cbc6cb63c84ee369 Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch Date: Thu, 21 Dec 2023 03:14:49 +0100 Subject: WIP: Sync FS to DB --- main.go | 73 ++++++++++++++++++++++++++++++++--------------------------------- 1 file changed, 36 insertions(+), 37 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index c22b5b5..59ca682 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "net/http" "os" "os/exec" + "os/signal" "path/filepath" "regexp" "runtime" @@ -802,13 +803,9 @@ type syncFileInfo struct { } type syncContext struct { - // TODO: Do we need the context at all? - ctx context.Context - cancel context.CancelFunc - tx *sql.Tx - - semaphore chan struct{} // asynchronous tasks - info chan syncFileInfo + ctx context.Context + tx *sql.Tx + info chan syncFileInfo } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -890,22 +887,12 @@ type syncPair struct { // syncEnqueue runs file scanning, which can be CPU and I/O expensive, // in parallel. The goroutine only touches the filesystem, read-only. func syncEnqueue(c *syncContext, info syncFileInfo) error { - select { - case <-c.ctx.Done(): - return c.ctx.Err() - case c.semaphore <- struct{}{}: - } - - // Give priority to context cancellation. - select { - case <-c.ctx.Done(): - <-c.semaphore - return c.ctx.Err() - default: + if err := taskSemaphore.acquire(c.ctx); err != nil { + return err } go func(info syncFileInfo) { - defer func() { <-c.semaphore }() + defer taskSemaphore.release() // TODO: Process the file and enqueue a result. }(info) @@ -1091,26 +1078,28 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { } func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { - // TODO: Let the context be interruptible with SIGINT (on a higher level). - var c syncContext - c.ctx, c.cancel = context.WithCancel(ctx) - c.tx = tx - defer c.cancel() + c := syncContext{ctx: ctx, tx: tx} - // Info tasks take a position in the semaphore channel. + // Info tasks take a position in the task semaphore channel. // then fill the info channel. - c.semaphore = make(chan struct{}, runtime.NumCPU()) - - // Immediately after syncDequeue(), this channel is empty, + // + // Immediately after syncDequeue(), the info channel is empty, // but the semaphore might be full. // - // By having at least one position in the channel, + // By having at least one position in the info channel, // we allow at least one info task to run to semaphore release, // so that syncEnqueue() doesn't deadlock. // // By making it the same size as the semaphore, // the end of this function doesn't need to dequeue while waiting. - c.info = make(chan syncFileInfo, cap(c.semaphore)) + // It also prevents goroutine leaks despite leaving them running-- + // once they finish their job, they're gone, + // and eventually the info channel would get garbage collected. + // + // The additional slot is there to handle the one result + // that may be placed while syncEnqueue() waits for the semaphore, + // i.e., it is for the result of the task that syncEnqueue() spawns. + c.info = make(chan syncFileInfo, cap(taskSemaphore)+1) // At least for now, turn all roots into absolute paths. fsPath, err := filepath.Abs(filepath.Clean(path)) @@ -1132,16 +1121,19 @@ func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { } // Wait for all tasks to finish, and process the results of their work. - for i := 0; i < cap(c.semaphore); i++ { - select { - case <-c.ctx.Done(): - return c.ctx.Err() - case c.semaphore <- struct{}{}: + for i := 0; i < cap(taskSemaphore); i++ { + if err := taskSemaphore.acquire(c.ctx); err != nil { + return err } } if err := syncDequeue(&c); err != nil { return err } + + // This is not our semaphore, so prepare it for the next user. + for i := 0; i < cap(taskSemaphore); i++ { + taskSemaphore.release() + } return nil } @@ -1175,14 +1167,21 @@ func cmdSync(args []string) error { BEGIN IMMEDIATE TRANSACTION`); err != nil { return err } + + // XXX: By not using the context for the transaction, + // interrupts can get ignored around the Commit. + ctxSignal, stop := signal.NotifyContext(ctx, os.Interrupt) + defer stop() + // TODO: Check if one is not a prefix of another, // and either filter out these duplicates (easy to do, may warn), // or plainly refuse to work. for _, path := range args[1:] { - if err := syncArgument(ctx, tx, path); err != nil { + if err := syncArgument(ctxSignal, tx, path); err != nil { return err } } + // TODO: Garbage collect empty directories, recursively. // Ideally, stop at the affected DB roots (assuming we go bottom-up). // -- cgit v1.2.3-70-g09d2