Skip to content

Commit bee10f9

Browse files
authored
Merge pull request #30157 from redpanda-data/l1-reader-cache
ct/l1: Remove L1 stream cache and add reader cache
2 parents cc999b6 + e833ba9 commit bee10f9

20 files changed

Lines changed: 865 additions & 508 deletions

src/v/cloud_topics/app.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "cloud_topics/topic_manifest_upload_manager.h"
2727
#include "cluster/controller.h"
2828
#include "cluster/utils/partition_change_notifier_impl.h"
29+
#include "config/configuration.h"
2930
#include "config/node_config.h"
3031
#include "resource_mgmt/cpu_scheduling.h"
3132
#include "ssx/future-util.h"
@@ -70,6 +71,17 @@ ss::future<> app::construct(
7071

7172
co_await construct_service(_l1_reader_probe);
7273

74+
co_await construct_service(
75+
_l1_reader_cache,
76+
ss::sharded_parameter([] {
77+
return config::shard_local_cfg()
78+
.cloud_topics_l1_reader_cache_eviction_timeout_ms.bind();
79+
}),
80+
ss::sharded_parameter([] {
81+
return config::shard_local_cfg()
82+
.cloud_topics_l1_reader_cache_max_size.bind();
83+
}));
84+
7385
co_await construct_service(
7486
l1_io,
7587
config::node().l1_staging_path(),
@@ -102,8 +114,6 @@ ss::future<> app::construct(
102114
ss::sharded_parameter([&remote] { return std::ref(remote->local()); }),
103115
bucket);
104116

105-
co_await construct_service(l1_reader_cache_);
106-
107117
co_await construct_service(
108118
rr_snapshot_manager_,
109119
config::node().l1_staging_path(),
@@ -131,7 +141,7 @@ ss::future<> app::construct(
131141
ss::sharded_parameter(
132142
[&metadata_cache] { return &metadata_cache->local(); }),
133143
ss::sharded_parameter([this] { return &_l1_reader_probe.local(); }),
134-
ss::sharded_parameter([this] { return &l1_reader_cache_.local(); }),
144+
ss::sharded_parameter([this] { return &_l1_reader_cache.local(); }),
135145
ss::sharded_parameter([this] { return &rr_metadata_manager_.local(); }),
136146
ss::sharded_parameter([this] { return &rr_snapshot_manager_.local(); }));
137147

src/v/cloud_topics/app.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ class app : public ssx::sharded_service_container {
102102

103103
ss::sstring _logger_name;
104104
ss::sharded<level_one_reader_probe> _l1_reader_probe;
105+
ss::sharded<l1_reader_cache> _l1_reader_cache;
105106
std::unique_ptr<data_plane_api> data_plane;
106-
ss::sharded<l1_reader_cache> l1_reader_cache_;
107107
ss::sharded<state_accessors> state;
108108
ss::sharded<l1::file_io> l1_io;
109109
ss::sharded<l1::replicated_metastore> replicated_metastore;

src/v/cloud_topics/frontend/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ redpanda_cc_library(
2626
"//src/v/cloud_storage:types",
2727
"//src/v/cloud_topics:logger",
2828
"//src/v/cloud_topics:topic_id_partition",
29+
"//src/v/cloud_topics/level_one/frontend_reader:l1_reader_cache",
2930
"//src/v/cloud_topics/level_one/metastore",
3031
"//src/v/cloud_topics/level_zero/common:producer_queue",
3132
"//src/v/cloud_topics/level_zero/stm:ctp_stm",

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "cloud_storage/types.h"
1313
#include "cloud_topics/data_plane_api.h"
1414
#include "cloud_topics/frontend/errc.h"
15+
#include "cloud_topics/level_one/frontend_reader/l1_reader_cache.h"
1516
#include "cloud_topics/level_one/frontend_reader/level_one_reader.h"
1617
#include "cloud_topics/level_one/metastore/metastore.h"
1718
#include "cloud_topics/level_zero/common/extent_meta.h"
@@ -287,8 +288,7 @@ frontend::make_reader(cloud_topic_log_reader_config cfg) {
287288
if (!tidp) {
288289
throw topic_config_not_found_exception(ntp());
289290
}
290-
co_return storage::translating_reader{
291-
model::record_batch_reader(make_l1_reader(cfg, *tidp))};
291+
co_return storage::translating_reader{make_l1_reader(cfg, *tidp)};
292292
}
293293
co_return storage::translating_reader{
294294
model::record_batch_reader(make_l0_reader(cfg)),
@@ -387,23 +387,28 @@ ss::future<size_t> frontend::size_bytes() {
387387
co_return l0_size + l1_size_res.value().size;
388388
}
389389

390-
std::unique_ptr<model::record_batch_reader::impl> frontend::make_l1_reader(
390+
model::record_batch_reader frontend::make_l1_reader(
391391
const cloud_topic_log_reader_config& cfg,
392392
model::topic_id_partition tidp) const {
393393
auto ct_state = _partition->get_cloud_topics_state();
394+
auto* cache = ct_state->local().get_l1_reader_cache();
395+
if (cache) {
396+
if (auto hit = cache->get_reader(tidp, cfg); hit) {
397+
return std::move(*hit);
398+
}
399+
}
400+
394401
auto l1_metastore = ct_state->local().get_l1_metastore();
395402
auto l1_io = ct_state->local().get_l1_io();
396403
auto l1_reader_probe = ct_state->local().get_l1_reader_probe();
397-
auto l1_cache = ct_state->local().get_l1_reader_cache();
398404

399-
return std::make_unique<level_one_log_reader_impl>(
400-
cfg,
401-
_partition->ntp(),
402-
tidp,
403-
l1_metastore,
404-
l1_io,
405-
l1_reader_probe,
406-
l1_cache);
405+
auto reader = std::make_unique<level_one_log_reader_impl>(
406+
cfg, _partition->ntp(), tidp, l1_metastore, l1_io, l1_reader_probe);
407+
408+
if (cache) {
409+
return cache->put(std::move(reader));
410+
}
411+
return model::record_batch_reader(std::move(reader));
407412
}
408413

409414
ss::future<std::optional<storage::timequery_result>>

src/v/cloud_topics/frontend/frontend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class frontend final {
238238
std::unique_ptr<model::record_batch_reader::impl>
239239
make_l0_reader(const cloud_topic_log_reader_config& cfg) const;
240240

241-
std::unique_ptr<model::record_batch_reader::impl> make_l1_reader(
241+
model::record_batch_reader make_l1_reader(
242242
const cloud_topic_log_reader_config& cfg,
243243
model::topic_id_partition tidp) const;
244244

src/v/cloud_topics/level_one/compaction/source.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ ss::future<ss::stop_iteration> compaction_source::deduplication_iteration(
282282
config, _ntp, _tp, _metastore, _io, _l1_reader_probe));
283283

284284
co_await sink.prepare_iteration(start_offset);
285-
auto stats = co_await rdr.consume(
285+
auto stats = co_await std::move(rdr).consume(
286286
compaction_filter{sink, *_map, _ntp, _removable_tombstone_ranges},
287287
model::no_timeout);
288288
co_await sink.finish_iteration(start_offset, last_offset);

src/v/cloud_topics/level_one/frontend_reader/BUILD

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ redpanda_cc_library(
2626
],
2727
visibility = ["//visibility:public"],
2828
deps = [
29-
"//src/v/base",
29+
":reader",
30+
"//src/v/cloud_topics:log_reader_config",
3031
"//src/v/cloud_topics:logger",
31-
"//src/v/cloud_topics/level_one/common:object",
32-
"//src/v/cloud_topics/level_one/common:object_id",
32+
"//src/v/config",
3333
"//src/v/container:intrusive",
3434
"//src/v/model",
35+
"//src/v/random:time_jitter",
3536
"//src/v/ssx:future_util",
3637
"@seastar",
3738
],
@@ -47,7 +48,6 @@ redpanda_cc_library(
4748
],
4849
visibility = ["//visibility:public"],
4950
deps = [
50-
":l1_reader_cache",
5151
"//src/v/bytes:iostream",
5252
"//src/v/cloud_topics:log_reader_config",
5353
"//src/v/cloud_topics:logger",

0 commit comments

Comments
 (0)