Skip to content

Commit 2b45607

Browse files
committed
ct: Add read_pipeline_test
and add reproducer for the splice bug Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
1 parent b918d91 commit 2b45607

2 files changed

Lines changed: 147 additions & 0 deletions

File tree

src/v/cloud_topics/level_zero/pipeline/tests/BUILD

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
load("//bazel:test.bzl", "redpanda_cc_bench", "redpanda_cc_gtest")
22

3+
redpanda_cc_gtest(
4+
name = "read_pipeline_test",
5+
timeout = "short",
6+
srcs = [
7+
"read_pipeline_test.cc",
8+
],
9+
deps = [
10+
"//src/v/cloud_topics/level_zero/pipeline:read_pipeline",
11+
"//src/v/model",
12+
"//src/v/test_utils:gtest",
13+
"@googletest//:gtest",
14+
"@seastar",
15+
],
16+
)
17+
318
redpanda_cc_gtest(
419
name = "write_pipeline_test",
520
timeout = "short",
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#include "cloud_topics/level_zero/pipeline/read_pipeline.h"
12+
#include "model/fundamental.h"
13+
#include "model/namespace.h"
14+
#include "test_utils/test.h"
15+
16+
#include <seastar/core/abort_source.hh>
17+
#include <seastar/core/lowres_clock.hh>
18+
#include <seastar/core/manual_clock.hh>
19+
#include <seastar/util/log.hh>
20+
21+
#include <chrono>
22+
#include <limits>
23+
24+
using namespace std::chrono_literals;
25+
26+
namespace cloud_topics::l0 {
27+
struct read_pipeline_accessor {
28+
// Returns true if the read request is in the `_pending` collection
29+
bool read_requests_pending(size_t n) {
30+
return pipeline->get_pending().size() == n;
31+
}
32+
33+
// Manually add a read request to the pending list with a specific stage
34+
void add_request_with_stage(
35+
read_request<ss::manual_clock>& req, pipeline_stage stage) {
36+
req.stage = stage;
37+
pipeline->get_pending().push_back(req);
38+
pipeline->signal(stage);
39+
}
40+
41+
// Call get_fetch_requests (which is private)
42+
read_pipeline<ss::manual_clock>::read_requests_list
43+
get_fetch_requests(size_t max_bytes, pipeline_stage stage) {
44+
return pipeline->get_fetch_requests(max_bytes, stage);
45+
}
46+
47+
read_pipeline<ss::manual_clock>* pipeline;
48+
};
49+
} // namespace cloud_topics::l0
50+
51+
// Simulate sleep of certain duration and wait until the condition is met
52+
template<class Fn>
53+
ss::future<>
54+
sleep_until(std::chrono::milliseconds delta, Fn&& fn, int retry_limit = 100) {
55+
ss::manual_clock::advance(delta);
56+
for (int i = 0; i < retry_limit; i++) {
57+
co_await ss::yield();
58+
if (fn()) {
59+
co_return;
60+
}
61+
}
62+
GTEST_MESSAGE_("Test stalled", ::testing::TestPartResult::kFatalFailure);
63+
}
64+
65+
TEST_CORO(read_pipeline_test, interleaving_stages_bug) {
66+
// This test demonstrates a bug where get_fetch_requests can return
67+
// requests that belong to the wrong pipeline stage when requests
68+
// with different stages are interleaved.
69+
cloud_topics::l0::read_pipeline<ss::manual_clock> pipeline;
70+
cloud_topics::l0::read_pipeline_accessor accessor{
71+
.pipeline = &pipeline,
72+
};
73+
74+
auto stage1 = pipeline.register_read_pipeline_stage();
75+
auto stage2 = pipeline.register_read_pipeline_stage();
76+
77+
const auto timeout = ss::manual_clock::now() + 10s;
78+
79+
// Create 6 read requests with interleaved stages:
80+
// stage1, stage2, stage1, stage2, stage1, stage2
81+
std::vector<
82+
std::unique_ptr<cloud_topics::l0::read_request<ss::manual_clock>>>
83+
requests;
84+
85+
for (int i = 0; i < 6; i++) {
86+
cloud_topics::l0::dataplane_query query;
87+
query.output_size_estimate = 1000 + i * 100; // Different sizes
88+
auto req
89+
= std::make_unique<cloud_topics::l0::read_request<ss::manual_clock>>(
90+
model::controller_ntp,
91+
std::move(query),
92+
timeout,
93+
&pipeline.get_root_rtc());
94+
requests.push_back(std::move(req));
95+
}
96+
97+
// Add requests with interleaved stages
98+
accessor.add_request_with_stage(*requests[0], stage1.id());
99+
accessor.add_request_with_stage(*requests[1], stage2.id());
100+
accessor.add_request_with_stage(*requests[2], stage1.id());
101+
accessor.add_request_with_stage(*requests[3], stage2.id());
102+
accessor.add_request_with_stage(*requests[4], stage1.id());
103+
accessor.add_request_with_stage(*requests[5], stage2.id());
104+
105+
co_await ss::yield();
106+
107+
ASSERT_EQ_CORO(accessor.read_requests_pending(6), true);
108+
109+
// Try to get requests for stage1 only - should get exactly 3 requests
110+
auto result = accessor.get_fetch_requests(
111+
std::numeric_limits<size_t>::max(), stage1.id());
112+
113+
ASSERT_EQ_CORO(result.requests.size(), 3);
114+
115+
for (auto& req : result.requests) {
116+
req.set_value(cloud_topics::l0::dataplane_query_result{});
117+
}
118+
119+
// The remaining 3 requests should still be in the pending queue
120+
ASSERT_EQ_CORO(accessor.read_requests_pending(3), true);
121+
122+
// Get stage2 requests - should get exactly 3 requests
123+
auto result2 = accessor.get_fetch_requests(
124+
std::numeric_limits<size_t>::max(), stage2.id());
125+
ASSERT_EQ_CORO(result2.requests.size(), 3);
126+
127+
for (auto& req : result2.requests) {
128+
req.set_value(cloud_topics::l0::dataplane_query_result{});
129+
}
130+
131+
ASSERT_EQ_CORO(accessor.read_requests_pending(0), true);
132+
}

0 commit comments

Comments
 (0)