Skip to content

Commit a8d117d

Browse files
Datastore query binary search + SPDX/CI hygiene (#104)
* perf(datastore): binary-search RangeCursor start and latestAt ARCHITECTURE.md documents both as binary searches, but findFirstValid() and latestAt() linear-scanned the chunk deque and the rows within a chunk (O(C*R)). Committed chunks are never empty and are time-ordered (each chunk's t_min >= the previous chunk's t_max), so t_min/t_max are monotonic across the deque and timestamps are monotonic within a chunk -- enabling std::lower_bound / std::upper_bound for an O(log C + log R) start. Behaviour-preserving: lower_bound keeps the first-duplicate range start, and upper_bound keeps the last-duplicate / later-chunk-at-shared-boundary semantics of the previous reverse scans. Adds regression tests for duplicate timestamps, shared chunk boundaries, and single-point ranges. Makes the ARCHITECTURE.md "binary-searches" claim accurate. Internal optimization; no API/ABI change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * chore: add missing SPDX headers; drop stale `development` CI trigger number_parse.{hpp,cpp} (pj_base, Apache-2.0) lacked their SPDX-License-Identifier headers -- the dual-license boundary relies on per-file headers, so add them. All four branch-triggered workflows still keyed on the deleted `development` branch; restrict them to `main`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b85757d commit a8d117d

8 files changed

Lines changed: 133 additions & 52 deletions

File tree

.github/workflows/linux-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
name: Linux CI
22
on:
33
push:
4-
branches: [development, main]
4+
branches: [main]
55
pull_request:
6-
branches: [development, main]
6+
branches: [main]
77
types: [opened, synchronize, reopened, ready_for_review]
88
workflow_dispatch: {}
99

.github/workflows/macos-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
name: macOS CI
22
on:
33
push:
4-
branches: [development, main]
4+
branches: [main]
55
pull_request:
6-
branches: [development, main]
6+
branches: [main]
77
types: [opened, synchronize, reopened, ready_for_review]
88
workflow_dispatch: {}
99

.github/workflows/pre-commit.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
name: pre-commit
22
on:
33
push:
4-
branches: [development, main]
4+
branches: [main]
55
pull_request:
6-
branches: [development, main]
6+
branches: [main]
77
types: [opened, synchronize, reopened, ready_for_review]
88
workflow_dispatch: {}
99

.github/workflows/windows-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
name: Windows CI
22
on:
33
push:
4-
branches: [development, main]
4+
branches: [main]
55
pull_request:
6-
branches: [development, main]
6+
branches: [main]
77
types: [opened, synchronize, reopened, ready_for_review]
88
workflow_dispatch: {}
99

pj_base/include/pj_base/number_parse.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#pragma once
2+
// Copyright 2026 Davide Faconti
3+
// SPDX-License-Identifier: Apache-2.0
24

35
#include <charconv>
46
#include <optional>

pj_base/src/number_parse.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright 2026 Davide Faconti
2+
// SPDX-License-Identifier: Apache-2.0
3+
14
#include "pj_base/number_parse.hpp"
25

36
#include <cerrno>

pj_datastore/src/query.cpp

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -126,30 +126,37 @@ void RangeCursor::forEachChunk(std::function<void(const ChunkRowRange&)> callbac
126126
}
127127

128128
void RangeCursor::findFirstValid() {
129-
// Linear scan to find the first chunk whose t_max >= t_min_
130-
// (i.e., that could contain data in our range)
131-
for (chunk_index_ = 0; chunk_index_ < chunks_->size(); ++chunk_index_) {
132-
const auto& chunk = (*chunks_)[chunk_index_];
133-
if (chunk.stats.t_max >= t_min_) {
134-
// This chunk might contain data in range.
135-
// Now find the first row where timestamp >= t_min_.
136-
for (row_index_ = 0; row_index_ < chunk.stats.row_count; ++row_index_) {
137-
Timestamp ts = chunk.readTimestamp(row_index_);
138-
if (ts >= t_min_) {
139-
// Check if this row is also within t_max
140-
if (ts <= t_max_) {
141-
return; // Found a valid starting position
142-
}
143-
// ts > t_max_ means no valid data in range at all
144-
chunk_index_ = chunks_->size();
145-
return;
146-
}
147-
}
148-
// All rows in this chunk are before t_min, try next chunk
149-
continue;
150-
}
129+
const auto& chunks = *chunks_;
130+
131+
// First chunk that could contain a row in range, i.e. whose t_max >= t_min_.
132+
// Committed chunks are non-empty and time-ordered (each chunk's t_min >= the
133+
// previous chunk's t_max), so t_max is non-decreasing across the deque and we
134+
// can binary-search it.
135+
const auto chunk_it = std::lower_bound(
136+
chunks.begin(), chunks.end(), t_min_,
137+
[](const TopicChunk& chunk, Timestamp value) { return chunk.stats.t_max < value; });
138+
if (chunk_it == chunks.end()) {
139+
// All data is strictly before t_min_.
140+
chunk_index_ = chunks.size();
141+
row_index_ = 0;
142+
return;
143+
}
144+
chunk_index_ = static_cast<std::size_t>(chunk_it - chunks.begin());
145+
146+
// First row with timestamp >= t_min_ within that chunk. Such a row exists
147+
// because t_max (the chunk's last timestamp) >= t_min_.
148+
const TopicChunk& chunk = *chunk_it;
149+
const auto ts_begin = chunk.timestamps.begin();
150+
const auto ts_end = ts_begin + static_cast<std::ptrdiff_t>(chunk.stats.row_count);
151+
const auto row_it = std::lower_bound(ts_begin, ts_end, t_min_);
152+
row_index_ = static_cast<std::size_t>(row_it - ts_begin);
153+
154+
// If the first row at or after t_min_ is already past t_max_, nothing in the
155+
// deque falls inside [t_min_, t_max_].
156+
if (row_it == ts_end || *row_it > t_max_) {
157+
chunk_index_ = chunks.size();
158+
row_index_ = 0;
151159
}
152-
// No valid chunk found: chunk_index_ == chunks_->size() (past-end)
153160
}
154161

155162
void RangeCursor::skipToValid() {
@@ -171,30 +178,30 @@ void RangeCursor::skipToValid() {
171178
// ===========================================================================
172179

173180
std::optional<SampleRow> latestAt(const std::deque<TopicChunk>& chunks, Timestamp t) {
174-
if (chunks.empty()) {
181+
// Last chunk that can contain a row at or before t, i.e. the latest chunk
182+
// whose t_min <= t. Committed chunks are non-empty and have non-decreasing
183+
// t_min, so upper_bound finds the first chunk strictly after t; the chunk
184+
// before it is the answer. (At a shared boundary timestamp this selects the
185+
// later chunk, matching the previous reverse-scan behaviour.)
186+
const auto after = std::upper_bound(chunks.begin(), chunks.end(), t, [](Timestamp value, const TopicChunk& chunk) {
187+
return value < chunk.stats.t_min;
188+
});
189+
if (after == chunks.begin()) {
190+
// Empty deque, or every chunk starts strictly after t.
175191
return std::nullopt;
176192
}
177-
178-
// Reverse iterate chunks. For each chunk, if t_min <= t, search within it.
179-
for (std::size_t ci = chunks.size(); ci > 0; --ci) {
180-
const auto& chunk = chunks[ci - 1];
181-
if (chunk.stats.t_min > t) {
182-
continue; // Entire chunk is after t
183-
}
184-
// chunk.stats.t_min <= t, so there might be a row <= t in this chunk.
185-
// Reverse scan within the chunk to find the last row with timestamp <= t.
186-
for (std::size_t ri = chunk.stats.row_count; ri > 0; --ri) {
187-
Timestamp ts = chunk.readTimestamp(ri - 1);
188-
if (ts <= t) {
189-
return SampleRow{ts, &chunk, ri - 1};
190-
}
191-
}
192-
// All rows in this chunk are after t, but t_min <= t was true.
193-
// This shouldn't happen with sorted data, but handle gracefully
194-
// by continuing to the previous chunk.
193+
const TopicChunk& chunk = *(after - 1);
194+
195+
// Last row with timestamp <= t within that chunk. Such a row exists because
196+
// the chunk's first timestamp (t_min) is <= t.
197+
const auto ts_begin = chunk.timestamps.begin();
198+
const auto ts_end = ts_begin + static_cast<std::ptrdiff_t>(chunk.stats.row_count);
199+
const auto row_after = std::upper_bound(ts_begin, ts_end, t);
200+
if (row_after == ts_begin) {
201+
return std::nullopt; // unreachable for committed chunks (row 0 ts == t_min <= t)
195202
}
196-
197-
return std::nullopt;
203+
const std::size_t row = static_cast<std::size_t>((row_after - 1) - ts_begin);
204+
return SampleRow{chunk.readTimestamp(row), &chunk, row};
198205
}
199206

200207
// ===========================================================================

pj_datastore/tests/query_test.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ TopicChunk make_test_chunk(Timestamp t_start, uint32_t num_rows, Timestamp step)
2828
return builder.seal();
2929
}
3030

31+
// Helper: build a chunk from an explicit (possibly non-uniform / duplicated)
32+
// timestamp list. The column value equals the row index, so a returned
33+
// row_index can be cross-checked against value.
34+
TopicChunk make_chunk_from_timestamps(const std::vector<Timestamp>& ts) {
35+
std::vector<ColumnDescriptor> cols = {{0, PrimitiveType::kFloat32, "value"}};
36+
TopicChunkBuilder builder(1, 1, cols, static_cast<uint32_t>(ts.size()));
37+
for (std::size_t i = 0; i < ts.size(); ++i) {
38+
builder.beginRow(ts[i]);
39+
builder.set(0, static_cast<float>(i));
40+
builder.finishRow();
41+
}
42+
return builder.seal();
43+
}
44+
3145
// Build the standard 5-chunk test fixture:
3246
// Chunk 0: t=[0, 90], step=10
3347
// Chunk 1: t=[100, 190], step=10
@@ -160,6 +174,61 @@ TEST(QueryTest, LatestAtBetweenChunks) {
160174
EXPECT_EQ(result->timestamp, 90);
161175
}
162176

177+
// =========================================================================
178+
// Binary-search edge cases (duplicate timestamps, shared chunk boundaries)
179+
// =========================================================================
180+
181+
TEST(QueryTest, LatestAtWithDuplicateTimestampsReturnsLastDuplicate) {
182+
std::deque<TopicChunk> chunks;
183+
// Rows: 0 1 2 3 4
184+
chunks.push_back(make_chunk_from_timestamps({10, 20, 20, 20, 30}));
185+
186+
auto result = latestAt(chunks, 20);
187+
ASSERT_TRUE(result.has_value());
188+
EXPECT_EQ(result->timestamp, 20);
189+
// upper_bound semantics: the last row with ts <= 20 is row index 3.
190+
EXPECT_EQ(result->row_index, 3u);
191+
}
192+
193+
TEST(QueryTest, RangeQueryWithDuplicateTimestampsStartsAtFirstDuplicate) {
194+
std::deque<TopicChunk> chunks;
195+
// Rows: 0 1 2 3 4
196+
chunks.push_back(make_chunk_from_timestamps({10, 20, 20, 20, 30}));
197+
198+
auto cursor = rangeQuery(chunks, 20, 20);
199+
std::vector<std::size_t> rows;
200+
cursor.forEach([&](const SampleRow& row) { rows.push_back(row.row_index); });
201+
202+
// lower_bound semantics: starts at the first ts >= 20 (row 1) and includes
203+
// every row with ts <= 20 (rows 1, 2, 3).
204+
ASSERT_EQ(rows.size(), 3u);
205+
EXPECT_EQ(rows.front(), 1u);
206+
EXPECT_EQ(rows.back(), 3u);
207+
}
208+
209+
TEST(QueryTest, LatestAtAtSharedChunkBoundarySelectsLaterChunk) {
210+
std::deque<TopicChunk> chunks;
211+
chunks.push_back(make_chunk_from_timestamps({70, 80, 90})); // chunk A, t_max=90
212+
chunks.push_back(make_chunk_from_timestamps({90, 100, 110})); // chunk B, t_min=90
213+
214+
auto result = latestAt(chunks, 90);
215+
ASSERT_TRUE(result.has_value());
216+
EXPECT_EQ(result->timestamp, 90);
217+
// The boundary value 90 exists in both chunks; the later chunk (B, row 0) wins.
218+
EXPECT_EQ(result->chunk, &chunks[1]);
219+
EXPECT_EQ(result->row_index, 0u);
220+
}
221+
222+
TEST(QueryTest, RangeQuerySingleTimestampPoint) {
223+
auto chunks = make_standard_chunks();
224+
// Degenerate inclusive range [200, 200] hits exactly one row.
225+
auto cursor = rangeQuery(chunks, 200, 200);
226+
std::vector<Timestamp> timestamps;
227+
cursor.forEach([&](const SampleRow& row) { timestamps.push_back(row.timestamp); });
228+
ASSERT_EQ(timestamps.size(), 1u);
229+
EXPECT_EQ(timestamps.front(), 200);
230+
}
231+
163232
// =========================================================================
164233
// Empty deque tests
165234
// =========================================================================

0 commit comments

Comments
 (0)