Skip to content

Commit 8f4f8e7

Browse files
committed
* Add streamed exceptions
1 parent 6ebbdbd commit 8f4f8e7

14 files changed

Lines changed: 103 additions & 87 deletions

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
"phpunit/phpunit": "^11",
6565
"portphp/portphp": "^1.9",
6666
"rector/rector": "^2.0",
67-
"symfony/messenger": "^7.0"
67+
"symfony/messenger": "^7.0",
68+
"league/flysystem": "3.*"
6869
},
6970
"suggest": {
7071
"amphp/amp": "Provide asynchronous steps for your dataflows",

src/DataflowType/AbstractDataflowType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function process(array $options, ?int $jobId = null): Result
4646
});
4747
$this->buildDataflow($builder, $options);
4848
$dataflow = $builder->getDataflow();
49-
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
49+
if ($this->logger instanceof LoggerInterface) {
5050
$dataflow->setLogger($this->logger);
5151
}
5252

src/DataflowType/Dataflow/Dataflow.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use Psr\Log\LoggerAwareInterface;
1010
use Psr\Log\LoggerAwareTrait;
1111

12-
class Dataflow implements DataflowInterface, LoggerAwareInterface
12+
class Dataflow implements DataflowInterface
1313
{
1414
use LoggerAwareTrait;
1515

@@ -75,7 +75,7 @@ public function setAfterItemProcessors(array $processors): self
7575
public function process(): Result
7676
{
7777
$count = 0;
78-
$exceptions = [];
78+
$countExceptions = 0;
7979
$startTime = new \DateTime();
8080

8181
try {
@@ -93,10 +93,10 @@ public function process(): Result
9393
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
9494
}
9595
} catch (\Throwable $e2) {
96-
$exceptions[$index] = $e2;
96+
++$countExceptions;
9797
$this->logException($e2, $index);
9898
}
99-
$exceptions[$exceptionIndex] = $e;
99+
++$countExceptions;
100100
$this->logException($e, $exceptionIndex);
101101
}
102102

@@ -111,11 +111,11 @@ public function process(): Result
111111
$writer->finish();
112112
}
113113
} catch (\Throwable $e) {
114-
$exceptions[] = $e;
114+
++$countExceptions;
115115
$this->logException($e);
116116
}
117117

118-
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
118+
return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions);
119119
}
120120

121121
private function processItem(mixed $item): void

src/DataflowType/Dataflow/DataflowInterface.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
66

77
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
8+
use Psr\Log\LoggerAwareInterface;
89

910
/**
1011
* Combines a reader, steps and writers as a data processing workflow.
1112
*/
12-
interface DataflowInterface
13+
interface DataflowInterface extends LoggerAwareInterface
1314
{
1415
/**
1516
* Processes the data.

src/DataflowType/DataflowTypeInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
namespace CodeRhapsodie\DataflowBundle\DataflowType;
66

7-
interface DataflowTypeInterface
7+
use Psr\Log\LoggerAwareInterface;
8+
9+
interface DataflowTypeInterface extends LoggerAwareInterface
810
{
911
public function getLabel(): string;
1012

src/DataflowType/Result.php

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,12 @@ class Result
1111
{
1212
private \DateInterval $elapsed;
1313

14-
private int $errorCount = 0;
15-
1614
private int $successCount = 0;
1715

18-
private array $exceptions;
19-
20-
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
16+
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, private int $errorCount)
2117
{
2218
$this->elapsed = $startTime->diff($endTime);
23-
$this->errorCount = \count($exceptions);
2419
$this->successCount = $totalProcessedCount - $this->errorCount;
25-
$this->exceptions = $exceptions;
2620
}
2721

2822
public function getName(): string

src/Entity/Job.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ class Job
6161

6262
private ?\DateTimeInterface $endTime = null;
6363

64+
/**
65+
* @var resource|null
66+
*/
67+
private $streamExceptions = null;
68+
6469
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
6570
{
6671
return (new static())
@@ -243,4 +248,22 @@ public function setEndTime(?\DateTimeInterface $endTime): self
243248

244249
return $this;
245250
}
251+
252+
/**
253+
* @return resource|null
254+
*/
255+
public function getStreamExceptions()
256+
{
257+
return $this->streamExceptions;
258+
}
259+
260+
/**
261+
* @param resource $streamExceptions
262+
*/
263+
public function setStreamExceptions($streamExceptions): self
264+
{
265+
$this->streamExceptions = $streamExceptions;
266+
267+
return $this;
268+
}
246269
}

src/ExceptionsHandler/ExceptionHandlerInterface.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
interface ExceptionHandlerInterface
88
{
9-
public function save(?int $jobId, ?array $exceptions): void;
9+
/** @param resource $exceptions */
10+
public function save(?int $jobId, $exceptions): void;
1011

11-
public function find(int $jobId): ?array;
12+
/** @return null|resource */
13+
public function find(int $jobId);
1214
}

src/ExceptionsHandler/FilesystemExceptionHandler.php

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,51 @@ public function __construct(private Filesystem $filesystem)
1313
{
1414
}
1515

16-
public function save(?int $jobId, ?array $exceptions): void
16+
/** @inheritDoc */
17+
public function save(?int $jobId, $exceptions): void
1718
{
18-
if ($jobId === null || empty($exceptions)) {
19+
if ($jobId === null || stream_get_contents($exceptions, 1) === false) {
1920
return;
2021
}
2122

22-
$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
23+
$path = \sprintf('dataflow-job-%s.log', $jobId);
24+
rewind($exceptions);
25+
26+
if ($this->filesystem->fileExists($path)) {
27+
$existingStream = $this->filesystem->readStream($path);
28+
29+
$combined = fopen('php://temp', 'r+');
30+
31+
stream_copy_to_stream($existingStream, $combined);
32+
stream_copy_to_stream($exceptions, $combined);
33+
34+
rewind($combined);
35+
36+
$this->filesystem->delete($path);
37+
$this->filesystem->writeStream($path, $combined);
38+
39+
fclose($existingStream);
40+
fclose($combined);
41+
42+
return;
43+
}
44+
45+
$this->filesystem->writeStream($path, $exceptions);
46+
47+
fclose($exceptions);
2348
}
2449

25-
public function find(int $jobId): ?array
50+
/** @inheritDoc */
51+
public function find(int $jobId)
2652
{
2753
try {
2854
if (!$this->filesystem->fileExists(\sprintf('dataflow-job-%s.log', $jobId))) {
29-
return [];
55+
return null;
3056
}
3157

32-
return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
58+
return $this->filesystem->readStream(\sprintf('dataflow-job-%s.log', $jobId));
3359
} catch (FilesystemException) {
34-
return [];
60+
return null;
3561
}
3662
}
3763
}

src/ExceptionsHandler/NullExceptionHandler.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66

77
class NullExceptionHandler implements ExceptionHandlerInterface
88
{
9-
public function save(?int $jobId, ?array $exceptions): void
9+
/** @inheritDoc */
10+
public function save(?int $jobId, $exceptions): void
1011
{
1112
}
1213

13-
public function find(int $jobId): ?array
14+
/** @inheritDoc */
15+
public function find(int $jobId)
1416
{
1517
return null;
1618
}

0 commit comments

Comments
 (0)