summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go73
1 files changed, 36 insertions, 37 deletions
diff --git a/main.go b/main.go
index c22b5b5..59ca682 100644
--- a/main.go
+++ b/main.go
@@ -19,6 +19,7 @@ import (
"net/http"
"os"
"os/exec"
+ "os/signal"
"path/filepath"
"regexp"
"runtime"
@@ -802,13 +803,9 @@ type syncFileInfo struct {
}
type syncContext struct {
- // TODO: Do we need the context at all?
- ctx context.Context
- cancel context.CancelFunc
- tx *sql.Tx
-
- semaphore chan struct{} // asynchronous tasks
- info chan syncFileInfo
+ ctx context.Context
+ tx *sql.Tx
+ info chan syncFileInfo
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -890,22 +887,12 @@ type syncPair struct {
// syncEnqueue runs file scanning, which can be CPU and I/O expensive,
// in parallel. The goroutine only touches the filesystem, read-only.
func syncEnqueue(c *syncContext, info syncFileInfo) error {
- select {
- case <-c.ctx.Done():
- return c.ctx.Err()
- case c.semaphore <- struct{}{}:
- }
-
- // Give priority to context cancellation.
- select {
- case <-c.ctx.Done():
- <-c.semaphore
- return c.ctx.Err()
- default:
+ if err := taskSemaphore.acquire(c.ctx); err != nil {
+ return err
}
go func(info syncFileInfo) {
- defer func() { <-c.semaphore }()
+ defer taskSemaphore.release()
// TODO: Process the file and enqueue a result.
}(info)
@@ -1091,26 +1078,28 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error {
}
func syncArgument(ctx context.Context, tx *sql.Tx, path string) error {
- // TODO: Let the context be interruptible with SIGINT (on a higher level).
- var c syncContext
- c.ctx, c.cancel = context.WithCancel(ctx)
- c.tx = tx
- defer c.cancel()
+ c := syncContext{ctx: ctx, tx: tx}
- // Info tasks take a position in the semaphore channel.
+ // Info tasks take a position in the task semaphore channel.
// then fill the info channel.
- c.semaphore = make(chan struct{}, runtime.NumCPU())
-
- // Immediately after syncDequeue(), this channel is empty,
+ //
+ // Immediately after syncDequeue(), the info channel is empty,
// but the semaphore might be full.
//
- // By having at least one position in the channel,
+ // By having at least one position in the info channel,
// we allow at least one info task to run to semaphore release,
// so that syncEnqueue() doesn't deadlock.
//
// By making it the same size as the semaphore,
// the end of this function doesn't need to dequeue while waiting.
- c.info = make(chan syncFileInfo, cap(c.semaphore))
+ // It also prevents goroutine leaks despite leaving them running--
+ // once they finish their job, they're gone,
+ // and eventually the info channel would get garbage collected.
+ //
+ // The additional slot is there to handle the one result
+ // that may be placed while syncEnqueue() waits for the semaphore,
+ // i.e., it is for the result of the task that syncEnqueue() spawns.
+ c.info = make(chan syncFileInfo, cap(taskSemaphore)+1)
// At least for now, turn all roots into absolute paths.
fsPath, err := filepath.Abs(filepath.Clean(path))
@@ -1132,16 +1121,19 @@ func syncArgument(ctx context.Context, tx *sql.Tx, path string) error {
}
// Wait for all tasks to finish, and process the results of their work.
- for i := 0; i < cap(c.semaphore); i++ {
- select {
- case <-c.ctx.Done():
- return c.ctx.Err()
- case c.semaphore <- struct{}{}:
+ for i := 0; i < cap(taskSemaphore); i++ {
+ if err := taskSemaphore.acquire(c.ctx); err != nil {
+ return err
}
}
if err := syncDequeue(&c); err != nil {
return err
}
+
+ // This is not our semaphore, so prepare it for the next user.
+ for i := 0; i < cap(taskSemaphore); i++ {
+ taskSemaphore.release()
+ }
return nil
}
@@ -1175,14 +1167,21 @@ func cmdSync(args []string) error {
BEGIN IMMEDIATE TRANSACTION`); err != nil {
return err
}
+
+ // XXX: By not using the context for the transaction,
+ // interrupts can get ignored around the Commit.
+ ctxSignal, stop := signal.NotifyContext(ctx, os.Interrupt)
+ defer stop()
+
// TODO: Check if one is not a prefix of another,
// and either filter out these duplicates (easy to do, may warn),
// or plainly refuse to work.
for _, path := range args[1:] {
- if err := syncArgument(ctx, tx, path); err != nil {
+ if err := syncArgument(ctxSignal, tx, path); err != nil {
return err
}
}
+
// TODO: Garbage collect empty directories, recursively.
// Ideally, stop at the affected DB roots (assuming we go bottom-up).
//