From 8a1fedcc8fa9c17a8dd89d9b83fee34422e75fac Mon Sep 17 00:00:00 2001 From: Přemysl Eric Janouch Date: Sun, 10 Dec 2023 07:18:04 +0100 Subject: Parallelize imports and thumbnailing --- go.mod | 5 +- go.sum | 2 + main.go | 166 ++++++++++++++++++++++++++++++++++++++++++++-------------------- 3 files changed, 121 insertions(+), 52 deletions(-) diff --git a/go.mod b/go.mod index 6d8c4f9..42e6176 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module janouch.name/gallery go 1.21.4 -require github.com/mattn/go-sqlite3 v1.14.18 +require ( + github.com/mattn/go-sqlite3 v1.14.18 + golang.org/x/sync v0.5.0 +) diff --git a/go.sum b/go.sum index 810a101..a5de6f1 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/main.go b/main.go index e6eae07..a57f3af 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "crypto/sha1" "database/sql" "encoding/hex" @@ -17,32 +18,38 @@ import ( "os/exec" "path/filepath" "regexp" + "runtime" "strconv" "strings" + "sync" "time" _ "github.com/mattn/go-sqlite3" + "golang.org/x/sync/semaphore" ) var ( - db *sql.DB // sqlite database - gd string // gallery directory + db *sql.DB // sqlite database + galleryDirectory string // gallery directory + + // taskSemaphore limits parallel computations. + taskSemaphore *semaphore.Weighted ) func openDB(directory string) error { var err error db, err = sql.Open("sqlite3", "file:"+filepath.Join(directory, "gallery.db?_foreign_keys=1")) - gd = directory + galleryDirectory = directory return err } func imagePath(sha1 string) string { - return filepath.Join(gd, "images", sha1[:2], sha1) + return filepath.Join(galleryDirectory, "images", sha1[:2], sha1) } func thumbPath(sha1 string) string { - return filepath.Join(gd, "thumbs", sha1[:2], sha1+".webp") + return filepath.Join(galleryDirectory, "thumbs", sha1[:2], sha1+".webp") } func dbCollectStrings(query string) ([]string, error) { @@ -70,27 +77,10 @@ type directoryManager struct { cache map[string]int64 // Unix-style paths to directory.id } -func (dm *directoryManager) IDForDirectoryPath( - tx *sql.Tx, path string) (int64, error) { - // Relative paths could be handled differently, - // but right now, they're assumed to start at the root. - path = filepath.ToSlash(filepath.Clean(path)) - list := strings.Split(path, "/") - if len(list) > 1 && list[0] == "" { - list = list[1:] - } - if len(list) == 0 { - return 0, nil - } - - if dm.cache == nil { - dm.cache = make(map[string]int64) - } else if id, ok := dm.cache[path]; ok { - return id, nil - } - +func (dm *directoryManager) uncachedIDForPath( + tx *sql.Tx, path []string) (int64, error) { var parent sql.NullInt64 - for _, name := range list { + for _, name := range path { if err := tx.QueryRow( `SELECT id FROM directory WHERE name = ? AND parent IS ?`, name, parent).Scan(&parent); err == nil { @@ -109,10 +99,36 @@ func (dm *directoryManager) IDForDirectoryPath( parent = sql.NullInt64{Int64: id, Valid: true} } } - dm.cache[path] = parent.Int64 return parent.Int64, nil } +func (dm *directoryManager) IDForDirectoryPath( + tx *sql.Tx, path string) (int64, error) { + // Relative paths could be handled differently, + // but right now, they're assumed to start at the root. + path = filepath.ToSlash(filepath.Clean(path)) + list := strings.Split(path, "/") + if len(list) > 1 && list[0] == "" { + list = list[1:] + } + if len(list) == 0 { + return 0, nil + } + + if dm.cache == nil { + dm.cache = make(map[string]int64) + } else if id, ok := dm.cache[path]; ok { + return id, nil + } + + id, err := dm.uncachedIDForPath(tx, list) + if err != nil { + return 0, err + } + dm.cache[path] = id + return id, nil +} + // XXX: This is preliminary. type entry struct { Parent int64 @@ -197,10 +213,12 @@ func cmdInit(args []string) error { // XXX: There's technically no reason to keep images as symlinks, // we might just keep absolute paths in the database as well. - if err := os.MkdirAll(filepath.Join(gd, "images"), 0755); err != nil { + if err := os.MkdirAll( + filepath.Join(galleryDirectory, "images"), 0755); err != nil { return err } - if err := os.MkdirAll(filepath.Join(gd, "thumbs"), 0755); err != nil { + if err := os.MkdirAll( + filepath.Join(galleryDirectory, "thumbs"), 0755); err != nil { return err } return nil @@ -323,7 +341,8 @@ func isImage(path string) (bool, error) { } type importer struct { - dm directoryManager + dm directoryManager + dmMutex sync.Mutex } func (i *importer) Import(path string, d fs.DirEntry, err error) error { @@ -376,6 +395,13 @@ func (i *importer) Import(path string, d fs.DirEntry, err error) error { return err } + // A concurrent transaction could be aborted, yet still result in + // creating directoryManager's cache entry, therefore this scope. + // TODO: Educate self about isolation levels and reconsider. + // Perhaps get rid of the cache. + i.dmMutex.Lock() + defer i.dmMutex.Unlock() + tx, err := db.Begin() if err != nil { return err @@ -393,9 +419,8 @@ func (i *importer) Import(path string, d fs.DirEntry, err error) error { return err } - _, err = tx.Exec(`INSERT INTO entry( - parent, name, mtime, sha1 - ) VALUES (?, ?, ?, ?)`, dbParent, dbBasename, s.ModTime().Unix(), hexSHA1) + _, err = tx.Exec(`INSERT INTO entry(parent, name, mtime, sha1) + VALUES (?, ?, ?, ?)`, dbParent, dbBasename, s.ModTime().Unix(), hexSHA1) if err != nil { return err } @@ -412,17 +437,34 @@ func cmdImport(args []string) error { return err } - // TODO: This would better be done in parallel (making hashes). // TODO: Show progress in some manner. Perhaps port my propeller code. + ctx, cancel := context.WithCancelCause(context.Background()) i := importer{} + wg := sync.WaitGroup{} for _, name := range args[1:] { - if err := filepath.WalkDir(name, - func(path string, d fs.DirEntry, err error) error { - return i.Import(path, d, err) - }); err != nil { + cb := func(path string, d fs.DirEntry, err error) error { + if taskSemaphore.Acquire(ctx, 1) != nil { + return context.Cause(ctx) + } + + wg.Add(1) + go func() { + defer taskSemaphore.Release(1) + defer wg.Done() + if err := i.Import(path, d, err); err != nil { + cancel(err) + } + }() + return nil + } + if err := filepath.WalkDir(name, cb); err != nil { return err } } + wg.Wait() + if ctx.Err() != nil { + return context.Cause(ctx) + } return nil } @@ -472,10 +514,12 @@ func makeThumbnail(pathImage, pathThumb string) (int, int, error) { // // TODO: See if we can optimize resulting WebP animations. // (Do -layers optimize* apply to this format at all?) - cmd := exec.Command("convert", pathImage, "-coalesce", "-colorspace", "RGB", - "-auto-orient", "-strip", "-resize", "256x128>", "-colorspace", "sRGB", + cmd := exec.Command("convert", "-limit", "thread", "1", pathImage, + "-coalesce", "-colorspace", "RGB", "-auto-orient", "-strip", + "-resize", "256x128>", "-colorspace", "sRGB", "-format", "%w %h", "+write", pathThumb, "-delete", "1--1", "info:") + // XXX: Early returns may leak resources. stdout, err := cmd.StdoutPipe() if err != nil { return 0, 0, err @@ -496,6 +540,19 @@ func makeThumbnail(pathImage, pathThumb string) (int, int, error) { return w, h, err } +func makeThumbnailFor(sha1 string) error { + pathImage := imagePath(sha1) + pathThumb := thumbPath(sha1) + w, h, err := makeThumbnail(pathImage, pathThumb) + if err != nil { + return err + } + + _, err = db.Exec(`UPDATE image SET thumbw = ?, thumbh = ? + WHERE sha1 = ?`, w, h, sha1) + return err +} + // cmdThumbnail generates missing thumbnails, in parallel. func cmdThumbnail(args []string) error { if len(args) < 1 { @@ -516,28 +573,34 @@ func cmdThumbnail(args []string) error { } } - // TODO: Try to run the thumbnailer in parallel, somehow. - // Then run convert with `-limit thread 1`. // TODO: Show progress in some manner. Perhaps port my propeller code. + ctx, cancel := context.WithCancelCause(context.Background()) + wg := sync.WaitGroup{} for _, sha1 := range hexSHA1 { - pathImage := imagePath(sha1) - pathThumb := thumbPath(sha1) - w, h, err := makeThumbnail(pathImage, pathThumb) - if err != nil { - return err + if taskSemaphore.Acquire(ctx, 1) != nil { + break } - _, err = db.Exec(`UPDATE image SET thumbw = ?, thumbh = ? - WHERE sha1 = ?`, w, h, sha1) - if err != nil { - return err - } + wg.Add(1) + go func(sha1 string) { + defer taskSemaphore.Release(1) + defer wg.Done() + if err := makeThumbnailFor(sha1); err != nil { + cancel(err) + } + }(sha1) + } + wg.Wait() + if ctx.Err() != nil { + return context.Cause(ctx) } return nil } func makeDhash(hasher, pathThumb string) (uint64, error) { cmd := exec.Command(hasher, pathThumb) + + // XXX: Early returns may leak resources. stdout, err := cmd.StdoutPipe() if err != nil { return 0, err @@ -617,6 +680,7 @@ func main() { log.Fatalln("Unknown command: " + os.Args[1]) } + taskSemaphore = semaphore.NewWeighted(int64(runtime.NumCPU())) err := cmd.handler(os.Args[2:]) // Note that the database object has a closing finalizer, -- cgit v1.2.3-70-g09d2