Skip to content

Commit ee7f9c2

Browse files
committed
fix(taskprocessing): claim tasks atomically in worker via lockTask()
Signed-off-by: bygadd <bygadd@gmail.com>
1 parent dbe2329 commit ee7f9c2

1 file changed

Lines changed: 24 additions & 10 deletions

File tree

core/Command/TaskProcessing/WorkerCommand.php

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,31 @@ private function processNextTask(OutputInterface $output, array $taskTypes = [])
165165
// Fetch the oldest scheduled task across all eligible task types in one query.
166166
// This naturally prevents starvation: regardless of how many tasks one provider
167167
// has queued, another provider's older tasks will be picked up first.
168-
try {
169-
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
170-
} catch (NotFoundException) {
171-
return false;
172-
} catch (Exception $e) {
173-
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
174-
return false;
175-
}
168+
$taskIdsToIgnore = [];
169+
while (true) {
170+
try {
171+
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders), $taskIdsToIgnore);
172+
} catch (NotFoundException) {
173+
return false;
174+
} catch (Exception $e) {
175+
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
176+
return false;
177+
}
176178

177-
$taskTypeId = $task->getTaskTypeId();
178-
$provider = $eligibleProviders[$taskTypeId];
179+
$taskTypeId = $task->getTaskTypeId();
180+
if (!isset($eligibleProviders[$taskTypeId])) {
181+
$taskIdsToIgnore[] = (int)$task->getId();
182+
continue;
183+
}
184+
$provider = $eligibleProviders[$taskTypeId];
185+
186+
// Atomically claim the task; if another worker grabbed it between
187+
// the SELECT above and now, lockTask() returns false -> skip it.
188+
if ($this->taskProcessingManager->lockTask($task)) {
189+
break;
190+
}
191+
$taskIdsToIgnore[] = (int)$task->getId();
192+
}
179193

180194
$output->writeln(
181195
'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),

0 commit comments

Comments
 (0)