@@ -43,10 +43,6 @@ final class ParallelFileProcessor
4343 * @var int
4444 */
4545 private const SYSTEM_ERROR_LIMIT = 50 ;
46- /**
47- * @var \Symplify\EasyCodingStandard\Parallel\ValueObject\ProcessPool|null
48- */
49- private $ processPool = null ;
5046 public function __construct (WorkerCommandLineFactory $ workerCommandLineFactory )
5147 {
5248 $ this ->workerCommandLineFactory = $ workerCommandLineFactory ;
@@ -68,20 +64,20 @@ public function check(Schedule $schedule, string $mainScript, callable $postFile
6864 $ fileDiffs = [];
6965 $ systemErrors = [];
7066 $ tcpServer = new TcpServer ('127.0.0.1:0 ' , $ streamSelectLoop );
71- $ this -> processPool = new ProcessPool ($ tcpServer );
72- $ tcpServer ->on (ReactEvent::CONNECTION , function (ConnectionInterface $ connection ) use (&$ jobs ): void {
67+ $ processPool = new ProcessPool ($ tcpServer );
68+ $ tcpServer ->on (ReactEvent::CONNECTION , function (ConnectionInterface $ connection ) use (&$ jobs, $ processPool ): void {
7369 $ inDecoder = new Decoder ($ connection , \true, 512 , 0 , 4 * 1024 * 1024 );
7470 $ outEncoder = new Encoder ($ connection );
75- $ inDecoder ->on (ReactEvent::DATA , function (array $ data ) use (&$ jobs , $ inDecoder , $ outEncoder ): void {
71+ $ inDecoder ->on (ReactEvent::DATA , function (array $ data ) use (&$ jobs , $ inDecoder , $ outEncoder, $ processPool ): void {
7672 $ action = $ data [ReactCommand::ACTION ];
7773 if ($ action !== Action::HELLO ) {
7874 return ;
7975 }
8076 $ processIdentifier = $ data [Option::PARALLEL_IDENTIFIER ];
81- $ parallelProcess = $ this -> processPool ->getProcess ($ processIdentifier );
77+ $ parallelProcess = $ processPool ->getProcess ($ processIdentifier );
8278 $ parallelProcess ->bindConnection ($ inDecoder , $ outEncoder );
8379 if ($ jobs === []) {
84- $ this -> processPool ->quitProcess ($ processIdentifier );
80+ $ processPool ->quitProcess ($ processIdentifier );
8581 return ;
8682 }
8783 $ job = array_pop ($ jobs );
@@ -94,11 +90,11 @@ public function check(Schedule $schedule, string $mainScript, callable $postFile
9490 $ serverPort = parse_url ($ serverAddress , \PHP_URL_PORT );
9591 $ systemErrorsCount = 0 ;
9692 $ reachedSystemErrorsCountLimit = \false;
97- $ handleErrorCallable = function (Throwable $ throwable ) use (&$ systemErrors , &$ systemErrorsCount , &$ reachedSystemErrorsCountLimit ): void {
93+ $ handleErrorCallable = function (Throwable $ throwable ) use (&$ systemErrors , &$ systemErrorsCount , &$ reachedSystemErrorsCountLimit, $ processPool ): void {
9894 $ systemErrors [] = new SystemError ($ throwable ->getLine (), $ throwable ->getMessage (), $ throwable ->getFile ());
9995 ++$ systemErrorsCount ;
10096 $ reachedSystemErrorsCountLimit = \true;
101- $ this -> processPool ->quitAll ();
97+ $ processPool ->quitAll ();
10298 };
10399 $ timeoutInSeconds = SimpleParameterProvider::getIntParameter (Option::PARALLEL_TIMEOUT_IN_SECONDS );
104100 // options mirrored to each worker sub-process
@@ -113,7 +109,7 @@ public function check(Schedule $schedule, string $mainScript, callable $postFile
113109 $ parallelProcess = new ParallelProcess ($ workerCommandLine , $ streamSelectLoop , $ timeoutInSeconds );
114110 $ parallelProcess ->start (
115111 // 1. callable on data
116- function (array $ json ) use ($ parallelProcess , &$ systemErrors , &$ fileDiffs , &$ codingStandardErrors , &$ jobs , $ postFileCallback , &$ systemErrorsCount , &$ reachedInternalErrorsCountLimit , $ processIdentifier ): void {
112+ function (array $ json ) use ($ parallelProcess , &$ systemErrors , &$ fileDiffs , &$ codingStandardErrors , &$ jobs , $ postFileCallback , &$ systemErrorsCount , &$ reachedInternalErrorsCountLimit , $ processIdentifier, $ processPool ): void {
117113 // decode arrays to objects
118114 foreach ($ json [Bridge::SYSTEM_ERRORS ] as $ jsonError ) {
119115 if (is_string ($ jsonError )) {
@@ -132,10 +128,10 @@ function (array $json) use ($parallelProcess, &$systemErrors, &$fileDiffs, &$cod
132128 $ systemErrorsCount += $ json [Bridge::SYSTEM_ERRORS_COUNT ];
133129 if ($ systemErrorsCount >= self ::SYSTEM_ERROR_LIMIT ) {
134130 $ reachedInternalErrorsCountLimit = \true;
135- $ this -> processPool ->quitAll ();
131+ $ processPool ->quitAll ();
136132 }
137133 if ($ jobs === []) {
138- $ this -> processPool ->quitProcess ($ processIdentifier );
134+ $ processPool ->quitProcess ($ processIdentifier );
139135 return ;
140136 }
141137 $ job = array_pop ($ jobs );
@@ -144,8 +140,8 @@ function (array $json) use ($parallelProcess, &$systemErrors, &$fileDiffs, &$cod
144140 // 2. callable on error
145141 $ handleErrorCallable ,
146142 // 3. callable on exit
147- function ($ exitCode , string $ stdErr ) use (&$ systemErrors , $ processIdentifier ): void {
148- $ this -> processPool ->tryQuitProcess ($ processIdentifier );
143+ function ($ exitCode , string $ stdErr ) use (&$ systemErrors , $ processIdentifier, $ processPool ): void {
144+ $ processPool ->tryQuitProcess ($ processIdentifier );
149145 if ($ exitCode === ExitCode::SUCCESS ) {
150146 return ;
151147 }
@@ -155,7 +151,7 @@ function ($exitCode, string $stdErr) use (&$systemErrors, $processIdentifier): v
155151 $ systemErrors [] = 'Child process error: ' . $ stdErr ;
156152 }
157153 );
158- $ this -> processPool ->attachProcess ($ processIdentifier , $ parallelProcess );
154+ $ processPool ->attachProcess ($ processIdentifier , $ parallelProcess );
159155 }
160156 $ streamSelectLoop ->run ();
161157 if ($ reachedSystemErrorsCountLimit ) {
0 commit comments