@@ -126,30 +126,37 @@ void RangeCursor::forEachChunk(std::function<void(const ChunkRowRange&)> callbac
126126}
127127
128128void RangeCursor::findFirstValid () {
129- // Linear scan to find the first chunk whose t_max >= t_min_
130- // (i.e., that could contain data in our range)
131- for (chunk_index_ = 0 ; chunk_index_ < chunks_->size (); ++chunk_index_) {
132- const auto & chunk = (*chunks_)[chunk_index_];
133- if (chunk.stats .t_max >= t_min_) {
134- // This chunk might contain data in range.
135- // Now find the first row where timestamp >= t_min_.
136- for (row_index_ = 0 ; row_index_ < chunk.stats .row_count ; ++row_index_) {
137- Timestamp ts = chunk.readTimestamp (row_index_);
138- if (ts >= t_min_) {
139- // Check if this row is also within t_max
140- if (ts <= t_max_) {
141- return ; // Found a valid starting position
142- }
143- // ts > t_max_ means no valid data in range at all
144- chunk_index_ = chunks_->size ();
145- return ;
146- }
147- }
148- // All rows in this chunk are before t_min, try next chunk
149- continue ;
150- }
129+ const auto & chunks = *chunks_;
130+
131+ // First chunk that could contain a row in range, i.e. whose t_max >= t_min_.
132+ // Committed chunks are non-empty and time-ordered (each chunk's t_min >= the
133+ // previous chunk's t_max), so t_max is non-decreasing across the deque and we
134+ // can binary-search it.
135+ const auto chunk_it = std::lower_bound (
136+ chunks.begin (), chunks.end (), t_min_,
137+ [](const TopicChunk& chunk, Timestamp value) { return chunk.stats .t_max < value; });
138+ if (chunk_it == chunks.end ()) {
139+ // All data is strictly before t_min_.
140+ chunk_index_ = chunks.size ();
141+ row_index_ = 0 ;
142+ return ;
143+ }
144+ chunk_index_ = static_cast <std::size_t >(chunk_it - chunks.begin ());
145+
146+ // First row with timestamp >= t_min_ within that chunk. Such a row exists
147+ // because t_max (the chunk's last timestamp) >= t_min_.
148+ const TopicChunk& chunk = *chunk_it;
149+ const auto ts_begin = chunk.timestamps .begin ();
150+ const auto ts_end = ts_begin + static_cast <std::ptrdiff_t >(chunk.stats .row_count );
151+ const auto row_it = std::lower_bound (ts_begin, ts_end, t_min_);
152+ row_index_ = static_cast <std::size_t >(row_it - ts_begin);
153+
154+ // If the first row at or after t_min_ is already past t_max_, nothing in the
155+ // deque falls inside [t_min_, t_max_].
156+ if (row_it == ts_end || *row_it > t_max_) {
157+ chunk_index_ = chunks.size ();
158+ row_index_ = 0 ;
151159 }
152- // No valid chunk found: chunk_index_ == chunks_->size() (past-end)
153160}
154161
155162void RangeCursor::skipToValid () {
@@ -171,30 +178,30 @@ void RangeCursor::skipToValid() {
171178// ===========================================================================
172179
173180std::optional<SampleRow> latestAt (const std::deque<TopicChunk>& chunks, Timestamp t) {
174- if (chunks.empty ()) {
181+ // Last chunk that can contain a row at or before t, i.e. the latest chunk
182+ // whose t_min <= t. Committed chunks are non-empty and have non-decreasing
183+ // t_min, so upper_bound finds the first chunk strictly after t; the chunk
184+ // before it is the answer. (At a shared boundary timestamp this selects the
185+ // later chunk, matching the previous reverse-scan behaviour.)
186+ const auto after = std::upper_bound (chunks.begin (), chunks.end (), t, [](Timestamp value, const TopicChunk& chunk) {
187+ return value < chunk.stats .t_min ;
188+ });
189+ if (after == chunks.begin ()) {
190+ // Empty deque, or every chunk starts strictly after t.
175191 return std::nullopt ;
176192 }
177-
178- // Reverse iterate chunks. For each chunk, if t_min <= t, search within it.
179- for (std::size_t ci = chunks.size (); ci > 0 ; --ci) {
180- const auto & chunk = chunks[ci - 1 ];
181- if (chunk.stats .t_min > t) {
182- continue ; // Entire chunk is after t
183- }
184- // chunk.stats.t_min <= t, so there might be a row <= t in this chunk.
185- // Reverse scan within the chunk to find the last row with timestamp <= t.
186- for (std::size_t ri = chunk.stats .row_count ; ri > 0 ; --ri) {
187- Timestamp ts = chunk.readTimestamp (ri - 1 );
188- if (ts <= t) {
189- return SampleRow{ts, &chunk, ri - 1 };
190- }
191- }
192- // All rows in this chunk are after t, but t_min <= t was true.
193- // This shouldn't happen with sorted data, but handle gracefully
194- // by continuing to the previous chunk.
193+ const TopicChunk& chunk = *(after - 1 );
194+
195+ // Last row with timestamp <= t within that chunk. Such a row exists because
196+ // the chunk's first timestamp (t_min) is <= t.
197+ const auto ts_begin = chunk.timestamps .begin ();
198+ const auto ts_end = ts_begin + static_cast <std::ptrdiff_t >(chunk.stats .row_count );
199+ const auto row_after = std::upper_bound (ts_begin, ts_end, t);
200+ if (row_after == ts_begin) {
201+ return std::nullopt ; // unreachable for committed chunks (row 0 ts == t_min <= t)
195202 }
196-
197- return std:: nullopt ;
203+ const std:: size_t row = static_cast <std:: size_t >((row_after - 1 ) - ts_begin);
204+ return SampleRow{chunk. readTimestamp (row), &chunk, row} ;
198205}
199206
200207// ===========================================================================
0 commit comments