Skip to content

Commit d244944

Browse files
author
Eugene Leonovich
committed
Refactor DefaultConfigFactory and bump minimum PHP version to 7.1
1 parent 9191c5f commit d244944

33 files changed

Lines changed: 171 additions & 109 deletions

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
}
1212
],
1313
"require": {
14-
"php": "^7.0",
14+
"php": "^7.1",
1515
"amphp/parallel": "^0.1.0@dev",
1616
"monolog/monolog": "^1.22",
1717
"psr/log": "^1.0",

src/Console/Command/Command.php

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class Command extends BaseCommand
1919

2020
private $configFactory;
2121

22-
protected function configure()
22+
protected function configure(): void
2323
{
2424
$this
2525
->addArgument('queue', InputArgument::REQUIRED)
@@ -30,22 +30,20 @@ protected function configure()
3030
;
3131
}
3232

33-
protected function initialize(InputInterface $input, OutputInterface $output)
33+
protected function initialize(InputInterface $input, OutputInterface $output): void
3434
{
3535
$customConfigPath = $input->getOption('config');
3636
if (null !== $customConfigPath && !is_readable($customConfigPath)) {
3737
throw new \RuntimeException("The given configuration file '$customConfigPath' does not exist or it's not readable.");
3838
}
3939

4040
$this->configFactory = $customConfigPath ? include $customConfigPath : new DefaultConfigFactory();
41-
}
42-
43-
protected function createQueue(InputInterface $input, OutputInterface $output)
44-
{
45-
$factory = $this->getConfigFactory();
4641

4742
$uri = sprintf('tcp://%s:%s', $input->getOption('host'), $input->getOption('port'));
48-
$client = $factory->createClient($uri);
43+
$this->configFactory->setConnectionUri($uri);
44+
45+
$queueName = $input->getArgument('queue');
46+
$this->configFactory->setQueueName($queueName);
4947

5048
$user = $input->getOption('user') ?: getenv(self::ENV_USER);
5149

@@ -58,12 +56,8 @@ protected function createQueue(InputInterface $input, OutputInterface $output)
5856
$password = $helper->ask($input, $output, $question);
5957
}
6058

61-
$client->authenticate($user, $password);
59+
$this->configFactory->setCredentials($user, $password);
6260
}
63-
64-
$queueName = $input->getArgument('queue');
65-
66-
return $factory->createQueue($queueName, $client);
6761
}
6862

6963
protected function getConfigFactory(): DefaultConfigFactory

