1616
1717use CodeRhapsodie \DataflowBundle \DataflowType \Result ;
1818use CodeRhapsodie \DataflowBundle \DataflowType \Writer \WriterInterface ;
19- use Psr \Log \LoggerAwareInterface ;
2019use Psr \Log \LoggerAwareTrait ;
2120
22- class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
21+ class AMPAsyncDataflow implements DataflowInterface
2322{
2423 use LoggerAwareTrait;
2524
@@ -65,7 +64,7 @@ public function addWriter(WriterInterface $writer): self
6564 public function process (): Result
6665 {
6766 $ count = 0 ;
68- $ exceptions = [] ;
67+ $ countExceptions = 0 ;
6968 $ startTime = new \DateTime ();
7069
7170 try {
@@ -82,7 +81,7 @@ public function process(): Result
8281 }
8382 });
8483
85- $ watcherId = Loop::repeat ($ this ->loopInterval , function () use ($ deferred , &$ resolved , $ producer , &$ count , &$ exceptions ) {
84+ $ watcherId = Loop::repeat ($ this ->loopInterval , function () use ($ deferred , &$ resolved , $ producer , &$ count , &$ countExceptions ) {
8685 if (yield $ producer ->advance ()) {
8786 $ it = $ producer ->getCurrent ();
8887 [$ index , $ item ] = $ it ;
@@ -93,7 +92,7 @@ public function process(): Result
9392 }
9493
9594 foreach ($ this ->states as $ state ) {
96- $ this ->processState ($ state , $ count , $ exceptions );
95+ $ this ->processState ($ state , $ count , $ countExceptions );
9796 }
9897 });
9998
@@ -104,18 +103,14 @@ public function process(): Result
104103 $ writer ->finish ();
105104 }
106105 } catch (\Throwable $ e ) {
107- $ exceptions [] = $ e ;
106+ ++ $ countExceptions ;
108107 $ this ->logException ($ e );
109108 }
110109
111- return new Result ($ this ->name , $ startTime , new \DateTime (), $ count , $ exceptions );
110+ return new Result ($ this ->name , $ startTime , new \DateTime (), $ count , $ countExceptions );
112111 }
113112
114- /**
115- * @param int $count internal count reference
116- * @param array $exceptions internal exceptions
117- */
118- private function processState (mixed $ state , int &$ count , array &$ exceptions ): void
113+ private function processState (mixed $ state , int &$ count , int &$ countExceptions ): void
119114 {
120115 [$ readIndex , $ stepIndex , $ item ] = $ state ;
121116 if ($ stepIndex < \count ($ this ->steps )) {
@@ -127,9 +122,9 @@ private function processState(mixed $state, int &$count, array &$exceptions): vo
127122 $ this ->stepsJobs [$ stepIndex ][$ readIndex ] = true ;
128123 /** @var Promise<void> $promise */
129124 $ promise = coroutine ($ step )($ item );
130- $ promise ->onResolve (function (?\Throwable $ exception = null , $ newItem = null ) use ($ stepIndex , $ readIndex , &$ exceptions ) {
125+ $ promise ->onResolve (function (?\Throwable $ exception = null , $ newItem = null ) use ($ stepIndex , $ readIndex , &$ countExceptions ): void {
131126 if ($ exception ) {
132- $ exceptions [ $ stepIndex ] = $ exception ;
127+ ++ $ countExceptions ;
133128 $ this ->logException ($ exception , (string ) $ stepIndex );
134129 } elseif ($ newItem === false ) {
135130 unset($ this ->states [$ readIndex ]);
0 commit comments