Skip to content

Commit 495e2b4

Browse files
Merge pull request #61367 from nextcloud/backport/61053/stable34
[stable34] fix(taskprocessing): claim tasks atomically so parallel workers don't duplicate
2 parents 7106aaf + 86b4dc6 commit 495e2b4

9 files changed

Lines changed: 492 additions & 26 deletions

File tree

core/Command/TaskProcessing/WorkerCommand.php

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
use OCP\AppFramework\Utility\ITimeFactory;
1414
use OCP\IAppConfig;
1515
use OCP\TaskProcessing\Exception\Exception;
16-
use OCP\TaskProcessing\Exception\NotFoundException;
1716
use OCP\TaskProcessing\IManager;
1817
use OCP\TaskProcessing\ISynchronousProvider;
1918
use Psr\Log\LoggerInterface;
@@ -117,9 +116,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
117116
* Attempt to process one task across all preferred synchronous providers.
118117
*
119118
* To avoid starvation, all eligible task types are first collected and then
120-
* the oldest scheduled task across all of them is fetched in a single query.
121-
* This ensures that tasks are processed in the order they were scheduled,
122-
* regardless of which provider handles them.
119+
* the oldest scheduled task across all of them is claimed in a single atomic
120+
* query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). Each claim prefers the
121+
* oldest available scheduled task -- under parallel workers SKIP LOCKED skips rows
122+
* another worker has locked, so this reduces starvation rather than guaranteeing a
123+
* strict global processing order -- and no two workers ever claim the same task.
123124
*
124125
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
125126
* @return bool True if a task was processed, false if no task was found
@@ -161,15 +162,22 @@ private function processNextTask(OutputInterface $output, array $taskTypes = [])
161162
return false;
162163
}
163164

164-
// Fetch the oldest scheduled task across all eligible task types in one query.
165-
// This naturally prevents starvation: regardless of how many tasks one provider
166-
// has queued, another provider's older tasks will be picked up first.
165+
// Atomically claim the oldest scheduled task across all eligible task types in
166+
// one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both
167+
// fetches and marks the task RUNNING, so multiple workers never claim the same
168+
// task and no per-worker ignore-list / retry loop is needed. This also reduces
169+
// starvation: each claim prefers the oldest available task, so a provider with a
170+
// large queue does not indefinitely block another provider's older tasks (though a
171+
// worker may claim a newer task while an older one is locked by another worker).
167172
try {
168-
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
169-
} catch (NotFoundException) {
170-
return false;
173+
$task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders));
171174
} catch (Exception $e) {
172-
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
175+
$this->logger->error('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $e]);
176+
return false;
177+
}
178+
179+
if ($task === null) {
180+
// No schedulable task available right now.
173181
return false;
174182
}
175183

core/Listener/AddMissingIndicesListener.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,5 +224,11 @@ public function handle(Event $event): void {
224224
['user', 'mountpoint'],
225225
['lengths' => [null, 128]]
226226
);
227+
228+
$event->addMissingIndex(
229+
'taskprocessing_tasks',
230+
'taskp_status_type_upd',
231+
['status', 'type', 'last_updated']
232+
);
227233
}
228234
}

core/Migrations/Version30000Date20240429122720.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt
9797
$table->addIndex(['status', 'type'], 'taskp_tasks_status_type');
9898
$table->addIndex(['last_updated'], 'taskp_tasks_updated');
9999
$table->addIndex(['user_id', 'app_id', 'custom_id'], 'taskp_tasks_uid_appid_cid');
100+
$table->addIndex(['status', 'type', 'last_updated'], 'taskp_status_type_upd');
100101

101102
return $schema;
102103
}

lib/private/TaskProcessing/Db/TaskMapper.php

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use OCP\AppFramework\Db\QBMapper;
1616
use OCP\AppFramework\Utility\ITimeFactory;
1717
use OCP\DB\Exception;
18+
use OCP\DB\QueryBuilder\ConflictResolutionMode;
1819
use OCP\DB\QueryBuilder\IQueryBuilder;
1920
use OCP\IDBConnection;
2021

@@ -75,6 +76,155 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno
7576
return $this->findEntity($qb);
7677
}
7778

