Skip to content

Commit 2bbfda3

Browse files
author
Grok Compression
committed
selective fetch: reuse initial 4k fetch for relevant packets
1 parent ccddf68 commit 2bbfda3

1 file changed

Lines changed: 143 additions & 1 deletion

File tree

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2039,7 +2039,119 @@ bool CodeStreamDecompress::fetchByTileSelective(
20392039
selectiveFetchTiles->insert(tileIndex);
20402040
}
20412041

2042-
if(selectiveFetchTiles->empty())
2042+
// Reuse Phase 1 data for tiles where all needed data fits within the header fetch
2043+
struct PrefetchedTile
2044+
{
2045+
uint16_t tileIndex;
2046+
std::shared_ptr<TPFetchSeq> decompressSeq;
2047+
bool disjoint;
2048+
};
2049+
std::vector<PrefetchedTile> prefetchedTiles;
2050+
for(auto it = selectiveFetchTiles->begin(); it != selectiveFetchTiles->end();)
2051+
{
2052+
auto tileIndex = *it;
2053+
auto& selParts = selectiveTileParts_[tileIndex];
2054+
auto headerIt = headerResults->find(tileIndex);
2055+
if(!selParts || headerIt == headerResults->end())
2056+
{
2057+
++it;
2058+
continue;
2059+
}
2060+
auto& hdr = headerIt->second;
2061+
auto& srcParts = allTileParts[tileIndex];
2062+
bool isDjoint = (selParts->size() != srcParts->size());
2063+
2064+
// Check if all entries' data fits within Phase 1 buffers
2065+
bool allFit = true;
2066+
if(!isDjoint)
2067+
{
2068+
// Contiguous: each entry maps 1:1 to a tile-part's Phase 1 buffer
2069+
for(size_t i = 0; i < selParts->size(); ++i)
2070+
{
2071+
if(i >= hdr.headerSizes.size() || !hdr.headerData[i] ||
2072+
(*selParts)[i]->length_ > hdr.headerSizes[i])
2073+
{
2074+
allFit = false;
2075+
break;
2076+
}
2077+
}
2078+
}
2079+
else
2080+
{
2081+
// Disjoint (single tile-part only): entries are sub-ranges within the one buffer
2082+
if(srcParts->size() != 1 || hdr.headerSizes.empty() || !hdr.headerData[0])
2083+
{
2084+
allFit = false;
2085+
}
2086+
else
2087+
{
2088+
uint64_t baseOffset = (*srcParts)[0]->offset_;
2089+
uint32_t bufSize = hdr.headerSizes[0];
2090+
for(size_t i = 0; i < selParts->size(); ++i)
2091+
{
2092+
auto& entry = (*selParts)[i];
2093+
uint64_t relOff = entry->offset_ - baseOffset;
2094+
if(relOff + entry->length_ > bufSize)
2095+
{
2096+
allFit = false;
2097+
break;
2098+
}
2099+
}
2100+
}
2101+
}
2102+
if(!allFit)
2103+
{
2104+
++it;
2105+
continue;
2106+
}
2107+
// Build decompression sequence from Phase 1 data
2108+
if(!isDjoint)
2109+
{
2110+
auto decompressSeq = std::make_shared<TPFetchSeq>();
2111+
for(size_t i = 0; i < selParts->size(); ++i)
2112+
{
2113+
auto& entry = (*selParts)[i];
2114+
auto fetch = std::make_shared<TPFetch>(entry->offset_, entry->length_, tileIndex);
2115+
fetch->data_ = std::make_unique<uint8_t[]>(entry->length_);
2116+
std::memcpy(fetch->data_.get(), hdr.headerData[i].get(), entry->length_);
2117+
fetch->stream_ = std::unique_ptr<IStream>(memStreamCreate(
2118+
fetch->data_.get(), fetch->length_, false, nullptr, stream_->getFormat(), true));
2119+
decompressSeq->SharedPtrSeq<TPFetch>::push_back(fetch);
2120+
}
2121+
prefetchedTiles.push_back({tileIndex, decompressSeq, false});
2122+
}
2123+
else
2124+
{
2125+
// Disjoint: assemble header + data ranges into single contiguous buffer
2126+
uint64_t baseOffset = (*srcParts)[0]->offset_;
2127+
uint64_t totalSize = 0;
2128+
for(size_t i = 0; i < selParts->size(); ++i)
2129+
totalSize += (*selParts)[i]->length_;
2130+
2131+
auto assembled = std::make_unique<uint8_t[]>(totalSize);
2132+
uint64_t pos = 0;
2133+
for(size_t i = 0; i < selParts->size(); ++i)
2134+
{
2135+
auto& entry = (*selParts)[i];
2136+
uint64_t relOff = entry->offset_ - baseOffset;
2137+
std::memcpy(assembled.get() + pos, hdr.headerData[0].get() + relOff, entry->length_);
2138+
pos += entry->length_;
2139+
}
2140+
2141+
auto decompressSeq = std::make_shared<TPFetchSeq>();
2142+
auto assembledFetch = std::make_shared<TPFetch>(0, totalSize, tileIndex);
2143+
assembledFetch->data_ = std::move(assembled);
2144+
assembledFetch->stream_ = std::unique_ptr<IStream>(memStreamCreate(
2145+
assembledFetch->data_.get(), totalSize, false, nullptr, stream_->getFormat(), true));
2146+
decompressSeq->SharedPtrSeq<TPFetch>::push_back(assembledFetch);
2147+
prefetchedTiles.push_back({tileIndex, decompressSeq, true});
2148+
}
2149+
it = selectiveFetchTiles->erase(it);
2150+
}
2151+
if(!prefetchedTiles.empty())
2152+
grklog.debug("fetchByTileSelective: %zu tiles reusing Phase 1 data", prefetchedTiles.size());
2153+
2154+
if(selectiveFetchTiles->empty() && prefetchedTiles.empty())
20432155
return false;
20442156
grklog.debug("fetchByTileSelective: %zu tiles to fetch", selectiveFetchTiles->size());
20452157

