-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathRunCommand.php
More file actions
167 lines (147 loc) · 5.28 KB
/
Copy pathRunCommand.php
File metadata and controls
167 lines (147 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
<?php
namespace Dtc\QueueBundle\Command;
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
use Dtc\QueueBundle\Run\Loop;
use Dtc\QueueBundle\Util\Util;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\Container;
class RunCommand extends Command
{
/** @var Loop */
private $runLoop;
/** @var LoggerInterface */
private $logger;
/** @var Container */
private $container;
protected function configure(): void
{
$options = [
new InputArgument('worker-name', InputArgument::OPTIONAL, 'Name of worker', null),
new InputArgument('method', InputArgument::OPTIONAL, 'DI method of worker', null),
new InputOption(
'id',
'i',
InputOption::VALUE_REQUIRED,
'Id of Job to run',
null
),
new InputOption(
'max-count',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of jobs to work on before exiting',
null
),
new InputOption(
'duration',
'd',
InputOption::VALUE_REQUIRED,
'Duration to run for in seconds',
null
),
new InputOption(
'timeout',
't',
InputOption::VALUE_REQUIRED,
'Process timeout in seconds (hard exit of process regardless)',
3600
),
new InputOption(
'nano-sleep',
's',
InputOption::VALUE_REQUIRED,
'If using duration, this is the time to sleep when there\'s no jobs in nanoseconds',
500000000
),
new InputOption(
'disable-gc',
null,
InputOption::VALUE_NONE,
'Disable garbage collection'
),
];
$options[] =
new InputOption(
'logger',
'l',
InputOption::VALUE_REQUIRED,
'Log using the logger service specified. Otherwise if not used will output to console. Logger service must be public, otherwise inject one by overriding the definition for this RunCommand service and calling the setLogger() method instead of using this option.'
);
$this
->setName('dtc:queue:run')
->setDefinition($options)
->setDescription('Start up a job in queue');
}
public function setRunLoop($runLoop)
{
$this->runLoop = $runLoop;
}
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function setContainer($container)
{
$this->container = $container;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$start = microtime(true);
// @TODO: move this to dependency injection.
$this->runLoop->setOutput($output);
$workerName = $input->getArgument('worker-name');
$methodName = $input->getArgument('method');
$maxCount = $input->getOption('max-count');
$duration = $input->getOption('duration');
$processTimeout = $input->getOption('timeout');
$nanoSleep = $input->getOption('nano-sleep');
$loggerService = null;
$disableGc = $input->getOption('disable-gc', false);
$this->setGc($disableGc);
$this->setLoggerService($this->runLoop, $loggerService);
$maxCount = Util::validateIntNull('max_count', $maxCount, 32);
$duration = Util::validateIntNull('duration', $duration, 32);
$nanoSleep = Util::validateIntNull('nano_sleep', $nanoSleep, 63);
$processTimeout = Util::validateIntNull('timeout', $processTimeout, 32);
$this->runLoop->checkMaxCountDuration($maxCount, $duration, $processTimeout);
// Check to see if there are other instances
set_time_limit($processTimeout); // Set timeout on the process
if ($jobId = $input->getOption('id')) {
$this->runLoop->runJobById($start, $jobId); // Run a single job
return $this::SUCCESS;
}
$this->runLoop->runLoop($start, $workerName, $methodName, $maxCount, $duration, $nanoSleep);
return $this::SUCCESS;
}
/**
* @param bool $disableGc
*/
protected function setGc($disableGc)
{
if ($disableGc) {
if (gc_enabled()) {
gc_disable();
}
return;
}
if (!gc_enabled()) {
gc_enable();
}
}
protected function setLoggerService(Loop $loop, $loggerService)
{
if (!$loggerService) {
return;
}
$logger = $this->container->get($loggerService);
if (!$logger instanceof LoggerInterface) {
throw new ClassNotSubclassException("$loggerService must be instance of Psr\\Log\\LoggerInterface");
}
$loop->setLogger($logger);
}
}