Skip to content

Commit e27e04e

Browse files
author
Grok Compression
committed
Phase 1: Revive SchedulerFreebyrd with full T1+DWT+MCT pipeline
Implement the freebyrd scheduler as a working alternative decompression path (activated via GRK_SCHEDULER=freebyrd). The scheduler runs a sequential pipeline: parallel T1 block decode → DWT → MCT/DC-shift. Key changes: - SchedulerFreebyrd.cpp: Full implementation of decodeBlocks(), runDWT(), and postProcess(). Blocks are collected across all components and decoded in parallel via tf::Taskflow::emplace(). DWT uses WaveletReverse with a DwtFlowHelper (minimal SchedulerStandard subclass) to provide the ImageComponentFlow infrastructure. MCT/DC-shift runs via Mct schedule_decompress_rev/irrev. - SchedulerFreebyrd.h: Added DwtFlowHelper forward decl, CoderPool member, updated constructor to accept CoderPool*. - TileProcessor.cpp: Pass coderPool to SchedulerFreebyrd constructor. Output is bit-exact with the standard scheduler for lossless (5/3) and matches for lossy (9/7). All 2069 existing tests pass.
1 parent 8315bd3 commit e27e04e

3 files changed

Lines changed: 328 additions & 21 deletions

File tree

src/lib/core/scheduling/freebyrd/SchedulerFreebyrd.cpp

Lines changed: 312 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,302 @@
1515
*
1616
*/
1717

18-
// Stub implementation — freebyrd thread pool has been removed.
19-
// SchedulerFreebyrd remains as a skeleton so the build succeeds but
20-
// setting GRK_SCHEDULER=freebyrd will log an error and fall back.
18+
#include "TFSingleton.h"
2119

20+
#include "geometry.h"
21+
#include "ISparseCanvas.h"
22+
#include "grk_restrict.h"
23+
#include "CodeStreamLimits.h"
24+
#include "TileWindow.h"
25+
#include "Quantizer.h"
2226
#include "Logger.h"
27+
#include "buffer.h"
28+
#include "GrkObjectWrapper.h"
29+
#include "TileFutureManager.h"
30+
#include "ImageComponentFlow.h"
31+
#include "IStream.h"
32+
#include "FetchCommon.h"
33+
#include "TPFetchSeq.h"
34+
#include "GrkImageMeta.h"
35+
#include "GrkImage.h"
36+
#include "MarkerParser.h"
37+
#include "PLMarker.h"
38+
#include "SIZMarker.h"
39+
#include "PPMMarker.h"
40+
namespace grk
41+
{
42+
struct ITileProcessor;
43+
}
44+
#include "CodeStream.h"
45+
#include "PacketLengthCache.h"
46+
#include "ICoder.h"
47+
#include "CoderPool.h"
48+
#include "BitIO.h"
49+
#include "TagTree.h"
50+
#include "CodeblockCompress.h"
51+
#include "CodeblockDecompress.h"
52+
#include "Precinct.h"
53+
#include "Subband.h"
54+
#include "Resolution.h"
55+
#include "CodecScheduler.h"
56+
#include "TileComponentWindow.h"
57+
#include "canvas/tile/Tile.h"
58+
#include "mct.h"
59+
#include "ITileProcessor.h"
60+
#include "CoderFactory.h"
61+
#include "WaveletReverse.h"
62+
#include "WaveletPoolData.h"
63+
#include "TileBlocks.h"
64+
#include "SchedulerStandard.h"
65+
#include "ImageComponentFlow.h"
2366
#include "SchedulerFreebyrd.h"
2467

