Skip to content

Commit 514294f

Browse files
committed
ct/l1: fix unmanage_cb_t and some life-time issues
This `unmanage_cb_t` type is typedef'd as returning `void`, when actually it was invoking a function returning an `ss::future<>`, due to some asynchronous work that must occur when unmanaging a partition. Fix the `compaction_scheduler::unmanage_partition()` implementation and some of the functions in the call-stack so that the asynchronous work is pushed to a detached fiber lower in `worker_manager`. To ensure proper lifetimes, add a `ss::gate` to the `worker_manager` and use this to prevent any requests from entering the `worker_manager` while it is being/after it is stopped.
1 parent 9eae36d commit 514294f

8 files changed

Lines changed: 38 additions & 30 deletions

File tree

src/v/cloud_topics/level_one/compaction/log_collector.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ ss::future<> partition_leader_log_collector::stop_collecting_logs() {
6666
}
6767

6868
void partition_leader_log_collector::on_ntp_change(
69-
cluster::topic_table::ntp_delta delta) {
69+
const cluster::topic_table::ntp_delta& delta) {
7070
auto& ntp = delta.ntp;
7171
auto is_managed = _is_managed_cb(ntp);
7272

@@ -75,7 +75,7 @@ void partition_leader_log_collector::on_ntp_change(
7575
case delta_type::removed: {
7676
// Partition/possibly topic was removed. Unmanage it if necessary.
7777
if (is_managed) {
78-
_unmanage_cb(std::move(ntp), "Partition removed");
78+
_unmanage_cb(ntp, "Partition removed");
7979
}
8080
return;
8181
}
@@ -104,7 +104,7 @@ void partition_leader_log_collector::on_ntp_change(
104104
if (!is_compacted_cloud_topic && is_managed) {
105105
// This is likely an existing cloud topic which is no longer
106106
// `compact` enabled.
107-
_unmanage_cb(std::move(ntp), "Disabled compaction");
107+
_unmanage_cb(ntp, "Disabled compaction");
108108
}
109109
return;
110110
}

src/v/cloud_topics/level_one/compaction/log_collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class partition_leader_log_collector : public log_collector {
102102
// Register operations can be performed synchronously while unregister
103103
// operations are performed in a backgrounded fiber (see
104104
// `compaction_scheduler::unmanage_partition()`).
105-
void on_ntp_change(cluster::topic_table::ntp_delta);
105+
void on_ntp_change(const cluster::topic_table::ntp_delta&);
106106

107107
// Registers/unregisters `ntp`s with the `compaction_scheduler` using
108108
// leadership notifications from the `partition_leaders_table`. The

src/v/cloud_topics/level_one/compaction/scheduler.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ compaction_scheduler::compaction_scheduler(
3434
const model::ntp& ntp,
3535
const model::topic_id_partition& tidp,
3636
std::string_view ctx) { manage_partition(ntp, tidp, ctx); },
37-
[this](model::ntp ntp, std::string_view ctx) {
38-
return unmanage_partition(std::move(ntp), ctx);
37+
[this](const model::ntp& ntp, std::string_view ctx) {
38+
unmanage_partition(ntp, ctx);
3939
},
4040
[this](const model::ntp& ntp) { return is_managed(ntp); },
4141
state))
@@ -96,8 +96,8 @@ void compaction_scheduler::manage_partition(
9696
_probe.set_log_count(_logs.size());
9797
}
9898

99-
ss::future<>
100-
compaction_scheduler::unmanage_partition(model::ntp ntp, std::string_view ctx) {
99+
void compaction_scheduler::unmanage_partition(
100+
const model::ntp& ntp, std::string_view ctx) {
101101
auto tidp_entry = _ntp_to_tidp.extract(ntp);
102102
if (!tidp_entry.has_value()) {
103103
vassert(
@@ -132,7 +132,7 @@ compaction_scheduler::unmanage_partition(model::ntp ntp, std::string_view ctx) {
132132
// Request that compaction of this CTP be stopped, if in flight. `handle` is
133133
// a `lw_shared_ptr`- we can allow it to go out of scope here without fear
134134
// of UAF elsewhere.
135-
co_await _worker_manager.request_stop_compaction(handle);
135+
_worker_manager.request_stop_compaction(std::move(handle));
136136
_probe.set_log_count(_logs.size());
137137
}
138138

src/v/cloud_topics/level_one/compaction/scheduler.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ class compaction_scheduler {
7373

7474
// Removes the `tidp` from the list of managed partitions. No-ops if the
7575
// provided `tidp` is not managed by this scheduler. Because the `tidp`
76-
// may be undergoing an inflight compaction, this function will block until
77-
// it is complete (an early stop is requested by this function).
78-
ss::future<> unmanage_partition(model::ntp, std::string_view);
76+
// may be undergoing an inflight compaction, an early stop is requested by
77+
// the `_worker_manager`.
78+
void unmanage_partition(const model::ntp&, std::string_view);
7979

8080
private:
8181
// Starts the backgrounded scheduling loop.

src/v/cloud_topics/level_one/compaction/tests/scheduler_multithread_test.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,15 @@ TEST_F(SchedulerTestFixture, TestSchedulerMultithread) {
5454
};
5555
auto unmanage_random_partition_func = [this, &managed_ntps]() {
5656
if (managed_ntps.empty()) {
57-
return ss::now();
57+
return;
5858
}
5959
auto ntp_to_remove
6060
= random_generators::random_choice(
6161
std::vector<std::pair<model::ntp, model::topic_id_partition>>(
6262
managed_ntps.begin(), managed_ntps.end()))
6363
.first;
6464
managed_ntps.erase(ntp_to_remove);
65-
return scheduler->unmanage_partition(
66-
std::move(ntp_to_remove), "unmanage_partition_func");
65+
scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func");
6766
};
6867
auto pause_random_worker_func = [this, &paused_workers]() {
6968
auto random_shard = random_generators::get_int(ss::smp::count - 1);
@@ -109,8 +108,8 @@ TEST_F(SchedulerTestFixture, TestSchedulerMultithread) {
109108
auto unmanage_ntp_fut = ss::do_until(
110109
[&] { return as.abort_requested(); },
111110
[&]() {
112-
return unmanage_random_partition_func().then(
113-
[&]() { return ss::sleep(unmanage_sleep); });
111+
unmanage_random_partition_func();
112+
return ss::sleep(unmanage_sleep);
114113
});
115114

116115
static constexpr auto pause_worker_sleep = 50ms;

src/v/cloud_topics/level_one/compaction/tests/scheduler_test.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,15 @@ TEST_F(SchedulerTestFixture, TestScheduler) {
5656
};
5757
auto unmanage_random_partition_func = [this, &managed_ntps]() {
5858
if (managed_ntps.empty()) {
59-
return ss::now();
59+
return;
6060
}
6161
auto ntp_to_remove
6262
= random_generators::random_choice(
6363
std::vector<std::pair<model::ntp, model::topic_id_partition>>(
6464
managed_ntps.begin(), managed_ntps.end()))
6565
.first;
6666
managed_ntps.erase(ntp_to_remove);
67-
return scheduler->unmanage_partition(
68-
std::move(ntp_to_remove), "unmanage_partition_func");
67+
scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func");
6968
};
7069
auto pause_random_worker_func = [this, &paused_workers]() {
7170
auto random_shard = random_generators::get_int(ss::smp::count - 1);
@@ -132,8 +131,8 @@ TEST_F(SchedulerTestFixture, TestScheduler) {
132131
auto unmanage_ntp_fut = ss::do_until(
133132
[&] { return as.abort_requested(); },
134133
[&]() {
135-
return unmanage_random_partition_func().then(
136-
[&]() { return ss::sleep(unmanage_sleep); });
134+
unmanage_random_partition_func();
135+
return ss::sleep(unmanage_sleep);
137136
});
138137

139138
static constexpr auto pause_worker_sleep = 50ms;

src/v/cloud_topics/level_one/compaction/worker_manager.cc

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "cloud_topics/level_one/compaction/worker.h"
1717
#include "cloud_topics/level_one/metastore/replicated_metastore.h"
1818
#include "model/fundamental.h"
19+
#include "ssx/future-util.h"
1920

2021
namespace cloud_topics::l1 {
2122

@@ -43,7 +44,10 @@ ss::future<> worker_manager::start() {
4344
co_await _workers.invoke_on_all(&compaction_worker::start);
4445
}
4546

46-
ss::future<> worker_manager::stop() { co_await _workers.stop(); }
47+
ss::future<> worker_manager::stop() {
48+
co_await _gate.close();
49+
co_await _workers.stop();
50+
}
4751

4852
std::optional<foreign_log_compaction_meta_ptr>
4953
worker_manager::try_acquire_work(ss::shard_id shard) {
@@ -93,35 +97,39 @@ void worker_manager::complete_work(log_compaction_meta* log) {
9397
_probe.log_compacted();
9498
}
9599

96-
ss::future<>
97-
worker_manager::request_stop_compaction(log_compaction_meta_ptr log) {
100+
void worker_manager::request_stop_compaction(log_compaction_meta_ptr log) {
98101
if (!log) {
99-
co_return;
102+
return;
100103
}
101104

102105
auto shard_opt = log->inflight_shard;
103106
if (!shard_opt.has_value()) {
104-
co_return;
107+
return;
105108
}
106109

107110
auto shard = shard_opt.value();
108111

109-
co_await _workers.invoke_on(shard, [](compaction_worker& worker) {
110-
return worker.terminate_current_job();
112+
ssx::spawn_with_gate(_gate, [this, shard]() {
113+
return _workers.invoke_on(shard, [](compaction_worker& worker) {
114+
return worker.terminate_current_job();
115+
});
111116
});
112117
}
113118

114119
ss::future<> worker_manager::alert_workers() {
120+
auto guard = _gate.hold();
115121
co_await _workers.invoke_on_all(
116122
[](compaction_worker& worker) { worker.alert_worker(); });
117123
}
118124

119125
ss::future<> worker_manager::pause_worker(ss::shard_id worker) {
126+
auto guard = _gate.hold();
120127
co_await _workers.invoke_on(
121128
worker, [](compaction_worker& worker) { return worker.pause_worker(); });
122129
}
123130

124131
ss::future<> worker_manager::resume_worker(ss::shard_id worker) {
132+
auto guard = _gate.hold();
125133
co_await _workers.invoke_on(
126134
worker, [](compaction_worker& worker) { return worker.resume_worker(); });
127135
}

src/v/cloud_topics/level_one/compaction/worker_manager.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class worker_manager {
7777
// compaction jobs to be ran. This function is ideally used when e.g. a
7878
// partition is removed or the `cleanup.policy` for a topic is changed and a
7979
// single compaction job must be stopped.
80-
ss::future<> request_stop_compaction(log_compaction_meta_ptr);
80+
void request_stop_compaction(log_compaction_meta_ptr);
8181

8282
// Alert all workers that new jobs have become available in the
8383
// `_work_queue`.
@@ -112,6 +112,8 @@ class worker_manager {
112112

113113
// A sharded pool of compaction workers.
114114
ss::sharded<compaction_worker> _workers;
115+
116+
ss::gate _gate;
115117
};
116118

117119
} // namespace cloud_topics::l1

0 commit comments

Comments
 (0)