Skip to content

Commit 1cd198a

Browse files
committed
fix(taskprocessing): claim tasks atomically with SKIP LOCKED + composite index
Replace the worker retry/ignore-list claim-loop with a single atomic SELECT ... FOR UPDATE SKIP LOCKED claim (SQLite bounded-retry fallback), preserving the no-duplicate guarantee while removing the thundering-herd contention that throttled backlog draining. Add a (status,type,last_updated) index via the table-creating migration + db:add-missing-indices listener. Signed-off-by: Yoan Bozhilov <bygadd@gmail.com> Assisted-by: Claude Code:claude-opus-4-8
1 parent e6346b2 commit 1cd198a

9 files changed

Lines changed: 451 additions & 25 deletions

File tree

core/Command/TaskProcessing/WorkerCommand.php

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
use OCP\AppFramework\Utility\ITimeFactory;
1515
use OCP\IAppConfig;
1616
use OCP\TaskProcessing\Exception\Exception;
17-
use OCP\TaskProcessing\Exception\NotFoundException;
1817
use OCP\TaskProcessing\IManager;
1918
use OCP\TaskProcessing\ISynchronousProvider;
2019
use Psr\Log\LoggerInterface;
@@ -118,9 +117,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
118117
* Attempt to process one task across all preferred synchronous providers.
119118
*
120119
* To avoid starvation, all eligible task types are first collected and then
121-
* the oldest scheduled task across all of them is fetched in a single query.
122-
* This ensures that tasks are processed in the order they were scheduled,
123-
* regardless of which provider handles them.
120+
* the oldest scheduled task across all of them is claimed in a single atomic
121+
* query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). This ensures tasks
122+
* are processed in the order they were scheduled, regardless of which provider
123+
* handles them, and guarantees no two workers ever claim the same task.
124124
*
125125
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
126126
* @return bool True if a task was processed, false if no task was found
@@ -162,15 +162,21 @@ private function processNextTask(OutputInterface $output, array $taskTypes = [])
162162
return false;
163163
}
164164

165-
// Fetch the oldest scheduled task across all eligible task types in one query.
166-
// This naturally prevents starvation: regardless of how many tasks one provider
167-
// has queued, another provider's older tasks will be picked up first.
165+
// Atomically claim the oldest scheduled task across all eligible task types in
166+
// one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both
167+
// fetches and marks the task RUNNING, so multiple workers never claim the same
168+
// task and no per-worker ignore-list / retry loop is needed. This also naturally
169+
// prevents starvation: regardless of how many tasks one provider has queued,
170+
// another provider's older tasks are picked up first.
168171
try {
169-
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
170-
} catch (NotFoundException) {
171-
return false;
172+
$task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders));
172173
} catch (Exception $e) {
173-
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
174+
$this->logger->error('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $e]);
175+
return false;
176+
}
177+
178+
if ($task === null) {
179+
// No schedulable task available right now.
174180
return false;
175181
}
176182

core/Listener/AddMissingIndicesListener.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,11 @@ public function handle(Event $event): void {
223223
['user', 'mountpoint'],
224224
['lengths' => [null, 128]]
225225
);
226+
227+
$event->addMissingIndex(
228+
'taskprocessing_tasks',
229+
'taskp_status_type_upd',
230+
['status', 'type', 'last_updated']
231+
);
226232
}
227233
}

core/Migrations/Version30000Date20240429122720.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt
9898
$table->addIndex(['status', 'type'], 'taskp_tasks_status_type');
9999
$table->addIndex(['last_updated'], 'taskp_tasks_updated');
100100
$table->addIndex(['user_id', 'app_id', 'custom_id'], 'taskp_tasks_uid_appid_cid');
101+
$table->addIndex(['status', 'type', 'last_updated'], 'taskp_status_type_upd');
101102

102103
return $schema;
103104
}

lib/private/TaskProcessing/Db/TaskMapper.php

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use OCP\AppFramework\Db\QBMapper;
1616
use OCP\AppFramework\Utility\ITimeFactory;
1717
use OCP\DB\Exception;
18+
use OCP\DB\QueryBuilder\ConflictResolutionMode;
1819
use OCP\DB\QueryBuilder\IQueryBuilder;
1920
use OCP\IDBConnection;
2021

