aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main.go180
1 files changed, 127 insertions, 53 deletions
diff --git a/main.go b/main.go
index efa2da4..c22b5b5 100644
--- a/main.go
+++ b/main.go
@@ -789,6 +789,7 @@ func cmdImport(args []string) error {
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
type syncFileInfo struct {
+ dbID int64 // DB node ID, or zero if there was none
dbParent int64 // where the file was to be stored
dbName string // the name under which it was to be stored
fsPath string // symlink target
@@ -801,13 +802,12 @@ type syncFileInfo struct {
}
type syncContext struct {
+ // TODO: Do we need the context at all?
ctx context.Context
cancel context.CancelFunc
tx *sql.Tx
- // TODO: See if this isn't entirely duplicitous, also with taskSemaphore.
- wg sync.WaitGroup // asynchronous tasks
- semaphore chan struct{} // asynchronous tasks
+ semaphore chan struct{} // asynchronous tasks
info chan syncFileInfo
}
@@ -888,70 +888,123 @@ type syncPair struct {
}
// syncEnqueue runs file scanning, which can be CPU and I/O expensive,
-// in parallel. The goroutine only touches the filesystem.
+// in parallel. The goroutine only touches the filesystem, read-only.
func syncEnqueue(c *syncContext, info syncFileInfo) error {
- // TODO: Probably wait for result dequeues here as well.
- // TODO: Also give priority to context cancellation.
select {
case <-c.ctx.Done():
return c.ctx.Err()
case c.semaphore <- struct{}{}:
}
- c.wg.Add(1)
+ // Give priority to context cancellation.
+ select {
+ case <-c.ctx.Done():
+ <-c.semaphore
+ return c.ctx.Err()
+ default:
+ }
+
go func(info syncFileInfo) {
defer func() { <-c.semaphore }()
- defer c.wg.Done()
// TODO: Process the file and enqueue a result.
}(info)
return nil
}
+// syncDequeue flushes the result queue of finished asynchronous tasks.
func syncDequeue(c *syncContext) error {
- // TODO: Process the result, which can be one of these cases:
- // - * → err: Abort.
- // - F → 0: Collect references [[B]], remove it from DB [[A1]].
- // - F → F: Collect references [[B]]. Add an image entry. Update the node.
- // Only do the first two if the hash changes!
- // When replacing a file, which needn't become orphaned,
+ for {
+ select {
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case info := <-c.info:
+ if err := syncPostProcess(c, info); err != nil {
+ return err
+ }
+ default:
+ return nil
+ }
+ }
+}
+
+// TODO: [[B]]: A function to collect references.
+//
+// - This is almost always combined with [[A1]] or [[A2]],
+// so perhaps combine these.
+//
+// - When collecting node subtrees, we need to delete bottom-up
+// because of foreign key constraints,
+// so maybe in reverse order of recursive CTE results.
+//
+// - Orphans will keep their GD/thumbs file, as evidence.
+//
+// - Sadly, this can't be done with a DB trigger. (What and why?)
+//
+// - One of the inputs needs to be the FS path, for the orphan table.
+//
+// syncDispose creates orphan records for the entire subtree given by nodeID
+// as appropriate, then deletes all nodes within the subtree. The subtree root
+// node is not deleted if "keepNode" is true.
+func syncDispose(c *syncContext, nodeID int64, keepNode bool) error {
+ return nil
+}
+
+func syncPostProcess(c *syncContext, info syncFileInfo) error {
+ // TODO: Actually dequeue, or something.
+
+ // TODO:
+ // - When replacing a file, which needn't become orphaned,
// we could offer copying all tags, but this needs another table
// to track it (if it's equivalent enough, the dhash will stay the same,
// so user can resolve this through the duplicates feature).
- // - D → 0: Collect references [[B]], DB rm -rf [[A1]].
- // - D → F: Collect references [[B]], DB rm -rf [[A2]]. Update the node.
- // - 0 → F: Add an image entry. Create a node.
// - When creating or updating a node,
// try inserting an image record first as well.
// And test that we have a symlink, and that it works.
// (Just do os.Stat() on it, which fails on both dead and missing links.)
+ switch {
+ case info.err != nil:
+ // * → error
+ return info.err
+
+ case info.sha1 == "":
+ // 0 → 0
+ if info.dbID == 0 {
+ return nil
+ }
+
+ // D → 0, F → 0
+ return syncDispose(c, info.dbID, false /*keepNode*/)
+
+ case info.dbID == 0:
+ // 0 → F
+ // TODO: Add an image entry. Create a node.
+
+ default:
+ // D → F, F → F (this statement is a no-op with the latter)
+ if err := syncDispose(c, info.dbID, true /*keepNode*/); err != nil {
+ return err
+ }
+
+ // TODO: Ideally, do nothing if the hash doesn't change.
+ // But that will be a rare situation.
+
+ // TODO: Add an image entry.
+ // TODO: Update the node.
+ }
return nil
}
-// TODO: [[B]]: A function to collect references.
-// - This is almost always combined with [[A1]] or [[A2]],
-// so perhaps combine these.
-// - When collecting node subtrees, we need to delete bottom-up
-// because of foreign key constraints,
-// so maybe in reverse order of recursive CTE results.
-// - Orphans will keep their GD/thumbs file, as evidence.
-// - Sadly, this can't be done with a DB trigger. (What and why?)
-//
-// - One of the inputs needs to be the FS path, for the orphan table.
-
func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string,
pair syncPair) error {
- // TODO: Perhaps process the result queue in here,
- // and also check context cancellation (both non-blocking).
-
- db, fs, fsInfo := pair.db, pair.fs, syncFileInfo{}
+ db, fs, fsInfo := pair.db, pair.fs, syncFileInfo{dbParent: dbParent}
+ if db != nil {
+ fsInfo.dbID = db.dbID
+ }
if fs != nil {
- fsInfo = syncFileInfo{
- dbParent: dbParent,
- dbName: fs.fsName,
- fsPath: filepath.Join(fsPath, fs.fsName),
- fsMtime: fs.fsMtime,
- }
+ fsInfo.dbName = fs.fsName
+ fsInfo.fsPath = filepath.Join(fsPath, fs.fsName)
+ fsInfo.fsMtime = fs.fsMtime
}
switch {
@@ -967,13 +1020,9 @@ func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string,
// 0 → F (or 0 → 0)
return syncEnqueue(c, fsInfo)
- case fs == nil && db.dbIsDir():
- // D → 0
- // TODO: Collect references [[B]], DB rm -rf [[A1]].
-
case fs == nil:
- // F → 0
- // TODO: Collect references [[B]], remove it from DB [[A1]].
+ // D → 0, F → 0
+ return syncDispose(c, db.dbID, false /*keepNode*/)
case db.dbIsDir() && fs.fsIsDir:
// D → D
@@ -985,7 +1034,10 @@ func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string,
case fs.fsIsDir:
// F → D
- // TODO: Collect references [[B]], change it in DB to a directory.
+ if err := syncDispose(c, db.dbID, true /*keepNode*/); err != nil {
+ return err
+ }
+ // TODO: Change it in DB to a directory.
return syncDirectory(c, db.dbID, filepath.Join(fsPath, fs.fsName))
case db.dbMtime != fs.fsMtime:
@@ -1028,6 +1080,9 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error {
}
for _, pair := range pairs {
+ if err := syncDequeue(c); err != nil {
+ return err
+ }
if err := syncDirectoryPair(c, dbParent, fsPath, pair); err != nil {
return err
}
@@ -1036,16 +1091,26 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error {
}
func syncArgument(ctx context.Context, tx *sql.Tx, path string) error {
+ // TODO: Let the context be interruptible with SIGINT (on a higher level).
var c syncContext
c.ctx, c.cancel = context.WithCancel(ctx)
c.tx = tx
defer c.cancel()
- // Info tasks take a position in the semaphore channel,
- // then turn that into a position in the output channel.
- // TODO: Beware of release and push order. Check for needless waits.
+ // Info tasks take a position in the semaphore channel.
+ // then fill the info channel.
c.semaphore = make(chan struct{}, runtime.NumCPU())
- c.info = make(chan syncFileInfo, runtime.NumCPU())
+
+ // Immediately after syncDequeue(), this channel is empty,
+ // but the semaphore might be full.
+ //
+ // By having at least one position in the channel,
+ // we allow at least one info task to run to semaphore release,
+ // so that syncEnqueue() doesn't deadlock.
+ //
+ // By making it the same size as the semaphore,
+ // the end of this function doesn't need to dequeue while waiting.
+ c.info = make(chan syncFileInfo, cap(c.semaphore))
// At least for now, turn all roots into absolute paths.
fsPath, err := filepath.Abs(filepath.Clean(path))
@@ -1060,14 +1125,23 @@ func syncArgument(ctx context.Context, tx *sql.Tx, path string) error {
crumbs := decodeWebPath(filepath.ToSlash(fsPath))
dbParent, err := idForPath(c.tx, crumbs, true)
if err != nil {
- return nil
+ return err
}
if err := syncDirectory(&c, dbParent, fsPath); err != nil {
- return nil
+ return err
}
- // TODO: Finish processing the result/info queue.
- c.wg.Wait()
+ // Wait for all tasks to finish, and process the results of their work.
+ for i := 0; i < cap(c.semaphore); i++ {
+ select {
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case c.semaphore <- struct{}{}:
+ }
+ }
+ if err := syncDequeue(&c); err != nil {
+ return err
+ }
return nil
}