Skip to content

Commit 46eb74d

Browse files
authored
Merge pull request #30114 from vbotbuildovich/backport-pr-30108-v26.1.x-353
[v26.1.x] `ct/l1`: support timestamp lookup by L1 index in footer
2 parents 31bb289 + b76909d commit 46eb74d

7 files changed

Lines changed: 77 additions & 23 deletions

File tree

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -518,16 +518,17 @@ ss::future<std::optional<storage::timequery_result>>
518518
frontend::refine_timequery_result(
519519
coarse_grained_timequery_result input,
520520
model::opt_abort_source_t abort_source) {
521+
// Pass the timestamp so the L1 reader can use the footer's timestamp
522+
// index to seek directly to the relevant position, avoiding unnecessary
523+
// cloud IO. In the case of L0, we should only need to materialize a
524+
// single batch here, because the local log is correct to the granularity of
525+
// a batch (but not within a batch due to placeholders).
521526
cloud_topic_log_reader_config reader_cfg(
522527
/*start_offset=*/input.start_offset,
523528
/*max_offset=*/input.last_offset,
524-
/*as=*/abort_source);
525-
// TODO(perf): In the case of L0, we should only need to materialize a
526-
// single batch here, because the local log is correct to the granularity of
527-
// a batch (but not within a batch due to placeholders). For L1, we could be
528-
// giving the reader a timestamp so it uses the L1 object indexes to seek
529-
// to the correct spot within the index, this would allow us to optimize IO
530-
// against the cloud.
529+
/*first_timestamp=*/input.time,
530+
/*as=*/abort_source,
531+
/*client_addr=*/std::nullopt);
531532
auto reader = co_await make_reader(reader_cfg);
532533
auto generator = std::move(reader.reader).generator(model::no_timeout);
533534
auto query_interval = model::bounded_offset_interval::checked(

src/v/cloud_topics/level_one/common/object.cc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ footer::seek_result footer::file_position_before_kafka_offset(
237237
}
238238

239239
footer::seek_result footer::file_position_before_max_timestamp(
240-
const model::topic_id_partition& tidp, model::timestamp target) {
240+
const model::topic_id_partition& tidp, model::timestamp target) const {
241241
auto [begin, end] = partitions.equal_range(tidp);
242242
auto filtered = std::views::filter(
243243
std::ranges::subrange{begin, end}, [&target](const auto& entry) {
@@ -256,20 +256,20 @@ footer::seek_result footer::file_position_before_max_timestamp(
256256
index, target, std::less<>{}, [](const auto& entry) {
257257
return entry.max_timestamp;
258258
});
259-
// If we're past all index entries, but still within the recorded file
260-
// bounds, the best we can do is start at the last well known offset (the
261-
// last index entry).
262-
if (it == index.end()) {
263-
--it;
264-
}
265-
// If at the first entry, we must start at the file beginning, because the
259+
// If we're at the first entry, we must start at the file beginning because
266260
// the max is inclusive of those entries.
267261
if (it == index.begin()) {
268262
return {
269263
.file_position = partition.file_position,
270264
.length = partition.length,
271265
};
272266
}
267+
// If we're past all index entries, but still within the recorded file
268+
// bounds, the best we can do is start at the last well known offset (the
269+
// last index entry).
270+
if (it == index.end()) {
271+
--it;
272+
}
273273
auto delta = it->file_position - partition.file_position;
274274
return {
275275
.file_position = it->file_position,

src/v/cloud_topics/level_one/common/object.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ struct footer
198198
// While a search for timestamp 25 or 40 would yield batch[4] and the
199199
// timestamp 50 would yield `npos`.
200200
seek_result file_position_before_max_timestamp(
201-
const model::topic_id_partition&, model::timestamp);
201+
const model::topic_id_partition&, model::timestamp) const;
202202

203203
// Read the footer using the suffix of an L1 object.
204204
//

src/v/cloud_topics/level_one/common/tests/object_test.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,36 @@ TEST(L1ObjectsIndex, TimestampSearch) {
197197
}
198198
}
199199

200+
// Regression test: file_position_before_max_timestamp must not crash when the
201+
// partition's index is empty. This can happen when the partition data is
202+
// smaller than the indexing interval.
203+
TEST(L1ObjectsIndex, TimestampSearchEmptyIndex) {
204+
footer index;
205+
auto tidp = model::topic_id_partition{
206+
model::topic_id(uuid_t::create()), model::partition_id(0)};
207+
index.partitions.emplace(
208+
tidp,
209+
footer::partition{
210+
.file_position = 0,
211+
.length = 200,
212+
.indexes = {},
213+
.first_offset = 0_o,
214+
.last_offset = 10_o,
215+
.max_timestamp = 1000_t,
216+
});
217+
218+
// Any timestamp within range should return the partition start.
219+
auto result = index.file_position_before_max_timestamp(tidp, 500_t);
220+
EXPECT_EQ(result, (footer::seek_result{.file_position = 0, .length = 200}));
221+
222+
result = index.file_position_before_max_timestamp(tidp, 1000_t);
223+
EXPECT_EQ(result, (footer::seek_result{.file_position = 0, .length = 200}));
224+
225+
// Timestamp beyond the partition max should return npos.
226+
result = index.file_position_before_max_timestamp(tidp, 1001_t);
227+
EXPECT_EQ(result, footer::npos);
228+
}
229+
200230
TEST(L1Objects, OffsetSearch) {
201231
auto test_topic_id = model::topic_id(uuid_t::create());
202232
auto specs_by_tidp = std::vector<batches_by_tidp>{

src/v/cloud_topics/level_one/compaction/source.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ ss::future<ss::stop_iteration> compaction_source::map_building_iteration() {
208208
const auto& start_offset = extent.base_offset;
209209
const auto& max_offset = extent.last_offset;
210210

211-
cloud_topic_log_reader_config config(start_offset, max_offset, _as);
211+
cloud_topic_log_reader_config config(
212+
start_offset, max_offset, std::nullopt, _as);
212213
auto rdr = model::record_batch_reader(
213214
std::make_unique<level_one_log_reader_impl>(
214215
config, _ntp, _tp, _metastore, _io, _l1_reader_probe));
@@ -276,7 +277,8 @@ ss::future<ss::stop_iteration> compaction_source::deduplication_iteration(
276277
if (should_compact_extent(extent, _min_compaction_lag_ms)) {
277278
kafka::offset start_offset{extent.base_offset};
278279
kafka::offset last_offset{extent.last_offset};
279-
cloud_topic_log_reader_config config(start_offset, last_offset, _as);
280+
cloud_topic_log_reader_config config(
281+
start_offset, last_offset, std::nullopt, _as);
280282
auto rdr = model::record_batch_reader(
281283
std::make_unique<level_one_log_reader_impl>(
282284
config, _ntp, _tp, _metastore, _io, _l1_reader_probe));

src/v/cloud_topics/level_one/frontend_reader/level_one_reader.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,28 @@ level_one_log_reader_impl::materialize_batches_from_object_offset(
424424
const object_info& object,
425425
kafka::offset offset,
426426
model::timeout_clock::time_point /*deadline*/) {
427-
auto seek_res = object.footer.file_position_before_kafka_offset(
428-
_tidp, offset);
427+
// When a timestamp hint is available, use the footer's timestamp index
428+
// to narrow the seek position. Both offset and timestamp constraints
429+
// must hold, so we start at whichever position is further into the
430+
// file.
431+
auto seek_res = [&] {
432+
auto offset_seek = object.footer.file_position_before_kafka_offset(
433+
_tidp, offset);
434+
if (!_config.first_timestamp) {
435+
return offset_seek;
436+
}
437+
auto time_seek = object.footer.file_position_before_max_timestamp(
438+
_tidp, *_config.first_timestamp);
439+
if (time_seek == l1::footer::npos) {
440+
return offset_seek;
441+
}
442+
if (offset_seek == l1::footer::npos) {
443+
return time_seek;
444+
}
445+
return time_seek.file_position > offset_seek.file_position
446+
? time_seek
447+
: offset_seek;
448+
}();
429449
if (seek_res == l1::footer::npos) {
430450
// Perhaps this object spans offsets in the metastore but has
431451
// no data because of compaction.

src/v/cloud_topics/log_reader_config.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct cloud_topic_log_reader_config {
3030
size_t min_bytes,
3131
size_t max_bytes,
3232
std::optional<model::record_batch_type> type_filter,
33-
std::optional<model::timestamp> time,
33+
std::optional<model::timestamp> first_timestamp,
3434
model::opt_abort_source_t as,
3535
model::opt_client_address_t client_addr = std::nullopt,
3636
bool strict_max_bytes = false)
@@ -39,7 +39,7 @@ struct cloud_topic_log_reader_config {
3939
, min_bytes(min_bytes)
4040
, max_bytes(max_bytes)
4141
, type_filter(type_filter)
42-
, first_timestamp(time)
42+
, first_timestamp(first_timestamp)
4343
, abort_source(as)
4444
, client_address(std::move(client_addr))
4545
, strict_max_bytes(strict_max_bytes) {}
@@ -50,6 +50,7 @@ struct cloud_topic_log_reader_config {
5050
cloud_topic_log_reader_config(
5151
kafka::offset start_offset,
5252
kafka::offset max_offset,
53+
std::optional<model::timestamp> first_timestamp = std::nullopt,
5354
model::opt_abort_source_t as = std::nullopt,
5455
model::opt_client_address_t client_addr = std::nullopt)
5556
: cloud_topic_log_reader_config(
@@ -58,7 +59,7 @@ struct cloud_topic_log_reader_config {
5859
0,
5960
std::numeric_limits<size_t>::max(),
6061
std::nullopt,
61-
std::nullopt,
62+
first_timestamp,
6263
as,
6364
std::move(client_addr),
6465
false) {}

0 commit comments

Comments
 (0)