Skip to content

Commit 0f306cb

Browse files
authored
Merge pull request #75 from mhugot/feature/try-fix-crash-2
Try to fix crash 2
2 parents e71a71a + 5008819 commit 0f306cb

5 files changed

Lines changed: 128 additions & 71 deletions

File tree

composer.lock

Lines changed: 9 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/activities/BasicActivity.php

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,51 @@ class BasicActivity extends CpeSdk\CpeActivity
5454
public function __construct($client = null, $params, $debug, $cpeLogger)
5555
{
5656
parent::__construct($client, $params, $debug, $cpeLogger);
57-
57+
5858
// S3 utils
5959
$this->s3Utils = new S3Utils($this->cpeLogger);
60+
61+
$this->registerSigtermHandler();
62+
}
63+
64+
private function registerSigtermHandler(): void
65+
{
66+
if (!function_exists('pcntl_async_signals')) {
67+
return;
68+
}
69+
pcntl_async_signals(true);
70+
pcntl_signal(SIGTERM, function () {
71+
if ($this->token) {
72+
$this->activityFail('SIGTERM', 'Worker shutting down (SIGTERM received)');
73+
}
74+
exit(0);
75+
});
76+
}
77+
78+
public function calculateLoops(): int
79+
{
80+
$now = new \DateTime('now', new \DateTimeZone('UTC'));
81+
$next3AM = (clone $now)->modify('tomorrow 03:00');
82+
$diff = $next3AM->getTimestamp() - $now->getTimestamp();
83+
return (int) round($diff / 60);
84+
}
85+
86+
public function writeHeartbeat(): void
87+
{
88+
file_put_contents('/tmp/heartbeat', (string) time());
89+
}
90+
91+
// Override to also refresh the heartbeat file on every SFN heartbeat
92+
public function activityHeartbeat($data = null)
93+
{
94+
$this->writeHeartbeat();
95+
return parent::activityHeartbeat($data);
6096
}
6197

