Skip to content

Commit 0062276

Browse files
committed
ct: Fix get_write_requests method
of the write_pipeline. The method can potentially return requests that belong to the wrong pipeline stage. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
1 parent 2b45607 commit 0062276

1 file changed

Lines changed: 19 additions & 15 deletions

File tree

src/v/cloud_topics/level_zero/pipeline/write_pipeline.cc

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,35 +170,39 @@ write_pipeline<Clock>::get_write_requests(
170170
size_t acc_size = 0;
171171
size_t acc_req = 0;
172172

173-
// The elements in the list are in the insertion order.
174173
auto it = pending.begin();
175-
for (; it != pending.end(); it++) {
174+
for (; it != pending.end();) {
176175
if (it->stage != stage) {
176+
it++;
177177
continue;
178178
}
179179
auto sz = it->data_chunk.payload.size_bytes();
180180
acc_size += sz;
181181
acc_req++;
182182
if (acc_size >= max_bytes || acc_req >= max_requests) {
183183
// Include last element
184+
auto& el = *it;
184185
it++;
186+
el._hook.unlink();
187+
if (el.stage != unassigned_pipeline_stage) {
188+
auto idx = static_cast<size_t>(el.stage()->get_numeric_id());
189+
_stage_bytes[idx] -= el.size_bytes();
190+
el.stage = unassigned_pipeline_stage;
191+
}
192+
result.requests.push_back(el);
185193
break;
186194
}
187-
}
188-
// There are three steps to sever the connection between the requests
189-
// and their original pipeline:
190-
// 1. Splice them out of the pending list.
191-
// 2. Decrement their bytes from their stage.
192-
// 3. Set the stage to unassigned.
193-
result.requests.splice(result.requests.end(), pending, pending.begin(), it);
194-
result.complete = pending.empty();
195-
for (auto& req : result.requests) {
196-
if (req.stage != unassigned_pipeline_stage) {
197-
auto idx = static_cast<size_t>(req.stage()->get_numeric_id());
198-
_stage_bytes[idx] -= req.size_bytes();
199-
req.stage = unassigned_pipeline_stage;
195+
auto& el = *it;
196+
it++;
197+
el._hook.unlink();
198+
if (el.stage != unassigned_pipeline_stage) {
199+
auto idx = static_cast<size_t>(el.stage()->get_numeric_id());
200+
_stage_bytes[idx] -= el.size_bytes();
201+
el.stage = unassigned_pipeline_stage;
200202
}
203+
result.requests.push_back(el);
201204
}
205+
result.complete = pending.empty();
202206
vlog(
203207
cd_log.trace,
204208
"get_write_requests returned {} elements, containing {} ({}B)",

0 commit comments

Comments
 (0)