@@ -105,16 +105,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int
105105 /**
106106 * Attempt to process one task across all preferred synchronous providers.
107107 *
108+ * To avoid starvation, all eligible task types are first collected and then
109+ * the oldest scheduled task across all of them is fetched in a single query.
110+ * This ensures that tasks are processed in the order they were scheduled,
111+ * regardless of which provider handles them.
112+ *
108113 * @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
109114 * @return bool True if a task was processed, false if no task was found
110115 */
111116 private function processNextTask (OutputInterface $ output , array $ taskTypes = []): bool {
112117 $ providers = $ this ->taskProcessingManager ->getProviders ();
113- // Shuffle providers to avoid starvation: if providers are always iterated
114- // in the same order, a provider with a constant stream of tasks would
115- // prevent all subsequent providers from ever being processed.
116- shuffle ($ providers );
117118
119+ // Build a map of eligible taskTypeId => provider for all preferred synchronous providers
120+ /** @var array<string, ISynchronousProvider> $eligibleProviders */
121+ $ eligibleProviders = [];
118122 foreach ($ providers as $ provider ) {
119123 if (!$ provider instanceof ISynchronousProvider) {
120124 continue ;
@@ -139,30 +143,40 @@ private function processNextTask(OutputInterface $output, array $taskTypes = [])
139143 continue ;
140144 }
141145
142- try {
143- $ task = $ this ->taskProcessingManager ->getNextScheduledTask ([$ taskTypeId ]);
144- } catch (NotFoundException ) {
145- continue ;
146- } catch (Exception $ e ) {
147- $ this ->logger ->error ('Unknown error while retrieving scheduled TaskProcessing tasks ' , ['exception ' => $ e ]);
148- continue ;
149- }
146+ $ eligibleProviders [$ taskTypeId ] = $ provider ;
147+ }
150148
151- $ output ->writeln (
152- 'Processing task ' . $ task ->getId () . ' of type ' . $ taskTypeId . ' with provider ' . $ provider ->getId (),
153- OutputInterface::VERBOSITY_VERBOSE
154- );
149+ if (empty ($ eligibleProviders )) {
150+ return false ;
151+ }
155152
156- $ this ->taskProcessingManager ->processTask ($ task , $ provider );
153+ // Fetch the oldest scheduled task across all eligible task types in one query.
154+ // This naturally prevents starvation: regardless of how many tasks one provider
155+ // has queued, another provider's older tasks will be picked up first.
156+ try {
157+ $ task = $ this ->taskProcessingManager ->getNextScheduledTask (array_keys ($ eligibleProviders ));
158+ } catch (NotFoundException ) {
159+ return false ;
160+ } catch (Exception $ e ) {
161+ $ this ->logger ->error ('Unknown error while retrieving scheduled TaskProcessing tasks ' , ['exception ' => $ e ]);
162+ return false ;
163+ }
157164
158- $ output ->writeln (
159- 'Finished processing task ' . $ task ->getId (),
160- OutputInterface::VERBOSITY_VERBOSE
161- );
165+ $ taskTypeId = $ task ->getTaskTypeId ();
166+ $ provider = $ eligibleProviders [$ taskTypeId ];
162167
163- return true ;
164- }
168+ $ output ->writeln (
169+ 'Processing task ' . $ task ->getId () . ' of type ' . $ taskTypeId . ' with provider ' . $ provider ->getId (),
170+ OutputInterface::VERBOSITY_VERBOSE
171+ );
172+
173+ $ this ->taskProcessingManager ->processTask ($ task , $ provider );
174+
175+ $ output ->writeln (
176+ 'Finished processing task ' . $ task ->getId (),
177+ OutputInterface::VERBOSITY_VERBOSE
178+ );
165179
166- return false ;
180+ return true ;
167181 }
168182}
0 commit comments