Skip to content

Commit be42875

Browse files
author
Grok Compression
committed
Add SCX scheduling option for freebyrd T1 decode (Phase 3)
When GRK_USE_SCX_SCHEDULING=ON, freebyrd uses sicorax's Rust-based scx_parallel_for for T1 block decoding instead of TaskFlow. The SCX path produces bit-identical output to the TaskFlow path. CMake integration finds the pre-built libscx_scheduling.a static lib and links it with pthread/dl on Linux.
1 parent e27e04e commit be42875

3 files changed

Lines changed: 93 additions & 0 deletions

File tree

CMakeLists.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,30 @@ option(GRK_BUILD_CODEC "Build the CODEC executables" ON)
211211
option(GRK_BUILD_PLUGIN_LOADER "Enable loading of T1 plugin" OFF)
212212
mark_as_advanced(GRK_BUILD_PLUGIN_LOADER)
213213

214+
# Sicorax Rust scheduling library (used by freebyrd scheduler)
215+
option(GRK_USE_SCX_SCHEDULING "Link sicorax Rust scheduling library for freebyrd scheduler" OFF)
216+
if(GRK_USE_SCX_SCHEDULING)
217+
set(SCX_SCHEDULING_DIR "${CMAKE_SOURCE_DIR}/../sicorax" CACHE PATH "Path to sicorax source tree")
218+
set(SCX_SCHEDULING_LIB "${SCX_SCHEDULING_DIR}/crates/scx-scheduling/target/release/libscx_scheduling.a")
219+
if(NOT EXISTS "${SCX_SCHEDULING_LIB}")
220+
message(STATUS "Building sicorax scx-scheduling Rust crate...")
221+
execute_process(
222+
COMMAND cargo build --release
223+
WORKING_DIRECTORY "${SCX_SCHEDULING_DIR}/crates/scx-scheduling"
224+
RESULT_VARIABLE SCX_BUILD_RESULT
225+
)
226+
if(NOT SCX_BUILD_RESULT EQUAL 0)
227+
message(FATAL_ERROR "Failed to build scx-scheduling Rust crate")
228+
endif()
229+
endif()
230+
add_library(scx_scheduling STATIC IMPORTED)
231+
set_target_properties(scx_scheduling PROPERTIES IMPORTED_LOCATION "${SCX_SCHEDULING_LIB}")
232+
# Rust static libs on Linux need pthread and dl
233+
if(UNIX AND NOT APPLE)
234+
set_target_properties(scx_scheduling PROPERTIES INTERFACE_LINK_LIBRARIES "pthread;dl")
235+
endif()
236+
endif()
237+
214238
set(SPDLOG_INSTALL OFF CACHE BOOL "" FORCE)
215239
set(SPDLOG_BUILD_EXAMPLE OFF CACHE BOOL "" FORCE)
216240
set(SPDLOG_BUILD_TESTS OFF CACHE BOOL "" FORCE)