@@ -2060,6 +2172,35 @@ bool CodeStreamDecompress::fetchByTileSelective(
20602172
(uint16_t)((maxRowsAhead_ + 1) * cp_.t_grid_width_)));
20612173

20622174
auto numTileCols = cp_.t_grid_width_;
2175+
2176+
// Queue pre-fetched tiles directly for decompression (reusing Phase 1 data)
2177+
for(auto& pf : prefetchedTiles)
2178+
{
2179+
if(compressedChunkCache_)
2180+
compressedChunkCache_->put(pf.tileIndex, pf.decompressSeq);
2181+
2182+
int32_t tileRow = pf.tileIndex / numTileCols;
2183+
int32_t prev = maxFetchedTileRow_.load(std::memory_order_acquire);
2184+
while(prev < tileRow &&
2185+
!maxFetchedTileRow_.compare_exchange_weak(prev, tileRow, std::memory_order_release,
2186+
std::memory_order_acquire))
2187+
{
2188+
}
2189+
decompressQueue_->push(
2190+
[this, tileIndex = pf.tileIndex, decompressSeq = pf.decompressSeq,
2191+
unreducedImageBounds, postGenerator]() {
2192+
const auto tileProcessor = getTileProcessor(tileIndex);
2193+
auto* tp = dynamic_cast<TileProcessor*>(tileProcessor);
2194+
if(tp)
2195+
tp->setSelectiveFetch(true);
2196+
auto decompressTask = genDecompressTileTLMTask(tileProcessor, decompressSeq,
2197+
unreducedImageBounds, postGenerator);
2198+
decompressTask();
2199+
});
2200+
}
2201+
2202+
if(!selectiveFetchTiles->empty())
2203+
{
20632204
fetcher->setFetchThrottle([this]() {
20642205
if(tileCompletion_)
20652206
{
@@ -2150,6 +2291,7 @@ bool CodeStreamDecompress::fetchByTileSelective(
21502291
});
21512292
}
21522293
}));
2294+
} // if(!selectiveFetchTiles->empty())
21532295

21542296
return true;
21552297
}

0 commit comments

Comments
 (0)