2568
namespace grk
2669
{
2770

28-
SchedulerFreebyrd::SchedulerFreebyrd(uint16_t numcomps, uint8_t prec)
29-
: numcomps_(numcomps), prec_(prec), success_(true)
71+
// Minimal concrete SchedulerStandard subclass for providing ImageComponentFlow to WaveletReverse
72+
class DwtFlowHelper : public SchedulerStandard
73+
{
74+
public:
75+
explicit DwtFlowHelper(uint16_t numComps) : SchedulerStandard(numComps) {}
76+
bool scheduleT1([[maybe_unused]] ITileProcessor* proc) override { return true; }
77+
void release(void) override { SchedulerStandard::release(); }
78+
79+
void setupComponentFlow(uint16_t compno, uint8_t numRes, bool regionDecompress)
80+
{
81+
if(compno >= numcomps_)
82+
return;
83+
if(imageComponentFlow_[compno])
84+
delete imageComponentFlow_[compno];
85+
imageComponentFlow_[compno] = new ImageComponentFlow(numRes);
86+
if(regionDecompress)
87+
imageComponentFlow_[compno]->setRegionDecompression();
88+
// addTo must be called before graph to initialize composition tasks
89+
imageComponentFlow_[compno]->addTo(*this);
90+
SchedulerStandard::graph(compno);
91+
}
92+
};
93+
94+
SchedulerFreebyrd::SchedulerFreebyrd(uint16_t numcomps, uint8_t prec, CoderPool* streamPool)
95+
: numcomps_(numcomps), prec_(prec), success_(true), streamPool_(streamPool),
96+
waveletPoolData_(new WaveletPoolData()), dwtHelper_(new DwtFlowHelper(numcomps))
3097
{}
3198

3299
SchedulerFreebyrd::~SchedulerFreebyrd()
33100
{
34101
release();
102+
delete waveletPoolData_;
103+
delete dwtHelper_;
35104
}
36105

37106
void SchedulerFreebyrd::release()
38107
{
39108
blocksByComp_.clear();
40109
}
41110

42-
bool SchedulerFreebyrd::decompressTile([[maybe_unused]] ITileProcessor* tileProcessor)
111+
bool SchedulerFreebyrd::decompressTile(ITileProcessor* tileProcessor)
43112
{
44-
grklog.error("SchedulerFreebyrd: freebyrd backend has been removed. "
45-
"Unset GRK_SCHEDULER or use the default scheduler.");
46-
return false;
113+
success_ = true;
114+
115+
if(!decodeBlocks(tileProcessor))
116+
return false;
117+
118+
if(!runDWT(tileProcessor))
119+
return false;
120+
121+
if(!postProcess(tileProcessor))
122+
return false;
123+
124+
return success_;
47125
}
48126

49-
bool SchedulerFreebyrd::decodeBlocks([[maybe_unused]] ITileProcessor* tileProcessor)
127+
bool SchedulerFreebyrd::decodeBlocks(ITileProcessor* tileProcessor)
50128
{
51-
return false;
129+
auto tcp = tileProcessor->getTCP();
130+
bool cacheAll =
131+
(tileProcessor->getTileCacheStrategy() & GRK_TILE_CACHE_ALL) == GRK_TILE_CACHE_ALL;
132+
uint32_t num_threads = (uint32_t)TFSingleton::num_threads();
133+
bool finalLayer = tcp->layersToDecompress_ == tcp->numLayers_;
134+
135+
// Collect all blocks across all components
136+
std::vector<std::shared_ptr<t1::DecompressBlockExec>> allBlocks;
137+
138+
struct BlockDecodeContext
139+
{
140+
CoderPool* pool;
141+
uint8_t cblkwExpn;
142+
uint8_t cblkhExpn;
143+
bool cacheAll;
144+
bool isHT;
145+
uint32_t tileCacheStrategy;
146+
uint16_t cbw;
147+
uint16_t cbh;
148+
};
149+
std::vector<BlockDecodeContext> blockContexts;
150+
151+
for(uint16_t compno = 0; compno < numcomps_; ++compno)
152+
{
153+
if(!tileProcessor->shouldDecodeComponent(compno))
154+
continue;
155+
156+
auto tccp = tcp->tccps_ + compno;
157+
uint16_t cbw = tccp->cblkw_expn_ ? (uint16_t)(1 << tccp->cblkw_expn_) : 0U;
158+
uint16_t cbh = tccp->cblkh_expn_ ? (uint16_t)(1 << tccp->cblkh_expn_) : 0U;
159+
auto activePool = &coderPool_;
160+
if(streamPool_ && streamPool_->contains(tccp->cblkw_expn_, tccp->cblkh_expn_))
161+
activePool = streamPool_;
162+
163+
if(!cacheAll)
164+
{
165+
activePool->makeCoders(
166+
num_threads, tccp->cblkw_expn_, tccp->cblkh_expn_,
167+
[tcp, cbw, cbh, tileProcessor]() -> std::shared_ptr<t1::ICoder> {
168+
return std::shared_ptr<t1::ICoder>(t1::CoderFactory::makeCoder(
169+
tcp->isHT(), false, cbw, cbh, tileProcessor->getTileCacheStrategy()));
170+
});
171+
}
172+
173+
auto tilec = tileProcessor->getTile()->comps_ + compno;
174+
auto wholeTileDecoding = tilec->isWholeTileDecoding();
175+
uint8_t resBegin =
176+
cacheAll ? (uint8_t)tilec->currentPacketProgressionState_.numResolutionsRead() : 0;
177+
uint8_t resUpperBound = tilec->nextPacketProgressionState_.numResolutionsRead();
178+
179+
for(uint8_t resno = resBegin; resno < resUpperBound; ++resno)
180+
{
181+
auto res = tilec->resolutions_ + resno;
182+
for(uint8_t bandIndex = 0; bandIndex < res->numBands_; ++bandIndex)
183+
{
184+
auto band = res->band + bandIndex;
185+
auto paddedBandWindow = tilec->getBandWindowPadded(resno, band->orientation_);
186+
for(auto precinct : band->precincts_)
187+
{
188+
if(!wholeTileDecoding && !paddedBandWindow->nonEmptyIntersection(precinct))
189+
continue;
190+
for(uint32_t cblkno = 0; cblkno < precinct->getNumCblks(); ++cblkno)
191+
{
192+
auto cblkBounds = precinct->getCodeBlockBounds(cblkno);
193+
if(!wholeTileDecoding && !paddedBandWindow->nonEmptyIntersection(&cblkBounds))
194+
continue;
195+
196+
auto cblk = precinct->getDecompressBlock(cblkno);
197+
auto block = std::make_shared<t1::DecompressBlockExec>(cacheAll);
198+
block->x = cblk->x0();
199+
block->y = cblk->y0();
200+
block->postProcessor_ =
201+
tcp->isHT() ? t1::DecompressBlockPostProcessor<int32_t>(
202+
[tilec](int32_t* srcData, t1::DecompressBlockExec* blk,
203+
uint16_t stride) {
204+
tilec->postProcessBlockHT(srcData, blk, stride);
205+
})
206+
: t1::DecompressBlockPostProcessor<int32_t>(
207+
[tilec](int32_t* srcData, t1::DecompressBlockExec* blk,
208+
[[maybe_unused]] uint16_t stride) {
209+
tilec->postProcessBlock(srcData, blk);
210+
});
211+
block->bandIndex = bandIndex;
212+
block->bandNumbps = band->maxBitPlanes_;
213+
block->bandOrientation = band->orientation_;
214+
block->cblk = cblk;
215+
block->cblk_sty = tccp->cblkStyle_;
216+
block->qmfbid = tccp->qmfbid_;
217+
block->resno = resno;
218+
block->roishift = tccp->roishift_;
219+
block->stepsize = band->stepsize_;
220+
block->k_msbs = (uint8_t)(band->maxBitPlanes_ - cblk->numbps());
221+
block->R_b = prec_ + gain_b[band->orientation_];
222+
block->finalLayer_ = finalLayer;
223+
224+
allBlocks.push_back(block);
225+
blockContexts.push_back({activePool, tccp->cblkw_expn_, tccp->cblkh_expn_, cacheAll,
226+
tcp->isHT(), tileProcessor->getTileCacheStrategy(), cbw, cbh});
227+
}
228+
}
229+
}
230+
}
231+
tilec->currentPacketProgressionState_ = tilec->nextPacketProgressionState_;
232+
}
233+
234+
if(allBlocks.empty())
235+
return true;
236+
237+
// Decode all blocks in parallel using TaskFlow
238+
tf::Taskflow taskflow;
239+
for(size_t i = 0; i < allBlocks.size(); ++i)
240+
{
241+
taskflow.emplace([this, i, &allBlocks, &blockContexts, tileProcessor]() {
242+
if(!success_)
243+
return;
244+
auto& block = allBlocks[i];
245+
auto& ctx = blockContexts[i];
246+
t1::ICoder* coder = nullptr;
247+
if(block->needsCachedCoder())
248+
{
249+
coder = t1::CoderFactory::makeCoder(ctx.isHT, false, ctx.cbw, ctx.cbh,
250+
ctx.tileCacheStrategy);
251+
}
252+
else if(!ctx.cacheAll)
253+
{
254+
auto threadnum = TFSingleton::get().this_worker_id();
255+
coder = ctx.pool->getCoder((size_t)threadnum, ctx.cblkwExpn, ctx.cblkhExpn).get();
256+
}
257+
try
258+
{
259+
if(!block->open(coder))
260+
success_ = false;
261+
}
262+
catch(const std::runtime_error& rerr)
263+
{
264+
grklog.error(rerr.what());
265+
success_ = false;
266+
}
267+
});
268+
}
269+
TFSingleton::get().run(taskflow).wait();
270+
271+
return success_;
52272
}
53273

54-
bool SchedulerFreebyrd::runDWT([[maybe_unused]] ITileProcessor* tileProcessor)
274+
bool SchedulerFreebyrd::runDWT(ITileProcessor* tileProcessor)
55275
{
56-
return false;
276+
auto tcp = tileProcessor->getTCP();
277+
278+
// Release any previous ImageComponentFlow state and clear the taskflow
279+
dwtHelper_->release();
280+
dwtHelper_->clear();
281+
282+
for(uint16_t compno = 0; compno < numcomps_; ++compno)
283+
{
284+
if(!tileProcessor->shouldDecodeComponent(compno))
285+
continue;
286+
287+
auto tilec = tileProcessor->getTile()->comps_ + compno;
288+
uint8_t numRes = tilec->nextPacketProgressionState_.numResolutionsRead();
289+
if(numRes <= 1)
290+
continue;
291+
292+
// Create ImageComponentFlow for this component (required by WaveletReverse)
293+
dwtHelper_->setupComponentFlow(compno, numRes, !tilec->isWholeTileDecoding());
294+
295+
auto tccp = tcp->tccps_ + compno;
296+
auto maxDim = std::max(tileProcessor->getCodingParams()->t_width_,
297+
tileProcessor->getCodingParams()->t_height_);
298+
299+
WaveletReverse wavelet(dwtHelper_, tilec, compno, tilec->windowUnreducedBounds(), numRes,
300+
tccp->qmfbid_, maxDim, tcp->wholeTileDecompress_, waveletPoolData_);
301+
if(!wavelet.decompress())
302+
return false;
303+
304+
// WaveletReverse::decompress() only schedules tasks into the flow —
305+
// we must actually run them.
306+
TFSingleton::get().run(*dwtHelper_).wait();
307+
308+
// Clear the taskflow for the next component
309+
dwtHelper_->release();
310+
dwtHelper_->clear();
311+
}
312+
313+
return true;
57314
}
58315

59316
bool SchedulerFreebyrd::runCascadeDWT97([[maybe_unused]] ITileProcessor* tileProcessor,
@@ -74,9 +331,49 @@ bool SchedulerFreebyrd::runSeparateDWT16([[maybe_unused]] ITileProcessor* tilePr
74331
return false;
75332
}
76333

77-
bool SchedulerFreebyrd::postProcess([[maybe_unused]] ITileProcessor* tileProcessor)
334+
bool SchedulerFreebyrd::postProcess(ITileProcessor* tileProcessor)
78335
{
79-
return false;
336+
if(!tileProcessor->doPostT1())
337+
return true;
338+
339+
auto tcp = tileProcessor->getTCP();
340+
auto mct = tileProcessor->getMCT();
341+
342+
if(tileProcessor->needsMctDecompress())
343+
{
344+
// MCT with DC shift
345+
FlowComponent mctComp;
346+
if(tcp->tccps_->qmfbid_ == 1)
347+
mct->schedule_decompress_rev(&mctComp, true);
348+
else
349+
mct->schedule_decompress_irrev(&mctComp, true);
350+
351+
TFSingleton::get().run(mctComp).wait();
352+
}
353+
else
354+
{
355+
// DC shift only, per component
356+
for(uint16_t compno = 0; compno < numcomps_; ++compno)
357+
{
358+
if(!tileProcessor->shouldDecodeComponent(compno))
359+
continue;
360+
361+
auto tccp = tcp->tccps_ + compno;
362+
auto tilec = tileProcessor->getTile()->comps_ + compno;
363+
uint8_t numRes = tilec->nextPacketProgressionState_.numResolutionsRead();
364+
365+
// Freebyrd doesn't fuse DC shift into wavelet, so always apply it
366+
FlowComponent dcComp;
367+
if(tccp->qmfbid_ == 1)
368+
mct->schedule_decompress_dc_shift_rev(&dcComp, compno);
369+
else
370+
mct->schedule_decompress_dc_shift_irrev(&dcComp, compno);
371+
372+
TFSingleton::get().run(dcComp).wait();
373+
}
374+
}
375+
376+
return true;
80377
}
81378

82379
} // namespace grk

0 commit comments

Comments
 (0)