-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathJobManager.php
More file actions
273 lines (248 loc) · 9.65 KB
/
JobManager.php
File metadata and controls
273 lines (248 loc) · 9.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
<?php
namespace Flowpack\JobQueue\Common\Job;
/*
* This file is part of the Flowpack.JobQueue.Common package.
*
* (c) Contributors to the package
*
* This package is Open Source Software. For the full copyright and license
* information, please view the LICENSE file which was distributed with this
* source code.
*/
use Flowpack\JobQueue\Common\InterruptException;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Cache\Frontend\VariableFrontend;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Core\Booting\Scripts;
use Neos\Flow\Property\PropertyMapper;
use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueManager;
/**
* Job manager
*
* @Flow\Scope("singleton")
*/
class JobManager
{
/**
* @var int
*/
const DEFAULT_MAXIMUM_NUMBER_RELEASES = 3;
/**
* @Flow\Inject
* @var QueueManager
*/
protected $queueManager;
/**
* @Flow\Inject
* @var PropertyMapper
*/
protected $propertyMapper;
/**
* @Flow\Inject
* @var VariableFrontend
*/
protected $messageCache;
/**
* @Flow\InjectConfiguration
* @var array
*/
protected $settings;
/**
* @Flow\InjectConfiguration(package="Neos.Flow")
* @var array
*/
protected $flowSettings;
/**
* Put a job in the queue
*
* @param string $queueName
* @param JobInterface $job The job to submit to the queue
* @param array $options Simple key/value array with options that will be passed to the queue for this job (optional)
* @return void
* @api
*/
public function queue(string $queueName, JobInterface $job, array $options = []): void
{
$queue = $this->queueManager->getQueue($queueName);
$payload = serialize($job);
$messageId = $queue->submit($payload, $options);
$this->emitMessageSubmitted($queue, $messageId, $payload, $options);
}
/**
* Wait for a job in the given queue and execute it
* A worker using this method should catch exceptions
*
* @param string $queueName
* @param integer $timeout
* @return Message The message that was processed or NULL if no job was executed and a timeout occurred
* @throws \Exception
* @api
*/
public function waitAndExecute(string $queueName, $timeout = null): ?Message
{
$messageCacheIdentifier = null;
$queue = $this->queueManager->getQueue($queueName);
$message = $queue->waitAndReserve($timeout);
if ($message === null) {
$this->emitMessageTimeout($queue);
// timeout
return null;
}
$this->emitMessageReserved($queue, $message);
$queueSettings = $this->queueManager->getQueueSettings($queueName);
try {
if (isset($queueSettings['executeIsolated']) && $queueSettings['executeIsolated'] === true) {
$messageCacheIdentifier = sha1(serialize($message));
$this->messageCache->set($messageCacheIdentifier, $message);
Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, ['queue' => $queue->getName(), 'messageCacheIdentifier' => $messageCacheIdentifier]);
} else {
$this->executeJobForMessage($queue, $message);
}
} catch (\Exception $exception) {
$maximumNumberOfReleases = isset($queueSettings['maximumNumberOfReleases']) ?
(int)$queueSettings['maximumNumberOfReleases'] :
self::DEFAULT_MAXIMUM_NUMBER_RELEASES;
if ($message->getNumberOfReleases() < $maximumNumberOfReleases) {
$releaseOptions = isset($queueSettings['releaseOptions']) ? $queueSettings['releaseOptions'] : [];
$queue->release($message->getIdentifier(), $releaseOptions);
$this->emitMessageReleased($queue, $message, $releaseOptions, $exception);
throw new JobQueueException(sprintf('Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', $message->getIdentifier(), $queue->getName(), $message->getNumberOfReleases() + 1, $maximumNumberOfReleases + 1), 1334056583, $exception);
} else {
$queue->abort($message->getIdentifier());
$this->emitMessageFailed($queue, $message, $exception);
throw new JobQueueException(sprintf('Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', $message->getIdentifier(), $queue->getName(), $message->getNumberOfReleases() + 1, $maximumNumberOfReleases + 1), 1334056584, $exception);
}
} finally {
if ($messageCacheIdentifier !== null) {
$this->messageCache->remove($messageCacheIdentifier);
}
}
$queue->finish($message->getIdentifier());
$this->emitMessageFinished($queue, $message);
return $message;
}
/**
* @param QueueInterface $queue
* @param Message $message
* @return void
* @throws JobQueueException
* @internal This method has to be public so that it can be run from the command handler (when "executeIsolated" is set). It is not meant to be called from "user land"
*/
public function executeJobForMessage(QueueInterface $queue, Message $message): void
{
// TODO stabilize unserialize() call (maybe using PHPs unserialize_callback_func directive)
$job = unserialize($message->getPayload());
if (!$job instanceof JobInterface) {
throw new \RuntimeException(sprintf('The message "%s" in queue "%s" could not be unserialized to a class implementing JobInterface', $message->getIdentifier(), $queue->getName()), 1465901245);
}
$jobExecutionSuccess = $job->execute($queue, $message);
if (!$jobExecutionSuccess) {
throw new JobQueueException(sprintf('execute() for job "%s" did not return TRUE', $job->getLabel()), 1468927872);
}
}
/**
*
* @param string $queueName
* @param integer $limit
* @return JobInterface[]
* @api
*/
public function peek(string $queueName, int $limit = 1): array
{
$queue = $this->queueManager->getQueue($queueName);
$messages = $queue->peek($limit);
return array_map(function (Message $message) {
$job = unserialize($message->getPayload());
return $job;
}, $messages);
}
/**
* This method is here to be called by queues if they reached local polling timeouts, if this applies
*
* @throws InterruptException
*/
public function interruptMe(): void
{
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
/**
* Signal that is triggered when a message has been submitted to a queue
*
* @param QueueInterface $queue The queue a message was submitted to
* @param string $messageId The unique id of the message that was submitted (determined by the queue implementation)
* @param mixed $payload The serialized job that has been added to a queue
* @param array $options Optional array of options passed to JobManager::queue()
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageSubmitted(QueueInterface $queue, $messageId, $payload, array $options = []): void
{
}
/**
* Signal that is triggered when a message could not be reserved (probably due to a timeout)
*
* @param QueueInterface $queue The queue that returned with a timeout
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageTimeout(QueueInterface $queue): void
{
}
/**
* Signal that is triggered when a message was reserved
*
* @param QueueInterface $queue The queue the reserved message belongs to
* @param Message $message The message that was reserved
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageReserved(QueueInterface $queue, Message $message): void
{
}
/**
* Signal that is triggered when a message has been processed successfully
*
* @param QueueInterface $queue The queue the finished message belongs to
* @param Message $message The message that was finished successfully
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageFinished(QueueInterface $queue, Message $message): void
{
}
/**
* Signal that is triggered when a message has been re-released to the queue
*
* @param QueueInterface $queue The queue the released message belongs to
* @param Message $message The message that was released to the queue again
* @param array $releaseOptions The options that were passed to the release call
* @param \Exception $jobExecutionException The exception (if any) thrown by the job execution
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageReleased(QueueInterface $queue, Message $message, array $releaseOptions, \Exception $jobExecutionException = null): void
{
}
/**
* Signal that is triggered when processing of a message failed
*
* @param QueueInterface $queue The queue the failed message belongs to
* @param Message $message The message that could not be executed successfully
* @param \Exception $jobExecutionException The exception (if any) thrown by the job execution
* @return void
* @Flow\Signal
* @api
*/
protected function emitMessageFailed(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null): void
{
}
}