src/lib/core/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,11 @@ if(UNIX)
276276
target_link_libraries(${GROK_CORE_NAME} PUBLIC ${CMAKE_THREAD_LIBS_INIT})
277277
endif(UNIX)
278278
target_link_libraries(${GROK_CORE_NAME} PRIVATE hwy ${LCMS_LIBNAME})
279+
if(GRK_USE_SCX_SCHEDULING)
280+
target_link_libraries(${GROK_CORE_NAME} PRIVATE scx_scheduling)
281+
target_include_directories(${GROK_CORE_NAME} PRIVATE "${SCX_SCHEDULING_DIR}/src/scheduling")
282+
target_compile_definitions(${GROK_CORE_NAME} PRIVATE GRK_USE_SCX_SCHEDULING)
283+
endif()
279284
if(GRK_ENABLE_LIBCURL AND GRK_HAVE_LIBCURL)
280285
target_link_libraries(${GROK_CORE_NAME} PRIVATE ${GRK_CURL_TARGET})
281286
target_include_directories(${GROK_CORE_NAME} PRIVATE

src/lib/core/scheduling/freebyrd/SchedulerFreebyrd.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ struct ITileProcessor;
6363
#include "TileBlocks.h"
6464
#include "SchedulerStandard.h"
6565
#include "ImageComponentFlow.h"
66+
#ifdef GRK_USE_SCX_SCHEDULING
67+
#include "scx_scheduling_ffi.h"
68+
#include <unordered_map>
69+
#endif
6670
#include "SchedulerFreebyrd.h"
6771

6872
namespace grk
@@ -234,6 +238,65 @@ bool SchedulerFreebyrd::decodeBlocks(ITileProcessor* tileProcessor)
234238
if(allBlocks.empty())
235239
return true;
236240

241+
#ifdef GRK_USE_SCX_SCHEDULING
242+
// Decode all blocks in parallel using sicorax Rust scheduler.
243+
// We create local coders per-thread to avoid races with the shared CoderPool,
244+
// since multiple tiles may call runDecompressT1() concurrently from different
245+
// TF workers, and scx_parallel_for spawns its own threads.
246+
struct ParallelDecodeCtx
247+
{
248+
SchedulerFreebyrd* self;
249+
std::vector<std::shared_ptr<t1::DecompressBlockExec>>* blocks;
250+
std::vector<BlockDecodeContext>* contexts;
251+
// One coder per thread, keyed by {cblkwExpn, cblkhExpn}
252+
std::vector<std::unordered_map<uint32_t, std::shared_ptr<t1::ICoder>>> threadCoders;
253+
254+
static uint32_t coderKey(uint8_t cblkwExpn, uint8_t cblkhExpn)
255+
{
256+
return ((uint32_t)cblkwExpn << 16) | (uint32_t)cblkhExpn;
257+
}
258+
259+
static void decode(size_t i, size_t thread_id, void* ud)
260+
{
261+
auto* c = static_cast<ParallelDecodeCtx*>(ud);
262+
if(!c->self->success_)
263+
return;
264+
try
265+
{
266+
auto& block = (*c->blocks)[i];
267+
auto& bctx = (*c->contexts)[i];
268+
t1::ICoder* coder = nullptr;
269+
if(block->needsCachedCoder())
270+
{
271+
coder = t1::CoderFactory::makeCoder(bctx.isHT, false, bctx.cbw, bctx.cbh,
272+
bctx.tileCacheStrategy);
273+
}
274+
else if(!bctx.cacheAll)
275+
{
276+
auto key = coderKey(bctx.cblkwExpn, bctx.cblkhExpn);
277+
auto& coderMap = c->threadCoders[thread_id];
278+
auto it = coderMap.find(key);
279+
if(it == coderMap.end())
280+
{
281+
auto newCoder = std::shared_ptr<t1::ICoder>(t1::CoderFactory::makeCoder(
282+
bctx.isHT, false, bctx.cbw, bctx.cbh, bctx.tileCacheStrategy));
283+
it = coderMap.emplace(key, std::move(newCoder)).first;
284+
}
285+
coder = it->second.get();
286+
}
287+
if(!block->open(coder))
288+
c->self->success_ = false;
289+
}
290+
catch(...)
291+
{
292+
c->self->success_ = false;
293+
}
294+
}
295+
};
296+
ParallelDecodeCtx ctx{this, &allBlocks, &blockContexts, {}};
297+
ctx.threadCoders.resize(num_threads);
298+
scx_parallel_for(allBlocks.size(), num_threads, ParallelDecodeCtx::decode, &ctx);
299+
#else
237300
// Decode all blocks in parallel using TaskFlow
238301
tf::Taskflow taskflow;
239302
for(size_t i = 0; i < allBlocks.size(); ++i)
@@ -267,6 +330,7 @@ bool SchedulerFreebyrd::decodeBlocks(ITileProcessor* tileProcessor)
267330
});
268331
}
269332
TFSingleton::get().run(taskflow).wait();
333+
#endif
270334

271335
return success_;
272336
}

0 commit comments

Comments
 (0)