@@ -75,6 +76,160 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno
7576
return $this->findEntity($qb);
7677
}
7778

79+
/**
80+
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
81+
*
82+
* This is the structural fix for the worker "claim loop": instead of every worker
83+
* racing for the single oldest task (a thundering herd that grows a per-worker
84+
* `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a
85+
* *distinct* task in one round trip.
86+
*
87+
* On databases that support row-level locking with SKIP LOCKED
88+
* (MySQL/MariaDB/PostgreSQL) the claim is a single transaction:
89+
* SELECT ... WHERE status = SCHEDULED [AND type IN (...)]
90+
* ORDER BY last_updated ASC LIMIT 1 FOR UPDATE SKIP LOCKED
91+
* followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already
92+
* locked by another transaction, so no two workers ever claim the same task.
93+
*
94+
* SQLite does not support SKIP LOCKED (verified: Doctrine throws "Operation
95+
* 'SKIP LOCKED' is not supported by platform"), so we feature-detect via the DB
96+
* provider and fall back to the existing bounded {@see lockTask} retry, which is
97+
* still safe because the UPDATE ... WHERE status = SCHEDULED is itself atomic and
98+
* SQLite serialises writers.
99+
*
100+
* A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked
101+
* FAILED by claiming. If the task cannot be claimed (none scheduled, or it was
102+
* taken by another worker between SELECT and UPDATE) this returns null.
103+
*
104+
* @param list<string> $taskTypes When non-empty, only tasks of these task type IDs are considered.
105+
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
106+
* @throws Exception
107+
*/
108+
public function claimOldestScheduledTask(array $taskTypes): ?Task {
109+
if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_SQLITE) {
110+
// SKIP LOCKED is unsupported on SQLite: fall back to the bounded lock-and-retry claim.
111+
return $this->claimWithBoundedRetry($taskTypes);
112+
}
113+
114+
return $this->claimWithSkipLocked($taskTypes);
115+
}
116+
117+
/**
118+
* Atomic claim using FOR UPDATE SKIP LOCKED in a single transaction.
119+
*
120+
* @param list<string> $taskTypes
121+
* @return Task|null
122+
* @throws Exception
123+
*/
124+
private function claimWithSkipLocked(array $taskTypes): ?Task {
125+
$this->db->beginTransaction();
126+
try {
127+
$qb = $this->db->getQueryBuilder();
128+
$qb->select(Task::COLUMNS)
129+
->from($this->tableName)
130+
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
131+
->orderBy('last_updated', 'ASC')
132+
->setMaxResults(1)
133+
->forUpdate(ConflictResolutionMode::SkipLocked);
134+
135+
if (!empty($taskTypes)) {
136+
$filter = [];
137+
foreach ($taskTypes as $taskType) {
138+
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
139+
}
140+
$qb->andWhere($qb->expr()->orX(...$filter));
141+
}
142+
143+
$result = $qb->executeQuery();
144+
$row = $result->fetch();
145+
$result->closeCursor();
146+
147+
if ($row === false) {
148+
// Nothing schedulable (or every candidate is locked by another worker).
149+
$this->db->commit();
150+
return null;
151+
}
152+
153+
/** @var Task $task */
154+
$task = $this->mapRowToEntity($row);
155+
156+
// Record the start time at claim time: because the worker receives the task
157+
// already in status RUNNING, the later SCHEDULED -> RUNNING transition in
158+
// Manager::setTaskStatus is skipped and would otherwise never persist started_at.
159+
$startedAt = $this->timeFactory->now()->getTimestamp();
160+
161+
// Guarded transition SCHEDULED -> RUNNING. The row is locked for this
162+
// transaction, so the guard is belt-and-braces rather than strictly required.
163+
$update = $this->db->getQueryBuilder();
164+
$update->update($this->tableName)
165+
->set('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
166+
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
167+
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)))
168+
->andWhere($update->expr()->eq('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
169+
$affected = $update->executeStatement();
170+
171+
$this->db->commit();
172+
173+
if ($affected === 0) {
174+
// Lost the race (should not happen under SKIP LOCKED); leave the task SCHEDULED.
175+
return null;
176+
}
177+
178+
$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
179+
$task->setStartedAt($startedAt);
180+
return $task;
181+
} catch (\Throwable $e) {
182+
$this->db->rollBack();
183+
throw $e;
184+
}
185+
}
186+
187+
/**
188+
* Fallback claim for databases without SKIP LOCKED (SQLite).
189+
*
190+
* Repeatedly fetches the oldest scheduled task and attempts the atomic
191+
* UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a
192+
* short ignore list so the next iteration moves on. Bounded to avoid unbounded
193+
* looping under contention.
194+
*
195+
* @param list<string> $taskTypes
196+
* @return Task|null
197+
* @throws Exception
198+
*/
199+
private function claimWithBoundedRetry(array $taskTypes): ?Task {
200+
$taskIdsToIgnore = [];
201+
// A handful of attempts is plenty: on SQLite writers are serialised, so at most
202+
// a few rows can be claimed out from under us before we either win or run dry.
203+
for ($attempt = 0; $attempt < 10; $attempt++) {
204+
try {
205+
$task = $this->findOldestScheduledByType($taskTypes, $taskIdsToIgnore);
206+
} catch (DoesNotExistException) {
207+
return null;
208+
}
209+
210+
if ($this->lockTask($task) !== 0) {
211+
$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
212+
// Record the start time at claim time. lockTask only flips the status (and is
213+
// shared with other callers), so persist started_at with a targeted follow-up
214+
// UPDATE rather than changing lockTask's behaviour. The worker receives the task
215+
// already RUNNING, so Manager::setTaskStatus would otherwise never write it.
216+
$startedAt = $this->timeFactory->now()->getTimestamp();
217+
$update = $this->db->getQueryBuilder();
218+
$update->update($this->tableName)
219+
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
220+
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)));
221+
$update->executeStatement();
222+
$task->setStartedAt($startedAt);
223+
return $task;
224+
}
225+
226+
// Another worker took it; skip this id and try the next oldest.
227+
$taskIdsToIgnore[] = $task->getId();
228+
}
229+
230+
return null;
231+
}
232+
78233
/**
79234
* @param int $id
80235
* @param string|null $userId

lib/private/TaskProcessing/Manager.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,6 +1341,21 @@ public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToI
13411341
}
13421342
}
13431343

1344+
#[\Override]
1345+
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task {
1346+
try {
1347+
$taskEntity = $this->taskMapper->claimOldestScheduledTask($taskTypeIds);
1348+
if ($taskEntity === null) {
1349+
return null;
1350+
}
1351+
return $taskEntity->toPublicTask();
1352+
} catch (\OCP\DB\Exception $e) {
1353+
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem claiming the task', previous: $e);
1354+
} catch (\JsonException $e) {
1355+
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after claiming the task', previous: $e);
1356+
}
1357+
}
1358+
13441359
/**
13451360
* Takes task input data and replaces fileIds with File objects
13461361
*

lib/public/TaskProcessing/IManager.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,22 @@ public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIg
173173
*/
174174
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;
175175

