Skip to content

Commit 3f9fc60

Browse files
committed
ct/l0: add better logging for stale epochs
Right now we get unhelpful log messages about empty epochs, but now at least we get a better error message about what the latest value in the STM is.
1 parent 0e81b9e commit 3f9fc60

8 files changed

Lines changed: 42 additions & 25 deletions

File tree

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -580,12 +580,13 @@ ss::future<result<raft::replicate_result>> do_upload_and_replicate(
580580
co_return default_errc;
581581
}
582582
auto fence = std::move(fence_fut.get());
583-
if (!fence.unit.has_value()) {
583+
if (!fence.has_value()) {
584584
vlog(
585585
cd_log.warn,
586-
"Failed to fence epoch {} for ntp {}, fence unit is empty",
586+
"Failed to fence epoch {} for ntp {}, ctp latest seen epoch is {}",
587587
upload_res.value().front().id.epoch,
588-
ntp);
588+
ntp,
589+
fence.error().latest_seen);
589590
co_return default_errc;
590591
}
591592

@@ -605,7 +606,7 @@ ss::future<result<raft::replicate_result>> do_upload_and_replicate(
605606
co_await ticket.redeem();
606607
}
607608
// Replicate now that our ticket is redeemed
608-
opts = update_replicate_options(opts, fence.term);
609+
opts = update_replicate_options(opts, fence->term);
609610
auto replicate_stages = partition->replicate_in_stages(
610611
batch_id, std::move(placeholders.batches.front()), opts);
611612
// Once the request is enqueued in raft and our order is guaranteed we can
@@ -722,14 +723,13 @@ ss::future<std::expected<kafka::offset, std::error_code>> frontend::replicate(
722723
std::rethrow_exception(e);
723724
}
724725
auto fence = std::move(fence_fut.get());
725-
if (!fence.unit.has_value()) {
726+
if (!fence.has_value()) {
726727
vlog(
727728
cd_log.warn,
728-
"Failed to fence epoch {} for ntp {}, fence unit is empty",
729+
"Failed to fence epoch {} for ntp {}, ctp latest seen epoch is {}",
729730
res.value().front().id.epoch,
730-
ntp());
731-
732-
/// TODO: Maybe return different error code here?
731+
ntp(),
732+
fence.error().latest_seen);
733733
co_return std::unexpected(
734734
kafka::make_error_code(kafka::error_code::request_timed_out));
735735
}
@@ -741,7 +741,7 @@ ss::future<std::expected<kafka::offset, std::error_code>> frontend::replicate(
741741
placeholder_batches.push_back(std::move(batch));
742742
}
743743

744-
opts = update_replicate_options(opts, fence.term);
744+
opts = update_replicate_options(opts, fence->term);
745745
auto result = co_await _partition->replicate(
746746
std::move(placeholder_batches), opts);
747747

