Skip to content

Commit 440f93b

Browse files
author
Grok Compression
committed
Add fetchByTileSelective for RLCP/RPCL/LRCP(1-layer) two-phase HTTP fetch
Phase 2 implementation: two-phase HTTP fetch for reduced resolution streaming over network. Phase 1 fetches 4KB tile-part headers to extract PLT packet lengths and SOD offsets. Phase 2 fetches only the needed low-resolution packet data (truncated tile-parts). Supports RLCP, RPCL, and LRCP with single layer (all have contiguous low-res data at start of packet data). Falls back to full fetch for other progression orders or when PLT parsing fails.
1 parent 5d8b0c1 commit 440f93b

2 files changed

Lines changed: 346 additions & 0 deletions

File tree

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ struct ITileProcessor;
6060
#include "TileCompletion.h"
6161
#include "GrkImageSIMD.h"
6262
#include "CodeStreamDecompress.h"
63+
#include "SelectiveFetchRanges.h"
6364

6465
namespace grk
6566
{
@@ -511,6 +512,21 @@ bool CodeStreamDecompress::startTLMDecompress(std::set<uint16_t>& pendingTiles)
511512
// begin network fetch
512513
auto generator = [this](ITileProcessor* tp) { return postMultiTile(tp); };
513514

515+
// Try selective fetch for reduced resolution over network
516+
uint8_t reduce = cp_.codingParams_.dec_.reduce_;
517+
if(reduce > 0 && stream_->getFetcher())
518+
{
519+
// Check progression order (use first tile's TCP)
520+
auto tcp = cp_.tcps_.get(*pendingTiles.begin());
521+
auto prog = tcp->prg_;
522+
if(prog == GRK_RLCP || prog == GRK_RPCL ||
523+
(prog == GRK_LRCP && tcp->numLayers_ == 1))
524+
{
525+
if(fetchByTileSelective(pendingTiles, scratchImage_->getBounds(), generator))
526+
return true;
527+
}
528+
}
529+
514530
if(fetchByTile(pendingTiles, scratchImage_->getBounds(), generator))
515531
return true;
516532

@@ -1731,6 +1747,333 @@ bool CodeStreamDecompress::fetchByTile(
17311747
return true;
17321748
}
17331749

1750+
bool CodeStreamDecompress::fetchByTileSelective(
1751+
std::set<uint16_t>& slated, Rect32 unreducedImageBounds,
1752+
std::function<std::function<void()>(ITileProcessor*)> postGenerator)
1753+
{
1754+
auto fetcher = stream_->getFetcher();
1755+
if(!fetcher)
1756+
return false;
1757+
1758+
const auto& allTileParts = cp_.tlmMarkers_->getTileParts();
1759+
uint8_t reduce = cp_.codingParams_.dec_.reduce_;
1760+
1761+
// Phase 1: Fetch 4KB header of each tile-part
1762+
constexpr uint32_t headerFetchSize = 4096;
1763+
1764+
// Create modified tile-part info with 4KB lengths for header fetch
1765+
TPSEQ_VEC headerTileParts(allTileParts.size());
1766+
for(auto tileIndex : slated)
1767+
{
1768+
if(tileIndex >= allTileParts.size() || !allTileParts[tileIndex])
1769+
continue;
1770+
auto& srcParts = allTileParts[tileIndex];
1771+
headerTileParts[tileIndex] = std::make_unique<TPSeq>();
1772+
for(size_t i = 0; i < srcParts->size(); ++i)
1773+
{
1774+
auto& part = (*srcParts)[i];
1775+
uint32_t fetchLen = std::min<uint32_t>((uint32_t)part->length_, headerFetchSize);
1776+
headerTileParts[tileIndex]->push_back((uint8_t)i, (uint8_t)srcParts->size(),
1777+
part->offset_, fetchLen);
1778+
}
1779+
}
1780+
1781+
// Structure to collect Phase 1 results
1782+
struct TileHeaderResult
1783+
{
1784+
std::vector<TilePartHeaderInfo> headerInfos; // per tile-part
1785+
std::vector<std::unique_ptr<uint8_t[]>> headerData; // raw header bytes
1786+
std::vector<uint32_t> headerSizes; // actual header sizes
1787+
};
1788+
auto headerResults =
1789+
std::make_shared<std::unordered_map<uint16_t, TileHeaderResult>>();
1790+
std::mutex headerMutex;
1791+
std::promise<bool> phase1Promise;
1792+
auto phase1Future = phase1Promise.get_future();
1793+
auto remainingTiles = std::make_shared<std::atomic<size_t>>(slated.size());
1794+
1795+
// Phase 1 fetch: get tile-part headers
1796+
fetcher->fetchTiles(
1797+
headerTileParts, slated, nullptr,
1798+
[&headerResults, &headerMutex, &remainingTiles, &phase1Promise](
1799+
size_t requestIndex, TileFetchContext* context) {
1800+
auto& tilePart = (*context->requests_)[requestIndex];
1801+
auto tileIndex = tilePart->tileIndex_;
1802+
1803+
auto& tilePartSeq = (*context->tilePartFetchByTile_)[tileIndex];
1804+
if(tilePartSeq->incrementFetchCount() == tilePartSeq->size())
1805+
{
1806+
// All tile-parts for this tile have arrived
1807+
TileHeaderResult result;
1808+
for(size_t i = 0; i < tilePartSeq->size(); ++i)
1809+
{
1810+
auto& tp = (*tilePartSeq)[i];
1811+
// Parse PLT and SOD from the raw header bytes
1812+
// Skip SOT marker (12 bytes) at start of tile-part data
1813+
constexpr size_t sotLen = 12;
1814+
if(tp->length_ > sotLen && tp->data_)
1815+
{
1816+
auto info = extractTilePartHeaderInfo(
1817+
tp->data_.get() + sotLen, (size_t)(tp->length_ - sotLen));
1818+
// Adjust SOD offset to be relative to tile-part start
1819+
if(info.valid)
1820+
info.sodOffset += sotLen;
1821+
result.headerInfos.push_back(std::move(info));
1822+
result.headerData.push_back(std::move(tp->data_));
1823+
result.headerSizes.push_back((uint32_t)tp->length_);
1824+
}
1825+
else
1826+
{
1827+
result.headerInfos.push_back(TilePartHeaderInfo{0, {}, false});
1828+
result.headerData.push_back(nullptr);
1829+
result.headerSizes.push_back(0);
1830+
}
1831+
}
1832+
{
1833+
std::lock_guard<std::mutex> lock(headerMutex);
1834+
(*headerResults)[tileIndex] = std::move(result);
1835+
}
1836+
if(remainingTiles->fetch_sub(1) == 1)
1837+
phase1Promise.set_value(true);
1838+
}
1839+
});
1840+
1841+
// Wait for Phase 1 to complete
1842+
phase1Future.get();
1843+
1844+
// Phase 2: Compute needed lengths and fetch truncated tile-parts
1845+
auto numComps = headerImage_->numcomps;
1846+
Rect32 imageBounds(headerImage_->x0, headerImage_->y0, headerImage_->x1, headerImage_->y1);
1847+
1848+
// Build modified tile-part info with truncated lengths
1849+
TPSEQ_VEC selectiveTileParts(allTileParts.size());
1850+
auto selectiveFetchTiles = std::make_shared<std::set<uint16_t>>();
1851+
1852+
for(auto tileIndex : slated)
1853+
{
1854+
auto it = headerResults->find(tileIndex);
1855+
if(it == headerResults->end())
1856+
continue;
1857+
1858+
auto& hdr = it->second;
1859+
bool allValid = true;
1860+
for(auto& info : hdr.headerInfos)
1861+
{
1862+
if(!info.valid || info.pltLengths.empty())
1863+
{
1864+
allValid = false;
1865+
break;
1866+
}
1867+
}
1868+
1869+
if(!allValid)
1870+
{
1871+
// Fallback: fetch full tile-part (no PLT or parsing failed)
1872+
selectiveTileParts[tileIndex] = std::make_unique<TPSeq>();
1873+
auto& srcParts = allTileParts[tileIndex];
1874+
for(size_t i = 0; i < srcParts->size(); ++i)
1875+
{
1876+
auto& part = (*srcParts)[i];
1877+
selectiveTileParts[tileIndex]->push_back(
1878+
(uint8_t)i, (uint8_t)srcParts->size(), part->offset_, (uint32_t)part->length_);
1879+
}
1880+
selectiveFetchTiles->insert(tileIndex);
1881+
continue;
1882+
}
1883+
1884+
// Compute tile bounds
1885+
uint16_t tileX = tileIndex % cp_.t_grid_width_;
1886+
uint16_t tileY = tileIndex / cp_.t_grid_width_;
1887+
auto tileBounds = cp_.getTileBounds(imageBounds, tileX, tileY);
1888+
1889+
// Get coding params for this tile
1890+
auto tcp = cp_.tcps_.get(tileIndex);
1891+
1892+
// Combine PLT lengths from all tile-parts
1893+
std::vector<uint32_t> allPltLengths;
1894+
for(auto& info : hdr.headerInfos)
1895+
{
1896+
allPltLengths.insert(allPltLengths.end(),
1897+
info.pltLengths.begin(), info.pltLengths.end());
1898+
}
1899+
1900+
// Build TilePacketInfo
1901+
TilePacketInfo tpi;
1902+
tpi.progression = tcp->prg_;
1903+
tpi.numComponents = numComps;
1904+
tpi.numLayers = tcp->numLayers_;
1905+
tpi.numResolutions.resize(numComps);
1906+
tpi.precinctsPerRes.resize(numComps);
1907+
tpi.pltLengths = std::move(allPltLengths);
1908+
1909+
for(uint16_t compno = 0; compno < numComps; ++compno)
1910+
{
1911+
auto tccp = tcp->tccps_ + compno;
1912+
auto comp = headerImage_->comps + compno;
1913+
tpi.numResolutions[compno] = tccp->numresolutions_;
1914+
1915+
// Compute tile-component bounds (scale by component subsampling)
1916+
uint32_t tcx0 = (uint32_t)((tileBounds.x0 + comp->dx - 1) / comp->dx);
1917+
uint32_t tcy0 = (uint32_t)((tileBounds.y0 + comp->dy - 1) / comp->dy);
1918+
uint32_t tcx1 = (uint32_t)((tileBounds.x1 + comp->dx - 1) / comp->dx);
1919+
uint32_t tcy1 = (uint32_t)((tileBounds.y1 + comp->dy - 1) / comp->dy);
1920+
1921+
tpi.precinctsPerRes[compno].resize(tccp->numresolutions_);
1922+
for(uint8_t resno = 0; resno < tccp->numresolutions_; ++resno)
1923+
{
1924+
tpi.precinctsPerRes[compno][resno] = computeNumPrecincts(
1925+
tcx0, tcy0, tcx1, tcy1,
1926+
tccp->numresolutions_, resno,
1927+
tccp->precWidthExp_[resno], tccp->precHeightExp_[resno]);
1928+
}
1929+
}
1930+
1931+
// Compute target resolution count (total res - reduce)
1932+
uint8_t maxRes = 0;
1933+
for(uint16_t c = 0; c < numComps; ++c)
1934+
maxRes = std::max(maxRes, tpi.numResolutions[c]);
1935+
uint8_t targetRes = (maxRes > reduce) ? (maxRes - reduce) : 1;
1936+
1937+
auto ranges = computeSelectiveFetchRanges(tpi, targetRes);
1938+
1939+
if(ranges.empty())
1940+
{
1941+
// No savings possible, fetch full tile-parts
1942+
selectiveTileParts[tileIndex] = std::make_unique<TPSeq>();
1943+
auto& srcParts = allTileParts[tileIndex];
1944+
for(size_t i = 0; i < srcParts->size(); ++i)
1945+
{
1946+
auto& part = (*srcParts)[i];
1947+
selectiveTileParts[tileIndex]->push_back(
1948+
(uint8_t)i, (uint8_t)srcParts->size(), part->offset_, (uint32_t)part->length_);
1949+
}
1950+
selectiveFetchTiles->insert(tileIndex);
1951+
continue;
1952+
}
1953+
1954+
// For RLCP/RPCL: ranges should be a single contiguous block starting at offset 0
1955+
// Needed length = end of last range
1956+
uint64_t neededDataBytes = ranges.back().end();
1957+
1958+
// Create truncated tile-part entries
1959+
// For single tile-part: header + needed data
1960+
// For multiple tile-parts: first tile-part gets header + data, rest get header only
1961+
selectiveTileParts[tileIndex] = std::make_unique<TPSeq>();
1962+
auto& srcParts = allTileParts[tileIndex];
1963+
1964+
if(srcParts->size() == 1)
1965+
{
1966+
// Single tile-part: header + SOD(2) + needed data
1967+
auto& part = (*srcParts)[0];
1968+
auto& headerInfo = hdr.headerInfos[0];
1969+
uint64_t truncatedLen = headerInfo.sodOffset + 2 + neededDataBytes;
1970+
truncatedLen = std::min<uint64_t>(truncatedLen, part->length_);
1971+
selectiveTileParts[tileIndex]->push_back(0, 1, part->offset_, (uint32_t)truncatedLen);
1972+
}
1973+
else
1974+
{
1975+
// Multiple tile-parts: distribute needed bytes across tile-parts
1976+
// For now, fetch all tile-parts but truncate based on cumulative PLT
1977+
uint64_t remainingNeeded = neededDataBytes;
1978+
for(size_t i = 0; i < srcParts->size(); ++i)
1979+
{
1980+
auto& part = (*srcParts)[i];
1981+
auto& headerInfo = hdr.headerInfos[i];
1982+
if(!headerInfo.valid)
1983+
{
1984+
// Can't truncate, fetch full
1985+
selectiveTileParts[tileIndex]->push_back(
1986+
(uint8_t)i, (uint8_t)srcParts->size(), part->offset_, (uint32_t)part->length_);
1987+
continue;
1988+
}
1989+
uint64_t tpDataSize = part->length_ - headerInfo.sodOffset - 2;
1990+
if(remainingNeeded >= tpDataSize)
1991+
{
1992+
// Need all data from this tile-part
1993+
selectiveTileParts[tileIndex]->push_back(
1994+
(uint8_t)i, (uint8_t)srcParts->size(), part->offset_, (uint32_t)part->length_);
1995+
remainingNeeded -= tpDataSize;
1996+
}
1997+
else
1998+
{
1999+
// Truncate this tile-part
2000+
uint64_t truncatedLen = headerInfo.sodOffset + 2 + remainingNeeded;
2001+
selectiveTileParts[tileIndex]->push_back(
2002+
(uint8_t)i, (uint8_t)srcParts->size(), part->offset_, (uint32_t)truncatedLen);
2003+
remainingNeeded = 0;
2004+
}
2005+
}
2006+
}
2007+
selectiveFetchTiles->insert(tileIndex);
2008+
}
2009+
2010+
if(selectiveFetchTiles->empty())
2011+
return false;
2012+
2013+
// Phase 2: Fetch truncated tile-parts through existing pipeline
2014+
maxFetchedTileRow_.store(-1, std::memory_order_release);
2015+
2016+
startDecompressConsumer(std::min((uint16_t)TFSingleton::num_threads(),
2017+
(uint16_t)((maxRowsAhead_ + 1) * cp_.t_grid_width_)));
2018+
2019+
auto numTileCols = cp_.t_grid_width_;
2020+
fetcher->setFetchThrottle([this]() {
2021+
if(tileCompletion_)
2022+
{
2023+
int32_t lastCleared = tileCompletion_->getLastClearedTileY();
2024+
int32_t maxAllowed = lastCleared + maxRowsAhead_ + 2;
2025+
if(maxFetchedTileRow_.load(std::memory_order_acquire) >= maxAllowed)
2026+
return false;
2027+
}
2028+
uint16_t inFlight;
2029+
{
2030+
std::lock_guard<std::mutex> lock(decompressThrottleMutex_);
2031+
inFlight = decompressInFlight_;
2032+
}
2033+
return decompressQueue_->size() + inFlight < maxDecompressInFlight_;
2034+
});
2035+
2036+
fetchByTileFutures_.push_back(fetcher->fetchTiles(
2037+
selectiveTileParts, *selectiveFetchTiles, nullptr,
2038+
[this, numTileCols, unreducedImageBounds, postGenerator,
2039+
selectiveFetchTiles](size_t requestIndex, TileFetchContext* context) {
2040+
auto& tilePart = (*context->requests_)[requestIndex];
2041+
tilePart->stream_ = std::unique_ptr<IStream>(memStreamCreate(
2042+
tilePart->data_.get(), tilePart->length_, false, nullptr, stream_->getFormat(), true));
2043+
auto& tilePartSeq = (*context->tilePartFetchByTile_)[tilePart->tileIndex_];
2044+
if(tilePartSeq->incrementFetchCount() == tilePartSeq->size())
2045+
{
2046+
auto tileIndex = tilePart->tileIndex_;
2047+
2048+
if(compressedChunkCache_)
2049+
compressedChunkCache_->put(tileIndex, tilePartSeq);
2050+
2051+
int32_t tileRow = tileIndex / numTileCols;
2052+
int32_t prev = maxFetchedTileRow_.load(std::memory_order_acquire);
2053+
while(prev < tileRow &&
2054+
!maxFetchedTileRow_.compare_exchange_weak(prev, tileRow,
2055+
std::memory_order_release,
2056+
std::memory_order_acquire))
2057+
{
2058+
}
2059+
auto tilePartSeqCopy = tilePartSeq;
2060+
decompressQueue_->push(
2061+
[this, tileIndex, tilePartSeqCopy, unreducedImageBounds, postGenerator]() {
2062+
const auto tileProcessor = getTileProcessor(tileIndex);
2063+
// Enable selective fetch mode on the packet cache
2064+
auto tcp = tileProcessor->getTCP();
2065+
if(tcp && tcp->packets_)
2066+
tcp->packets_->setSelectiveFetch(true);
2067+
auto decompressTask = genDecompressTileTLMTask(tileProcessor, tilePartSeqCopy,
2068+
unreducedImageBounds, postGenerator);
2069+
decompressTask();
2070+
});
2071+
}
2072+
}));
2073+
2074+
return true;
2075+
}
2076+
17342077
/////////////////////////////////////////////////////////////////////
17352078

17362079
void CodeStreamDecompress::initTilesToDecompress(Rect16 region)

src/lib/core/codestream/decompress/CodeStreamDecompress.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,9 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
463463
bool fetchByTile(std::set<uint16_t>& slated, Rect32 unreducedImageBounds,
464464
std::function<std::function<void()>(ITileProcessor*)> postGenerator);
465465

466+
bool fetchByTileSelective(std::set<uint16_t>& slated, Rect32 unreducedImageBounds,
467+
std::function<std::function<void()>(ITileProcessor*)> postGenerator);
468+
466469
/**
467470
* @brief Scratch @ref GrkImage for decompressor
468471
* This image may composite multiple tiles, if needed.

0 commit comments

Comments
 (0)