src/Console/Command/KickCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
class KickCommand extends Command
1010
{
11-
protected function configure()
11+
protected function configure(): void
1212
{
1313
parent::configure();
1414

@@ -19,10 +19,10 @@ protected function configure()
1919
;
2020
}
2121

22-
protected function execute(InputInterface $input, OutputInterface $output)
22+
protected function execute(InputInterface $input, OutputInterface $output): void
2323
{
2424
$count = $input->getArgument('count');
25-
$queue = $this->createQueue($input, $output);
25+
$queue = $this->getConfigFactory()->createQueue();
2626

2727
$affected = $queue->kick($count);
2828

src/Console/Command/PutCommand.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
class PutCommand extends Command
1010
{
11-
protected function configure()
11+
protected function configure(): void
1212
{
1313
parent::configure();
1414

@@ -19,16 +19,16 @@ protected function configure()
1919
;
2020
}
2121

22-
protected function execute(InputInterface $input, OutputInterface $output)
22+
protected function execute(InputInterface $input, OutputInterface $output): void
2323
{
2424
$json = $input->getArgument('json-data');
25-
$queue = $this->createQueue($input, $output);
26-
2725
$data = json_decode($json, true);
26+
2827
if (!is_array($data)) {
2928
throw new \InvalidArgumentException('Invalid json data.');
3029
}
3130

31+
$queue = $this->getConfigFactory()->createQueue();
3232
$task = $queue->put($data);
3333

3434
$output->writeln(sprintf(

src/Console/Command/RunCommand.php

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class RunCommand extends Command
1111
{
1212
const DEFAULT_IDLE_TIMEOUT = 1;
1313

14-
protected function configure()
14+
protected function configure(): void
1515
{
1616
parent::configure();
1717

@@ -25,46 +25,26 @@ protected function configure()
2525
;
2626
}
2727

28-
protected function execute(InputInterface $input, OutputInterface $output)
28+
protected function initialize(InputInterface $input, OutputInterface $output): void
2929
{
30-
$queue = $this->createQueue($input, $output);
31-
$logger = $this->createLogger($queue->getName(), $input);
30+
parent::initialize($input, $output);
3231

32+
$configFactory = $this->getConfigFactory();
33+
34+
if ($logFile = $input->getOption('log-file')) {
35+
$configFactory->setLogFile($logFile);
36+
}
37+
if ($logLevel = $input->getOption('log-level')) {
38+
$configFactory->setLogLevel($logLevel);
39+
}
3340
if ($executorsConfigFile = $input->getOption('executors-config')) {
34-
$executorsConfigFile = realpath($executorsConfigFile);
41+
$configFactory->setExecutorsConfigFile(realpath($executorsConfigFile));
3542
}
36-
37-
$runner = $this->getConfigFactory()->createRunner(
38-
$queue,
39-
$logger,
40-
$executorsConfigFile
41-
);
42-
43-
$runner->run($input->getOption('idle-timeout'));
44-
}
45-
46-
private function createLogger(string $queueName, InputInterface $input)
47-
{
48-
$logFile = $input->getOption('log-file');
49-
$logLevel = self::translateLogLevel($input->getOption('log-level'));
50-
51-
return $this->getConfigFactory()->createLogger($queueName, $logFile, $logLevel);
5243
}
5344

54-
private static function translateLogLevel($name)
45+
protected function execute(InputInterface $input, OutputInterface $output): void
5546
{
56-
// level is already translated to logger constant, return as-is
57-
if (is_int($name)) {
58-
return $name;
59-
}
60-
61-
$levels = Logger::getLevels();
62-
$upper = strtoupper($name);
63-
64-
if (!isset($levels[$upper])) {
65-
throw new \InvalidArgumentException("Provided logging level '$name' does not exist. Must be a valid monolog logging level.");
66-
}
67-
68-
return $levels[$upper];
47+
$runner = $this->getConfigFactory()->createRunner();
48+
$runner->run($input->getOption('idle-timeout'));
6949
}
7050
}

src/Console/Command/StatsCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class StatsCommand extends Command
99
{
10-
protected function configure()
10+
protected function configure(): void
1111
{
1212
parent::configure();
1313

@@ -17,9 +17,9 @@ protected function configure()
1717
;
1818
}
1919

20-
protected function execute(InputInterface $input, OutputInterface $output)
20+
protected function execute(InputInterface $input, OutputInterface $output): void
2121
{
22-
$queue = $this->createQueue($input, $output);
22+
$queue = $this->getConfigFactory()->createQueue();
2323
$stats = $queue->stats();
2424

2525
$output->writeln(sprintf('Queue: <options=bold>%s</>', $queue->getName()));

src/Console/Command/TruncateCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class TruncateCommand extends Command
99
{
10-
protected function configure()
10+
protected function configure(): void
1111
{
1212
parent::configure();
1313

@@ -17,9 +17,9 @@ protected function configure()
1717
;
1818
}
1919

20-
protected function execute(InputInterface $input, OutputInterface $output)
20+
protected function execute(InputInterface $input, OutputInterface $output): void
2121
{
22-
$queue = $this->createQueue($input, $output);
22+
$queue = $this->getConfigFactory()->createQueue();
2323
$queue->truncate();
2424

2525
$output->writeln(sprintf('<info>%s</info> was successfully truncated.', $queue->getName()));

src/DefaultConfigFactory.php

Lines changed: 101 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,103 @@
2222

2323
class DefaultConfigFactory
2424
{
25-
public function createRunner(Queue $queue, Logger $logger, string $executorsConfigFile = null): Runner
25+
private $queueName;
26+
private $connectionUri;
27+
private $username;
28+
private $password;
29+
private $logFile;
30+
private $logLevel;
31+
private $executorsConfigFile;
32+
33+
public function setQueueName(string $name): self
34+
{
35+
$this->queueName = $name;
36+
37+
return $this;
38+
}
39+
40+
public function setConnectionUri(string $uri): self
41+
{
42+
$this->connectionUri = $uri;
43+
44+
return $this;
45+
}
46+
47+
public function setCredentials(string $username, string $password): self
48+
{
49+
$this->username = $username;
50+
$this->password = $password;
51+
52+
return $this;
53+
}
54+
55+
public function setLogFile(string $logFile): self
56+
{
57+
$this->logFile = $logFile;
58+
59+
return $this;
60+
}
61+
62+
public function setLogLevel($logLevel): self
63+
{
64+
$this->logLevel = self::normalizeLogLevel($logLevel);
65+
66+
return $this;
67+
}
68+
69+
public function setExecutorsConfigFile(string $configFile): self
70+
{
71+
$this->executorsConfigFile = $configFile;
72+
73+
return $this;
74+
}
75+
76+
public function createRunner(string $executorsConfigFile = null): Runner
2677
{
2778
return new ParallelRunner(
28-
$queue,
79+
$this->createQueue(),
2980
$this->createSuccessHandler(),
3081
$this->createFailureHandler(),
31-
$logger,
82+
$this->createLogger(),
3283
$executorsConfigFile
3384
);
3485
}
3586

36-
public function createQueue(string $name, $client): Queue
87+
public function createQueue(): Queue
3788
{
38-
return new Queue($client, $name);
89+
$this->ensureQueueName();
90+
91+
return new Queue($this->createClient(), $this->queueName);
3992
}
4093

41-
public function createClient(string $uri)
94+
public function createClient(): Client
4295
{
43-
$conn = new StreamConnection($uri);
96+
if (!$this->connectionUri) {
97+
throw new \LogicException('Connection URI is not defined.');
98+
}
99+
100+
$conn = new StreamConnection($this->connectionUri);
44101
$conn = new Retryable($conn);
102+
$client = new Client($conn, new PurePacker());
45103

46-
return new Client($conn, new PurePacker());
104+
if ($this->username) {
105+
// TODO make it lazy
106+
$client->authenticate($this->username, $this->password);
107+
}
108+
109+
return $client;
47110
}
48111

49-
public function createLogger(string $queueName, string $logFile = null, int $logLevel = null): Logger
112+
public function createLogger(): Logger
50113
{
51-
if (!$logFile) {
114+
if (!$this->logFile) {
52115
return new NullLogger();
53116
}
54117

55-
return new MonologLogger("$queueName:worker", [new StreamHandler($logFile, $logLevel)]);
118+
$this->ensureQueueName();
119+
$handlers = [new StreamHandler($this->logFile, $this->logLevel)];
120+
121+
return new MonologLogger("$this->queueName:worker", $handlers);
56122
}
57123

58124
public function createSuccessHandler(): Handler
@@ -72,4 +138,28 @@ public function createRetryStrategyFactory(): RetryStrategyFactory
72138
{
73139
return new RetryStrategyFactory();
74140
}
141+
142+
private function ensureQueueName(): void
143+
{
144+
if (!$this->queueName) {
145+
throw new \LogicException('Queue name is not defined.');
146+
}
147+
}
148+
149+
private static function normalizeLogLevel($name): int
150+
{
151+
// level is already translated to logger constant, return as-is
152+
if (is_int($name)) {
153+
return $name;
154+
}
155+
156+
$levels = MonologLogger::getLevels();
157+
$upper = strtoupper($name);
158+
159+
if (!isset($levels[$upper])) {
160+
throw new \InvalidArgumentException("Provided logging level '$name' does not exist. Must be a valid monolog logging level.");
161+
}
162+
163+
return $levels[$upper];
164+
}
75165
}

src/Executor/CallbackExecutor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public function __construct(CallbackResolver $callbackResolver, array $autowired
1717
$this->autowiredArgs = $autowiredArgs;
1818
}
1919

20-
public function execute($payload, Queue $queue)
20+
public function execute($payload, Queue $queue): void
2121
{
2222
$callback = $this->callbackResolver->resolve($payload);
2323
$args = $payload['args'] ?? [];

src/Executor/Executor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66

77
interface Executor
88
{
9-
public function execute($payload, Queue $queue);
9+
public function execute($payload, Queue $queue): void;
1010
}

0 commit comments

Comments
 (0)