Skip to content

Commit 8493212

Browse files
author
Eugene Leonovich
committed
Add option constants
1 parent 8ab7900 commit 8493212

13 files changed

Lines changed: 57 additions & 25 deletions

File tree

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"rybakit/msgpack": "^0.2.2",
1919
"symfony/console": "^3.2",
2020
"tarantool/client": "^0.3.0",
21-
"tarantool/queue": "^0.4.0"
21+
"tarantool/queue": "^0.5.0"
2222
},
2323
"require-dev": {
2424
"phpunit/phpunit": "^6.0",

examples/client/put_cb.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
<?php
22

3+
use Tarantool\JobQueue\JobOptions;
4+
35
require __DIR__.'/../../vendor/autoload.php';
46

57
/** @var Tarantool\Queue\Queue $queue */
68
$queue = require __DIR__.'/queue.php';
79

810
$task = $queue->put([
9-
'payload' => date('r'),
11+
JobOptions::PAYLOAD => 'Hello world!',
1012
]);
1113

1214
printf("Added #%d: %s\n", $task->getId(), json_encode($task->getData()));

examples/client/put_dic.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
<?php
22

3+
use Tarantool\JobQueue\JobOptions;
4+
35
require __DIR__.'/../../vendor/autoload.php';
46

57
/** @var Tarantool\Queue\Queue $queue */
68
$queue = require __DIR__.'/queue.php';
79

810
$task = $queue->put([
9-
'payload' => [
10-
'service' => 'greet',
11-
'args' => ['World'],
11+
JobOptions::PAYLOAD => [
12+
JobOptions::PAYLOAD_SERVICE => 'greet',
13+
JobOptions::PAYLOAD_ARGS => ['World'],
1214
],
1315
]);
1416

examples/client/put_proc.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
<?php
22

3+
use Tarantool\JobQueue\JobOptions;
4+
35
require __DIR__.'/../../vendor/autoload.php';
46

57
/** @var Tarantool\Queue\Queue $queue */
68
$queue = require __DIR__.'/queue.php';
79

810
$task = $queue->put([
9-
'payload' => 'sleep 5; date >> '.__DIR__.'/jobqueue_process.log',
11+
JobOptions::PAYLOAD => 'sleep 5; date >> '.__DIR__.'/jobqueue_process.log',
1012
]);
1113

1214
printf("Added #%d: %s\n", $task->getId(), json_encode($task->getData()));

examples/executors.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33
namespace Tarantool\JobQueue\Executor;
44

55
use Tarantool\JobQueue\Executor\CallbackResolver\ContainerCallbackResolver;
6+
use Tarantool\JobQueue\Executor\CallbackResolver\DirectCallbackResolver;
67

78
$container = require __DIR__.'/container.php';
89

10+
$callback = function ($payload) use ($container) {
11+
$container['logger']->info(strrev($payload));
12+
};
13+
914
return [
15+
new CallbackExecutor(new DirectCallbackResolver($callback)),
1016
new CallbackExecutor(new ContainerCallbackResolver($container, 'job.'), $container['autowiring.job_args']),
1117
new ProcessExecutor(),
1218
];

src/Console/Command/Command.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ protected function configure(): void
2525
->addArgument('queue', InputArgument::REQUIRED)
2626
->addOption('host', 'H', InputOption::VALUE_REQUIRED, '', self::DEFAULT_HOST)
2727
->addOption('port', 'p', InputOption::VALUE_REQUIRED, '', self::DEFAULT_PORT)
28-
->addOption('user', 'u', InputOption::VALUE_REQUIRED, '')
29-
->addOption('config', 'c', InputOption::VALUE_REQUIRED, '')
28+
->addOption('user', 'u', InputOption::VALUE_REQUIRED)
29+
->addOption('config', 'c', InputOption::VALUE_REQUIRED)
3030
;
3131
}
3232

@@ -46,7 +46,6 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
4646
$this->configFactory->setQueueName($queueName);
4747

4848
$user = $input->getOption('user') ?: getenv(self::ENV_USER);
49-
5049
if ($user) {
5150
if (!$password = getenv(self::ENV_PASSWORD)) {
5251
$helper = $this->getHelper('question');

src/Executor/CallbackExecutor.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use ArgumentsResolver\InDepthArgumentsResolver;
66
use Tarantool\JobQueue\Executor\CallbackResolver\CallbackResolver;
7+
use Tarantool\JobQueue\JobOptions;
78
use Tarantool\Queue\Queue;
89

910
class CallbackExecutor implements Executor
@@ -20,7 +21,7 @@ public function __construct(CallbackResolver $callbackResolver, array $autowired
2021
public function execute($payload, Queue $queue): void
2122
{
2223
$callback = $this->callbackResolver->resolve($payload);
23-
$args = $payload['args'] ?? [];
24+
$args = $payload[JobOptions::PAYLOAD_ARGS] ?? [];
2425

2526
$args = array_merge($args, [
2627
$payload,

src/Executor/CallbackResolver/ContainerCallbackResolver.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Psr\Container\ContainerInterface as Container;
66
use Tarantool\JobQueue\Exception\BadPayloadException;
7+
use Tarantool\JobQueue\JobOptions;
78

89
class ContainerCallbackResolver implements CallbackResolver
910
{
@@ -18,10 +19,10 @@ public function __construct(Container $container, string $idPrefix = '')
1819

1920
public function resolve($payload): callable
2021
{
21-
if (!empty($payload['service'])) {
22-
return $this->container->get($this->idPrefix.$payload['service']);
22+
if (!empty($payload[JobOptions::PAYLOAD_SERVICE])) {
23+
return $this->container->get($this->idPrefix.$payload[JobOptions::PAYLOAD_SERVICE]);
2324
}
2425

25-
throw BadPayloadException::missingOrEmptyKeyValue($payload, 'service', 'string', __CLASS__);
26+
throw BadPayloadException::missingOrEmptyKeyValue($payload, JobOptions::PAYLOAD_SERVICE, 'string', __CLASS__);
2627
}
2728
}

src/Handler/RecurrenceHandler.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
namespace Tarantool\JobQueue\Handler;
44

5+
use Tarantool\JobQueue\JobOptions;
56
use Tarantool\Queue\Queue;
67
use Tarantool\Queue\Task;
8+
use Tarantool\Queue\TtlOptions;
79

810
class RecurrenceHandler implements Handler
911
{
@@ -18,12 +20,12 @@ public function handle(Task $task, Queue $queue): void
1820
{
1921
$data = $task->getData();
2022

21-
if (empty($data['recurrence'])) {
23+
if (empty($data[JobOptions::RECURRENCE])) {
2224
$this->handler->handle($task, $queue);
2325

2426
return;
2527
}
2628

27-
$queue->release($task->getId(), ['delay' => $data['recurrence']]);
29+
$queue->release($task->getId(), [TtlOptions::DELAY => $data[JobOptions::RECURRENCE]]);
2830
}
2931
}

src/Handler/RetryHandler.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@
44

55
use Tarantool\JobQueue\Handler\RetryStrategy\LimitedRetryStrategy;
66
use Tarantool\JobQueue\Handler\RetryStrategy\RetryStrategyFactory;
7+
use Tarantool\JobQueue\JobOptions;
78
use Tarantool\Queue\Queue;
89
use Tarantool\Queue\Task;
10+
use Tarantool\Queue\TtlOptions;
911

1012
class RetryHandler implements Handler
1113
{
1214
private $handler;
1315
private $retryStrategyFactory;
1416

1517
private static $defaults = [
16-
'retry_limit' => 2,
17-
'retry_attempt' => 1,
18-
'retry_strategy' => RetryStrategyFactory::LINEAR,
18+
JobOptions::RETRY_LIMIT => 2,
19+
JobOptions::RETRY_ATTEMPT => 1,
20+
JobOptions::RETRY_STRATEGY => RetryStrategyFactory::LINEAR,
1921
];
2022

2123
public function __construct(Handler $handler, RetryStrategyFactory $retryStrategyFactory)
@@ -27,10 +29,10 @@ public function __construct(Handler $handler, RetryStrategyFactory $retryStrateg
2729
public function handle(Task $task, Queue $queue): void
2830
{
2931
$data = $task->getData() + self::$defaults;
30-
$attempt = $data['retry_attempt'];
32+
$attempt = $data[JobOptions::RETRY_ATTEMPT];
3133

32-
$strategy = $this->retryStrategyFactory->create($data['retry_strategy']);
33-
$strategy = new LimitedRetryStrategy($strategy, $data['retry_limit']);
34+
$strategy = $this->retryStrategyFactory->create($data[JobOptions::RETRY_STRATEGY]);
35+
$strategy = new LimitedRetryStrategy($strategy, $data[JobOptions::RETRY_LIMIT]);
3436

3537
if (null === $delay = $strategy->getDelay($attempt)) {
3638
$this->handler->handle($task, $queue);
@@ -39,7 +41,7 @@ public function handle(Task $task, Queue $queue): void
3941
}
4042

4143
// TODO replace these 2 calls with an atomic one
42-
$queue->put(['retry_attempt' => $attempt + 1] + $data, ['delay' => $delay]);
44+
$queue->put([JobOptions::RETRY_ATTEMPT => $attempt + 1] + $data, [TtlOptions::DELAY => $delay]);
4345
$queue->delete($task->getId());
4446
}
4547
}

0 commit comments

Comments
 (0)