1818#include " raft/consensus.h"
1919#include " raft/persisted_stm.h"
2020#include " ssx/future-util.h"
21+ #include " ssx/watchdog.h"
2122
2223#include < seastar/core/abort_source.hh>
2324#include < seastar/core/sleep.hh>
@@ -61,7 +62,8 @@ class ctp_stm_consumer {
6162} // namespace
6263
6364ctp_stm::ctp_stm (ss::logger& logger, raft::consensus* raft)
64- : raft::persisted_stm<>(name, logger, raft) {}
65+ : raft::persisted_stm<>(name, logger, raft)
66+ , _lock(ss::semaphore::max_counter()) {}
6567
6668ss::future<> ctp_stm::start () {
6769 ssx::spawn_with_gate (_gate, [this ] { return prefix_truncate_below_lro (); });
@@ -71,7 +73,26 @@ ss::future<> ctp_stm::start() {
7173ss::future<> ctp_stm::stop () {
7274 _lro_advanced.broken ();
7375 _as.request_abort ();
76+ // We can't break the lock because that could cause UAF
77+ // as the units are held outside of this class.
78+ // however lock acquisition uses the above abort_source so
79+ // we should not be acquiring new waiters.
80+ // _lock.broken();
7481 co_await raft::persisted_stm<>::stop ();
82+ static constexpr auto epoch_fence_lock_timeout = 10s;
83+ ssx::watchdog wd (epoch_fence_lock_timeout, [this ] {
84+ // This is basically the number of produce requests still in flight
85+ auto num_read_locks_held = ss::semaphore::max_counter ()
86+ - _lock.available_units ();
87+ vlog (
88+ _log.debug ,
89+ " timeout waiting for epoch fencing lock units to be returned: {} "
90+ " units outstanding" ,
91+ num_read_locks_held);
92+ });
93+ // Wait for all the units to be returned otherwise when the units are
94+ // destructed we could get a UAF.
95+ co_await _lock.wait (ss::semaphore::max_counter ());
7596}
7697
7798ss::future<> ctp_stm::prefix_truncate_below_lro () {
@@ -126,7 +147,8 @@ ss::future<> ctp_stm::prefix_truncate_below_lro() {
126147 // truncating it again so if LRO is making lots of rapid but small
127148 // progress we aren't snapshotting too much.
128149 if (_raft->last_snapshot_index () > snapshot_index) {
129- co_await ss::sleep_abortable (min_truncate_period, _as);
150+ co_await ss::sleep_abortable<ss::lowres_clock>(
151+ min_truncate_period, _as);
130152 }
131153 }
132154}
@@ -335,7 +357,7 @@ ss::future<iobuf> ctp_stm::take_raft_snapshot(model::offset snapshot_at) {
335357ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
336358ctp_stm::fence_epoch (cluster_epoch e) {
337359 auto holder = _gate.hold ();
338- if (!co_await sync (sync_timeout)) {
360+ if (!co_await sync (sync_timeout, _as )) {
339361 vlog (_log.warn , " ctp_stm::fence_epoch sync timeout" );
340362 throw std::runtime_error (fmt_with_ctx (fmt::format, " Sync timeout" ));
341363 }
@@ -347,15 +369,16 @@ ctp_stm::fence_epoch(cluster_epoch e) {
347369 auto fence_epoch = _state.get_max_seen_epoch ().or_else (get_applied_epoch);
348370 if (fence_epoch.has_value () && fence_epoch.value () == e) {
349371 // Case 1. Same epoch, need to acquire read-lock.
350- auto unit = co_await _lock. hold_read_lock ( );
372+ auto unit = co_await ss::get_units (_lock, 1 , _as );
351373 if (_state.get_max_seen_epoch ().or_else (get_applied_epoch) == e) {
352374 // The max_seen_epoch didn't advance after the scheduling point
353375 co_return cluster_epoch_fence{
354376 .unit = std::move (unit), .term = term};
355377 }
356378 } else {
357379 // Case 2. New epoch, need to acquire write-lock.
358- auto unit = co_await _lock.hold_write_lock ();
380+ auto unit = co_await ss::get_units (
381+ _lock, ss::semaphore::max_counter (), _as);
359382 auto current_epoch = _state.get_max_seen_epoch ().or_else (
360383 get_applied_epoch);
361384 if (!current_epoch.has_value () || current_epoch.value () <= e) {
0 commit comments