|
19 | 19 | use OCP\App\IAppManager; |
20 | 20 | use OCP\AppFramework\Db\DoesNotExistException; |
21 | 21 | use OCP\AppFramework\Db\MultipleObjectsReturnedException; |
| 22 | +use OCP\AppFramework\Utility\ITimeFactory; |
22 | 23 | use OCP\BackgroundJob\IJobList; |
23 | 24 | use OCP\DB\Exception; |
24 | 25 | use OCP\EventDispatcher\IEventDispatcher; |
|
59 | 60 | use OCP\TaskProcessing\IInternalTaskType; |
60 | 61 | use OCP\TaskProcessing\IManager; |
61 | 62 | use OCP\TaskProcessing\IProvider; |
62 | | -use OCP\TaskProcessing\ISynchronousOptionsProvider; |
| 63 | +use OCP\TaskProcessing\ISynchronousOptionsAwareProvider; |
63 | 64 | use OCP\TaskProcessing\ISynchronousProvider; |
64 | 65 | use OCP\TaskProcessing\ISynchronousWatermarkingProvider; |
65 | 66 | use OCP\TaskProcessing\ITaskType; |
@@ -158,6 +159,7 @@ public function __construct( |
158 | 159 | private IUserSession $userSession, |
159 | 160 | ICacheFactory $cacheFactory, |
160 | 161 | private IFactory $l10nFactory, |
| 162 | + private ITimeFactory $timeFactory, |
161 | 163 | ) { |
162 | 164 | $this->appData = $appDataFactory->get('core'); |
163 | 165 | $this->distributedCache = $cacheFactory->createDistributed('task_processing::'); |
@@ -1134,7 +1136,7 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { |
1134 | 1136 | $this->setTaskStatus($task, Task::STATUS_RUNNING); |
1135 | 1137 | if ($provider instanceof ISynchronousWatermarkingProvider) { |
1136 | 1138 | $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); |
1137 | | - } elseif ($provider instanceof ISynchronousOptionsProvider) { |
| 1139 | + } elseif ($provider instanceof ISynchronousOptionsAwareProvider) { |
1138 | 1140 | $options = new SynchronousProviderOptions( |
1139 | 1141 | $task->getIncludeWatermark(), |
1140 | 1142 | $task->getPreferStreaming(), |
@@ -1249,14 +1251,17 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { |
1249 | 1251 | 'message' => 'task_' . $task->getId(), |
1250 | 1252 | 'body' => $output, |
1251 | 1253 | ]); |
1252 | | - // we don't update the DB if something was sent via notify_push |
1253 | | - // so if the push messages are not received for some reason, the polling will still not see any intermediate output |
1254 | | - // but will receive the final output |
1255 | | - return true; |
1256 | 1254 | } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { |
1257 | 1255 | $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); |
1258 | 1256 | } |
1259 | 1257 | } |
| 1258 | + |
| 1259 | + // throttle DB update |
| 1260 | + $now = $this->timeFactory->now()->getTimestamp(); |
| 1261 | + if ($now - $task->getLastUpdated() < 2) { |
| 1262 | + return true; |
| 1263 | + } |
| 1264 | + |
1260 | 1265 | // no output shape validation for now |
1261 | 1266 | $task->setOutput($output); |
1262 | 1267 | $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); |
|
0 commit comments