Skip to content

Commit fdf9ca1

Browse files
authored
Merge branch 'main' into f/hehehe
2 parents 3879231 + 02c438d commit fdf9ca1

2 files changed

Lines changed: 70 additions & 2 deletions

File tree

src/tasks/sub_task_queue.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ namespace ccf::tasks
1010
{
1111
// Helper type for OrderedTasks, containing a list of sub-tasks to be
1212
// performed in-order. Modifiers return bools indicating whether the caller
13-
// is responsible for scheduling a future flush of this queue.
13+
// should schedule processing of this queue *now* (eg, enqueue the owning
14+
// runner), based on the current state.
1415
template <typename T>
1516
class SubTaskQueue
1617
{
@@ -21,10 +22,19 @@ namespace ccf::tasks
2122
std::atomic<bool> paused;
2223

2324
public:
25+
// Enqueue a new sub-task.
26+
//
27+
// Returns true iff this call made the queue non-empty, the queue was not
28+
// already active, and the queue is not paused. Callers should interpret
29+
// a true return as "schedule processing of this queue now" (eg, enqueue
30+
// the parent runner). While paused, this function intentionally never
31+
// requests scheduling (it will always return false), even if the queue
32+
// transitions from empty to non-empty. This avoids double-enqueue when
33+
// the queue is resumed elsewhere.
2434
bool push(T&& t)
2535
{
2636
std::lock_guard<std::mutex> lock(pending_mutex);
27-
const bool ret = pending.empty() && !active.load();
37+
const bool ret = pending.empty() && !active.load() && !paused.load();
2838
pending.emplace_back(std::forward<T>(t));
2939
return ret;
3040
}

src/tasks/test/ordered_tasks.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,61 @@ TEST_CASE("OrderedTasks" * doctest::test_suite("ordered_tasks"))
199199
}
200200
}
201201
}
202+
203+
TEST_CASE(
204+
"Concurrent pause + add_action does not double-enqueue" *
205+
doctest::test_suite("ordered_tasks"))
206+
{
207+
ccf::tasks::JobBoard job_board;
208+
auto tasks = ccf::tasks::OrderedTasks::create(job_board);
209+
210+
std::vector<size_t> execution_order;
211+
212+
// Step 1: Add an action that pauses itself mid-execution
213+
ccf::tasks::Resumable resumable;
214+
215+
tasks->add_action(ccf::tasks::make_basic_action([&]() {
216+
execution_order.push_back(1);
217+
218+
// Pause the task (simulating respond_on_commit)
219+
resumable = ccf::tasks::pause_current_task();
220+
}));
221+
222+
// Step 2: Execute the first action - it will pause
223+
{
224+
auto task = job_board.get_task();
225+
REQUIRE(task != nullptr);
226+
task->do_task();
227+
}
228+
229+
// Confirm results of action - board should be empty, and we hold a resumable
230+
// token to restore the paused queue
231+
REQUIRE(job_board.get_task() == nullptr);
232+
REQUIRE(resumable != nullptr);
233+
234+
// Step 3: Simulate concurrent operations — add_action + resume_task
235+
tasks->add_action(
236+
ccf::tasks::make_basic_action([&]() { execution_order.push_back(2); }));
237+
238+
ccf::tasks::resume_task(std::move(resumable));
239+
240+
// Step 4: Count how many times the task was enqueued
241+
size_t enqueue_count = 0;
242+
while (true)
243+
{
244+
auto task = job_board.get_task();
245+
if (task == nullptr)
246+
{
247+
break;
248+
}
249+
enqueue_count++;
250+
task->do_task();
251+
}
252+
253+
// Confirm that despite 2 potentially-queuing concurrent operations, only one
254+
// actual enqueue occurred
255+
REQUIRE(enqueue_count == 1);
256+
257+
// Verify the second action (added in step 3) actually executed
258+
REQUIRE(execution_order == std::vector<size_t>{1, 2});
259+
}

0 commit comments

Comments
 (0)