79+
/**
80+
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
81+
*
82+
* This is the structural fix for the worker "claim loop": instead of every worker
83+
* racing for the single oldest task (a thundering herd that grows a per-worker
84+
* `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a
85+
* *distinct* task in a single claim attempt without a per-worker ignore-list.
86+
* On databases that support row-level locking with SKIP LOCKED
87+
* (MySQL/MariaDB/PostgreSQL) the claim is a single transaction:
88+
* SELECT ... WHERE status = SCHEDULED [AND type IN (...)]
89+
* ORDER BY last_updated ASC LIMIT 1 FOR UPDATE SKIP LOCKED
90+
* followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already
91+
* locked by another transaction, so no two workers ever claim the same task.
92+
*
93+
* Two databases cannot use the SKIP LOCKED path and fall back to a bounded
94+
* lock-and-retry claim instead:
95+
* - SQLite has no SKIP LOCKED (Doctrine throws "Operation 'SKIP LOCKED' is not
96+
* supported by platform").
97+
* - Oracle cannot combine a row-limiting clause with FOR UPDATE: the LIMIT is
98+
* emulated with a ROWNUM sub-select, and selecting FOR UPDATE from that derived
99+
* view raises ORA-02014.
100+
* The fallback is still safe because the UPDATE ... WHERE status = SCHEDULED is itself
101+
* atomic (SQLite additionally serialises writers).
102+
*
103+
* A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked
104+
* FAILED by claiming. If the task cannot be claimed (none scheduled, or it was
105+
* taken by another worker between SELECT and UPDATE) this returns null.
106+
*
107+
* @param list<string> $taskTypes When non-empty, only tasks of these task type IDs are considered.
108+
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
109+
* @throws Exception
110+
*/
111+
public function claimOldestScheduledTask(array $taskTypes): ?Task {
112+
$provider = $this->db->getDatabaseProvider();
113+
// SKIP LOCKED is unusable on SQLite (unsupported) and Oracle (LIMIT + FOR UPDATE =>
114+
// ORA-02014): both fall back to the bounded lock-and-retry claim.
115+
if ($provider === IDBConnection::PLATFORM_SQLITE || $provider === IDBConnection::PLATFORM_ORACLE) {
116+
return $this->claimWithBoundedRetry($taskTypes);
117+
}
118+
119+
return $this->claimWithSkipLocked($taskTypes);
120+
}
121+
122+
/**
123+
* Atomic claim using FOR UPDATE SKIP LOCKED in a single transaction.
124+
*
125+
* @param list<string> $taskTypes
126+
* @return Task|null
127+
* @throws Exception
128+
*/
129+
private function claimWithSkipLocked(array $taskTypes): ?Task {
130+
$this->db->beginTransaction();
131+
try {
132+
$qb = $this->db->getQueryBuilder();
133+
$qb->select(Task::$columns)
134+
->from($this->tableName)
135+
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
136+
->orderBy('last_updated', 'ASC')
137+
->setMaxResults(1)
138+
->forUpdate(ConflictResolutionMode::SkipLocked);
139+
140+
if (!empty($taskTypes)) {
141+
$filter = [];
142+
foreach ($taskTypes as $taskType) {
143+
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
144+
}
145+
$qb->andWhere($qb->expr()->orX(...$filter));
146+
}
147+
148+
$result = $qb->executeQuery();
149+
$row = $result->fetch();
150+
$result->closeCursor();
151+
152+
if ($row === false) {
153+
// Nothing schedulable (or every candidate is locked by another worker).
154+
$this->db->commit();
155+
return null;
156+
}
157+
158+
/** @var Task $task */
159+
$task = $this->mapRowToEntity($row);
160+
161+
// Record the start time at claim time: because the worker receives the task
162+
// already in status RUNNING, the later SCHEDULED -> RUNNING transition in
163+
// Manager::setTaskStatus is skipped and would otherwise never persist started_at.
164+
$startedAt = $this->timeFactory->now()->getTimestamp();
165+
166+
// Guarded transition SCHEDULED -> RUNNING. The row is locked for this
167+
// transaction, so the guard is belt-and-braces rather than strictly required.
168+
$update = $this->db->getQueryBuilder();
169+
$update->update($this->tableName)
170+
->set('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
171+
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
172+
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)))
173+
->andWhere($update->expr()->eq('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
174+
$affected = $update->executeStatement();
175+
176+
$this->db->commit();
177+
178+
if ($affected === 0) {
179+
// Lost the race (should not happen under SKIP LOCKED); leave the task SCHEDULED.
180+
return null;
181+
}
182+
183+
$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
184+
$task->setStartedAt($startedAt);
185+
return $task;
186+
} catch (\Throwable $e) {
187+
$this->db->rollBack();
188+
throw $e;
189+
}
190+
}
191+
192+
/**
193+
* Fallback claim for databases that cannot use the SKIP LOCKED path (SQLite, Oracle).
194+
*
195+
* Repeatedly fetches the oldest scheduled task and attempts the atomic
196+
* UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a
197+
* short ignore list so the next iteration moves on. Bounded to avoid unbounded
198+
* looping under contention.
199+
*
200+
* @param list<string> $taskTypes
201+
* @return Task|null
202+
* @throws Exception
203+
*/
204+
private function claimWithBoundedRetry(array $taskTypes): ?Task {
205+
$taskIdsToIgnore = [];
206+
// A handful of attempts is plenty: on SQLite writers are serialised, so at most
207+
// a few rows can be claimed out from under us before we either win or run dry.
208+
for ($attempt = 0; $attempt < 10; $attempt++) {
209+
try {
210+
$task = $this->findOldestScheduledByType($taskTypes, $taskIdsToIgnore);
211+
} catch (DoesNotExistException) {
212+
return null;
213+
}
214+
215+
if ($this->lockTask($task) !== 0) {
216+
// lockTask atomically flipped SCHEDULED -> RUNNING and stamped started_at.
217+
// Re-read so the returned task reflects the persisted status and started_at.
218+
return $this->find($task->getId());
219+
}
220+
221+
// Another worker took it; skip this id and try the next oldest.
222+
$taskIdsToIgnore[] = $task->getId();
223+
}
224+
225+
return null;
226+
}
227+
78228
/**
79229
* @param int $id
80230
* @param string|null $userId
@@ -222,12 +372,30 @@ public function update(Entity $entity): Entity {
222372
return parent::update($entity);
223373
}
224374

375+
/**
376+
* Atomically claim a task by transitioning it SCHEDULED -> RUNNING.
377+
*
378+
* The UPDATE is guarded on `status = SCHEDULED` so a task another worker has already
379+
* finished (SUCCESSFUL/FAILED) between a caller's SELECT and this UPDATE can never be
380+
* re-claimed and processed twice. started_at is stamped in the same statement: the
381+
* worker receives the task already RUNNING, so the later SCHEDULED -> RUNNING edge in
382+
* Manager::setTaskStatus (which used to set started_at) no longer fires.
383+
*
384+
* Semantic change: this previously guarded on `status != RUNNING`, which allowed an
385+
* already SUCCESSFUL/FAILED task to be re-locked back to RUNNING. Callers must now
386+
* treat a 0 return as "the task is no longer claimable" (it is no longer SCHEDULED)
387+
* and move on, rather than assuming the lock succeeded.
388+
*
389+
* @return int Number of rows updated: 1 if the task was claimed, 0 if it was no longer scheduled.
390+
*/
225391
public function lockTask(Entity $entity): int {
392+
$startedAt = $this->timeFactory->now()->getTimestamp();
226393
$qb = $this->db->getQueryBuilder();
227394
$qb->update($this->tableName)
228395
->set('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
396+
->set('started_at', $qb->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
229397
->where($qb->expr()->eq('id', $qb->createPositionalParameter($entity->getId(), IQueryBuilder::PARAM_INT)))
230-
->andWhere($qb->expr()->neq('status', $qb->createPositionalParameter(2, IQueryBuilder::PARAM_INT)));
398+
->andWhere($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
231399
try {
232400
return $qb->executeStatement();
233401
} catch (Exception) {

lib/private/TaskProcessing/Manager.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,21 @@ public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToI
13381338
}
13391339
}
13401340

1341+
#[\Override]
1342+
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task {
1343+
try {
1344+
$taskEntity = $this->taskMapper->claimOldestScheduledTask($taskTypeIds);
1345+
if ($taskEntity === null) {
1346+
return null;
1347+
}
1348+
return $taskEntity->toPublicTask();
1349+
} catch (\OCP\DB\Exception $e) {
1350+
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem claiming the task', previous: $e);
1351+
} catch (\JsonException $e) {
1352+
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after claiming the task', previous: $e);
1353+
}
1354+
}
1355+
13411356
/**
13421357
* Takes task input data and replaces fileIds with File objects
13431358
*

lib/public/TaskProcessing/IManager.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,22 @@ public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIg
174174
*/
175175
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;
176176

177+
/**
178+
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
179+
*
180+
* Unlike {@see getNextScheduledTask} (which only fetches) this both selects and
181+
* locks the task in one step, so concurrent workers never claim the same task.
182+
* On databases supporting it this uses SELECT ... FOR UPDATE SKIP LOCKED; on
183+
* SQLite it falls back to a bounded lock-and-retry. The task is only ever
184+
* transitioned SCHEDULED -> RUNNING; it is never marked FAILED by claiming.
185+
*
186+
* @param list<string> $taskTypeIds When non-empty, only tasks of these task type IDs are considered.
187+
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
188+
* @throws Exception If the query failed
189+
* @since 35.0.0
190+
*/
191+
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task;
192+
177193
/**
178194
* @param int $id The id of the task
179195
* @param string|null $userId The user id that scheduled the task

0 commit comments

Comments
 (0)