Skip to content

Commit 5d57567

Browse files
author
Grok Compression
committed
feat(freebyrd): use ScxEngine persistent thread pool for T1 decode
Replace scx_parallel_for (which spawns/joins threads per tile) with ScxEngineSingleton — a persistent worker pool that stays alive for the lifetime of the process. This eliminates thread spawn/join overhead when decoding multiple tiles. Changes: - Add ScxEngineSingleton: global wrapper around Rust ScxEngine - decodeBlocks() now submits T1 jobs via scx_engine_submit_batch() and runs them with scx_engine_run() - CMake: conditionally compile ScxEngineSingleton.cpp when GRK_USE_SCX_SCHEDULING is ON The engine creates domains (currently 'T1') with sequence-based ordering, laying the groundwork for Phase 2 where DWT will also be submitted to the engine for natural T1/DWT interleaving.
1 parent 5c83e2e commit 5d57567

4 files changed

Lines changed: 114 additions & 6 deletions

File tree

src/lib/core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ if(GRK_USE_SCX_SCHEDULING)
280280
target_link_libraries(${GROK_CORE_NAME} PRIVATE scx_scheduling)
281281
target_include_directories(${GROK_CORE_NAME} PRIVATE "${SCX_SCHEDULING_DIR}/src/scheduling")
282282
target_compile_definitions(${GROK_CORE_NAME} PRIVATE GRK_USE_SCX_SCHEDULING)
283+
target_sources(${GROK_CORE_NAME} PRIVATE
284+
${CMAKE_CURRENT_SOURCE_DIR}/scheduling/freebyrd/ScxEngineSingleton.cpp)
283285
endif()
284286
if(GRK_ENABLE_LIBCURL AND GRK_HAVE_LIBCURL)
285287
target_link_libraries(${GROK_CORE_NAME} PRIVATE ${GRK_CURL_TARGET})

src/lib/core/scheduling/freebyrd/SchedulerFreebyrd.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ struct ITileProcessor;
6565
#include "ImageComponentFlow.h"
6666
#ifdef GRK_USE_SCX_SCHEDULING
6767
#include "scx_scheduling_ffi.h"
68+
#include "ScxEngineSingleton.h"
6869
#include <unordered_map>
6970
#endif
7071
#include "SchedulerFreebyrd.h"
@@ -244,16 +245,13 @@ bool SchedulerFreebyrd::decodeBlocks(ITileProcessor* tileProcessor)
244245
return true;
245246

246247
#ifdef GRK_USE_SCX_SCHEDULING
247-
// Decode all blocks in parallel using sicorax Rust scheduler.
248-
// We create local coders per-thread to avoid races with the shared CoderPool,
249-
// since multiple tiles may call runDecompressT1() concurrently from different
250-
// TF workers, and scx_parallel_for spawns its own threads.
248+
// Decode all blocks in parallel using ScxEngine persistent thread pool.
249+
// Per-thread coder maps avoid races with the shared CoderPool.
251250
struct ParallelDecodeCtx
252251
{
253252
SchedulerFreebyrd* self;
254253
std::vector<std::shared_ptr<t1::DecompressBlockExec>>* blocks;
255254
std::vector<BlockDecodeContext>* contexts;
256-
// One coder per thread, keyed by {cblkwExpn, cblkhExpn}
257255
std::vector<std::unordered_map<uint32_t, std::shared_ptr<t1::ICoder>>> threadCoders;
258256

259257
static uint32_t coderKey(uint8_t cblkwExpn, uint8_t cblkhExpn)
@@ -298,9 +296,18 @@ bool SchedulerFreebyrd::decodeBlocks(ITileProcessor* tileProcessor)
298296
}
299297
}
300298
};
299+
300+
auto* engine = ScxEngineSingleton::get();
301+
// Use a thread-local domain ID — create once, reuse across tiles
302+
static uint32_t t1DomainId = scx_engine_create_domain(engine, "T1");
303+
301304
ParallelDecodeCtx ctx{this, &allBlocks, &blockContexts, {}};
302305
ctx.threadCoders.resize(num_threads);
303-
scx_parallel_for(allBlocks.size(), num_threads, ParallelDecodeCtx::decode, &ctx);
306+
307+
scx_engine_reset_domain(engine, t1DomainId);
308+
scx_engine_submit_batch(engine, t1DomainId, 0, allBlocks.size(),
309+
ParallelDecodeCtx::decode, &ctx);
310+
scx_engine_run(engine);
304311
#else
305312
// Decode all blocks in parallel using TaskFlow
306313
tf::Taskflow taskflow;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (C) 2016-2026 Grok Image Compression Inc.
3+
*
4+
* This source code is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3,
6+
* as published by the Free Software Foundation.
7+
*
8+
* This source code is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* GNU Affero General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU Affero General Public License
14+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
*
16+
*/
17+
18+
#include "ScxEngineSingleton.h"
19+
20+
ScxEngine* ScxEngineSingleton::instance_ = nullptr;
21+
std::mutex ScxEngineSingleton::mutex_;
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (C) 2016-2026 Grok Image Compression Inc.
3+
*
4+
* This source code is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3,
6+
* as published by the Free Software Foundation.
7+
*
8+
* This source code is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* GNU Affero General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU Affero General Public License
14+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
*
16+
*/
17+
18+
#pragma once
19+
20+
#include "scx_scheduling_ffi.h"
21+
#include <mutex>
22+
#include <cstdint>
23+
24+
/**
25+
* @class ScxEngineSingleton
26+
* @brief Manages a global ScxEngine instance (persistent thread pool).
27+
*
28+
* Like TFSingleton, but wraps the Rust ScxEngine for domain-based scheduling.
29+
* The engine is created once and reused across tiles to avoid thread spawn overhead.
30+
*/
31+
class ScxEngineSingleton
32+
{
33+
public:
34+
/**
35+
* @brief Creates singleton instance with specified thread count.
36+
* @param numThreads Number of worker threads (0 = use all cores)
37+
*/
38+
static void create(int32_t numThreads = 0)
39+
{
40+
std::lock_guard<std::mutex> lock(mutex_);
41+
if(instance_)
42+
return;
43+
instance_ = scx_engine_create(numThreads);
44+
}
45+
46+
/**
47+
* @brief Gets the engine instance (creates with default thread count if null).
48+
* @return Raw pointer to ScxEngine
49+
*/
50+
static ScxEngine* get()
51+
{
52+
std::lock_guard<std::mutex> lock(mutex_);
53+
if(!instance_)
54+
instance_ = scx_engine_create(0);
55+
return instance_;
56+
}
57+
58+
/**
59+
* @brief Destroys the engine singleton.
60+
*/
61+
static void destroy()
62+
{
63+
std::lock_guard<std::mutex> lock(mutex_);
64+
if(instance_)
65+
{
66+
scx_engine_destroy(instance_);
67+
instance_ = nullptr;
68+
}
69+
}
70+
71+
private:
72+
ScxEngineSingleton() = default;
73+
ScxEngineSingleton(const ScxEngineSingleton&) = delete;
74+
ScxEngineSingleton& operator=(const ScxEngineSingleton&) = delete;
75+
76+
static ScxEngine* instance_;
77+
static std::mutex mutex_;
78+
};

0 commit comments

Comments
 (0)