Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cluster-faces-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
# do not stop on another job's failure
fail-fast: false
matrix:
php-versions: ['8.2']
php-versions: ['8.3']
databases: ['sqlite']
server-versions: ['master']
pure-js-mode: ['false']
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/files-scan-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
# do not stop on another job's failure
fail-fast: false
matrix:
php-versions: ['8.2']
php-versions: ['8.3']
databases: ['sqlite', 'mysql', 'pgsql']
server-versions: ['master']

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/full-run-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
# do not stop on another job's failure
fail-fast: false
matrix:
php-versions: ['8.2']
php-versions: ['8.3']
databases: ['sqlite']
server-versions: ['master']
pure-js-mode: ['false']
Expand All @@ -41,7 +41,7 @@ jobs:
# test pure-js once
- server-versions: master
databases: sqlite
php-versions: 8.2
php-versions: 8.3
pure-js-mode: true
imagenet-enabled: true
faces-enabled: true
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"php": "8.2.0"
},
"allow-plugins": {
"bamarni/composer-bin-plugin": true,
"composer/package-versions-deprecated": true
},
"autoloader-suffix": "Recognize",
Expand Down
15 changes: 15 additions & 0 deletions lib/AppInfo/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
use OCA\DAV\Events\SabrePluginAddEvent;
use OCA\Recognize\Dav\Faces\PropFindPlugin;
use OCA\Recognize\Hooks\FileListener;
use OCA\Recognize\TaskProcessing\AudioClassificationTaskType;
use OCA\Recognize\TaskProcessing\ImageClassificationTaskType;
use OCA\Recognize\TaskProcessing\ImageFaceRecognitionTaskType;
use OCA\Recognize\TaskProcessing\TaskResultListener;
use OCA\Recognize\TaskProcessing\VideoClassificationTaskType;
use OCP\AppFramework\App;
use OCP\AppFramework\Bootstrap\IBootContext;
use OCP\AppFramework\Bootstrap\IBootstrap;
Expand All @@ -23,6 +28,8 @@
use OCP\Files\Events\Node\NodeDeletedEvent;
use OCP\Files\Events\Node\NodeRenamedEvent;
use OCP\Files\Events\NodeRemovedFromCache;
use OCP\TaskProcessing\Events\TaskFailedEvent;
use OCP\TaskProcessing\Events\TaskSuccessfulEvent;

