Skip to content

Commit 80b9554

Browse files
author
Grok Compression
committed
fetch refactor II
1 parent e681012 commit 80b9554

6 files changed

Lines changed: 1095 additions & 975 deletions

File tree

src/lib/core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ set(GROK_LIBRARY_SRCS
9191
${CMAKE_CURRENT_SOURCE_DIR}/stream/MappedFile.cpp
9292
${CMAKE_CURRENT_SOURCE_DIR}/stream/StreamIO.cpp
9393
${CMAKE_CURRENT_SOURCE_DIR}/stream/StreamGenerator.cpp
94+
${CMAKE_CURRENT_SOURCE_DIR}/stream/fetchers/CurlFetcher.cpp
9495

9596
${CMAKE_CURRENT_SOURCE_DIR}/plugin/minpf_dynamic_library.cpp
9697
${CMAKE_CURRENT_SOURCE_DIR}/plugin/minpf_plugin_manager.cpp

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,28 +1682,8 @@ bool CodeStreamDecompress::fetchByTile(
16821682
startDecompressConsumer(std::min((uint16_t)TFSingleton::num_threads(),
16831683
(uint16_t)((maxRowsAhead_ + 1) * cp_.t_grid_width_)));
16841684

1685-
// Back pressure: prevent the fetcher from scheduling more HTTP requests
1686-
// when either (a) too many tiles are in-flight for decompression, or
1687-
// (b) the fetcher is too far ahead of the consumer (swath-based release).
16881685
auto numTileCols = cp_.t_grid_width_;
1689-
fetcher->setFetchThrottle([this]() {
1690-
// Row-based: don't fetch tiles more than maxRowsAhead_ rows beyond
1691-
// the last row released by the consumer.
1692-
if(tileCompletion_)
1693-
{
1694-
int32_t lastCleared = tileCompletion_->getLastClearedTileY();
1695-
int32_t maxAllowed = lastCleared + maxRowsAhead_ + 2;
1696-
if(maxFetchedTileRow_.load(std::memory_order_acquire) >= maxAllowed)
1697-
return false;
1698-
}
1699-
// In-flight: don't overwhelm the decompress pipeline
1700-
uint16_t inFlight;
1701-
{
1702-
std::lock_guard<std::mutex> lock(decompressThrottleMutex_);
1703-
inFlight = decompressInFlight_;
1704-
}
1705-
return decompressQueue_->size() + inFlight < maxDecompressInFlight_;
1706-
});
1686+
installFetchThrottle(fetcher);
17071687

17081688
fetchByTileFutures_.push_back(fetcher->fetchTiles(
17091689
cp_.tlmMarkers_->getTileParts(), slated, nullptr,
@@ -2053,6 +2033,28 @@ std::vector<std::pair<uint16_t, std::shared_ptr<TPFetchSeq>>>
20532033
return prefetchedTiles;
20542034
}
20552035

2036+
void CodeStreamDecompress::installFetchThrottle(IFetcher* fetcher)
2037+
{
2038+
fetcher->setFetchThrottle([this]() {
2039+
// Row-based: don't fetch tiles more than maxRowsAhead_ rows beyond
2040+
// the last row released by the consumer.
2041+
if(tileCompletion_)
2042+
{
2043+
int32_t lastCleared = tileCompletion_->getLastClearedTileY();
2044+
int32_t maxAllowed = lastCleared + maxRowsAhead_ + 2;
2045+
if(maxFetchedTileRow_.load(std::memory_order_acquire) >= maxAllowed)
2046+
return false;
2047+
}
2048+
// In-flight: don't overwhelm the decompress pipeline
2049+
uint16_t inFlight;
2050+
{
2051+
std::lock_guard<std::mutex> lock(decompressThrottleMutex_);
2052+
inFlight = decompressInFlight_;
2053+
}
2054+
return decompressQueue_->size() + inFlight < maxDecompressInFlight_;
2055+
});
2056+
}
2057+
20562058
bool CodeStreamDecompress::fetchByTileSelective(
20572059
std::set<uint16_t>& slated, Rect32 unreducedImageBounds,
20582060
std::function<std::function<void()>(ITileProcessor*)> postGenerator)
@@ -2092,9 +2094,6 @@ bool CodeStreamDecompress::fetchByTileSelective(
20922094
auto phase1Future = phase1Promise.get_future();
20932095
auto remainingTiles = std::make_shared<std::atomic<size_t>>(slated.size());
20942096

2095-
// Save slated tiles before Phase 1 fetch (fetchTiles moves the set)
2096-
auto slatedTiles = slated;
2097-
20982097
// Phase 1 fetch: get tile-part headers
20992098
fetcher->fetchTiles(headerTileParts, slated, nullptr,
21002099
[&headerResults, &headerMutex, &remainingTiles,
@@ -2149,7 +2148,7 @@ bool CodeStreamDecompress::fetchByTileSelective(
21492148
selectiveTileParts_.resize(allTileParts.size());
21502149
auto selectiveFetchTiles = std::make_shared<std::set<uint16_t>>();
21512150

2152-
for(auto tileIndex : slatedTiles)
2151+
for(auto tileIndex : slated)
21532152
{
21542153
auto it = headerResults->find(tileIndex);
21552154
if(it == headerResults->end())
@@ -2196,21 +2195,7 @@ bool CodeStreamDecompress::fetchByTileSelective(
21962195

21972196
if(!selectiveFetchTiles->empty())
21982197
{
2199-
fetcher->setFetchThrottle([this]() {
2200-
if(tileCompletion_)
2201-
{
2202-
int32_t lastCleared = tileCompletion_->getLastClearedTileY();
2203-
int32_t maxAllowed = lastCleared + maxRowsAhead_ + 2;
2204-
if(maxFetchedTileRow_.load(std::memory_order_acquire) >= maxAllowed)
2205-
return false;
2206-
}
2207-
uint16_t inFlight;
2208-
{
2209-
std::lock_guard<std::mutex> lock(decompressThrottleMutex_);
2210-
inFlight = decompressInFlight_;
2211-
}
2212-
return decompressQueue_->size() + inFlight < maxDecompressInFlight_;
2213-
});
2198+
installFetchThrottle(fetcher);
22142199

22152200
fetchByTileFutures_.push_back(fetcher->fetchTiles(
22162201
selectiveTileParts_, *selectiveFetchTiles, nullptr,

src/lib/core/codestream/decompress/CodeStreamDecompress.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,15 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
552552
const std::unordered_map<uint16_t, TileHeaderResult>& headerResults,
553553
const TPSEQ_VEC& allTileParts);
554554

555+
/**
556+
* @brief Install fetch throttle on the given fetcher.
557+
*
558+
* Back pressure: prevent the fetcher from scheduling more HTTP requests
559+
* when either (a) the fetcher is too far ahead of the consumer (swath-based
560+
* release), or (b) too many tiles are in-flight for decompression.
561+
*/
562+
void installFetchThrottle(IFetcher* fetcher);
563+
555564
/**
556565
* @brief Scratch @ref GrkImage for decompressor
557566
* This image may composite multiple tiles, if needed.

src/lib/core/stream/TPFetchSeq.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
#pragma once
1919

20+
#include <cassert>
21+
#include <cstring>
2022
#include <vector>
2123
#include <memory>
2224
#include <set>
2325
#include <unordered_map>
2426
#include <atomic>
2527

2628
#include "FetchCommon.h"
29+
#include "Logger.h"
30+
#include "IStream.h"
2731

2832
namespace grk
2933
{
@@ -56,6 +60,15 @@ struct SharedPtrSeq
5660
return objStore_.begin();
5761
}
5862

63+
/**
64+
* @brief Returns const iterator to beginning of sequence
65+
* @return Const iterator pointing to first shared_ptr
66+
*/
67+
const_iterator begin() const
68+
{
69+
return objStore_.begin();
70+
}
71+
5972
/**
6073
* @brief Returns const iterator to end of sequence
6174
* @return Const iterator pointing past the last shared_ptr
@@ -211,6 +224,12 @@ struct TPFetch : public DataSlice
211224
{
212225
data_ = std::make_unique<uint8_t[]>(length_);
213226
}
227+
if(fetchOffset_ + chunkLen > length_)
228+
{
229+
grklog.error("TPFetch::copy: buffer overflow (offset=%zu + len=%zu > capacity=%zu)",
230+
fetchOffset_, chunkLen, (size_t)length_);
231+
return;
232+
}
214233
std::memcpy(data_.get() + fetchOffset_, chunk, chunkLen);
215234
fetchOffset_ += chunkLen;
216235
}
@@ -249,7 +268,7 @@ struct TPFetchSeq : SharedPtrSeq<TPFetch>
249268
}
250269

251270
static void
252-
genCollections(const TPSEQ_VEC* allTileParts, std::set<uint16_t>& slated,
271+
genCollections(const TPSEQ_VEC* allTileParts, const std::set<uint16_t>& slated,
253272
std::shared_ptr<TPFetchSeq>& tilePartFetchFlat,
254273
std::shared_ptr<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>>&
255274
tilePartFetchByTile)
@@ -272,6 +291,13 @@ struct TPFetchSeq : SharedPtrSeq<TPFetch>
272291
}
273292
}
274293

294+
/**
295+
* @brief Atomically increment and return the fetch-completed count.
296+
*
297+
* When the returned value equals size(), all tile-parts for this tile
298+
* have been fetched. Uses uint8_t storage, limiting to 255 tile-parts
299+
* per tile (the JPEG 2000 spec allows up to 255 tile-parts per tile).
300+
*/
275301
uint8_t incrementFetchCount()
276302
{
277303
std::atomic_ref<uint8_t> atomicCount(fetchCount_);

0 commit comments

Comments
 (0)