Skip to content

Commit c1b76c0

Browse files
author
Grok Compression
committed
Phase 3: fine-grained dependency signaling via gates
Replace strict-sequencing DWT scheduling with gated approach: - Two domains per component: T1 (free-running) and DWT (gated sequences) - Gates created per resolution: H-DWT at res R gated on T1 blocks completing at that resolution (res 1 also waits for res 0 blocks) - T1 decode callback signals gate after each block completes - H-DWT batches submitted as gated (scx_engine_submit_gated_batch) - V-DWT uses sequence ordering within DWT domain (no gate needed) - DWT stripes become eligible incrementally as T1 blocks complete, rather than waiting for ALL T1 to finish Also includes incidental clang-format fixes from format.sh.
1 parent 593b576 commit c1b76c0

13 files changed

Lines changed: 233 additions & 166 deletions

File tree

src/lib/codec/apps/GrkCompress.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ GrkRC GrkCompress::parseCommandLine(int argc, const char* argv[], CompressInitPa
941941

942942
bool xyzTransform;
943943
auto xyzOpt = app.add_flag("--xyz", xyzTransform,
944-
"Apply Rec.709 RGB to DCI X'Y'Z' colour transform before compression");
944+
"Apply Rec.709 RGB to DCI X'Y'Z' colour transform before compression");
945945

946946
app.set_help_flag("-h", "Show abreviated usage");
947947
app.set_version_flag("-v,--version", grk_version(), "Show version");

src/lib/core/canvas/resolution/ResWindow.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,20 +171,20 @@ struct ResWindow
171171
Buf2dAligned* bandWindowsBuffersPaddedXL,
172172
Buf2dAligned* bandWindowsBuffersPaddedXH, bool useRelative)
173173
{
174-
auto lowerRes = useRelative ? Rect32(tileCompAtLowerRes_).toRelative() : Rect32(tileCompAtLowerRes_);
174+
auto lowerRes =
175+
useRelative ? Rect32(tileCompAtLowerRes_).toRelative() : Rect32(tileCompAtLowerRes_);
175176
auto xlBounds = useRelative ? Rect32(bandWindowsBuffersPaddedXL).toRelative()
176177
: Rect32(bandWindowsBuffersPaddedXL);
177178
auto xhBounds = useRelative ? Rect32(bandWindowsBuffersPaddedXH).toRelative()
178179
: Rect32(bandWindowsBuffersPaddedXH);
179180

180181
// two windows formed by horizontal pass and used as input for vertical pass
181-
auto splitResWindowBounds = Rect32(resWindowBuffer->x0, xlBounds.y0,
182-
resWindowBuffer->x1, xlBounds.y1);
182+
auto splitResWindowBounds =
183+
Rect32(resWindowBuffer->x0, xlBounds.y0, resWindowBuffer->x1, xlBounds.y1);
183184
resWindowBufferSplit[SPLIT_L] = new Buf2dAligned(splitResWindowBounds);
184185

185-
splitResWindowBounds =
186-
Rect32(resWindowBuffer->x0, lowerRes.y1 + xhBounds.y0,
187-
resWindowBuffer->x1, lowerRes.y1 + xhBounds.y1);
186+
splitResWindowBounds = Rect32(resWindowBuffer->x0, lowerRes.y1 + xhBounds.y0,
187+
resWindowBuffer->x1, lowerRes.y1 + xhBounds.y1);
188188
resWindowBufferSplit[SPLIT_H] = new Buf2dAligned(splitResWindowBounds);
189189
}
190190
bool alloc(bool clear)

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 70 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,25 +1763,22 @@ void CodeStreamDecompress::enqueueTileForDecompress(
17631763

17641764
int32_t tileRow = tileIndex / numTileCols;
17651765
int32_t prev = maxFetchedTileRow_.load(std::memory_order_acquire);
1766-
while(prev < tileRow &&
1767-
!maxFetchedTileRow_.compare_exchange_weak(prev, tileRow, std::memory_order_release,
1768-
std::memory_order_acquire))
1766+
while(prev < tileRow && !maxFetchedTileRow_.compare_exchange_weak(
1767+
prev, tileRow, std::memory_order_release, std::memory_order_acquire))
17691768
{
17701769
}
1771-
decompressQueue_->push(
1772-
[this, tileIndex, decompressSeq, unreducedImageBounds, postGenerator]() {
1773-
const auto tileProcessor = getTileProcessor(tileIndex);
1774-
auto* tp = dynamic_cast<TileProcessor*>(tileProcessor);
1775-
if(tp)
1776-
tp->setSelectiveFetch(true);
1777-
auto decompressTask = genDecompressTileTLMTask(tileProcessor, decompressSeq,
1778-
unreducedImageBounds, postGenerator);
1779-
decompressTask();
1780-
});
1770+
decompressQueue_->push([this, tileIndex, decompressSeq, unreducedImageBounds, postGenerator]() {
1771+
const auto tileProcessor = getTileProcessor(tileIndex);
1772+
auto* tp = dynamic_cast<TileProcessor*>(tileProcessor);
1773+
if(tp)
1774+
tp->setSelectiveFetch(true);
1775+
auto decompressTask =
1776+
genDecompressTileTLMTask(tileProcessor, decompressSeq, unreducedImageBounds, postGenerator);
1777+
decompressTask();
1778+
});
17811779
}
17821780

