Skip to content

Commit e4f7afc

Browse files
author
Grok Compression
committed
fetch: HTTP/2 multiplexing, adaptive Phase 1, configurable batch size, request sorting
- Enable HTTP/2 multiplexing (CURLPIPE_MULTIPLEX + HTTP_VERSION_2TLS) so all concurrent range requests share a single TCP connection - Adaptive Phase 1 sizing for contiguous progressions (RLCP, RPCL, single-layer LRCP): estimate needed data from tile size and reduce level so reusePhase1Data() can skip the Phase 2 round-trip - Add fetch_batch_size to grk_stream_params / IFetcher::setBatchSize() allowing callers to tune HTTP concurrency (default remains 30) - Sort flat fetch request list by file offset after genCollections() for better server-side read coalescing with HTTP/2 streams
1 parent 80b9554 commit e4f7afc

7 files changed

Lines changed: 79 additions & 5 deletions

File tree

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
#include <chrono>
19+
#include <cmath>
1920
#include <functional>
2021

2122
#include "TFSingleton.h"
@@ -2066,12 +2067,36 @@ bool CodeStreamDecompress::fetchByTileSelective(
20662067
const auto& allTileParts = cp_.tlmMarkers_->getTileParts();
20672068
uint8_t reduce = cp_.codingParams_.dec_.reduce_;
20682069

2069-
// Phase 1: Fetch 4KB header of each tile-part
2070+
// Determine if the progression order produces contiguous data from the start.
2071+
// For these cases, Phase 1 can speculatively fetch the estimated needed amount
2072+
// so that reusePhase1Data() can skip Phase 2 entirely.
2073+
auto tcp = defaultTcp_.get();
2074+
bool contiguousProgression = false;
2075+
switch(tcp->prg_)
2076+
{
2077+
case GRK_RLCP:
2078+
case GRK_RPCL:
2079+
contiguousProgression = true;
2080+
break;
2081+
case GRK_LRCP:
2082+
contiguousProgression = (tcp->numLayers_ == 1);
2083+
break;
2084+
default:
2085+
break;
2086+
}
2087+
2088+
// Phase 1: fetch tile-part headers (and possibly all needed data for contiguous cases)
20702089
constexpr uint32_t headerFetchSize = 4096;
2071-
grklog.debug("fetchByTileSelective: Phase 1 starting, %zu tiles, reduce=%u", slated.size(),
2072-
(unsigned)reduce);
2090+
grklog.debug("fetchByTileSelective: Phase 1 starting, %zu tiles, reduce=%u, contiguous=%d",
2091+
slated.size(), (unsigned)reduce, (int)contiguousProgression);
2092+
2093+
// Create modified tile-part info for Phase 1 fetch.
2094+
// For contiguous progressions, estimate the needed data size and fetch that
2095+
// directly, eliminating the need for a second round-trip in most cases.
2096+
double pixelRatio = (contiguousProgression && reduce > 0)
2097+
? (1.0 / std::pow(4.0, reduce)) * 2.0
2098+
: 0.0;
20732099

2074-
// Create modified tile-part info with 4KB lengths for header fetch
20752100
TPSEQ_VEC headerTileParts(allTileParts.size());
20762101
for(auto tileIndex : slated)
20772102
{
@@ -2082,7 +2107,17 @@ bool CodeStreamDecompress::fetchByTileSelective(
20822107
for(size_t i = 0; i < srcParts->size(); ++i)
20832108
{
20842109
auto& part = (*srcParts)[i];
2085-
uint32_t fetchLen = std::min<uint32_t>((uint32_t)part->length_, headerFetchSize);
2110+
uint32_t fetchLen;
2111+
if(contiguousProgression && reduce > 0)
2112+
{
2113+
uint64_t estimated = std::max<uint64_t>(
2114+
headerFetchSize, (uint64_t)(part->length_ * pixelRatio));
2115+
fetchLen = (uint32_t)std::min<uint64_t>(estimated, part->length_);
2116+
}
2117+
else
2118+
{
2119+
fetchLen = std::min<uint32_t>((uint32_t)part->length_, headerFetchSize);
2120+
}
20862121
headerTileParts[tileIndex]->push_back((uint8_t)i, (uint8_t)srcParts->size(), part->offset_,
20872122
fetchLen);
20882123
}

src/lib/core/grok.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,9 @@ typedef struct _grk_stream_params
625625
uint32_t max_retry; /* maximum number of retries */
626626
uint32_t retry_delay; /* delay between retries in seconds */
627627

628+
/* 13 Fetch concurrency (0 = use default of 30) */
629+
uint32_t fetch_batch_size; /* max concurrent HTTP range requests */
630+
628631
} grk_stream_params;
629632

630633
/**

src/lib/core/stream/StreamGenerator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ IStream* StreamGenerator::createCurlFetchStream(void)
160160
auth.connect_timeout_ = streamParams_.connect_timeout;
161161
auth.max_retry_ = streamParams_.max_retry;
162162
auth.retry_delay_ = streamParams_.retry_delay;
163+
auth.fetch_batch_size_ = streamParams_.fetch_batch_size;
163164
grklog.debug("StreamGenerator: s3_allow_insecure: streamParams=%d, auth=%d",
164165
(int)streamParams_.s3_allow_insecure, (int)auth.s3_allow_insecure_);
165166
std::string_view file{streamParams_.file};

src/lib/core/stream/TPFetchSeq.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919

20+
#include <algorithm>
2021
#include <cassert>
2122
#include <cstring>
2223
#include <vector>
@@ -114,6 +115,12 @@ struct SharedPtrSeq
114115
objStore_.clear();
115116
}
116117

118+
template<typename Compare>
119+
void sort(Compare comp)
120+
{
121+
std::sort(objStore_.begin(), objStore_.end(), comp);
122+
}
123+
117124
private:
118125
std::vector<std::shared_ptr<T>> objStore_;
119126
};
@@ -242,6 +249,13 @@ struct TPFetch : public DataSlice
242249

243250
struct TPFetchSeq : SharedPtrSeq<TPFetch>
244251
{
252+
void sortByOffset()
253+
{
254+
sort([](const std::shared_ptr<TPFetch>& a, const std::shared_ptr<TPFetch>& b) {
255+
return a->offset_ < b->offset_;
256+
});
257+
}
258+
245259
void push_back(uint16_t tileIndex, const std::unique_ptr<TPSeq>& tileParts)
246260
{
247261
if(!tileParts)
@@ -289,6 +303,11 @@ struct TPFetchSeq : SharedPtrSeq<TPFetch>
289303
}
290304
}
291305
}
306+
307+
// Sort requests by file offset so that batched HTTP requests target
308+
// sequential file regions, improving server-side read coalescing
309+
// and CDN cache efficiency.
310+
tilePartFetchFlat->sortByOffset();
292311
}
293312

294313
/**

src/lib/core/stream/fetchers/CurlFetcher.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ CurlFetcher::CurlFetcher(void) : tileWriteCallback_(tileWriteCallback)
7676
if(!multi_handle_)
7777
throw std::runtime_error("Failed to initialize CURL multi handle");
7878
curl_multi_setopt(multi_handle_, CURLMOPT_MAX_TOTAL_CONNECTIONS, 100L);
79+
curl_multi_setopt(multi_handle_, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
7980
fetchThread_ = std::thread(&CurlFetcher::fetchWorker, this);
8081
}
8182

@@ -105,13 +106,20 @@ void CurlFetcher::notifyThrottleRelease()
105106
throttleCV_.notify_one();
106107
}
107108

109+
void CurlFetcher::setBatchSize(size_t batchSize)
110+
{
111+
batchSize_ = std::max<size_t>(1, batchSize);
112+
}
113+
108114
void CurlFetcher::init(const std::string& path, const FetchAuth& auth)
109115
{
110116
auth_ = auth;
111117
if(auth_.max_retry_ > 0)
112118
maxRetries_ = auth_.max_retry_;
113119
if(auth_.retry_delay_ > 0)
114120
retryDelayMs_ = auth_.retry_delay_ * 1000;
121+
if(auth_.fetch_batch_size_ > 0)
122+
batchSize_ = auth_.fetch_batch_size_;
115123
parse(path);
116124
fetch_total_size();
117125
}
@@ -561,6 +569,8 @@ CURL* CurlFetcher::configureHandle(uint64_t offset, uint64_t end, FetchResult& r
561569
curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
562570
curl_initiate_retry(curl);
563571
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
572+
curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
573+
curl_easy_setopt(curl, CURLOPT_PIPEWAIT, 1L);
564574
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
565575
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &result);
566576
curl_easy_setopt(curl, CURLOPT_PRIVATE, &result);

src/lib/core/stream/fetchers/CurlFetcher.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class IFetcher
9494
// fetcher so it can re-check the condition.
9595
virtual void setFetchThrottle(std::function<bool()> throttle) = 0;
9696
virtual void notifyThrottleRelease() = 0;
97+
98+
virtual void setBatchSize(size_t batchSize) = 0;
9799
};
98100

99101
struct TileFetchContext : public std::enable_shared_from_this<TileFetchContext>
@@ -131,6 +133,7 @@ class CurlFetcher : public IFetcher
131133

132134
void setFetchThrottle(std::function<bool()> throttle) override;
133135
void notifyThrottleRelease() override;
136+
void setBatchSize(size_t batchSize) override;
134137
void init(const std::string& path, const FetchAuth& auth) override;
135138
size_t read(uint8_t* buffer, size_t numBytes) override;
136139
bool seek(uint64_t offset) override;

src/lib/core/stream/fetchers/FetchCommon.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ struct FetchAuth
7979
uint32_t max_retry_ = 0;
8080
uint32_t retry_delay_ = 0;
8181

82+
// Fetch concurrency (0 = use default of 30)
83+
uint32_t fetch_batch_size_ = 0;
84+
8285
FetchAuth() = default;
8386
FetchAuth(const std::string& u, const std::string& p, const std::string& t,
8487
const std::vector<std::string>& h = {}, const std::string& r = "",

0 commit comments

Comments
 (0)