final class Application extends App implements IBootstrap {
public const APP_ID = 'recognize';
Expand All @@ -44,6 +51,9 @@ public function __construct() {
$dispatcher->addServiceListener('OCP\Files\Config\Event\UserMountRemovedEvent', FileListener::class);
// it is not fired as of now, Added and Removed events are fired instead in that order
// $context->addServiceListener('OCP\Files\Config\Event\UserMountUpdatedEvent', FileListener::class);

$dispatcher->addServiceListener(TaskSuccessfulEvent::class, TaskResultListener::class);
$dispatcher->addServiceListener(TaskFailedEvent::class, TaskResultListener::class);
}

public function register(IRegistrationContext $context): void {
Expand All @@ -53,6 +63,11 @@ public function register(IRegistrationContext $context): void {

/** Register $principalBackend for the DAV collection */
$context->registerServiceAlias('principalBackend', Principal::class);

$context->registerTaskProcessingTaskType(ImageClassificationTaskType::class);
$context->registerTaskProcessingTaskType(VideoClassificationTaskType::class);
$context->registerTaskProcessingTaskType(AudioClassificationTaskType::class);
$context->registerTaskProcessingTaskType(ImageFaceRecognitionTaskType::class);
}

/**
Expand Down
18 changes: 11 additions & 7 deletions lib/BackgroundJobs/ClassifierJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function __construct(
private SettingsService $settingsService,
) {
parent::__construct($time);
$this->setInterval(60 * 5);
$this->setInterval(60);
$this->setTimeSensitivity(self::TIME_INSENSITIVE);
$this->setAllowParallelRuns($settingsService->getSetting('concurrency.enabled') === 'true');
}
Expand All @@ -38,10 +38,13 @@ public function __construct(
* @param array{storageId: int, rootId: int} $argument
*/
protected function runClassifier(string $model, array $argument): void {
sleep(10);
if ($this->settingsService->getSetting('concurrency.enabled') !== 'true' && $this->anyOtherClassifierJobsRunning()) {
$this->logger->debug('Stalling job '.static::class.' with argument ' . var_export($argument, true) . ' because other classifiers are already reserved');
return;
$taskProcessingMode = $this->settingsService->getSetting('taskprocessing.enabled') === 'true';
if (!$taskProcessingMode) {
sleep(10);
if ($this->settingsService->getSetting('concurrency.enabled') !== 'true' && $this->anyOtherClassifierJobsRunning()) {
$this->logger->debug('Stalling job '.static::class.' with argument ' . var_export($argument, true) . ' because other classifiers are already reserved');
return;
}
}

$storageId = $argument['storageId'];
Expand All @@ -53,9 +56,10 @@ protected function runClassifier(string $model, array $argument): void {
return;
}
$this->logger->debug('Classifying files of storage '.$storageId. ' using '.$model);
$batchSize = $taskProcessingMode ? 500 : $this->getBatchSize();
try {
$this->logger->debug('fetching '.$this->getBatchSize().' files from '.$model.' queue');
$files = $this->queue->getFromQueue($model, $storageId, $rootId, $this->getBatchSize());
$this->logger->debug('fetching '.$batchSize.' files from '.$model.' queue');
$files = $this->queue->getFromQueue($model, $storageId, $rootId, $batchSize);
} catch (Exception $e) {
$this->settingsService->setSetting($model.'.status', 'false');
$this->logger->error('Cannot retrieve items from '.$model.' queue', ['exception' => $e]);
Expand Down
9 changes: 8 additions & 1 deletion lib/BackgroundJobs/ClassifyFacesJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace OCA\Recognize\BackgroundJobs;

use OCA\Recognize\Classifiers\Images\ClusteringFaceClassifier;
use OCA\Recognize\Classifiers\TaskProcessing\ImageFaceRecognitionClassifier as TaskProcessingFaceClassifier;
use OCA\Recognize\Service\Logger;
use OCA\Recognize\Service\QueueService;
use OCA\Recognize\Service\SettingsService;
Expand All @@ -20,11 +21,13 @@ final class ClassifyFacesJob extends ClassifierJob {

private SettingsService $settingsService;
private ClusteringFaceClassifier $faces;
private TaskProcessingFaceClassifier $tpFaces;

public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, ClusteringFaceClassifier $faceClassifier, IUserMountCache $mountCache, IJobList $jobList) {
public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, ClusteringFaceClassifier $faceClassifier, TaskProcessingFaceClassifier $tpFaces, IUserMountCache $mountCache, IJobList $jobList) {
parent::__construct($time, $logger, $queue, $mountCache, $jobList, $settingsService);
$this->settingsService = $settingsService;
$this->faces = $faceClassifier;
$this->tpFaces = $tpFaces;
}

/**
Expand All @@ -39,6 +42,10 @@ protected function run($argument): void {
* @return void
*/
protected function classify(array $files) : void {
if ($this->settingsService->getSetting('taskprocessing.enabled') === 'true') {
$this->tpFaces->classify($files);
return;
}
$this->faces->classify($files);
}

Expand Down
9 changes: 8 additions & 1 deletion lib/BackgroundJobs/ClassifyImagenetJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace OCA\Recognize\BackgroundJobs;

use OCA\Recognize\Classifiers\Images\ImagenetClassifier;
use OCA\Recognize\Classifiers\TaskProcessing\ImageClassifier as TaskProcessingImageClassifier;
use OCA\Recognize\Service\Logger;
use OCA\Recognize\Service\QueueService;
use OCA\Recognize\Service\SettingsService;
Expand All @@ -20,11 +21,13 @@ final class ClassifyImagenetJob extends ClassifierJob {

private SettingsService $settingsService;
private ImagenetClassifier $imagenet;
private TaskProcessingImageClassifier $tpImage;

public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, ImagenetClassifier $imagenet, IUserMountCache $mountCache, IJobList $jobList) {
public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, ImagenetClassifier $imagenet, TaskProcessingImageClassifier $tpImage, IUserMountCache $mountCache, IJobList $jobList) {
parent::__construct($time, $logger, $queue, $mountCache, $jobList, $settingsService);
$this->settingsService = $settingsService;
$this->imagenet = $imagenet;
$this->tpImage = $tpImage;
}

/**
Expand All @@ -39,6 +42,10 @@ protected function run($argument): void {
* @return void
*/
protected function classify(array $files) : void {
if ($this->settingsService->getSetting('taskprocessing.enabled') === 'true') {
$this->tpImage->classify($files);
return;
}
$this->imagenet->classify($files);
}

Expand Down
9 changes: 8 additions & 1 deletion lib/BackgroundJobs/ClassifyMovinetJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
declare(strict_types=1);
namespace OCA\Recognize\BackgroundJobs;

use OCA\Recognize\Classifiers\TaskProcessing\VideoClassifier as TaskProcessingVideoClassifier;
use OCA\Recognize\Classifiers\Video\MovinetClassifier;
use OCA\Recognize\Service\Logger;
use OCA\Recognize\Service\QueueService;
Expand All @@ -20,11 +21,13 @@ final class ClassifyMovinetJob extends ClassifierJob {

private SettingsService $settingsService;
private MovinetClassifier $movinet;
private TaskProcessingVideoClassifier $tpVideo;

public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, MovinetClassifier $movinet, IUserMountCache $mountCache, IJobList $jobList) {
public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, MovinetClassifier $movinet, TaskProcessingVideoClassifier $tpVideo, IUserMountCache $mountCache, IJobList $jobList) {
parent::__construct($time, $logger, $queue, $mountCache, $jobList, $settingsService);
$this->settingsService = $settingsService;
$this->movinet = $movinet;
$this->tpVideo = $tpVideo;
}

/**
Expand All @@ -40,6 +43,10 @@ protected function run($argument): void {
* @throws \OCA\Recognize\Exception\Exception
*/
protected function classify(array $files) : void {
if ($this->settingsService->getSetting('taskprocessing.enabled') === 'true') {
$this->tpVideo->classify($files);
return;
}
$this->movinet->classify($files);
}

Expand Down
9 changes: 8 additions & 1 deletion lib/BackgroundJobs/ClassifyMusicnnJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace OCA\Recognize\BackgroundJobs;

use OCA\Recognize\Classifiers\Audio\MusicnnClassifier;
use OCA\Recognize\Classifiers\TaskProcessing\AudioClassifier as TaskProcessingAudioClassifier;
use OCA\Recognize\Service\Logger;
use OCA\Recognize\Service\QueueService;
use OCA\Recognize\Service\SettingsService;
Expand All @@ -20,11 +21,13 @@ final class ClassifyMusicnnJob extends ClassifierJob {

private SettingsService $settingsService;
private MusicnnClassifier $musicnn;
private TaskProcessingAudioClassifier $tpAudio;

public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, MusicnnClassifier $musicnn, IUserMountCache $mountCache, IJobList $jobList) {
public function __construct(ITimeFactory $time, Logger $logger, QueueService $queue, SettingsService $settingsService, MusicnnClassifier $musicnn, TaskProcessingAudioClassifier $tpAudio, IUserMountCache $mountCache, IJobList $jobList) {
parent::__construct($time, $logger, $queue, $mountCache, $jobList, $settingsService);
$this->settingsService = $settingsService;
$this->musicnn = $musicnn;
$this->tpAudio = $tpAudio;
}

/**
Expand All @@ -39,6 +42,10 @@ protected function run($argument): void {
* @return void
*/
protected function classify(array $files) : void {
if ($this->settingsService->getSetting('taskprocessing.enabled') === 'true') {
$this->tpAudio->classify($files);
return;
}
$this->musicnn->classify($files);
}

Expand Down
114 changes: 114 additions & 0 deletions lib/Classifiers/AbstractTaskProcessingClassifier.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

declare(strict_types=1);

/**
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

namespace OCA\Recognize\Classifiers;

use OCA\Recognize\AppInfo\Application;
use OCA\Recognize\Db\QueueFile;
use OCA\Recognize\Service\QueueService;
use OCP\DB\Exception;
use OCP\Files\Config\ICachedMountInfo;
use OCP\Files\Config\IUserMountCache;
use OCP\TaskProcessing\IManager as ITaskProcessingManager;
use OCP\TaskProcessing\Task;
use Psr\Log\LoggerInterface;

abstract class AbstractTaskProcessingClassifier {
public function __construct(
protected LoggerInterface $logger,
protected ITaskProcessingManager $taskProcessingManager,
protected IUserMountCache $userMountCache,
protected QueueService $queue,
) {
}

/**
* The TaskProcessing task type id this classifier schedules.
*/
abstract protected function getTaskTypeId(): string;

/**
* The queue model name this classifier consumes.
*/
abstract protected function getModelName(): string;

/**
* Schedule a TaskProcessing task for the given batch of queue files.
*
* Results will be applied asynchronously by {@see \OCA\Recognize\TaskProcessing\TaskResultListener}
* when the provider emits a TaskSuccessfulEvent.
*
* @param list<QueueFile> $queueFiles
*/
public function classify(array $queueFiles): void {
if (count($queueFiles) === 0) {
return;
}

$storageId = $queueFiles[0]->getStorageId();
$rootId = $queueFiles[0]->getRootId();
$userId = $this->findUserForStorage($storageId, $rootId);
if ($userId === null) {
$this->logger->warning('No user with access for storage ' . $storageId . '/' . $rootId . ' found; dropping ' . count($queueFiles) . ' files from ' . $this->getModelName() . ' queue');
$this->dropFromQueue($queueFiles);
return;
}

$fileIds = array_values(array_unique(array_map(static fn (QueueFile $qf): int => $qf->getFileId(), $queueFiles)));

$task = new Task(
$this->getTaskTypeId(),
['input' => $fileIds],
Application::APP_ID,
$userId,
$this->getModelName(),
);

try {
$this->taskProcessingManager->scheduleTask($task);
} catch (\Throwable $e) {
// Leave files in the queue so they can be retried on the next job run
$this->logger->error('Failed to schedule ' . $this->getTaskTypeId() . ' task', ['exception' => $e]);
throw new \RuntimeException('Could not schedule ' . $this->getTaskTypeId() . ' task', 0, $e);
}

/**
* @psalm-suppress PossiblyNullOperand
* @psalm-suppress InvalidOperand
*/
$this->logger->debug('Scheduled ' . $this->getTaskTypeId() . ' task #' . $task->getId() . ' for ' . count($fileIds) . ' files');

// Once scheduled, files leave the queue. The TaskResultListener applies results when the task completes.
$this->dropFromQueue($queueFiles);
}

private function findUserForStorage(int $storageId, int $rootId): ?string {
$mounts = array_values(array_filter(
$this->userMountCache->getMountsForStorageId($storageId),
static fn (ICachedMountInfo $m): bool => $m->getRootId() === $rootId,
));
if (count($mounts) === 0) {
return null;
}
return $mounts[0]->getUser()->getUID();
}

/**
* @param list<QueueFile> $queueFiles
*/
private function dropFromQueue(array $queueFiles): void {
foreach ($queueFiles as $qf) {
try {
$this->queue->removeFromQueue($this->getModelName(), $qf);
} catch (Exception $e) {
$this->logger->warning('Could not remove file ' . $qf->getFileId() . ' from ' . $this->getModelName() . ' queue', ['exception' => $e]);
}
}
}
}
Loading
Loading