1783-
void CodeStreamDecompress::buildSelectiveTileParts(uint16_t tileIndex,
1784-
const TileHeaderResult& hdr,
1781+
void CodeStreamDecompress::buildSelectiveTileParts(uint16_t tileIndex, const TileHeaderResult& hdr,
17851782
const TPSEQ_VEC& allTileParts, uint8_t reduce)
17861783
{
17871784
auto numComps = headerImage_->numcomps;
@@ -1949,11 +1946,10 @@ void CodeStreamDecompress::buildSelectiveTileParts(uint16_t tileIndex,
19491946
}
19501947
}
19511948

1952-
std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>>
1953-
CodeStreamDecompress::reusePhase1Data(
1954-
std::shared_ptr<std::set<uint16_t>>& selectiveFetchTiles,
1955-
const std::unordered_map<uint16_t, TileHeaderResult>& headerResults,
1956-
const TPSEQ_VEC& allTileParts)
1949+
std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>> CodeStreamDecompress::reusePhase1Data(
1950+
std::shared_ptr<std::set<uint16_t>>& selectiveFetchTiles,
1951+
const std::unordered_map<uint16_t, TileHeaderResult>& headerResults,
1952+
const TPSEQ_VEC& allTileParts)
19571953
{
19581954
std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>> prefetchedTiles;
19591955

@@ -2122,9 +2118,8 @@ bool CodeStreamDecompress::fetchByTileSelective(
21222118
// Create modified tile-part info for Phase 1 fetch.
21232119
// For contiguous progressions, estimate the needed data size and fetch that
21242120
// directly, eliminating the need for a second round-trip in most cases.
2125-
double pixelRatio = (contiguousProgression && reduce > 0)
2126-
? (1.0 / std::pow(4.0, reduce)) * 2.0
2127-
: 0.0;
2121+
double pixelRatio =
2122+
(contiguousProgression && reduce > 0) ? (1.0 / std::pow(4.0, reduce)) * 2.0 : 0.0;
21282123

21292124
TPSEQ_VEC headerTileParts(allTileParts.size());
21302125
for(auto tileIndex : slated)
@@ -2139,8 +2134,8 @@ bool CodeStreamDecompress::fetchByTileSelective(
21392134
uint32_t fetchLen;
21402135
if(contiguousProgression && reduce > 0)
21412136
{
2142-
uint64_t estimated = std::max<uint64_t>(
2143-
headerFetchSize, (uint64_t)(part->length_ * pixelRatio));
2137+
uint64_t estimated =
2138+
std::max<uint64_t>(headerFetchSize, (uint64_t)(part->length_ * pixelRatio));
21442139
fetchLen = (uint32_t)std::min<uint64_t>(estimated, part->length_);
21452140
}
21462141
else
@@ -2259,64 +2254,64 @@ bool CodeStreamDecompress::fetchByTileSelective(
22592254

22602255
if(!selectiveFetchTiles->empty())
22612256
{
2262-
installFetchThrottle(fetcher);
2263-
2264-
fetchByTileFutures_.push_back(fetcher->fetchTiles(
2265-
selectiveTileParts_, *selectiveFetchTiles, nullptr,
2266-
[this, numTileCols, unreducedImageBounds, postGenerator, selectiveFetchTiles,
2267-
disjointTiles](size_t requestIndex, TileFetchContext* context) {
2268-
auto& tilePart = (*context->requests_)[requestIndex];
2269-
auto tileIndex = tilePart->tileIndex_;
2270-
auto& tilePartSeq = (*context->tilePartFetchByTile_)[tileIndex];
2257+
installFetchThrottle(fetcher);
22712258

2272-
if(!disjointTiles->count(tileIndex))
2273-
{
2274-
// Contiguous case: create MemStream per entry (standard path)
2275-
tilePart->stream_ =
2276-
std::unique_ptr<IStream>(memStreamCreate(tilePart->data_.get(), tilePart->length_,
2277-
false, nullptr, stream_->getFormat(), true));
2278-
}
2259+
fetchByTileFutures_.push_back(fetcher->fetchTiles(
2260+
selectiveTileParts_, *selectiveFetchTiles, nullptr,
2261+
[this, numTileCols, unreducedImageBounds, postGenerator, selectiveFetchTiles,
2262+
disjointTiles](size_t requestIndex, TileFetchContext* context) {
2263+
auto& tilePart = (*context->requests_)[requestIndex];
2264+
auto tileIndex = tilePart->tileIndex_;
2265+
auto& tilePartSeq = (*context->tilePartFetchByTile_)[tileIndex];
22792266

2280-
if(tilePartSeq->incrementFetchCount() == tilePartSeq->size())
2281-
{
2282-
std::shared_ptr<TPFetchSeq> decompressSeq;
2267+
if(!disjointTiles->count(tileIndex))
2268+
{
2269+
// Contiguous case: create MemStream per entry (standard path)
2270+
tilePart->stream_ = std::unique_ptr<IStream>(
2271+
memStreamCreate(tilePart->data_.get(), tilePart->length_, false, nullptr,
2272+
stream_->getFormat(), true));
2273+
}
22832274

2284-
if(disjointTiles->count(tileIndex))
2275+
if(tilePartSeq->incrementFetchCount() == tilePartSeq->size())
22852276
{
2286-
// Disjoint case: assemble header + data ranges into single buffer
2287-
// Entry 0 = header (with SOT), entries 1..N = data ranges
2288-
uint64_t totalSize = 0;
2289-
for(size_t i = 0; i < tilePartSeq->size(); ++i)
2290-
totalSize += (*tilePartSeq)[i]->length_;
2291-
2292-
auto assembled = std::make_unique<uint8_t[]>(totalSize);
2293-
uint64_t pos = 0;
2294-
for(size_t i = 0; i < tilePartSeq->size(); ++i)
2277+
std::shared_ptr<TPFetchSeq> decompressSeq;
2278+
2279+
if(disjointTiles->count(tileIndex))
2280+
{
2281+
// Disjoint case: assemble header + data ranges into single buffer
2282+
// Entry 0 = header (with SOT), entries 1..N = data ranges
2283+
uint64_t totalSize = 0;
2284+
for(size_t i = 0; i < tilePartSeq->size(); ++i)
2285+
totalSize += (*tilePartSeq)[i]->length_;
2286+
2287+
auto assembled = std::make_unique<uint8_t[]>(totalSize);
2288+
uint64_t pos = 0;
2289+
for(size_t i = 0; i < tilePartSeq->size(); ++i)
2290+
{
2291+
auto& tp = (*tilePartSeq)[i];
2292+
if(tp->data_)
2293+
std::memcpy(assembled.get() + pos, tp->data_.get(), tp->length_);
2294+
pos += tp->length_;
2295+
}
2296+
2297+
// Create single-entry TPFetchSeq with assembled data
2298+
decompressSeq = std::make_shared<TPFetchSeq>();
2299+
auto assembledFetch = std::make_shared<TPFetch>(0, totalSize, tileIndex);
2300+
assembledFetch->data_ = std::move(assembled);
2301+
assembledFetch->stream_ = std::unique_ptr<IStream>(
2302+
memStreamCreate(assembledFetch->data_.get(), totalSize, false, nullptr,
2303+
stream_->getFormat(), true));
2304+
decompressSeq->SharedPtrSeq<TPFetch>::push_back(assembledFetch);
2305+
}
2306+
else
22952307
{
2296-
auto& tp = (*tilePartSeq)[i];
2297-
if(tp->data_)
2298-
std::memcpy(assembled.get() + pos, tp->data_.get(), tp->length_);
2299-
pos += tp->length_;
2308+
decompressSeq = tilePartSeq;
23002309
}
23012310

2302-
// Create single-entry TPFetchSeq with assembled data
2303-
decompressSeq = std::make_shared<TPFetchSeq>();
2304-
auto assembledFetch = std::make_shared<TPFetch>(0, totalSize, tileIndex);
2305-
assembledFetch->data_ = std::move(assembled);
2306-
assembledFetch->stream_ = std::unique_ptr<IStream>(
2307-
memStreamCreate(assembledFetch->data_.get(), totalSize, false, nullptr,
2308-
stream_->getFormat(), true));
2309-
decompressSeq->SharedPtrSeq<TPFetch>::push_back(assembledFetch);
2310-
}
2311-
else
2312-
{
2313-
decompressSeq = tilePartSeq;
2311+
enqueueTileForDecompress(tileIndex, decompressSeq, numTileCols, unreducedImageBounds,
2312+
postGenerator);
23142313
}
2315-
2316-
enqueueTileForDecompress(tileIndex, decompressSeq, numTileCols, unreducedImageBounds,
2317-
postGenerator);
2318-
}
2319-
}));
2314+
}));
23202315
} // if(!selectiveFetchTiles->empty())
23212316

23222317
return true;

src/lib/core/codestream/decompress/CodeStreamDecompress.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -508,10 +508,10 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
508508
* @param unreducedImageBounds unreduced image bounds for decompress tasks
509509
* @param postGenerator factory for post-decompress callbacks
510510
*/
511-
void enqueueTileForDecompress(
512-
uint16_t tileIndex, std::shared_ptr<TPFetchSeq> decompressSeq, uint16_t numTileCols,
513-
Rect32 unreducedImageBounds,
514-
std::function<std::function<void()>(ITileProcessor*)> postGenerator);
511+
void
512+
enqueueTileForDecompress(uint16_t tileIndex, std::shared_ptr<TPFetchSeq> decompressSeq,
513+
uint16_t numTileCols, Rect32 unreducedImageBounds,
514+
std::function<std::function<void()>(ITileProcessor*)> postGenerator);
515515

516516
/**
517517
* @brief Build selective tile-part entries for a single tile.
@@ -548,10 +548,10 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
548548
* @param allTileParts full tile-part info from TLM
549549
* @return vector of tiles with pre-built decompression sequences
550550
*/
551-
std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>> reusePhase1Data(
552-
std::shared_ptr<std::set<uint16_t>>& selectiveFetchTiles,
553-
const std::unordered_map<uint16_t, TileHeaderResult>& headerResults,
554-
const TPSEQ_VEC& allTileParts);
551+
std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>>
552+
reusePhase1Data(std::shared_ptr<std::set<uint16_t>>& selectiveFetchTiles,
553+
const std::unordered_map<uint16_t, TileHeaderResult>& headerResults,
554+
const TPSEQ_VEC& allTileParts);
555555

556556
/**
557557
* @brief Install fetch throttle on the given fetcher.

src/lib/core/fileformat/decompress/FileFormatMJ2Decompress.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,7 @@ void FileFormatMJ2Decompress::stsc_decompact(mj2_tk* tk)
555555
samples_per_chunk, (uint32_t)tk->samples_.size());
556556
return;
557557
}
558-
auto num_chunks = (uint32_t)ceil((double)tk->samples_.size() /
559-
(double)samples_per_chunk);
558+
auto num_chunks = (uint32_t)ceil((double)tk->samples_.size() / (double)samples_per_chunk);
560559
for(uint32_t k = 0; k < num_chunks; k++)
561560
{
562561
mj2_chunk chunk;

0 commit comments

Comments
 (0)