aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPřemysl Eric Janouch <p@janouch.name>2023-12-22 00:34:00 +0100
committerPřemysl Eric Janouch <p@janouch.name>2023-12-22 00:34:00 +0100
commit0867eecb6737fb4300408877d8918945aaf697ee (patch)
tree6762a99bea76f2a840e66ea854143d670403b945
parenta3129ce9179ea8816c6442b73e61b7ba7bd2e49a (diff)
downloadgallery-0867eecb6737fb4300408877d8918945aaf697ee.tar.gz
gallery-0867eecb6737fb4300408877d8918945aaf697ee.tar.xz
gallery-0867eecb6737fb4300408877d8918945aaf697ee.zip
WIP: FS to DB sync
-rw-r--r--main.go194
1 files changed, 96 insertions, 98 deletions
diff --git a/main.go b/main.go
index 4f65e0b..8b70dec 100644
--- a/main.go
+++ b/main.go
@@ -820,6 +820,10 @@ type syncContext struct {
tx *sql.Tx
info chan syncFileInfo
pb *progressBar
+
+ stmtOrphan *sql.Stmt
+ stmtDisposeSub *sql.Stmt
+ stmtDisposeAll *sql.Stmt
}
func syncPrintf(c *syncContext, format string, v ...any) {
@@ -975,73 +979,19 @@ func syncDequeue(c *syncContext) error {
//
// Orphans keep their thumbnail files, as evidence.
func syncDispose(c *syncContext, nodeID int64, keepNode bool) error {
- const cte = `WITH RECURSIVE
- root(id, sha1, parent, path) AS (
- SELECT id, sha1, parent, name FROM node WHERE id = ?
- UNION ALL
- SELECT r.id, r.sha1, n.parent, n.name || '/' || r.path
- FROM node AS n JOIN root AS r ON n.id = r.parent
- ),
- children(id, sha1, path, level) AS (
- SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL
- UNION ALL
- SELECT n.id, n.sha1, c.path || '/' || n.name, c.level + 1
- FROM node AS n JOIN children AS c ON n.parent = c.id
- ),
- removed(sha1, count, path) AS (
- SELECT sha1, COUNT(*) AS count, MIN(path) AS path
- FROM children
- GROUP BY sha1
- ),
- orphaned(sha1, path, count, total) AS (
- SELECT r.sha1, r.path, r.count, COUNT(*) AS total
- FROM removed AS r
- JOIN node ON node.sha1 = r.sha1
- GROUP BY node.sha1
- HAVING count = total
- )`
-
- // TODO: Prepare the statements.
- var err error
- if _, err = c.tx.Exec(cte+`
- INSERT OR IGNORE INTO orphan(sha1, path)
- SELECT sha1, path FROM orphaned`, nodeID); err != nil {
+ if _, err := c.stmtOrphan.Exec(nodeID); err != nil {
return err
}
+
if keepNode {
- _, err = c.tx.Exec(cte+`
- DELETE FROM node
- WHERE id IN (SELECT DISTINCT id FROM children WHERE level <> 1)`,
- nodeID)
+ if _, err := c.stmtDisposeSub.Exec(nodeID); err != nil {
+ return err
+ }
} else {
- _, err = c.tx.Exec(cte+`
- DELETE FROM node
- WHERE id IN (SELECT DISTINCT id FROM children)`,
- nodeID)
- }
- if err != nil {
- return err
- }
-
- rows, err := c.tx.Query(`WITH RECURSIVE
- root(id, sha1, parent, path) AS (
- SELECT id, sha1, parent, name FROM node WHERE id = ?
- UNION ALL
- SELECT r.id, r.sha1, node.parent, node.name || '/' || r.path
- FROM node JOIN root AS r ON node.id = r.parent
- ),
- children(id, sha1, path, level) AS (
- SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL
- UNION ALL
- SELECT node.id, node.sha1, c.path || '/' || node.name, c.level + 1
- FROM node JOIN children AS c ON node.parent = c.id
- )
- SELECT id, sha1, path FROM children ORDER BY level DESC`, nodeID)
- if err != nil {
- return err
+ if _, err := c.stmtDisposeAll.Exec(nodeID); err != nil {
+ return err
+ }
}
- defer rows.Close()
-
return nil
}
@@ -1242,12 +1192,84 @@ func syncDirectory(c *syncContext, dbParent int64, fsPath string) error {
return nil
}
-func syncRoot(ctx context.Context, tx *sql.Tx, fsPath string) error {
- c := syncContext{ctx: ctx, tx: tx}
+func syncRoot(c *syncContext, fsPath string) error {
+ // Figure out a database root (not trying to convert F → D on conflict,
+ // also because we don't know yet if the argument is a directory).
+ //
+ // Synchronizing F → D or * → F are special cases not worth implementing.
+ crumbs := decodeWebPath(filepath.ToSlash(fsPath))
+ dbParent, err := idForPath(c.tx, crumbs, true)
+ if err != nil {
+ return err
+ }
+ if err := syncDirectory(c, dbParent, fsPath); err != nil {
+ return err
+ }
- c.pb = newProgressBar(-1)
+ // Wait for all tasks to finish, and process the results of their work.
+ for i := 0; i < cap(taskSemaphore); i++ {
+ if err := taskSemaphore.acquire(c.ctx); err != nil {
+ return err
+ }
+ }
+ if err := syncDequeue(c); err != nil {
+ return err
+ }
+
+ // This is not our semaphore, so prepare it for the next user.
+ for i := 0; i < cap(taskSemaphore); i++ {
+ taskSemaphore.release()
+ }
+ return nil
+}
+
+const disposeCTE = `WITH RECURSIVE
+ root(id, sha1, parent, path) AS (
+ SELECT id, sha1, parent, name FROM node WHERE id = ?
+ UNION ALL
+ SELECT r.id, r.sha1, n.parent, n.name || '/' || r.path
+ FROM node AS n JOIN root AS r ON n.id = r.parent
+ ),
+ children(id, sha1, path, level) AS (
+ SELECT id, sha1, path, 1 FROM root WHERE parent IS NULL
+ UNION ALL
+ SELECT n.id, n.sha1, c.path || '/' || n.name, c.level + 1
+ FROM node AS n JOIN children AS c ON n.parent = c.id
+ ),
+ removed(sha1, count, path) AS (
+ SELECT sha1, COUNT(*) AS count, MIN(path) AS path
+ FROM children
+ GROUP BY sha1
+ ),
+ orphaned(sha1, path, count, total) AS (
+ SELECT r.sha1, r.path, r.count, COUNT(*) AS total
+ FROM removed AS r
+ JOIN node ON node.sha1 = r.sha1
+ GROUP BY node.sha1
+ HAVING count = total
+ )`
+
+func syncRun(ctx context.Context, tx *sql.Tx, roots []string) error {
+ c := syncContext{ctx: ctx, tx: tx, pb: newProgressBar(-1)}
defer c.pb.Stop()
+ var err error
+ if c.stmtOrphan, err = c.tx.Prepare(disposeCTE + `
+ INSERT OR IGNORE INTO orphan(sha1, path)
+ SELECT sha1, path FROM orphaned`); err != nil {
+ return err
+ }
+ if c.stmtDisposeSub, err = c.tx.Prepare(disposeCTE + `
+ DELETE FROM node WHERE id
+ IN (SELECT DISTINCT id FROM children WHERE level <> 1)`); err != nil {
+ return err
+ }
+ if c.stmtDisposeAll, err = c.tx.Prepare(disposeCTE + `
+ DELETE FROM node WHERE id
+ IN (SELECT DISTINCT id FROM children)`); err != nil {
+ return err
+ }
+
// Info tasks take a position in the task semaphore channel.
// then fill the info channel.
//
@@ -1269,33 +1291,17 @@ func syncRoot(ctx context.Context, tx *sql.Tx, fsPath string) error {
// i.e., it is for the result of the task that syncEnqueue() spawns.
c.info = make(chan syncFileInfo, cap(taskSemaphore)+1)
- // Figure out a database root (not trying to convert F → D on conflict,
- // also because we don't know yet if the argument is a directory).
- //
- // Synchronizing F → D or * → F are special cases not worth implementing.
- crumbs := decodeWebPath(filepath.ToSlash(fsPath))
- dbParent, err := idForPath(c.tx, crumbs, true)
- if err != nil {
- return err
- }
- if err := syncDirectory(&c, dbParent, fsPath); err != nil {
- return err
- }
-
- // Wait for all tasks to finish, and process the results of their work.
- for i := 0; i < cap(taskSemaphore); i++ {
- if err := taskSemaphore.acquire(c.ctx); err != nil {
+ for _, path := range roots {
+ if err := syncRoot(&c, path); err != nil {
return err
}
}
- if err := syncDequeue(&c); err != nil {
- return err
- }
- // This is not our semaphore, so prepare it for the next user.
- for i := 0; i < cap(taskSemaphore); i++ {
- taskSemaphore.release()
- }
+ // TODO: Garbage collect empty directories, recursively.
+ // Ideally, stop at the affected DB roots (assuming we go bottom-up).
+ //
+ // We need to do this at the end, due to our recursive handling,
+ // as well as because of asynchronous file filtering.
return nil
}
@@ -1355,17 +1361,9 @@ func cmdSync(args []string) error {
return true
})
- for _, path := range roots {
- if err := syncRoot(ctxSignal, tx, path); err != nil {
- return err
- }
+ if err := syncRun(ctxSignal, tx, roots); err != nil {
+ return err
}
-
- // TODO: Garbage collect empty directories, recursively.
- // Ideally, stop at the affected DB roots (assuming we go bottom-up).
- //
- // We need to do this at the end, due to our recursive handling,
- // as well as because of asynchronous file filtering.
return tx.Commit()
}