From 36f661260321ff97842099b23e4c4999576ace77 Mon Sep 17 00:00:00 2001
From: Přemysl Eric Janouch 
Date: Thu, 18 Jan 2024 00:54:40 +0100
Subject: Load images in multiple threads
This worsens CPU-only times by some five percent,
but can also make GPU-accelerated runtime twice as fast.
---
 deeptagger/deeptagger.cpp | 119 ++++++++++++++++++++++++++++++++++++----------
 1 file changed, 94 insertions(+), 25 deletions(-)
diff --git a/deeptagger/deeptagger.cpp b/deeptagger/deeptagger.cpp
index 27be965..103047b 100644
--- a/deeptagger/deeptagger.cpp
+++ b/deeptagger/deeptagger.cpp
@@ -6,13 +6,17 @@
 #endif
 
 #include 
+#include 
 #include 
 #include 
 #include 
+#include 
+#include 
 #include 
 #include 
 #include 
 #include 
+#include 
 #include 
 
 #include 
@@ -435,6 +439,62 @@ add_providers(Ort::SessionOptions &options)
 
 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 
+struct Thumbnailing {
+	std::mutex input_mutex;
+	std::condition_variable input_cv;
+	std::queue input;      // All input paths
+	int work = 0;                       // Number of images requested
+
+	std::mutex output_mutex;
+	std::condition_variable output_cv;
+	std::vector output;  // Processed images
+	int done = 0;                       // Finished worker threads
+};
+
+static void
+thumbnail(const Config &config, int64_t width, int64_t height,
+	Thumbnailing &ctx)
+{
+	while (true) {
+		std::unique_lock input_lock(ctx.input_mutex);
+		ctx.input_cv.wait(input_lock,
+			[&]{ return ctx.input.empty() || ctx.work; });
+		if (ctx.input.empty())
+			break;
+
+		auto path = ctx.input.front();
+		ctx.input.pop();
+		ctx.work--;
+		input_lock.unlock();
+
+		Magick::Image image;
+		try {
+			image = load(path, config, width, height);
+			if (height != image.rows() || width != image.columns())
+				throw std::runtime_error("tensor mismatch");
+
+			std::unique_lock output_lock(ctx.output_mutex);
+			ctx.output.push_back(image);
+			output_lock.unlock();
+			ctx.output_cv.notify_all();
+		} catch (const std::exception &e) {
+			fprintf(stderr, "%s: %s\n", path.c_str(), e.what());
+
+			std::unique_lock input_lock(ctx.input_mutex);
+			ctx.work++;
+			input_lock.unlock();
+			ctx.input_cv.notify_all();
+		}
+	}
+
+	std::unique_lock output_lock(ctx.output_mutex);
+	ctx.done++;
+	output_lock.unlock();
+	ctx.output_cv.notify_all();
+}
+
+// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
 static std::string
 print_shape(const Ort::ConstTensorTypeAndShapeInfo &info)
 {
@@ -533,30 +593,34 @@ infer(Ort::Env &env, const char *path, const std::vector &images)
 		return;
 	}
 
-	// TODO: Image loading is heavily parallelizable. In theory.
-	std::vector batch;
-	for (const auto &filename : images) {
-		Magick::Image image;
-		try {
-			image = load(filename, config, *width, *height);
-		} catch (const std::exception &e) {
-			fprintf(stderr, "%s: %s\n", filename.c_str(), e.what());
-			continue;
-		}
-
-		if (*height != image.rows() || *width != image.columns()) {
-			fprintf(stderr, "%s: %s\n", filename.c_str(), "tensor mismatch");
-			continue;
-		}
-
-		batch.push_back(image);
-		if (batch.size() == g.batch) {
-			run(batch, config, session, shape);
-			batch.clear();
+	// By only parallelizing image loads here during batching,
+	// they never compete for CPU time with inference.
+	Thumbnailing ctx;
+	for (const auto &path : images)
+		ctx.input.push(path);
+	for (auto i = g.batch; i--; )
+		std::thread(thumbnail, std::ref(config), *width, *height,
+			std::ref(ctx)).detach();
+
+	while (true) {
+		std::unique_lock input_lock(ctx.input_mutex);
+		ctx.work = g.batch;
+		input_lock.unlock();
+		ctx.input_cv.notify_all();
+
+		std::unique_lock output_lock(ctx.output_mutex);
+		ctx.output_cv.wait(output_lock,
+			[&]{ return ctx.output.size() == g.batch || ctx.done == g.batch; });
+
+		// It would be possible to add dummy entries to the batch,
+		// so that the model doesn't need to be rebuilt.
+		if (!ctx.output.empty()) {
+			run(ctx.output, config, session, shape);
+			ctx.output.clear();
 		}
+		if (ctx.done == g.batch)
+			break;
 	}
-	if (!batch.empty())
-		run(batch, config, session, shape);
 }
 
 int
@@ -649,14 +713,19 @@ main(int argc, char *argv[])
 		paths.assign(argv + 1, argv + argc);
 	}
 
+	// Load batched images in parallel (the first is for GM, the other for IM).
+	if (g.batch > 1) {
+		auto value = std::to_string(
+			std::max(std::thread::hardware_concurrency() / g.batch, 1L));
+		setenv("OMP_NUM_THREADS", value.c_str(), true);
+		setenv("MAGICK_THREAD_LIMIT", value.c_str(), true);
+	}
+
 	// XXX: GraphicsMagick initializes signal handlers here,
 	// one needs to use MagickLib::InitializeMagickEx()
 	// with MAGICK_OPT_NO_SIGNAL_HANDER to prevent that.
 	//
 	// ImageMagick conveniently has the opposite default.
-	//
-	// Once processing images in parallel, consider presetting
-	// OMP_NUM_THREADS=1 (GM) and/or MAGICK_THREAD_LIMIT=1 (IM).
 	Magick::InitializeMagick(nullptr);
 
 	OrtLoggingLevel logging = g.debug > 1
-- 
cgit v1.2.3-70-g09d2