176+
/**
177+
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
178+
*
179+
* Unlike {@see getNextScheduledTask} (which only fetches) this both selects and
180+
* locks the task in one step, so concurrent workers never claim the same task.
181+
* On databases supporting it this uses SELECT ... FOR UPDATE SKIP LOCKED; on
182+
* SQLite it falls back to a bounded lock-and-retry. The task is only ever
183+
* transitioned SCHEDULED -> RUNNING; it is never marked FAILED by claiming.
184+
*
185+
* @param list<string> $taskTypeIds When non-empty, only tasks of these task type IDs are considered.
186+
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
187+
* @throws Exception If the query failed
188+
* @since 34.0.0
189+
*/
190+
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task;
191+
176192
/**
177193
* @param int $id The id of the task
178194
* @param string|null $userId The user id that scheduled the task

tests/Core/Command/TaskProcessing/WorkerCommandTest.php

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
use OCP\AppFramework\Utility\ITimeFactory;
1414
use OCP\IAppConfig;
1515
use OCP\TaskProcessing\Exception\Exception;
16-
use OCP\TaskProcessing\Exception\NotFoundException;
1716
use OCP\TaskProcessing\IManager;
1817
use OCP\TaskProcessing\ISynchronousProvider;
1918
use OCP\TaskProcessing\Task;
@@ -89,7 +88,7 @@ public function testOnceProcessesOneTask(): void {
8988
->willReturn($provider);
9089

9190
$this->manager->expects($this->once())
92-
->method('getNextScheduledTask')
91+
->method('claimNextScheduledTask')
9392
->with([$taskTypeId])
9493
->willReturn($task);
9594

@@ -120,7 +119,7 @@ public function testSkipsNonSynchronousProviders(): void {
120119
->method('getPreferredProvider');
121120

122121
$this->manager->expects($this->never())
123-
->method('getNextScheduledTask');
122+
->method('claimNextScheduledTask');
124123

125124
$input = new ArrayInput(['--once' => true], $this->command->getDefinition());
126125
$output = new NullOutput();
@@ -144,9 +143,9 @@ public function testSkipsNonPreferredProviders(): void {
144143
->with($taskTypeId)
145144
->willReturn($preferredProvider);
146145

147-
// provider_a is not preferred (provider_b is), so getNextScheduledTask is never called
146+
// provider_a is not preferred (provider_b is), so claimNextScheduledTask is never called
148147
$this->manager->expects($this->never())
149-
->method('getNextScheduledTask');
148+
->method('claimNextScheduledTask');
150149

151150
$input = new ArrayInput(['--once' => true], $this->command->getDefinition());
152151
$output = new NullOutput();
@@ -169,10 +168,11 @@ public function testContinuesWhenNoTaskFound(): void {
169168
->with($taskTypeId)
170169
->willReturn($provider);
171170

171+
// The no-task path is now claimNextScheduledTask returning null (not an exception).
172172
$this->manager->expects($this->once())
173-
->method('getNextScheduledTask')
173+
->method('claimNextScheduledTask')
174174
->with([$taskTypeId])
175-
->willThrowException(new NotFoundException());
175+
->willReturn(null);
176176

177177
$this->manager->expects($this->never())
178178
->method('processTask');
@@ -200,13 +200,13 @@ public function testLogsErrorAndContinuesOnException(): void {
200200

201201
$exception = new Exception('DB error');
202202
$this->manager->expects($this->once())
203-
->method('getNextScheduledTask')
203+
->method('claimNextScheduledTask')
204204
->with([$taskTypeId])
205205
->willThrowException($exception);
206206

207207
$this->logger->expects($this->once())
208208
->method('error')
209-
->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]);
209+
->with('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $exception]);
210210

211211
$this->manager->expects($this->never())
212212
->method('processTask');
@@ -256,9 +256,9 @@ public function testProcessesCorrectProviderForReturnedTaskType(): void {
256256
[$taskTypeId2, $provider2],
257257
]);
258258

259-
// All eligible task types are passed in a single query
259+
// All eligible task types are passed in a single atomic claim
260260
$this->manager->expects($this->once())
261-
->method('getNextScheduledTask')
261+
->method('claimNextScheduledTask')
262262
->with($this->equalTo([$taskTypeId1, $taskTypeId2]))
263263
->willReturn($task);
264264

@@ -294,7 +294,7 @@ public function testTaskTypesWhitelistFiltersProviders(): void {
294294
->willReturn($provider2);
295295

296296
$this->manager->expects($this->once())
297-
->method('getNextScheduledTask')
297+
->method('claimNextScheduledTask')
298298
->with([$taskTypeId2])
299299
->willReturn($task);
300300

@@ -323,7 +323,7 @@ public function testTaskTypesWhitelistWithNoMatchingProviders(): void {
323323
->method('getPreferredProvider');
324324

325325
$this->manager->expects($this->never())
326-
->method('getNextScheduledTask');
326+
->method('claimNextScheduledTask');
327327

328328
$input = new ArrayInput(['--once' => true, '--taskTypes' => ['type_b']], $this->command->getDefinition());
329329
$output = new NullOutput();
@@ -348,7 +348,7 @@ public function testEmptyTaskTypesAllowsAllProviders(): void {
348348
->willReturn($provider);
349349

350350
$this->manager->expects($this->once())
351-
->method('getNextScheduledTask')
351+
->method('claimNextScheduledTask')
352352
->with([$taskTypeId])
353353
->willReturn($task);
354354

0 commit comments

Comments
 (0)