6298
// Perform the activity
6399
public function process($task)
64100
{
101+
$this->writeHeartbeat();
65102
// Use workflowID to generate a unique TMP folder localy.
66103
$this->tmpInputPath = self::TMP_FOLDER
67104
. $this->logKey."/"

src/activities/TranscodeAssetActivity.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ function check_activity_arguments()
354354
$cpeLogger = new SA\CpeSdk\CpeLogger($name, $logPath);
355355
$cpeLogger->logOut("INFO", basename(__FILE__),
356356
"\033[1mStarting activity\033[0m: $name");
357+
file_put_contents('/tmp/heartbeat', (string) time());
357358

358359
// We instanciate the Activity 'ValidateAsset' and give it a name for Snf
359360
$activityPoller = new TranscodeAssetActivity(
@@ -366,5 +367,7 @@ function check_activity_arguments()
366367
$cpeLogger);
367368

368369
// Initiate the polling loop and will call your `process` function upon trigger
369-
// The process will exit after 24 hours (1440 minutes)
370-
$activityPoller->doActivity(1440);
370+
$loops = $activityPoller->calculateLoops();
371+
$cpeLogger->logOut("INFO", basename(__FILE__),
372+
"\033[1mLoops calculated\033[0m: $loops");
373+
$activityPoller->doActivity($loops);

src/activities/ValidateAssetActivity.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ function check_activity_arguments()
268268
$cpeLogger = new SA\CpeSdk\CpeLogger($name, $logPath);
269269
$cpeLogger->logOut("INFO", basename(__FILE__),
270270
"\033[1mStarting activity\033[0m: $name");
271+
file_put_contents('/tmp/heartbeat', (string) time());
271272

272273
// We instanciate the Activity 'ValidateAsset' and give it a name for Snf
273274
$activityPoller = new ValidateAssetActivity(
@@ -280,5 +281,7 @@ function check_activity_arguments()
280281
$cpeLogger);
281282

282283
// Initiate the polling loop and will call your `process` function upon trigger
283-
// The process will exit after 24 hours (1440 minutes)
284-
$activityPoller->doActivity(1440);
284+
$loops = $activityPoller->calculateLoops();
285+
$cpeLogger->logOut("INFO", basename(__FILE__),
286+
"\033[1mLoops calculated\033[0m: $loops");
287+
$activityPoller->doActivity($loops);

src/utils/CommandExecuter.php

Lines changed: 71 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public function execute(
3333
$progressCallbackParams = null,
3434
$showProgress = false,
3535
$callbackTurns = 0,
36-
$logKey = null)
36+
$logKey = null,
37+
$maxSeconds = 3600)
3738
{
3839
if ($logKey)
3940
$this->logKey = $logKey;
@@ -66,81 +67,89 @@ public function execute(
6667
$allOut = "";
6768
$allOutErr = "";
6869

69-
// Check process status at every turn
70-
do {
71-
sleep($sleep);
70+
$startTime = time();
71+
try {
72+
// Check process status at every turn
73+
do {
74+
if ($maxSeconds > 0 && (time() - $startTime) > $maxSeconds) {
75+
proc_terminate($process, 9);
76+
throw new CpeSdk\CpeException(
77+
"Command timed out after {$maxSeconds}s: $cmd",
78+
self::EXEC_FAILED
79+
);
80+
}
81+
sleep($sleep);
82+
83+
// If callback only after N turns
84+
if ( !$callbackTurns || in_array($i, array(0, $callbackTurns)) )
85+
{
86+
if ($showProgress) {
87+
echo ".\n";
88+
}
89+
90+
// Call user provided callback.
91+
// Callback should be an array as per doc here:
92+
// http://www.php.net/manual/en/language.types.callable.php
93+
// Type 3: Object method call
94+
if (isset($progressCallback) && $progressCallback) {
95+
call_user_func($progressCallback, $progressCallbackParams,
96+
$allOut, $allOutErr);
97+
}
98+
99+
$i = 0;
100+
}
72101

73-
// If callback only after N turns
74-
if ( !$callbackTurns || in_array($i, array(0, $callbackTurns)) )
75-
{
102+
// Get latest status
103+
$procStatus = proc_get_status($process);
76104
if ($showProgress) {
77-
echo ".\n";
105+
echo ".";
106+
flush();
78107
}
79108

80-
// Call user provided callback.
81-
// Callback should be an array as per doc here:
82-
// http://www.php.net/manual/en/language.types.callable.php
83-
// Type 3: Object method call
84-
if (isset($progressCallback) && $progressCallback) {
85-
call_user_func($progressCallback, $progressCallbackParams,
86-
$allOut, $allOutErr);
109+
// Read prog output
110+
if (isset($pipes[1]) && $pipes[1]) {
111+
$out = stream_get_contents($pipes[1], -1);
112+
$allOut .= $out;
87113
}
88114

89-
$i = 0;
90-
}
91-
92-
// Get latest status
93-
$procStatus = proc_get_status($process);
94-
if ($showProgress) {
95-
echo ".";
96-
flush();
97-
}
98-
99-
// Read prog output
100-
if (isset($pipes[1]) && $pipes[1]) {
101-
$out = stream_get_contents($pipes[1], -1);
102-
$allOut .= $out;
103-
}
104-
105-
// Read prog errors
106-
if (isset($pipes[2]) && $pipes[2]) {
107-
$outErr = stream_get_contents($pipes[2], -1);
108-
$allOutErr .= $outErr;
109-
}
110-
111-
$i++;
112-
} while ($procStatus['running']);
115+
// Read prog errors
116+
if (isset($pipes[2]) && $pipes[2]) {
117+
$outErr = stream_get_contents($pipes[2], -1);
118+
$allOutErr .= $outErr;
119+
}
113120

114-
if (isset($pipes[1]))
115-
fclose($pipes[1]);
116-
if (isset($pipes[2]))
117-
fclose($pipes[2]);
121+
$i++;
122+
} while ($procStatus['running']);
118123

119-
if ($procStatus['exitcode'] > 0)
120-
{
121-
$this->cpeLogger->logOut("ERROR",
122-
basename(__FILE__),
123-
"Can't execute: $cmd. Exit Code: ".$procStatus['exitcode'],
124-
$this->logKey);
125-
if ($allOut) {
124+
if ($procStatus['exitcode'] > 0)
125+
{
126126
$this->cpeLogger->logOut("ERROR",
127-
basename(__FILE__), "COMMAND STDOUT: ".$allOut,
127+
basename(__FILE__),
128+
"Can't execute: $cmd. Exit Code: ".$procStatus['exitcode'],
128129
$this->logKey);
129-
$allOut = null;
130+
if ($allOut) {
131+
$this->cpeLogger->logOut("ERROR",
132+
basename(__FILE__), "COMMAND STDOUT: ".$allOut,
133+
$this->logKey);
134+
$allOut = null;
135+
}
136+
if ($allOutErr)
137+
$this->cpeLogger->logOut("ERROR",
138+
basename(__FILE__), "COMMAND STDERR: ".$allOutErr,
139+
$this->logKey);
130140
}
131-
if ($allOutErr)
132-
$this->cpeLogger->logOut("ERROR",
133-
basename(__FILE__), "COMMAND STDERR: ".$allOutErr,
134-
$this->logKey);
135-
}
136141

137-
if ($showProgress) {
138-
echo "\n";
142+
if ($showProgress) {
143+
echo "\n";
144+
}
145+
} finally {
146+
if (isset($pipes[1]) && is_resource($pipes[1]))
147+
fclose($pipes[1]);
148+
if (isset($pipes[2]) && is_resource($pipes[2]))
149+
fclose($pipes[2]);
150+
proc_close($process);
139151
}
140152

141-
// Process is over
142-
proc_close($process);
143-
144153
return array('out' => $allOut, 'outErr' => $allOutErr);
145154
}
146155
}

0 commit comments

Comments
 (0)