Skip to content

Commit 19bc0ee

Browse files
committed
Add auto update count processed item while running job
1 parent 33e253b commit 19bc0ee

7 files changed

Lines changed: 29 additions & 7 deletions

File tree

src/Command/ExecuteDataflowCommand.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace CodeRhapsodie\DataflowBundle\Command;
66

7+
use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
78
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
89
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
910
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
@@ -62,7 +63,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6263
$io = new SymfonyStyle($input, $output);
6364

6465
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
65-
$dataflowType->setRepository($this->jobRepository);
66+
if ($dataflowType instanceof RepositoryInterface) {
67+
$dataflowType->setRepository($this->jobRepository);
68+
}
69+
6670
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
6771
$dataflowType->setLogger($this->logger);
6872
}

src/DataflowType/AbstractDataflowType.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use Psr\Log\LoggerInterface;
1111
use Symfony\Component\OptionsResolver\OptionsResolver;
1212

13-
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
13+
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, RepositoryInterface
1414
{
1515
use LoggerAwareTrait;
1616

@@ -28,18 +28,21 @@ public function getAliases(): iterable
2828

2929
public function process(array $options, ?int $jobId = null): Result
3030
{
31+
$this->saveDate = new \DateTime();
32+
3133
$optionsResolver = new OptionsResolver();
3234
$this->configureOptions($optionsResolver);
3335
$options = $optionsResolver->resolve($options);
3436

3537
$builder = $this->createDataflowBuilder();
3638
$builder->setName($this->getLabel());
3739
$builder->addAfterItemProcessors(function (int|string $index, mixed $item, int $count) use ($jobId) {
38-
if ($jobId === null || $this->saveDate->modify('+1 minute') > new \DateTime()) {
40+
if ($jobId === null || $this->saveDate > new \DateTime()) {
3941
return;
4042
}
4143

4244
$this->repository->updateCount($jobId, $count);
45+
$this->saveDate = new \DateTime('+1 minute');
4346
});
4447
$this->buildDataflow($builder, $options);
4548
$dataflow = $builder->getDataflow();

src/DataflowType/Dataflow/Dataflow.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public function process(): Result
9595
$exceptionIndex = $index;
9696
try {
9797
if (is_callable($this->customExceptionIndex)) {
98-
$exceptionIndex = (string)($this->customExceptionIndex)($item, $index);
98+
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
9999
}
100100
} catch (\Throwable $e2) {
101101
$exceptions[$index] = $e2;

src/DataflowType/DataflowTypeInterface.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,4 @@ public function getLabel(): string;
1313
public function getAliases(): iterable;
1414

1515
public function process(array $options, ?int $jobId = null): Result;
16-
17-
public function setRepository(JobRepository $repository): void;
1816
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace CodeRhapsodie\DataflowBundle\DataflowType;
6+
7+
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
8+
9+
interface RepositoryInterface
10+
{
11+
public function setRepository(JobRepository $repository): void;
12+
}

src/Processor/JobProcessor.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace CodeRhapsodie\DataflowBundle\Processor;
66

7+
use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
78
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
89
use CodeRhapsodie\DataflowBundle\Entity\Job;
910
use CodeRhapsodie\DataflowBundle\Event\Events;
@@ -30,7 +31,10 @@ public function process(Job $job): void
3031
$this->beforeProcessing($job);
3132

3233
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
33-
$dataflowType->setRepository($this->repository);
34+
if ($dataflowType instanceof RepositoryInterface) {
35+
$dataflowType->setRepository($this->repository);
36+
}
37+
3438
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
3539
if (isset($this->logger)) {
3640
$loggers[] = $this->logger;

src/Resources/config/services.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ services:
2323
arguments:
2424
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
2525
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
26+
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
2627
tags: ['console.command']
2728

2829
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:

0 commit comments

Comments
 (0)