diff options
author | Přemysl Eric Janouch <p@janouch.name> | 2023-12-24 05:45:04 +0100 |
---|---|---|
committer | Přemysl Eric Janouch <p@janouch.name> | 2023-12-24 05:48:15 +0100 |
commit | 940dd30e564ae97796dde52517f02398cbf91196 (patch) | |
tree | 5f8c2e1c3f5325238b419c1fdc16b865e5d17b7a | |
parent | 2898e8dbb11e5d69934ae39cb8152120b8cc0b21 (diff) | |
download | gallery-940dd30e564ae97796dde52517f02398cbf91196.tar.gz gallery-940dd30e564ae97796dde52517f02398cbf91196.tar.xz gallery-940dd30e564ae97796dde52517f02398cbf91196.zip |
Fix stdout races
-rw-r--r-- | main.go | 173 |
1 files changed, 73 insertions, 100 deletions
@@ -130,7 +130,7 @@ func (s semaphore) acquire(ctx context.Context) error { // --- Progress bar ------------------------------------------------------------ type progressBar struct { - mutex sync.Mutex + sync.Mutex current int target int } @@ -162,13 +162,58 @@ func (pb *progressBar) Update() { } func (pb *progressBar) Step() { - pb.mutex.Lock() - defer pb.mutex.Unlock() + pb.Lock() + defer pb.Unlock() pb.current++ pb.Update() } +func (pb *progressBar) Interrupt(callback func()) { + pb.Lock() + defer pb.Unlock() + pb.Stop() + defer pb.Update() + + callback() +} + +// --- Parallelization --------------------------------------------------------- + +type parallelFunc func(item string) (message string, err error) + +// parallelize runs the callback in parallel on a list of strings, +// reporting progress and any non-fatal messages. +func parallelize(strings []string, callback parallelFunc) error { + pb := newProgressBar(len(strings)) + defer pb.Stop() + + ctx, cancel := context.WithCancelCause(context.Background()) + wg := sync.WaitGroup{} + for _, item := range strings { + if taskSemaphore.acquire(ctx) != nil { + break + } + + wg.Add(1) + go func(item string) { + defer taskSemaphore.release() + defer wg.Done() + if message, err := callback(item); err != nil { + cancel(err) + } else if message != "" { + pb.Interrupt(func() { log.Printf("%s: %s\n", item, message) }) + } + pb.Step() + }(item) + } + wg.Wait() + if ctx.Err() != nil { + return context.Cause(ctx) + } + return nil +} + // --- Initialization ---------------------------------------------------------- // cmdInit initializes a "gallery directory" that contains gallery.sqlite, @@ -1032,33 +1077,10 @@ func cmdImport(args []string) error { } } - pb := newProgressBar(len(paths)) - defer pb.Stop() - i := importer{} - ctx, cancel := context.WithCancelCause(context.Background()) - wg := sync.WaitGroup{} - for _, path := range paths { - if taskSemaphore.acquire(ctx) != nil { - break - } - - wg.Add(1) - go func(path string) { - defer taskSemaphore.release() - defer wg.Done() - if err := i.Import(path); err != nil { - cancel(err) - } else { - pb.Step() - } - }(path) - } - wg.Wait() - if ctx.Err() != nil { - return context.Cause(ctx) - } - return nil + return parallelize(paths, func(path string) (string, error) { + return "", i.Import(path) + }) } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1914,17 +1936,20 @@ func makeThumbnail(pathImage, pathThumb string) (int, int, error) { return w, h, err } -func makeThumbnailFor(sha1 string) error { +func makeThumbnailFor(sha1 string) (message string, err error) { pathImage := imagePath(sha1) pathThumb := thumbPath(sha1) w, h, err := makeThumbnail(pathImage, pathThumb) if err != nil { - return err + if ee, ok := err.(*exec.ExitError); ok { + return string(ee.Stderr), nil + } + return "", err } _, err = db.Exec(`UPDATE image SET thumbw = ?, thumbh = ? WHERE sha1 = ?`, w, h, sha1) - return err + return "", err } // cmdThumbnail generates missing thumbnails, in parallel. @@ -1946,39 +1971,7 @@ func cmdThumbnail(args []string) error { return err } } - - pb := newProgressBar(len(hexSHA1)) - defer pb.Stop() - - ctx, cancel := context.WithCancelCause(context.Background()) - wg := sync.WaitGroup{} - for _, sha1 := range hexSHA1 { - if taskSemaphore.acquire(ctx) != nil { - break - } - - wg.Add(1) - go func(sha1 string) { - defer taskSemaphore.release() - defer wg.Done() - if err := makeThumbnailFor(sha1); err != nil { - if ee, ok := err.(*exec.ExitError); ok { - // FIXME: Not in the goroutine, or lock it. - pb.Stop() - log.Printf("%s: %s\n", sha1, ee.Stderr) - pb.Update() - } else { - cancel(err) - } - } - pb.Step() - }(sha1) - } - wg.Wait() - if ctx.Err() != nil { - return context.Cause(ctx) - } - return nil + return parallelize(hexSHA1, makeThumbnailFor) } // --- Perceptual hash --------------------------------------------------------- @@ -2100,6 +2093,20 @@ func makeDhash(sha1 string) (uint64, error) { return dhashWebP(f) } +func makeDhashFor(sha1 string) (message string, err error) { + hash, err := makeDhash(sha1) + if errors.Is(err, errIsAnimation) { + // Ignoring this common condition. + return "", nil + } else if err != nil { + return err.Error(), nil + } + + _, err = db.Exec( + `UPDATE image SET dhash = ? WHERE sha1 = ?`, int64(hash), sha1) + return "", err +} + // cmdDhash generates perceptual hash from thumbnails. func cmdDhash(args []string) error { if len(args) < 1 { @@ -2118,41 +2125,7 @@ func cmdDhash(args []string) error { return err } } - - pb := newProgressBar(len(hexSHA1)) - defer pb.Stop() - - ctx, cancel := context.WithCancelCause(context.Background()) - wg := sync.WaitGroup{} - for _, sha1 := range hexSHA1 { - if taskSemaphore.acquire(ctx) != nil { - break - } - - wg.Add(1) - go func(sha1 string) { - defer taskSemaphore.release() - defer wg.Done() - if hash, err := makeDhash(sha1); errors.Is(err, errIsAnimation) { - // Ignoring this common condition. - } else if err != nil { - // FIXME: Not in the goroutine, or lock it. - pb.Stop() - log.Printf("%s: %s\n", sha1, err) - pb.Update() - } else if _, err = db.Exec( - `UPDATE image SET dhash = ? WHERE sha1 = ?`, - int64(hash), sha1); err != nil { - cancel(err) - } - pb.Step() - }(sha1) - } - wg.Wait() - if ctx.Err() != nil { - return context.Cause(ctx) - } - return nil + return parallelize(hexSHA1, makeDhashFor) } // --- Main -------------------------------------------------------------------- |