diff options
author | Přemysl Eric Janouch <p@janouch.name> | 2023-12-22 00:34:00 +0100 |
---|---|---|
committer | Přemysl Eric Janouch <p@janouch.name> | 2023-12-22 00:34:00 +0100 |
commit | 0867eecb6737fb4300408877d8918945aaf697ee (patch) | |
tree | 6762a99bea76f2a840e66ea854143d670403b945 | |
parent | a3129ce9179ea8816c6442b73e61b7ba7bd2e49a (diff) | |
download | gallery-0867eecb6737fb4300408877d8918945aaf697ee.tar.gz gallery-0867eecb6737fb4300408877d8918945aaf697ee.tar.xz gallery-0867eecb6737fb4300408877d8918945aaf697ee.zip |
WIP: FS to DB sync
-rw-r--r-- | main.go | 194 |
1 files changed, 96 insertions, 98 deletions
@@ -820,6 +820,10 @@ type syncContext struct { tx *sql.Tx info chan syncFileInfo pb *progressBar + + stmtOrphan *sql.Stmt + stmtDisposeSub *sql.Stmt + stmtDisposeAll *sql.Stmt } func syncPrintf(c *syncContext, format string, v ...any) { @@ -975,73 +979,19 @@ func syncDequeue(c *syncContext) error { // // Orphans keep their thumbnail files, as evidence. func syncDispose(c *syncContext, nodeID int64, keepNode bool) error { - const cte = `WITH RECURSIVE - root(id, sha1, parent, path) AS ( - SELECT id, sha1, parent, name FROM node WHERE id = ? - UNION ALL - SELECT r.id, r.sha1, n.parent, n.name || '/' || r.path - FROM node AS n JOIN root AS r ON n.id = r.parent - ), - children(id, sha1, path, level) AS ( - SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL - UNION ALL - SELECT n.id, n.sha1, c.path || '/' || n.name, c.level + 1 - FROM node AS n JOIN children AS c ON n.parent = c.id - ), - removed(sha1, count, path) AS ( - SELECT sha1, COUNT(*) AS count, MIN(path) AS path - FROM children - GROUP BY sha1 - ), - orphaned(sha1, path, count, total) AS ( - SELECT r.sha1, r.path, r.count, COUNT(*) AS total - FROM removed AS r - JOIN node ON node.sha1 = r.sha1 - GROUP BY node.sha1 - HAVING count = total - )` - - // TODO: Prepare the statements. - var err error - if _, err = c.tx.Exec(cte+` - INSERT OR IGNORE INTO orphan(sha1, path) - SELECT sha1, path FROM orphaned`, nodeID); err != nil { + if _, err := c.stmtOrphan.Exec(nodeID); err != nil { return err } + if keepNode { - _, err = c.tx.Exec(cte+` - DELETE FROM node - WHERE id IN (SELECT DISTINCT id FROM children WHERE level <> 1)`, - nodeID) + if _, err := c.stmtDisposeSub.Exec(nodeID); err != nil { + return err + } } else { - _, err = c.tx.Exec(cte+` - DELETE FROM node - WHERE id IN (SELECT DISTINCT id FROM children)`, - nodeID) - } - if err != nil { - return err - } - - rows, err := c.tx.Query(`WITH RECURSIVE - root(id, sha1, parent, path) AS ( - SELECT id, sha1, parent, name FROM node WHERE id = ? - UNION ALL - SELECT r.id, r.sha1, node.parent, node.name || '/' || r.path - FROM node JOIN root AS r ON node.id = r.parent - ), - children(id, sha1, path, level) AS ( - SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL - UNION ALL - SELECT node.id, node.sha1, c.path || '/' || node.name, c.level + 1 - FROM node JOIN children AS c ON node.parent = c.id - ) - SELECT id, sha1, path FROM children ORDER BY level DESC`, nodeID) - if err != nil { - return err + if _, err := c.stmtDisposeAll.Exec(nodeID); err != nil { + return err + } } - defer rows.Close() - return nil } @@ -1242,12 +1192,84 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error { return nil } -func syncRoot(ctx context.Context, tx *sql.Tx, fsPath string) error { - c := syncContext{ctx: ctx, tx: tx} +func syncRoot(c *syncContext, fsPath string) error { + // Figure out a database root (not trying to convert F → D on conflict, + // also because we don't know yet if the argument is a directory). + // + // Synchronizing F → D or * → F are special cases not worth implementing. + crumbs := decodeWebPath(filepath.ToSlash(fsPath)) + dbParent, err := idForPath(c.tx, crumbs, true) + if err != nil { + return err + } + if err := syncDirectory(c, dbParent, fsPath); err != nil { + return err + } - c.pb = newProgressBar(-1) + // Wait for all tasks to finish, and process the results of their work. + for i := 0; i < cap(taskSemaphore); i++ { + if err := taskSemaphore.acquire(c.ctx); err != nil { + return err + } + } + if err := syncDequeue(c); err != nil { + return err + } + + // This is not our semaphore, so prepare it for the next user. + for i := 0; i < cap(taskSemaphore); i++ { + taskSemaphore.release() + } + return nil +} + +const disposeCTE = `WITH RECURSIVE + root(id, sha1, parent, path) AS ( + SELECT id, sha1, parent, name FROM node WHERE id = ? + UNION ALL + SELECT r.id, r.sha1, n.parent, n.name || '/' || r.path + FROM node AS n JOIN root AS r ON n.id = r.parent + ), + children(id, sha1, path, level) AS ( + SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL + UNION ALL + SELECT n.id, n.sha1, c.path || '/' || n.name, c.level + 1 + FROM node AS n JOIN children AS c ON n.parent = c.id + ), + removed(sha1, count, path) AS ( + SELECT sha1, COUNT(*) AS count, MIN(path) AS path + FROM children + GROUP BY sha1 + ), + orphaned(sha1, path, count, total) AS ( + SELECT r.sha1, r.path, r.count, COUNT(*) AS total + FROM removed AS r + JOIN node ON node.sha1 = r.sha1 + GROUP BY node.sha1 + HAVING count = total + )` + +func syncRun(ctx context.Context, tx *sql.Tx, roots []string) error { + c := syncContext{ctx: ctx, tx: tx, pb: newProgressBar(-1)} defer c.pb.Stop() + var err error + if c.stmtOrphan, err = c.tx.Prepare(disposeCTE + ` + INSERT OR IGNORE INTO orphan(sha1, path) + SELECT sha1, path FROM orphaned`); err != nil { + return err + } + if c.stmtDisposeSub, err = c.tx.Prepare(disposeCTE + ` + DELETE FROM node WHERE id + IN (SELECT DISTINCT id FROM children WHERE level <> 1)`); err != nil { + return err + } + if c.stmtDisposeAll, err = c.tx.Prepare(disposeCTE + ` + DELETE FROM node WHERE id + IN (SELECT DISTINCT id FROM children)`); err != nil { + return err + } + // Info tasks take a position in the task semaphore channel. // then fill the info channel. // @@ -1269,33 +1291,17 @@ func syncRoot(ctx context.Context, tx *sql.Tx, fsPath string) error { // i.e., it is for the result of the task that syncEnqueue() spawns. c.info = make(chan syncFileInfo, cap(taskSemaphore)+1) - // Figure out a database root (not trying to convert F → D on conflict, - // also because we don't know yet if the argument is a directory). - // - // Synchronizing F → D or * → F are special cases not worth implementing. - crumbs := decodeWebPath(filepath.ToSlash(fsPath)) - dbParent, err := idForPath(c.tx, crumbs, true) - if err != nil { - return err - } - if err := syncDirectory(&c, dbParent, fsPath); err != nil { - return err - } - - // Wait for all tasks to finish, and process the results of their work. - for i := 0; i < cap(taskSemaphore); i++ { - if err := taskSemaphore.acquire(c.ctx); err != nil { + for _, path := range roots { + if err := syncRoot(&c, path); err != nil { return err } } - if err := syncDequeue(&c); err != nil { - return err - } - // This is not our semaphore, so prepare it for the next user. - for i := 0; i < cap(taskSemaphore); i++ { - taskSemaphore.release() - } + // TODO: Garbage collect empty directories, recursively. + // Ideally, stop at the affected DB roots (assuming we go bottom-up). + // + // We need to do this at the end, due to our recursive handling, + // as well as because of asynchronous file filtering. return nil } @@ -1355,17 +1361,9 @@ func cmdSync(args []string) error { return true }) - for _, path := range roots { - if err := syncRoot(ctxSignal, tx, path); err != nil { - return err - } + if err := syncRun(ctxSignal, tx, roots); err != nil { + return err } - - // TODO: Garbage collect empty directories, recursively. - // Ideally, stop at the affected DB roots (assuming we go bottom-up). - // - // We need to do this at the end, due to our recursive handling, - // as well as because of asynchronous file filtering. return tx.Commit() } |