Skip to content

Commit b3eff9a

Browse files
committed
* Add streamed exceptions
1 parent c2133c6 commit b3eff9a

13 files changed

Lines changed: 101 additions & 86 deletions

File tree

src/DataflowType/AbstractDataflowType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function process(array $options, ?int $jobId = null): Result
4646
});
4747
$this->buildDataflow($builder, $options);
4848
$dataflow = $builder->getDataflow();
49-
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
49+
if ($this->logger instanceof LoggerInterface) {
5050
$dataflow->setLogger($this->logger);
5151
}
5252

src/DataflowType/Dataflow/Dataflow.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use Psr\Log\LoggerAwareInterface;
1010
use Psr\Log\LoggerAwareTrait;
1111

12-
class Dataflow implements DataflowInterface, LoggerAwareInterface
12+
class Dataflow implements DataflowInterface
1313
{
1414
use LoggerAwareTrait;
1515

@@ -73,7 +73,7 @@ public function setAfterItemProcessors(array $processors): self
7373
public function process(): Result
7474
{
7575
$count = 0;
76-
$exceptions = [];
76+
$countExceptions = 0;
7777
$startTime = new \DateTime();
7878

7979
try {
@@ -91,10 +91,10 @@ public function process(): Result
9191
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
9292
}
9393
} catch (\Throwable $e2) {
94-
$exceptions[$index] = $e2;
94+
++$countExceptions;
9595
$this->logException($e2, $index);
9696
}
97-
$exceptions[$exceptionIndex] = $e;
97+
++$countExceptions;
9898
$this->logException($e, $exceptionIndex);
9999
}
100100

@@ -109,11 +109,11 @@ public function process(): Result
109109
$writer->finish();
110110
}
111111
} catch (\Throwable $e) {
112-
$exceptions[] = $e;
112+
++$countExceptions;
113113
$this->logException($e);
114114
}
115115

116-
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
116+
return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions);
117117
}
118118

119119
private function processItem(mixed $item): void

src/DataflowType/Dataflow/DataflowInterface.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
66

77
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
8+
use Psr\Log\LoggerAwareInterface;
89

910
/**
1011
* Combines a reader, steps and writers as a data processing workflow.
1112
*/
12-
interface DataflowInterface
13+
interface DataflowInterface extends LoggerAwareInterface
1314
{
1415
/**
1516
* Processes the data.

src/DataflowType/DataflowTypeInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
namespace CodeRhapsodie\DataflowBundle\DataflowType;
66

7-
interface DataflowTypeInterface
7+
use Psr\Log\LoggerAwareInterface;
8+
9+
interface DataflowTypeInterface extends LoggerAwareInterface
810
{
911
public function getLabel(): string;
1012

src/DataflowType/Result.php

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,12 @@ class Result
1111
{
1212
private readonly \DateInterval $elapsed;
1313

14-
private int $errorCount = 0;
15-
1614
private int $successCount = 0;
1715

18-
private readonly array $exceptions;
19-
20-
public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, array $exceptions)
16+
public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, private int $errorCount)
2117
{
2218
$this->elapsed = $startTime->diff($endTime);
23-
$this->errorCount = \count($exceptions);
2419
$this->successCount = $totalProcessedCount - $this->errorCount;
25-
$this->exceptions = $exceptions;
2620
}
2721

2822
public function getName(): string

src/Entity/Job.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ class Job
6262

6363
private ?\DateTimeInterface $endTime = null;
6464

65+
/**
66+
* @var resource|null
67+
*/
68+
private $streamExceptions = null;
69+
6570
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
6671
{
6772
return (new self())
@@ -244,4 +249,22 @@ public function setEndTime(?\DateTimeInterface $endTime): self
244249

245250
return $this;
246251
}
252+
253+
/**
254+
* @return resource|null
255+
*/
256+
public function getStreamExceptions()
257+
{
258+
return $this->streamExceptions;
259+
}
260+
261+
/**
262+
* @param resource $streamExceptions
263+
*/
264+
public function setStreamExceptions($streamExceptions): self
265+
{
266+
$this->streamExceptions = $streamExceptions;
267+
268+
return $this;
269+
}
247270
}

