Skip to content

Commit f83d5c6

Browse files
authored
Merge pull request redpanda-data#29084 from rockwotj/ct-producer-queue
ct: introduce producer queue
2 parents 176e0a1 + b4ad1b4 commit f83d5c6

21 files changed

Lines changed: 1047 additions & 199 deletions

src/v/cloud_topics/data_plane_api.h

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,27 @@
2121
#include <seastar/core/future.hh>
2222
#include <seastar/core/lowres_clock.hh>
2323

24+
#include <expected>
25+
#include <system_error>
26+
2427
namespace cloud_topics {
2528

29+
// staged_write is a write operation that has been reserved in the pipeline.
30+
// it is decoupled from uploading so that we can provide backpressure before
31+
// accepting more batches into the pipeline
32+
struct staged_write {
33+
struct batch_data {
34+
batch_data() = default;
35+
batch_data(const batch_data&) = default;
36+
batch_data(batch_data&&) = delete;
37+
batch_data& operator=(const batch_data&) = default;
38+
batch_data& operator=(batch_data&&) = delete;
39+
virtual ~batch_data() = default;
40+
};
41+
42+
std::unique_ptr<batch_data> staged;
43+
};
44+
2645
/// Dataplane API
2746
class data_plane_api {
2847
public:
@@ -37,11 +56,17 @@ class data_plane_api {
3756
virtual ss::future<> start() = 0;
3857
virtual ss::future<> stop() = 0;
3958

40-
/// Write data batches and get back placeholder batches
41-
virtual ss::future<result<chunked_vector<extent_meta>>> write_and_debounce(
59+
// Reserve the space needed for this write.
60+
virtual ss::future<std::expected<staged_write, std::error_code>>
61+
stage_write(chunked_vector<model::record_batch> batches) = 0;
62+
63+
// Execute this write using the reservation.
64+
virtual ss::future<
65+
std::expected<chunked_vector<extent_meta>, std::error_code>>
66+
execute_write(
4267
model::ntp ntp,
4368
cluster_epoch min_epoch,
44-
chunked_vector<model::record_batch> batches,
69+
staged_write reservation,
4570
model::timeout_clock::time_point deadline)
4671
= 0;
4772

src/v/cloud_topics/data_plane_impl.cc

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636

3737
namespace cloud_topics {
3838

39+
struct staged_pipeline_write : public staged_write::batch_data {
40+
l0::write_pipeline<>::prepared_data data;
41+
};
42+
3943
class impl
4044
: public data_plane_api
4145
, public ssx::sharded_service_container {
@@ -132,17 +136,28 @@ class impl
132136
co_return;
133137
}
134138

135-
ss::future<result<chunked_vector<extent_meta>>> write_and_debounce(
139+
ss::future<std::expected<staged_write, std::error_code>>
140+
stage_write(chunked_vector<model::record_batch> batches) override {
141+
auto reservation = co_await _write_pipeline.local().prepare_write(
142+
std::move(batches));
143+
if (!reservation.has_value()) {
144+
co_return std::unexpected(reservation.error());
145+
}
146+
auto staged = std::make_unique<staged_pipeline_write>();
147+
staged->data = std::move(reservation.value());
148+
co_return staged_write{.staged = std::move(staged)};
149+
}
150+
151+
ss::future<std::expected<chunked_vector<extent_meta>, std::error_code>>
152+
execute_write(
136153
model::ntp ntp,
137154
cluster_epoch min_epoch,
138-
chunked_vector<model::record_batch> r,
139-
model::timeout_clock::time_point timeout) override {
140-
auto res = co_await _write_pipeline.local().write_and_debounce(
141-
std::move(ntp), min_epoch, std::move(r), timeout);
142-
if (res.has_value()) {
143-
co_return std::move(res.value());
144-
}
145-
co_return res.error();
155+
staged_write reservation,
156+
model::timeout_clock::time_point deadline) override {
157+
auto staged = std::unique_ptr<staged_pipeline_write>(
158+
static_cast<staged_pipeline_write*>(reservation.staged.release()));
159+
co_return co_await _write_pipeline.local().execute_write(
160+
std::move(ntp), min_epoch, std::move(staged->data), deadline);
146161
}
147162

148163
ss::future<result<chunked_vector<model::record_batch>>> materialize(

src/v/cloud_topics/frontend/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ redpanda_cc_library(
2626
"//src/v/cloud_storage:types",
2727
"//src/v/cloud_topics:logger",
2828
"//src/v/cloud_topics/level_one/metastore",
29+
"//src/v/cloud_topics/level_zero/common:producer_queue",
2930
"//src/v/cloud_topics/level_zero/stm:ctp_stm",
3031
"//src/v/cloud_topics/level_zero/stm:placeholder",
3132
"//src/v/cluster",

0 commit comments

Comments
 (0)