From 38d61556ecc6499b4ea17adf8c57cc84f320b2cd Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch Date: Wed, 20 Dec 2023 06:48:51 +0100 Subject: WIP: FS to DB sync --- main.go | 325 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 321 insertions(+), 4 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index e86bb36..efa2da4 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "regexp" "runtime" "slices" + "sort" "strconv" "strings" "sync" @@ -785,19 +786,335 @@ func cmdImport(args []string) error { return nil } +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type syncFileInfo struct { + dbParent int64 // where the file was to be stored + dbName string // the name under which it was to be stored + fsPath string // symlink target + fsMtime int64 // last modified Unix timestamp, used a bit like an ID + + err error // any processing error + sha1 string // raw content hash, empty to skip file + width int // image width in pixels + height int // image height in pixels +} + +type syncContext struct { + ctx context.Context + cancel context.CancelFunc + tx *sql.Tx + + // TODO: See if this isn't entirely duplicitous, also with taskSemaphore. + wg sync.WaitGroup // asynchronous tasks + semaphore chan struct{} // asynchronous tasks + info chan syncFileInfo +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type syncNode struct { + dbID int64 + dbName string + dbMtime int64 + dbSHA1 string +} + +func (n *syncNode) dbIsDir() bool { return n.dbSHA1 == "" } + +type syncFile struct { + fsName string + fsMtime int64 + fsIsDir bool +} + +// syncGetNodes returns direct children of a DB node, ordered by name. +// SQLite, like Go, compares strings byte-wise by default. +func syncGetNodes(tx *sql.Tx, dbParent int64) (nodes []syncNode, err error) { + // This works even for the root, which doesn't exist as a DB node. + rows, err := tx.Query(`SELECT id, name, IFNULL(mtime, 0), IFNULL(sha1, '') + FROM node WHERE IFNULL(parent, 0) = ? ORDER BY name`, dbParent) + if err != nil { + return + } + defer rows.Close() + + for rows.Next() { + var node syncNode + if err = rows.Scan(&node.dbID, + &node.dbName, &node.dbMtime, &node.dbSHA1); err != nil { + return + } + nodes = append(nodes, node) + } + return nodes, rows.Err() +} + +// syncGetFiles returns direct children of a FS directory, ordered by name. +func syncGetFiles(fsPath string) (files []syncFile, err error) { + dir, err := os.Open(fsPath) + if err != nil { + return + } + defer dir.Close() + + entries, err := dir.ReadDir(0) + if err != nil { + return + } + + for _, entry := range entries { + info, err := entry.Info() + if err != nil { + return files, err + } + + files = append(files, syncFile{ + fsName: entry.Name(), + fsMtime: info.ModTime().Unix(), + fsIsDir: entry.IsDir(), + }) + } + sort.Slice(files, + func(a, b int) bool { return files[a].fsName < files[b].fsName }) + return +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +type syncPair struct { + db *syncNode + fs *syncFile +} + +// syncEnqueue runs file scanning, which can be CPU and I/O expensive, +// in parallel. The goroutine only touches the filesystem. +func syncEnqueue(c *syncContext, info syncFileInfo) error { + // TODO: Probably wait for result dequeues here as well. + // TODO: Also give priority to context cancellation. + select { + case <-c.ctx.Done(): + return c.ctx.Err() + case c.semaphore <- struct{}{}: + } + + c.wg.Add(1) + go func(info syncFileInfo) { + defer func() { <-c.semaphore }() + defer c.wg.Done() + + // TODO: Process the file and enqueue a result. + }(info) + return nil +} + +func syncDequeue(c *syncContext) error { + // TODO: Process the result, which can be one of these cases: + // - * → err: Abort. + // - F → 0: Collect references [[B]], remove it from DB [[A1]]. + // - F → F: Collect references [[B]]. Add an image entry. Update the node. + // Only do the first two if the hash changes! + // When replacing a file, which needn't become orphaned, + // we could offer copying all tags, but this needs another table + // to track it (if it's equivalent enough, the dhash will stay the same, + // so user can resolve this through the duplicates feature). + // - D → 0: Collect references [[B]], DB rm -rf [[A1]]. + // - D → F: Collect references [[B]], DB rm -rf [[A2]]. Update the node. + // - 0 → F: Add an image entry. Create a node. + // - When creating or updating a node, + // try inserting an image record first as well. + // And test that we have a symlink, and that it works. + // (Just do os.Stat() on it, which fails on both dead and missing links.) + return nil +} + +// TODO: [[B]]: A function to collect references. +// - This is almost always combined with [[A1]] or [[A2]], +// so perhaps combine these. +// - When collecting node subtrees, we need to delete bottom-up +// because of foreign key constraints, +// so maybe in reverse order of recursive CTE results. +// - Orphans will keep their GD/thumbs file, as evidence. +// - Sadly, this can't be done with a DB trigger. (What and why?) +// +// - One of the inputs needs to be the FS path, for the orphan table. + +func syncDirectoryPair(c *syncContext, dbParent int64, fsPath string, + pair syncPair) error { + // TODO: Perhaps process the result queue in here, + // and also check context cancellation (both non-blocking). + + db, fs, fsInfo := pair.db, pair.fs, syncFileInfo{} + if fs != nil { + fsInfo = syncFileInfo{ + dbParent: dbParent, + dbName: fs.fsName, + fsPath: filepath.Join(fsPath, fs.fsName), + fsMtime: fs.fsMtime, + } + } + + switch { + case db == nil && fs == nil: + // 0 → 0, unreachable. + + case db == nil && fs.fsIsDir: + // 0 → D + // TODO: Create a directory node. + return syncDirectory(c, 42, filepath.Join(fsPath, fs.fsName)) + + case db == nil: + // 0 → F (or 0 → 0) + return syncEnqueue(c, fsInfo) + + case fs == nil && db.dbIsDir(): + // D → 0 + // TODO: Collect references [[B]], DB rm -rf [[A1]]. + + case fs == nil: + // F → 0 + // TODO: Collect references [[B]], remove it from DB [[A1]]. + + case db.dbIsDir() && fs.fsIsDir: + // D → D + return syncDirectory(c, db.dbID, filepath.Join(fsPath, fs.fsName)) + + case db.dbIsDir(): + // D → F (or D → 0) + return syncEnqueue(c, fsInfo) + + case fs.fsIsDir: + // F → D + // TODO: Collect references [[B]], change it in DB to a directory. + return syncDirectory(c, db.dbID, filepath.Join(fsPath, fs.fsName)) + + case db.dbMtime != fs.fsMtime: + // F → F (or F → 0) + // Assuming that any content modifications change the timestamp. + return syncEnqueue(c, fsInfo) + } + return nil +} + +func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { + db, err := syncGetNodes(c.tx, dbParent) + if err != nil { + return err + } + + fs, err := syncGetFiles(fsPath) + if err != nil { + return err + } + + // Convert differences to a more convenient form for processing. + iDB, iFS, pairs := 0, 0, []syncPair{} + for iDB < len(db) && iFS < len(fs) { + if db[iDB].dbName == fs[iFS].fsName { + pairs = append(pairs, syncPair{&db[iDB], &fs[iFS]}) + } else if db[iDB].dbName < fs[iFS].fsName { + pairs = append(pairs, syncPair{&db[iDB], nil}) + iDB++ + } else { + pairs = append(pairs, syncPair{nil, &fs[iFS]}) + iFS++ + } + } + for i := range db[iDB:] { + pairs = append(pairs, syncPair{&db[iDB+i], nil}) + } + for i := range fs[iFS:] { + pairs = append(pairs, syncPair{nil, &fs[iFS+i]}) + } + + for _, pair := range pairs { + if err := syncDirectoryPair(c, dbParent, fsPath, pair); err != nil { + return err + } + } + return nil +} + +func syncArgument(ctx context.Context, tx *sql.Tx, path string) error { + var c syncContext + c.ctx, c.cancel = context.WithCancel(ctx) + c.tx = tx + defer c.cancel() + + // Info tasks take a position in the semaphore channel, + // then turn that into a position in the output channel. + // TODO: Beware of release and push order. Check for needless waits. + c.semaphore = make(chan struct{}, runtime.NumCPU()) + c.info = make(chan syncFileInfo, runtime.NumCPU()) + + // At least for now, turn all roots into absolute paths. + fsPath, err := filepath.Abs(filepath.Clean(path)) + if err != nil { + return err + } + + // Figure out a database root (not trying to convert F → D on conflict, + // also because we don't know yet if the argument is a directory). + // + // Synchronizing F → D or * → F are special cases not worth implementing. + crumbs := decodeWebPath(filepath.ToSlash(fsPath)) + dbParent, err := idForPath(c.tx, crumbs, true) + if err != nil { + return nil + } + if err := syncDirectory(&c, dbParent, fsPath); err != nil { + return nil + } + + // TODO: Finish processing the result/info queue. + c.wg.Wait() + return nil +} + // cmdSync ensures the given (sub)roots are accurately reflected // in the database. func cmdSync(args []string) error { - if len(args) < 1 { + if len(args) < 2 { return errors.New("usage: GD ROOT...") } if err := openDB(args[0]); err != nil { return err } - // TODO: Probably make this run in a transaction, - // if only to get exclusivity. - return nil + // TODO: See if the SQLite can cancel anything in a useful manner. + // If using this, beware that a cancel prevents commiting transactions. + ctx := context.Background() + + // In case of a failure during processing, the only retained side effects + // on the filesystem tree are: + // - Fixing dead symlinks to images. + // - Creating symlinks to images that aren't necessary. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + // Mild hack: upgrade the transaction to a write one straight away, + // in order to rule out deadlocks (preventable failure). + if _, err := tx.Exec(`END TRANSACTION; + BEGIN IMMEDIATE TRANSACTION`); err != nil { + return err + } + // TODO: Check if one is not a prefix of another, + // and either filter out these duplicates (easy to do, may warn), + // or plainly refuse to work. + for _, path := range args[1:] { + if err := syncArgument(ctx, tx, path); err != nil { + return err + } + } + // TODO: Garbage collect empty directories, recursively. + // Ideally, stop at the affected DB roots (assuming we go bottom-up). + // + // We need to do this at the end, due to our recursive handling, + // as well as because of asynchronous file filtering. + return tx.Commit() } // --- Tagging ----------------------------------------------------------------- -- cgit v1.2.3-70-g09d2