src/ExceptionsHandler/ExceptionHandlerInterface.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66

77
interface ExceptionHandlerInterface
88
{
9-
public function save(?int $jobId, ?array $exceptions): void;
9+
/** @param resource $exceptions */
10+
public function save(?int $jobId, $exceptions): void;
1011

11-
public function find(int $jobId): ?array;
12+
/** @return null|resource */
13+
public function find(int $jobId);
1214

1315
public function delete(int $jobId): void;
1416
}

src/ExceptionsHandler/FilesystemExceptionHandler.php

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,51 @@ public function __construct(private readonly Filesystem $filesystem)
1313
{
1414
}
1515

16-
public function save(?int $jobId, ?array $exceptions): void
16+
/** @inheritDoc */
17+
public function save(?int $jobId, $exceptions): void
1718
{
18-
if ($jobId === null || empty($exceptions)) {
19+
if ($jobId === null || stream_get_contents($exceptions, 1) === false) {
1920
return;
2021
}
2122

22-
$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
23+
$path = \sprintf('dataflow-job-%s.log', $jobId);
24+
rewind($exceptions);
25+
26+
if ($this->filesystem->fileExists($path)) {
27+
$existingStream = $this->filesystem->readStream($path);
28+
29+
$combined = fopen('php://temp', 'r+');
30+
31+
stream_copy_to_stream($existingStream, $combined);
32+
stream_copy_to_stream($exceptions, $combined);
33+
34+
rewind($combined);
35+
36+
$this->filesystem->delete($path);
37+
$this->filesystem->writeStream($path, $combined);
38+
39+
fclose($existingStream);
40+
fclose($combined);
41+
42+
return;
43+
}
44+
45+
$this->filesystem->writeStream($path, $exceptions);
46+
47+
fclose($exceptions);
2348
}
2449

25-
public function find(int $jobId): ?array
50+
/** @inheritDoc */
51+
public function find(int $jobId)
2652
{
2753
try {
2854
if (!$this->filesystem->fileExists(\sprintf('dataflow-job-%s.log', $jobId))) {
29-
return [];
55+
return null;
3056
}
3157

32-
return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
58+
return $this->filesystem->readStream(\sprintf('dataflow-job-%s.log', $jobId));
3359
} catch (FilesystemException) {
34-
return [];
60+
return null;
3561
}
3662
}
3763

src/ExceptionsHandler/NullExceptionHandler.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66

77
class NullExceptionHandler implements ExceptionHandlerInterface
88
{
9-
public function save(?int $jobId, ?array $exceptions): void
9+
/** @inheritDoc */
10+
public function save(?int $jobId, $exceptions): void
1011
{
1112
// Nothing to do
1213
}
1314

14-
public function find(int $jobId): ?array
15+
/** @inheritDoc */
16+
public function find(int $jobId)
1517
{
1618
return null;
1719
}

src/Gateway/JobGateway.php

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,27 @@ public function find(int $jobId): ?Job
1919
{
2020
$job = $this->repository->find($jobId);
2121

22-
return $this->loadExceptions($job);
22+
return $this->loadStreamExceptions($job);
2323
}
2424

2525
public function save(Job $job): void
2626
{
27-
if (!$this->exceptionHandler instanceof NullExceptionHandler) {
28-
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
29-
$job->setExceptions([]);
30-
}
31-
3227
$this->repository->save($job);
3328
}
3429

3530
public function findLastForDataflowId(int $scheduleId): ?Job
3631
{
3732
$job = $this->repository->findLastForDataflowId($scheduleId);
3833

39-
return $this->loadExceptions($job);
34+
return $this->loadStreamExceptions($job);
4035
}
4136

42-
private function loadExceptions(?Job $job): ?Job
37+
private function loadStreamExceptions(?Job $job): ?Job
4338
{
4439
if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) {
4540
return $job;
4641
}
4742

48-
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
49-
50-
return $job->setExceptions($this->exceptionHandler->find($job->getId()));
43+
return $job->setStreamExceptions($this->exceptionHandler->find($job->getId()));
5144
}
5245
}

0 commit comments

Comments
 (0)