Skip to content

Commit e833ba9

Browse files
committed
ct/l1: use l1_reader_cache in the fetch path
Wire the reader cache into frontend::make_l1_reader. On each L1 fetch the frontend first checks the cache for a reader matching the topic_id_partition and start offset. On a hit the positioned reader is reused, skipping the metastore RPC, footer fetch, and stream open. On a miss a fresh reader is constructed and handed to the cache so it will be returned to the idle pool when the fetch completes.
1 parent 81e4c68 commit e833ba9

3 files changed

Lines changed: 18 additions & 5 deletions

File tree

src/v/cloud_topics/frontend/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ redpanda_cc_library(
2626
"//src/v/cloud_storage:types",
2727
"//src/v/cloud_topics:logger",
2828
"//src/v/cloud_topics:topic_id_partition",
29+
"//src/v/cloud_topics/level_one/frontend_reader:l1_reader_cache",
2930
"//src/v/cloud_topics/level_one/metastore",
3031
"//src/v/cloud_topics/level_zero/common:producer_queue",
3132
"//src/v/cloud_topics/level_zero/stm:ctp_stm",

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "cloud_storage/types.h"
1313
#include "cloud_topics/data_plane_api.h"
1414
#include "cloud_topics/frontend/errc.h"
15+
#include "cloud_topics/level_one/frontend_reader/l1_reader_cache.h"
1516
#include "cloud_topics/level_one/frontend_reader/level_one_reader.h"
1617
#include "cloud_topics/level_one/metastore/metastore.h"
1718
#include "cloud_topics/level_zero/common/extent_meta.h"
@@ -287,8 +288,7 @@ frontend::make_reader(cloud_topic_log_reader_config cfg) {
287288
if (!tidp) {
288289
throw topic_config_not_found_exception(ntp());
289290
}
290-
co_return storage::translating_reader{
291-
model::record_batch_reader(make_l1_reader(cfg, *tidp))};
291+
co_return storage::translating_reader{make_l1_reader(cfg, *tidp)};
292292
}
293293
co_return storage::translating_reader{
294294
model::record_batch_reader(make_l0_reader(cfg)),
@@ -387,16 +387,28 @@ ss::future<size_t> frontend::size_bytes() {
387387
co_return l0_size + l1_size_res.value().size;
388388
}
389389

390-
std::unique_ptr<model::record_batch_reader::impl> frontend::make_l1_reader(
390+
model::record_batch_reader frontend::make_l1_reader(
391391
const cloud_topic_log_reader_config& cfg,
392392
model::topic_id_partition tidp) const {
393393
auto ct_state = _partition->get_cloud_topics_state();
394+
auto* cache = ct_state->local().get_l1_reader_cache();
395+
if (cache) {
396+
if (auto hit = cache->get_reader(tidp, cfg); hit) {
397+
return std::move(*hit);
398+
}
399+
}
400+
394401
auto l1_metastore = ct_state->local().get_l1_metastore();
395402
auto l1_io = ct_state->local().get_l1_io();
396403
auto l1_reader_probe = ct_state->local().get_l1_reader_probe();
397404

398-
return std::make_unique<level_one_log_reader_impl>(
405+
auto reader = std::make_unique<level_one_log_reader_impl>(
399406
cfg, _partition->ntp(), tidp, l1_metastore, l1_io, l1_reader_probe);
407+
408+
if (cache) {
409+
return cache->put(std::move(reader));
410+
}
411+
return model::record_batch_reader(std::move(reader));
400412
}
401413

402414
ss::future<std::optional<storage::timequery_result>>

src/v/cloud_topics/frontend/frontend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class frontend final {
238238
std::unique_ptr<model::record_batch_reader::impl>
239239
make_l0_reader(const cloud_topic_log_reader_config& cfg) const;
240240

241-
std::unique_ptr<model::record_batch_reader::impl> make_l1_reader(
241+
model::record_batch_reader make_l1_reader(
242242
const cloud_topic_log_reader_config& cfg,
243243
model::topic_id_partition tidp) const;
244244

0 commit comments

Comments
 (0)