Skip to content

Commit 70e0e88

Browse files
committed
ct: add compaction_scheduler to app
1 parent 1528c31 commit 70e0e88

3 files changed

Lines changed: 20 additions & 0 deletions

File tree

src/v/cloud_topics/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ redpanda_cc_library(
9999
"//src/v/base",
100100
"//src/v/cloud_topics:state_accessors",
101101
"//src/v/cloud_topics/level_one/common:file_io",
102+
"//src/v/cloud_topics/level_one/compaction:scheduler",
102103
"//src/v/cloud_topics/level_one/domain:domain_supervisor",
103104
"//src/v/cloud_topics/level_one/metastore:leader_router",
104105
"//src/v/cloud_topics/level_one/metastore:replicated_metastore",

src/v/cloud_topics/app.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "cloud_topics/data_plane_api.h"
1515
#include "cloud_topics/data_plane_impl.h"
1616
#include "cloud_topics/housekeeper/manager.h"
17+
#include "cloud_topics/level_one/compaction/scheduler.h"
1718
#include "cloud_topics/level_one/metastore/topic_purger.h"
1819
#include "cloud_topics/level_zero/gc/level_zero_gc.h"
1920
#include "cloud_topics/manager/manager.h"
@@ -113,6 +114,16 @@ ss::future<> app::construct(
113114
return &replicated_metastore.local();
114115
}));
115116

117+
construct_single_service(
118+
compaction_scheduler,
119+
l1::compaction_cluster_state{
120+
.self = self,
121+
.leaders_table = leaders_table,
122+
.topic_table = &controller->get_topics_state(),
123+
.metadata_cache = metadata_cache},
124+
&l1_io,
125+
&replicated_metastore);
126+
116127
// Must be last to register so it will be first to be stopped in
117128
// `app::stop`. This is to ensure that stopped services don't receive
118129
// callbacks.
@@ -129,6 +140,7 @@ ss::future<> app::start() {
129140
co_await domain_supervisor.invoke_on_all(
130141
[](auto& ds) { return ds.start(); });
131142
co_await housekeeper_manager.invoke_on_all(&housekeeper_manager::start);
143+
co_await compaction_scheduler->start();
132144

133145
// When start is called, we must have registered all the callbacks before
134146
// this as starting the manager will invoke callbacks for partitions already
@@ -228,4 +240,8 @@ ss::sharded<l1::replicated_metastore>* app::get_sharded_replicated_metastore() {
228240
return &replicated_metastore;
229241
}
230242

243+
l1::compaction_scheduler* app::get_compaction_scheduler() {
244+
return compaction_scheduler.get();
245+
}
246+
231247
} // namespace cloud_topics

src/v/cloud_topics/app.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#pragma once
1212

1313
#include "cloud_topics/level_one/common/file_io.h"
14+
#include "cloud_topics/level_one/compaction/scheduler.h"
1415
#include "cloud_topics/level_one/domain/domain_supervisor.h"
1516
#include "cloud_topics/level_one/metastore/leader_router.h"
1617
#include "cloud_topics/level_one/metastore/replicated_metastore.h"
@@ -74,6 +75,7 @@ class app : public ssx::sharded_service_container {
7475
ss::sharded<l1::domain_supervisor>* get_sharded_l1_domain_supervisor();
7576
ss::sharded<reconciler::reconciler>* get_reconciler();
7677
ss::sharded<l1::replicated_metastore>* get_sharded_replicated_metastore();
78+
l1::compaction_scheduler* get_compaction_scheduler();
7779

7880
// TODO: add 'get_control_plane_api' etc
7981

@@ -92,6 +94,7 @@ class app : public ssx::sharded_service_container {
9294
ss::sharded<cloud_topics_manager> manager;
9395
ss::sharded<level_zero_gc> l0_gc;
9496
ss::sharded<housekeeper_manager> housekeeper_manager;
97+
std::unique_ptr<l1::compaction_scheduler> compaction_scheduler;
9598
};
9699

97100
} // namespace cloud_topics

0 commit comments

Comments
 (0)