Skip to content

Commit 33e253b

Browse files
committed
Add auto update count processed item while running job
1 parent 47af0e2 commit 33e253b

8 files changed

Lines changed: 73 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Version 5.3.0
2+
* Added auto update count processed item while running job
3+
14
# Version 5.2.0
25
* Added custom index for job status
36

src/Command/ExecuteDataflowCommand.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
88
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
9+
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
910
use Psr\Log\LoggerAwareInterface;
1011
use Psr\Log\LoggerAwareTrait;
1112
use Symfony\Component\Console\Attribute\AsCommand;
@@ -26,7 +27,7 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
2627
{
2728
use LoggerAwareTrait;
2829

29-
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
30+
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository)
3031
{
3132
parent::__construct();
3233
}
@@ -61,6 +62,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6162
$io = new SymfonyStyle($input, $output);
6263

6364
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
65+
$dataflowType->setRepository($this->jobRepository);
6466
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
6567
$dataflowType->setLogger($this->logger);
6668
}

src/DataflowType/AbstractDataflowType.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace CodeRhapsodie\DataflowBundle\DataflowType;
66

7+
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
78
use Psr\Log\LoggerAwareInterface;
89
use Psr\Log\LoggerAwareTrait;
910
use Psr\Log\LoggerInterface;
@@ -13,6 +14,10 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
1314
{
1415
use LoggerAwareTrait;
1516

17+
private JobRepository $repository;
18+
19+
private ?\DateTime $saveDate = null;
20+
1621
/**
1722
* @codeCoverageIgnore
1823
*/
@@ -21,14 +26,21 @@ public function getAliases(): iterable
2126
return [];
2227
}
2328

24-
public function process(array $options): Result
29+
public function process(array $options, ?int $jobId = null): Result
2530
{
2631
$optionsResolver = new OptionsResolver();
2732
$this->configureOptions($optionsResolver);
2833
$options = $optionsResolver->resolve($options);
2934

3035
$builder = $this->createDataflowBuilder();
3136
$builder->setName($this->getLabel());
37+
$builder->addAfterItemProcessors(function (int|string $index, mixed $item, int $count) use ($jobId) {
38+
if ($jobId === null || $this->saveDate->modify('+1 minute') > new \DateTime()) {
39+
return;
40+
}
41+
42+
$this->repository->updateCount($jobId, $count);
43+
});
3244
$this->buildDataflow($builder, $options);
3345
$dataflow = $builder->getDataflow();
3446
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
@@ -51,4 +63,9 @@ protected function configureOptions(OptionsResolver $optionsResolver): void
5163
}
5264

5365
abstract protected function buildDataflow(DataflowBuilder $builder, array $options): void;
66+
67+
public function setRepository(JobRepository $repository): void
68+
{
69+
$this->repository = $repository;
70+
}
5471
}

src/DataflowType/Dataflow/Dataflow.php

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
2121

2222
private ?\Closure $customExceptionIndex = null;
2323

24+
private ?\DateTimeInterface $dateTime = null;
25+
26+
/**
27+
* @var \Closure[]
28+
*/
29+
private array $afterItemProcessors = [];
30+
2431
public function __construct(private iterable $reader, private ?string $name)
2532
{
2633
}
@@ -55,6 +62,18 @@ public function setCustomExceptionIndex(callable $callable): self
5562
return $this;
5663
}
5764

65+
/**
66+
* @param array<callable> $processors
67+
*/
68+
public function setAfterItemProcessors(array $processors): self
69+
{
70+
$this->afterItemProcessors = array_map(function (callable $callable) {
71+
return \Closure::fromCallable($callable);
72+
}, $processors);
73+
74+
return $this;
75+
}
76+
5877
/**
5978
* {@inheritdoc}
6079
*/
@@ -76,7 +95,7 @@ public function process(): Result
7695
$exceptionIndex = $index;
7796
try {
7897
if (is_callable($this->customExceptionIndex)) {
79-
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
98+
$exceptionIndex = (string)($this->customExceptionIndex)($item, $index);
8099
}
81100
} catch (\Throwable $e2) {
82101
$exceptions[$index] = $e2;
@@ -87,6 +106,10 @@ public function process(): Result
87106
}
88107

89108
++$count;
109+
110+
foreach ($this->afterItemProcessors as $afterItemProcessor) {
111+
$afterItemProcessor($index, $item, $count);
112+
}
90113
}
91114

92115
foreach ($this->writers as $writer) {

src/DataflowType/DataflowBuilder.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ class DataflowBuilder
1818
private array $writers = [];
1919

2020
private ?\Closure $customExceptionIndex = null;
21+
/**
22+
* @var \Closure[]
23+
*/
24+
private array $afterItemProcessors = [];
2125

2226
public function setName(string $name): self
2327
{
@@ -54,6 +58,13 @@ public function setCustomExceptionIndex(callable $callable): self
5458
return $this;
5559
}
5660

61+
public function addAfterItemProcessors(callable $callable): self
62+
{
63+
$this->afterItemProcessors[] = \Closure::fromCallable($callable);
64+
65+
return $this;
66+
}
67+
5768
public function getDataflow(): DataflowInterface
5869
{
5970
$dataflow = new Dataflow($this->reader, $this->name);
@@ -73,6 +84,8 @@ public function getDataflow(): DataflowInterface
7384
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
7485
}
7586

87+
$dataflow->setAfterItemProcessors($this->afterItemProcessors);
88+
7689
return $dataflow;
7790
}
7891
}

src/DataflowType/DataflowTypeInterface.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44

55
namespace CodeRhapsodie\DataflowBundle\DataflowType;
66

7+
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
8+
79
interface DataflowTypeInterface
810
{
911
public function getLabel(): string;
1012

1113
public function getAliases(): iterable;
1214

13-
public function process(array $options): Result;
15+
public function process(array $options, ?int $jobId = null): Result;
16+
17+
public function setRepository(JobRepository $repository): void;
1418
}

src/Processor/JobProcessor.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public function process(Job $job): void
3030
$this->beforeProcessing($job);
3131

3232
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
33+
$dataflowType->setRepository($this->repository);
3334
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
3435
if (isset($this->logger)) {
3536
$loggers[] = $this->logger;
@@ -40,7 +41,7 @@ public function process(Job $job): void
4041
$dataflowType->setLogger($logger);
4142
}
4243

43-
$result = $dataflowType->process($job->getOptions());
44+
$result = $dataflowType->process($job->getOptions(), $job->getId());
4445

4546
if (!$dataflowType instanceof LoggerAwareInterface) {
4647
foreach ($result->getExceptions() as $index => $e) {

src/Repository/JobRepository.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ public function save(Job $job)
137137
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields());
138138
}
139139

140+
public function updateCount(int $jobId, int $count): void
141+
{
142+
$this->connection->update(static::TABLE_NAME, ['count' => $count], ['id' => $jobId]);
143+
}
144+
140145
public function createQueryBuilder($alias = null): QueryBuilder
141146
{
142147
$qb = $this->connection->createQueryBuilder();

0 commit comments

Comments
 (0)