src/v/cloud_topics/level_zero/stm/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ redpanda_cc_library(
6363
hdrs = ["types.h"],
6464
visibility = [":__subpackages__"],
6565
deps = [
66+
"//src/v/cloud_topics:types",
6667
"//src/v/model",
6768
"@fmt",
6869
"@seastar",

src/v/cloud_topics/level_zero/stm/ctp_stm.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ ss::future<iobuf> ctp_stm::take_raft_snapshot(model::offset snapshot_at) {
332332
co_return serde::to_iobuf(_state);
333333
}
334334

335-
ss::future<cluster_epoch_fence> ctp_stm::fence_epoch(cluster_epoch e) {
335+
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
336+
ctp_stm::fence_epoch(cluster_epoch e) {
336337
auto holder = _gate.hold();
337338
if (!co_await sync(sync_timeout)) {
338339
vlog(_log.warn, "ctp_stm::fence_epoch sync timeout");
@@ -366,7 +367,10 @@ ss::future<cluster_epoch_fence> ctp_stm::fence_epoch(cluster_epoch e) {
366367
}
367368
}
368369
// If we reach here, it means that we need to discard the batch.
369-
co_return cluster_epoch_fence{};
370+
co_return std::unexpected(
371+
stale_cluster_epoch(_state.get_max_seen_epoch()
372+
.or_else(get_applied_epoch)
373+
.value_or(cluster_epoch{-1})));
370374
}
371375

372376
model::offset ctp_stm::max_removable_local_log_offset() {

src/v/cloud_topics/level_zero/stm/ctp_stm.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include <seastar/core/rwlock.hh>
1919

20+
#include <expected>
21+
2022
namespace cloud_topics {
2123

2224
class ctp_stm_api;
@@ -54,7 +56,8 @@ class ctp_stm final : public raft::persisted_stm<> {
5456
_state.advance_max_seen_epoch(epoch);
5557
}
5658

57-
ss::future<cluster_epoch_fence> fence_epoch(cluster_epoch e);
59+
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
60+
fence_epoch(cluster_epoch e);
5861

5962
/// Return inactive epoch of the CTP
6063
///

src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,15 @@ ss::future<bool> ctp_stm_api::sync_in_term(
174174
co_return co_await _stm->sync_in_term(deadline, as);
175175
}
176176

177-
ss::future<cluster_epoch_fence> ctp_stm_api::fence_epoch(cluster_epoch e) {
177+
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
178+
ctp_stm_api::fence_epoch(cluster_epoch e) {
178179
vlog(_log.debug, "Fencing epoch {} in term {}", e, _stm->_raft->term());
179180
auto res = co_await _stm->fence_epoch(e);
180181
vlog(
181182
_log.debug,
182183
"Fence acquired = {} in term {}",
183-
res.unit.has_value(),
184-
res.term);
184+
res.has_value(),
185+
res.has_value() ? res->term : model::term_id{-1});
185186
co_return std::move(res);
186187
}
187188

src/v/cloud_topics/level_zero/stm/ctp_stm_api.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ class ctp_stm_api {
9191
sync_in_term(model::timeout_clock::time_point deadline, ss::abort_source&);
9292

9393
/// Fence writes
94-
ss::future<cluster_epoch_fence> fence_epoch(cluster_epoch e);
94+
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
95+
fence_epoch(cluster_epoch e);
9596

9697
std::optional<cluster_epoch> get_max_epoch() const;
9798

src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,20 @@ TEST_F_CORO(ctp_stm_fixture, test_fencing) {
158158
{
159159
auto fence
160160
= co_await api(node(*get_leader())).fence_epoch(ct::cluster_epoch{2});
161-
ASSERT_TRUE_CORO(fence.unit.has_value());
161+
ASSERT_TRUE_CORO(fence.has_value());
162162
}
163163

164164
// Acquire the fence for epoch 1 (should fail)
165165
{
166166
auto fence
167167
= co_await api(node(*get_leader())).fence_epoch(ct::cluster_epoch{1});
168-
ASSERT_FALSE_CORO(fence.unit.has_value());
168+
ASSERT_FALSE_CORO(fence.has_value());
169169
}
170170

171171
// Advance max_seen_epoch to 3.
172172
auto write_fence
173173
= co_await api(node(*get_leader())).fence_epoch(ct::cluster_epoch{3});
174-
ASSERT_TRUE_CORO(write_fence.unit.has_value());
174+
ASSERT_TRUE_CORO(write_fence.has_value());
175175

176176
// Out of order fence for epoch 2 (should be waiting for the fence to be
177177
// released)
@@ -182,7 +182,7 @@ TEST_F_CORO(ctp_stm_fixture, test_fencing) {
182182
write_fence = {};
183183

184184
auto read_fence = co_await std::move(fut);
185-
ASSERT_FALSE_CORO(read_fence.unit.has_value());
185+
ASSERT_FALSE_CORO(read_fence.has_value());
186186
}
187187

188188
TEST_F_CORO(ctp_stm_fixture, test_last_reconciled_offset) {
@@ -457,6 +457,6 @@ TEST_F_CORO(ctp_stm_fixture, test_snapshot) {
457457
// Acquire the fence for epoch 1 (should fail)
458458
{
459459
auto fence = co_await api(leader).fence_epoch(ct::cluster_epoch{1});
460-
ASSERT_FALSE_CORO(fence.unit.has_value());
460+
ASSERT_FALSE_CORO(fence.has_value());
461461
}
462462
}

src/v/cloud_topics/level_zero/stm/types.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#pragma once
1212

13+
#include "cloud_topics/types.h"
1314
#include "model/fundamental.h"
1415

1516
#include <seastar/core/rwlock.hh>
@@ -23,12 +24,18 @@ enum class ctp_stm_key : uint8_t {
2324

2425
struct [[nodiscard]] cluster_epoch_fence {
2526
// Units protecting the epoch state.
26-
// If it's set to nullopt the batch has to be discarded
27-
// because of the out of order epoch.
28-
std::optional<ss::rwlock::holder> unit;
27+
ss::rwlock::holder unit;
2928
// Term in which the batch is replicated.
3029
model::term_id term;
3130
};
31+
32+
// The error returned when the CTP STM has seen a newer epoch than the one
33+
// attempting to be used.
34+
struct [[nodiscard]] stale_cluster_epoch {
35+
// The latest cluster epoch
36+
cluster_epoch latest_seen;
37+
};
38+
3239
} // namespace cloud_topics
3340

3441
template<>

0 commit comments

Comments
 (0)