@@ -43,6 +43,14 @@ struct write_pipeline_accessor {
4343 return pipeline->get_pending ().size () == n;
4444 }
4545
46+ // Manually add a write request to the pending list with a specific stage
47+ void add_request_with_stage (
48+ write_request<ss::manual_clock>& req, pipeline_stage stage) {
49+ req.stage = stage;
50+ pipeline->get_pending ().push_back (req);
51+ pipeline->signal (stage);
52+ }
53+
4654 write_pipeline<ss::manual_clock>* pipeline;
4755};
4856} // namespace cloud_topics::l0
@@ -202,3 +210,86 @@ TEST_CORO(write_pipeline_test, stage_bytes_accounting_on_timeout) {
202210 ASSERT_FALSE_CORO (write_res.has_value ());
203211 ASSERT_EQ_CORO (pipeline.stage_bytes (stage.id ()), 0 );
204212}
213+
214+ TEST_CORO (write_pipeline_test, interleaving_stages_bug) {
215+ // This test demonstrates a bug where get_write_requests can return
216+ // requests that belong to the wrong pipeline stage when requests
217+ // with different stages are interleaved.
218+ cloud_topics::l0::write_pipeline<ss::manual_clock> pipeline;
219+ cloud_topics::l0::write_pipeline_accessor accessor{
220+ .pipeline = &pipeline,
221+ };
222+
223+ auto stage1 = pipeline.register_write_pipeline_stage ();
224+ auto stage2 = pipeline.register_write_pipeline_stage ();
225+
226+ const auto timeout = ss::manual_clock::now () + 10s;
227+
228+ auto test_data = co_await model::test::make_random_batches (
229+ {.count = 1 , .records = 5 });
230+
231+ // Helper to create a serialized chunk from test data
232+ auto make_chunk = [&]() -> ss::future<cloud_topics::l0::serialized_chunk> {
233+ chunked_vector<model::record_batch> batches;
234+ auto data = co_await model::test::make_random_batches (
235+ {.count = 1 , .records = 5 });
236+ std::ranges::move (std::move (data), std::back_inserter (batches));
237+ co_return co_await cloud_topics::l0::serialize_batches (
238+ std::move (batches));
239+ };
240+
241+ // Create 6 write requests with interleaved stages:
242+ // stage1, stage2, stage1, stage2, stage1, stage2
243+ std::vector<
244+ std::unique_ptr<cloud_topics::l0::write_request<ss::manual_clock>>>
245+ requests;
246+
247+ for (int i = 0 ; i < 6 ; i++) {
248+ auto chunk = co_await make_chunk ();
249+ auto req
250+ = std::make_unique<cloud_topics::l0::write_request<ss::manual_clock>>(
251+ model::controller_ntp, min_epoch, std::move (chunk), timeout);
252+ requests.push_back (std::move (req));
253+ }
254+
255+ // Add requests with interleaved stages
256+ accessor.add_request_with_stage (*requests[0 ], stage1.id ());
257+ accessor.add_request_with_stage (*requests[1 ], stage2.id ());
258+ accessor.add_request_with_stage (*requests[2 ], stage1.id ());
259+ accessor.add_request_with_stage (*requests[3 ], stage2.id ());
260+ accessor.add_request_with_stage (*requests[4 ], stage1.id ());
261+ accessor.add_request_with_stage (*requests[5 ], stage2.id ());
262+
263+ co_await ss::yield ();
264+
265+ ASSERT_EQ_CORO (accessor.write_requests_pending (6 ), true );
266+
267+ // Try to get requests for stage1 only - should get exactly 3 requests
268+ auto result = stage1.pull_write_requests (
269+ std::numeric_limits<size_t >::max ());
270+
271+ ASSERT_EQ_CORO (result.requests .size (), 3 );
272+
273+ for (auto & req : result.requests ) {
274+ // Stage should be unassigned after extraction
275+ ASSERT_TRUE_CORO (
276+ req.stage == cloud_topics::l0::unassigned_pipeline_stage);
277+ req.set_value (chunked_vector<cloud_topics::extent_meta>{});
278+ }
279+
280+ // The remaining 3 requests should still be in the pending queue
281+ ASSERT_TRUE_CORO (accessor.write_requests_pending (3 ));
282+
283+ // Get stage2 requests - should get exactly 3 requests
284+ auto result2 = stage2.pull_write_requests (
285+ std::numeric_limits<size_t >::max ());
286+ ASSERT_EQ_CORO (result2.requests .size (), 3 );
287+
288+ for (auto & req : result2.requests ) {
289+ ASSERT_TRUE_CORO (
290+ req.stage == cloud_topics::l0::unassigned_pipeline_stage);
291+ req.set_value (chunked_vector<cloud_topics::extent_meta>{});
292+ }
293+
294+ ASSERT_EQ_CORO (accessor.write_requests_pending (0 ), true );
295+ }
0 commit comments