From 075f391730ac4084ac24fbde1cc92a768b7c5d6b Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch Date: Thu, 21 Dec 2023 02:17:59 +0100 Subject: WIP: FS to DB sync --- main.go | 180 +++++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 127 insertions(+), 53 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index efa2da4..c22b5b5 100644 --- a/main.go +++ b/main.go @@ -789,6 +789,7 @@ func cmdImport(args []string) error { // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - type syncFileInfo struct { + dbID int64 // DB node ID, or zero if there was none dbParent int64 // where the file was to be stored dbName string // the name under which it was to be stored fsPath string // symlink target @@ -801,13 +802,12 @@ type syncFileInfo struct { } type syncContext struct { + // TODO: Do we need the context at all? ctx context.Context cancel context.CancelFunc tx *sql.Tx - // TODO: See if this isn't entirely duplicitous, also with taskSemaphore. - wg sync.WaitGroup // asynchronous tasks - semaphore chan struct{} // asynchronous tasks + semaphore chan struct{} // asynchronous tasks info chan syncFileInfo } @@ -888,70 +888,123 @@ type syncPair struct { } // syncEnqueue runs file scanning, which can be CPU and I/O expensive, -// in parallel. The goroutine only touches the filesystem. +// in parallel. The goroutine only touches the filesystem, read-only. func syncEnqueue(c *syncContext, info syncFileInfo) error { - // TODO: Probably wait for result dequeues here as well. - // TODO: Also give priority to context cancellation. select { case <-c.ctx.Done(): return c.ctx.Err() case c.semaphore <- struct{}{}: } - c.wg.Add(1) + // Give priority to context cancellation. + select { + case <-c.ctx.Done(): + <-c.semaphore + return c.ctx.Err() + default: + } + go func(info syncFileInfo) { defer func() { <-c.semaphore }() - defer c.wg.Done() // TODO: Process the file and enqueue a result. }(info) return nil } +// syncDequeue flushes the result queue of finished asynchronous tasks. func syncDequeue(c *syncContext) error { - // TODO: Process the result, which can be one of these cases: - // - * → err: Abort. - // - F → 0: Collect references [[B]], remove it from DB [[A1]]. - // - F → F: Collect references [[B]]. Add an image entry. Update the node. - // Only do the first two if the hash changes! - // When replacing a file, which needn't become orphaned, + for { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + case info := <-c.info: + if err := syncPostProcess(c, info); err != nil { + return err + } + default: + return nil + } + } +} + +// TODO: [[B]]: A function to collect references. +// +// - This is almost always combined with [[A1]] or [[A2]], +// so perhaps combine these. +// +// - When collecting node subtrees, we need to delete bottom-up +// because of foreign key constraints, +// so maybe in reverse order of recursive CTE results. +// +// - Orphans will keep their GD/thumbs file, as evidence. +// +// - Sadly, this can't be done with a DB trigger. (What and why?) +// +// - One of the inputs needs to be the FS path, for the orphan table. +// +// syncDispose creates orphan records for the entire subtree given by nodeID +// as appropriate, then deletes all nodes within the subtree. The subtree root +// node is not deleted if "keepNode" is true. +func syncDispose(c *syncContext, nodeID int64, keepNode bool) error { + return nil +} + +func syncPostProcess(c *syncContext, info syncFileInfo) error { + // TODO: Actually dequeue, or something. + + // TODO: + // - When replacing a file, which needn't become orphaned, // we could offer copying all tags, but this needs another table // to track it (if it's equivalent enough, the dhash will stay the same, // so user can resolve this through the duplicates feature). - // - D → 0: Collect references [[B]], DB rm -rf [[A1]]. - // - D → F: Collect references [[B]], DB rm -rf [[A2]]. Update the node. - // - 0 → F: Add an image entry. Create a node. // - When creating or updating a node, // try inserting an image record first as well. // And test that we have a symlink, and that it works. // (Just do os.Stat() on it, which fails on both dead and missing links.) + switch { + case info.err != nil: + // * → error + return info.err + + case info.sha1 == "": + // 0 → 0 + if info.dbID == 0 { + return nil + } + + // D → 0, F → 0 + return syncDispose(c, info.dbID, false /*keepNode*/) + + case info.dbID == 0: + // 0 → F + // TODO: Add an image entry. Create a node. + + default: + // D → F, F → F (this statement is a no-op with the latter) + if err := syncDispose(c, info.dbID, true /*keepNode*/); err != nil { + return err + } + + // TODO: Ideally, do nothing if the hash doesn't change. + // But that will be a rare situation. + + // TODO: Add an image entry. + // TODO: Update the node. + } return nil } -// TODO: [[B]]: A function to collect references. -// - This is almost always combined with [[A1]] or [[A2]], -// so perhaps combine these. -// - When collecting node subtrees, we need to delete bottom-up -// because of foreign key constraints, -// so maybe in reverse order of recursive CTE results. -// - Orphans will keep their GD/thumbs file, as evidence. -// - Sadly, this can't be done with a DB trigger. (What and why?) -// -// - One of the inputs needs to be the FS path, for the orphan table. - func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string, pair syncPair) error { - // TODO: Perhaps process the result queue in here, - // and also check context cancellation (both non-blocking). - - db, fs, fsInfo := pair.db, pair.fs, syncFileInfo{} + db, fs, fsInfo := pair.db, pair.fs, syncFileInfo{dbParent: dbParent} + if db != nil { + fsInfo.dbID = db.dbID + } if fs != nil { - fsInfo = syncFileInfo{ - dbParent: dbParent, - dbName: fs.fsName, - fsPath: filepath.Join(fsPath, fs.fsName), - fsMtime: fs.fsMtime, - } + fsInfo.dbName = fs.fsName + fsInfo.fsPath = filepath.Join(fsPath, fs.fsName) + fsInfo.fsMtime = fs.fsMtime } switch { @@ -967,13 +1020,9 @@ func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string, // 0 → F (or 0 → 0) return syncEnqueue(c, fsInfo) - case fs == nil && db.dbIsDir(): - // D → 0 - // TODO: Collect references [[B]], DB rm -rf [[A1]]. - case fs == nil: - // F → 0 - // TODO: Collect references [[B]], remove it from DB [[A1]]. + // D → 0, F → 0 + return syncDispose(c, db.dbID, false /*keepNode*/) case db.dbIsDir() && fs.fsIsDir: // D → D @@ -985,7 +1034,10 @@ func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string, case fs.fsIsDir: // F → D - // TODO: Collect references [[B]], change it in DB to a directory. + if err := syncDispose(c, db.dbID, true /*keepNode*/); err != nil { + return err + } + // TODO: Change it in DB to a directory. return syncDirectory(c, db.dbID, filepath.Join(fsPath, fs.fsName)) case db.dbMtime != fs.fsMtime: @@ -1028,6 +1080,9 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { } for _, pair := range pairs { + if err := syncDequeue(c); err != nil { + return err + } if err := syncDirectoryPair(c, dbParent, fsPath, pair); err != nil { return err } @@ -1036,16 +1091,26 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { } func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { + // TODO: Let the context be interruptible with SIGINT (on a higher level). var c syncContext c.ctx, c.cancel = context.WithCancel(ctx) c.tx = tx defer c.cancel() - // Info tasks take a position in the semaphore channel, - // then turn that into a position in the output channel. - // TODO: Beware of release and push order. Check for needless waits. + // Info tasks take a position in the semaphore channel. + // then fill the info channel. c.semaphore = make(chan struct{}, runtime.NumCPU()) - c.info = make(chan syncFileInfo, runtime.NumCPU()) + + // Immediately after syncDequeue(), this channel is empty, + // but the semaphore might be full. + // + // By having at least one position in the channel, + // we allow at least one info task to run to semaphore release, + // so that syncEnqueue() doesn't deadlock. + // + // By making it the same size as the semaphore, + // the end of this function doesn't need to dequeue while waiting. + c.info = make(chan syncFileInfo, cap(c.semaphore)) // At least for now, turn all roots into absolute paths. fsPath, err := filepath.Abs(filepath.Clean(path)) @@ -1060,14 +1125,23 @@ func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { crumbs := decodeWebPath(filepath.ToSlash(fsPath)) dbParent, err := idForPath(c.tx, crumbs, true) if err != nil { - return nil + return err } if err := syncDirectory(&c, dbParent, fsPath); err != nil { - return nil + return err } - // TODO: Finish processing the result/info queue. - c.wg.Wait() + // Wait for all tasks to finish, and process the results of their work. + for i := 0; i < cap(c.semaphore); i++ { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + case c.semaphore <- struct{}{}: + } + } + if err := syncDequeue(&c); err != nil { + return err + } return nil } -- cgit v1.2.3-70-g09d2