Skip to content

Commit ceaceec

Browse files
authored
Merge pull request redpanda-data#29134 from Lazin/ct/fix-get-requests
ct: Fix `get_fetch_requests`
2 parents 59089d4 + 6b97b2d commit ceaceec

6 files changed

Lines changed: 616 additions & 26 deletions

File tree

src/v/cloud_topics/level_zero/pipeline/pipeline_stage.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pipeline_stage pipeline_stage_container::next_stage(pipeline_stage old) const {
3434
auto next_ix = old_ix + 1;
3535
// Check that we have next stage
3636
vassert(
37-
static_cast<size_t>(next_ix) < _stages.size(),
37+
static_cast<size_t>(next_ix) < _registered,
3838
"Pipeline stage {} is not registered",
3939
next_ix);
4040
return pipeline_stage(&_stages.at(next_ix));

src/v/cloud_topics/level_zero/pipeline/read_pipeline.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,27 +163,29 @@ read_pipeline<Clock>::get_fetch_requests(
163163
read_requests_list result(this, stage);
164164
size_t acc_size = 0;
165165

166-
// The elements in the list are in the insertion order.
167166
auto it = pending.begin();
168-
for (; it != pending.end(); it++) {
167+
for (; it != pending.end();) {
169168
if (it->stage != stage) {
169+
it++;
170170
continue;
171171
}
172-
// TODO: avoid copy
173172
auto sz = it->query.output_size_estimate;
174173
acc_size += sz;
175174
vlog(
176175
it->rtc_logger.trace,
177176
"get_fetch_requests processing req for {}, size estimate: {}",
178177
it->ntp,
179178
acc_size);
180-
if (acc_size >= max_bytes) {
181-
// Include last element
182-
it++;
179+
// Always include the first request even if it exceeds max_bytes
180+
// to avoid stalling the pipeline with oversized requests
181+
if (acc_size >= max_bytes && !result.requests.empty()) {
183182
break;
184183
}
184+
auto& el = *it;
185+
it++;
186+
el._hook.unlink();
187+
result.requests.push_back(el);
185188
}
186-
result.requests.splice(result.requests.end(), pending, pending.begin(), it);
187189
result.complete = pending.empty();
188190
vlog(
189191
logger.debug,

src/v/cloud_topics/level_zero/pipeline/tests/BUILD

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
load("//bazel:test.bzl", "redpanda_cc_bench", "redpanda_cc_gtest")
22

3+
redpanda_cc_gtest(
4+
name = "read_pipeline_test",
5+
timeout = "short",
6+
srcs = [
7+
"read_pipeline_test.cc",
8+
],
9+
deps = [
10+
"//src/v/cloud_topics/level_zero/pipeline:read_pipeline",
11+
"//src/v/model",
12+
"//src/v/test_utils:gtest",
13+
"@googletest//:gtest",
14+
"@seastar",
15+
],
16+
)
17+
318
redpanda_cc_gtest(
419
name = "write_pipeline_test",
520
timeout = "short",
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#include "cloud_topics/level_zero/pipeline/read_pipeline.h"
12+
#include "model/fundamental.h"
13+
#include "model/namespace.h"
14+
#include "test_utils/test.h"
15+
16+
#include <seastar/core/abort_source.hh>
17+
#include <seastar/core/lowres_clock.hh>
18+
#include <seastar/core/manual_clock.hh>
19+
#include <seastar/util/log.hh>
20+
21+
#include <chrono>
22+
#include <limits>
23+
24+
using namespace std::chrono_literals;
25+
26+
namespace cloud_topics::l0 {
27+
struct read_pipeline_accessor {
28+
// Returns true if the read request is in the `_pending` collection
29+
bool read_requests_pending(size_t n) {
30+
return pipeline->get_pending().size() == n;
31+
}
32+
33+
// Manually add a read request to the pending list with a specific stage
34+
void add_request_with_stage(
35+
read_request<ss::manual_clock>& req, pipeline_stage stage) {
36+
req.stage = stage;
37+
pipeline->get_pending().push_back(req);
38+
pipeline->signal(stage);
39+
}
40+
41+
// Call get_fetch_requests (which is private)
42+
read_pipeline<ss::manual_clock>::read_requests_list
43+
get_fetch_requests(size_t max_bytes, pipeline_stage stage) {
44+
return pipeline->get_fetch_requests(max_bytes, stage);
45+
}
46+
47+
read_pipeline<ss::manual_clock>* pipeline;
48+
};
49+
} // namespace cloud_topics::l0
50+
51+
// Simulate sleep of certain duration and wait until the condition is met
52+
template<class Fn>
53+
ss::future<>
54+
sleep_until(std::chrono::milliseconds delta, Fn&& fn, int retry_limit = 100) {
55+
ss::manual_clock::advance(delta);
56+
for (int i = 0; i < retry_limit; i++) {
57+
co_await ss::yield();
58+
if (fn()) {
59+
co_return;
60+
}
61+
}
62+
GTEST_MESSAGE_("Test stalled", ::testing::TestPartResult::kFatalFailure);
63+
}
64+
65+
TEST_CORO(read_pipeline_test, interleaving_stages_bug) {
66+
// This test demonstrates a bug where get_fetch_requests can return
67+
// requests that belong to the wrong pipeline stage when requests
68+
// with different stages are interleaved.
69+
cloud_topics::l0::read_pipeline<ss::manual_clock> pipeline;
70+
cloud_topics::l0::read_pipeline_accessor accessor{
71+
.pipeline = &pipeline,
72+
};
73+
74+
auto stage1 = pipeline.register_read_pipeline_stage();
75+
auto stage2 = pipeline.register_read_pipeline_stage();
76+
77+
const auto timeout = ss::manual_clock::now() + 10s;
78+
79+
// Create 6 read requests with interleaved stages:
80+
// stage1, stage2, stage1, stage2, stage1, stage2
81+
std::vector<
82+
std::unique_ptr<cloud_topics::l0::read_request<ss::manual_clock>>>
83+
requests;
84+
85+
for (int i = 0; i < 6; i++) {
86+
cloud_topics::l0::dataplane_query query;
87+
query.output_size_estimate = 1000 + i * 100; // Different sizes
88+
auto req
89+
= std::make_unique<cloud_topics::l0::read_request<ss::manual_clock>>(
90+
model::controller_ntp,
91+
std::move(query),
92+
timeout,
93+
&pipeline.get_root_rtc());
94+
requests.push_back(std::move(req));
95+
}
96+
97+
// Add requests with interleaved stages
98+
accessor.add_request_with_stage(*requests[0], stage1.id());
99+
accessor.add_request_with_stage(*requests[1], stage2.id());
100+
accessor.add_request_with_stage(*requests[2], stage1.id());
101+
accessor.add_request_with_stage(*requests[3], stage2.id());
102+
accessor.add_request_with_stage(*requests[4], stage1.id());
103+
accessor.add_request_with_stage(*requests[5], stage2.id());
104+
105+
co_await ss::yield();
106+
107+
ASSERT_EQ_CORO(accessor.read_requests_pending(6), true);
108+
109+
// Try to get requests for stage1 only - should get exactly 3 requests
110+
auto result = accessor.get_fetch_requests(
111+
std::numeric_limits<size_t>::max(), stage1.id());
112+
113+
ASSERT_EQ_CORO(result.requests.size(), 3);
114+
115+
for (auto& req : result.requests) {
116+
req.set_value(cloud_topics::l0::dataplane_query_result{});
117+
}
118+
119+
// The remaining 3 requests should still be in the pending queue
120+
ASSERT_EQ_CORO(accessor.read_requests_pending(3), true);
121+
122+
// Get stage2 requests - should get exactly 3 requests
123+
auto result2 = accessor.get_fetch_requests(
124+
std::numeric_limits<size_t>::max(), stage2.id());
125+
ASSERT_EQ_CORO(result2.requests.size(), 3);
126+
127+
for (auto& req : result2.requests) {
128+
req.set_value(cloud_topics::l0::dataplane_query_result{});
129+
}
130+
131+
ASSERT_EQ_CORO(accessor.read_requests_pending(0), true);
132+
}
133+
134+
TEST_CORO(read_pipeline_test, oversized_request) {
135+
// This test verifies that get_fetch_requests returns at least one
136+
// request even if it exceeds the max_bytes limit, to prevent pipeline
137+
// stalls with oversized requests.
138+
cloud_topics::l0::read_pipeline<ss::manual_clock> pipeline;
139+
cloud_topics::l0::read_pipeline_accessor accessor{
140+
.pipeline = &pipeline,
141+
};
142+
143+
auto stage = pipeline.register_read_pipeline_stage();
144+
const auto timeout = ss::manual_clock::now() + 10s;
145+
146+
// Create requests with different sizes
147+
std::vector<
148+
std::unique_ptr<cloud_topics::l0::read_request<ss::manual_clock>>>
149+
requests;
150+
151+
// First request is oversized (10000 bytes)
152+
cloud_topics::l0::dataplane_query query1;
153+
query1.output_size_estimate = 10000;
154+
auto req1
155+
= std::make_unique<cloud_topics::l0::read_request<ss::manual_clock>>(
156+
model::controller_ntp,
157+
std::move(query1),
158+
timeout,
159+
&pipeline.get_root_rtc());
160+
requests.push_back(std::move(req1));
161+
162+
// Second request is normal size (1000 bytes)
163+
cloud_topics::l0::dataplane_query query2;
164+
query2.output_size_estimate = 1000;
165+
auto req2
166+
= std::make_unique<cloud_topics::l0::read_request<ss::manual_clock>>(
167+
model::controller_ntp,
168+
std::move(query2),
169+
timeout,
170+
&pipeline.get_root_rtc());
171+
requests.push_back(std::move(req2));
172+
173+
// Add both requests to stage
174+
accessor.add_request_with_stage(*requests[0], stage.id());
175+
accessor.add_request_with_stage(*requests[1], stage.id());
176+
177+
co_await ss::yield();
178+
179+
ASSERT_EQ_CORO(accessor.read_requests_pending(2), true);
180+
181+
// Try to get requests with max_bytes = 5000 (less than first request)
182+
// Should still get the first request to avoid stalling
183+
auto result = accessor.get_fetch_requests(5000, stage.id());
184+
185+
// Should return exactly 1 request (the oversized one)
186+
ASSERT_EQ_CORO(result.requests.size(), 1);
187+
result.requests.front().set_value(
188+
cloud_topics::l0::dataplane_query_result{});
189+
190+
// Second request should still be pending
191+
ASSERT_EQ_CORO(accessor.read_requests_pending(1), true);
192+
193+
// Get the second request
194+
auto result2 = accessor.get_fetch_requests(
195+
std::numeric_limits<size_t>::max(), stage.id());
196+
ASSERT_EQ_CORO(result2.requests.size(), 1);
197+
result2.requests.front().set_value(
198+
cloud_topics::l0::dataplane_query_result{});
199+
200+
ASSERT_EQ_CORO(accessor.read_requests_pending(0), true);
201+
}
202+
203+
TEST_CORO(read_pipeline_test, multiple_requests_within_limit) {
204+
// This test verifies that get_fetch_requests returns multiple requests
205+
// when they fit within the size limit.
206+
cloud_topics::l0::read_pipeline<ss::manual_clock> pipeline;
207+
cloud_topics::l0::read_pipeline_accessor accessor{
208+
.pipeline = &pipeline,
209+
};
210+
211+
auto stage = pipeline.register_read_pipeline_stage();
212+
const auto timeout = ss::manual_clock::now() + 10s;
213+
214+
// Create 3 requests with size 1000 each
215+
std::vector<
216+
std::unique_ptr<cloud_topics::l0::read_request<ss::manual_clock>>>
217+
requests;
218+
219+
for (int i = 0; i < 3; i++) {
220+
cloud_topics::l0::dataplane_query query;
221+
query.output_size_estimate = 1000;
222+
auto req
223+
= std::make_unique<cloud_topics::l0::read_request<ss::manual_clock>>(
224+
model::controller_ntp,
225+
std::move(query),
226+
timeout,
227+
&pipeline.get_root_rtc());
228+
requests.push_back(std::move(req));
229+
}
230+
231+
// Add all requests to stage
232+
for (auto& req : requests) {
233+
accessor.add_request_with_stage(*req, stage.id());
234+
}
235+
236+
co_await ss::yield();
237+
238+
ASSERT_EQ_CORO(accessor.read_requests_pending(3), true);
239+
240+
// Get requests with max_bytes = 2500 (should get 2 requests, not 3)
241+
auto result = accessor.get_fetch_requests(2500, stage.id());
242+
243+
// Should return 2 requests (total 2000 bytes, within limit)
244+
ASSERT_EQ_CORO(result.requests.size(), 2);
245+
246+
for (auto& req : result.requests) {
247+
req.set_value(cloud_topics::l0::dataplane_query_result{});
248+
}
249+
250+
// One request should still be pending
251+
ASSERT_EQ_CORO(accessor.read_requests_pending(1), true);
252+
253+
// Get the last request
254+
auto result2 = accessor.get_fetch_requests(
255+
std::numeric_limits<size_t>::max(), stage.id());
256+
ASSERT_EQ_CORO(result2.requests.size(), 1);
257+
result2.requests.front().set_value(
258+
cloud_topics::l0::dataplane_query_result{});
259+
260+
ASSERT_EQ_CORO(accessor.read_requests_pending(0), true);
261+
}

0 commit comments

Comments
 (0)