Skip to content

Commit 0fdb44d

Browse files
committed
Merge pull request #38 from adamwight/STOMP
STOMP backend
2 parents e20314a + 6a81158 commit 0fdb44d

5 files changed

Lines changed: 260 additions & 6 deletions

File tree

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ PHP-Queue hopes to serve as an abstract layer between your application code and
3939

4040
You can also include our core library files into your application and do some powerful heavy lifting.
4141

42-
We also added a bunch of convenience Backend libraries such as Memcache, Redis, MongoDB, CSV to let you tap on ancillary data sources outside of your queue Job.
42+
Several backend drivers are bundled:
43+
* Memcache
44+
* Redis
45+
* MongoDB
46+
* CSV
47+
These can be used as the primary job queue server, or for abstract FIFO or key-value data access.
4348

4449
---
4550
## Installation ##

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
"clio/clio": "Support for daemonizing PHP CLI runner",
4141
"iron-io/iron_mq": "For IronMQ backend support",
4242
"microsoft/windowsazure": "For Windows Azure Service Bus backend support",
43-
"Respect/Rest": "For a REST server to post job data"
43+
"Respect/Rest": "For a REST server to post job data",
44+
"fusesource/stomp-php": "For the STOMP backend"
4445
},
4546
"autoload": {
4647
"psr-0": {"PHPQueue": "src/"}

src/PHPQueue/Backend/Stomp.php

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<?php
2+
namespace PHPQueue\Backend;
3+
4+
use FuseSource\Stomp\Stomp as FuseStomp;
5+
6+
use PHPQueue\Exception\BackendException;
7+
use PHPQueue\Exception\JobNotFoundException;
8+
use PHPQueue\Interfaces\FifoQueueStore;
9+
use PHPQueue\Interfaces\KeyValueStore;
10+
11+
/**
12+
* Wrap a STOMP queue
13+
*
14+
* This has been tested against the ActiveMQ and ApolloMQ implementations.
15+
*
16+
* @author awight@wikimedia.org
17+
*/
18+
class Stomp
19+
extends Base
20+
implements FifoQueueStore, KeyValueStore
21+
{
22+
public $queue_name;
23+
public $uri;
24+
public $connection;
25+
public $merge_headers;
26+
public $read_timeout;
27+
public $ack = true;
28+
29+
public function __construct($options=array())
30+
{
31+
parent::__construct();
32+
if (!empty($options['queue'])) {
33+
$this->queue_name = $options['queue'];
34+
}
35+
if (!empty($options['uri'])) {
36+
$this->uri = $options['uri'];
37+
}
38+
if (isset($options['merge_headers'])) {
39+
$this->merge_headers = (bool)$options['merge_headers'];
40+
}
41+
if (isset($options['read_timeout'])) {
42+
$this->read_timeout = (int)$options['read_timeout'];
43+
}
44+
if (isset($options['ack'])) {
45+
$this->ack = (bool)$options['ack'];
46+
}
47+
}
48+
49+
public function __destruct()
50+
{
51+
if ( $this->connection ) {
52+
$this->connection->disconnect();
53+
$this->connection = null;
54+
}
55+
}
56+
57+
public function connect()
58+
{
59+
$this->connection = new FuseStomp($this->uri);
60+
$this->connection->connect();
61+
}
62+
63+
/**
64+
* @param array $data
65+
* @param array $properties Optional headers.
66+
*
67+
* @throws \PHPQueue\Exception\BackendException
68+
*/
69+
public function push($data=array(), $properties=array())
70+
{
71+
$this->beforeAdd();
72+
73+
$body = json_encode($data);
74+
$result = $this->getConnection()->send($this->queue_name, $body, $properties);
75+
if (!$result) {
76+
throw new BackendException("Couldn't send a message!");
77+
}
78+
}
79+
80+
/**
81+
* @return array|null
82+
* @throws \PHPQueue\Exception\BackendException
83+
*/
84+
public function pop()
85+
{
86+
return $this->readFrame();
87+
}
88+
89+
public function set($key, $data=array(), $properties=array())
90+
{
91+
$properties['correlation-id'] = $key;
92+
$this->push($data, $properties);
93+
}
94+
95+
/**
96+
* @return array|null
97+
*/
98+
public function get($key)
99+
{
100+
$properties = array(
101+
'ack' => 'client',
102+
'selector' => "JMSCorrelationID='{$key}' OR JMSMessageID='{$key}'",
103+
);
104+
return $this->readFrame($properties);
105+
}
106+
107+
/**
108+
* @param array|null $properties Optional selectors.
109+
*/
110+
protected function readFrame($properties = null)
111+
{
112+
$this->beforeGet();
113+
if ($properties === null) {
114+
$properties = array('ack' => 'client');
115+
}
116+
$this->getConnection()->subscribe($this->queue_name, $properties);
117+
if ($this->read_timeout) {
118+
$this->getConnection()->setReadTimeout($this->read_timeout);
119+
}
120+
$response = $this->getConnection()->readFrame();
121+
$this->afterGet();
122+
123+
if ($response && $this->ack) {
124+
$this->getConnection()->ack($response);
125+
}
126+
127+
$this->getConnection()->unsubscribe($this->queue_name);
128+
129+
if (!$response) {
130+
return null;
131+
}
132+
133+
$message = json_decode($response->body, true);
134+
if ($this->merge_headers) {
135+
$message = array_merge($response->headers, $message);
136+
}
137+
return $message;
138+
}
139+
140+
public function clear($key=null)
141+
{
142+
throw new Exception('Not implemented.');
143+
}
144+
}

test/PHPQueue/Backend/PDOTest.php

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,8 @@ public function testPop()
103103
$this->assertEquals($data, $this->object->pop());
104104
}
105105

106-
/**
107-
* @expectedException \PHPQueue\Exception\JobNotFoundException
108-
*/
109106
public function testPopEmpty()
110107
{
111-
$this->object->pop();
108+
$this->assertNull( $this->object->pop() );
112109
}
113110
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
<?php
2+
namespace PHPQueue\Backend;
3+
4+
use PHPUnit_Framework_TestCase;
5+
use PHPQueue\Exception\JobNotFoundException;
6+
7+
class StompTest extends PHPUnit_Framework_TestCase
8+
{
9+
protected $object;
10+
protected $unique;
11+
protected $unclean;
12+
13+
public function setUp()
14+
{
15+
parent::setUp();
16+
if (!class_exists('\FuseSource\Stomp\Stomp')) {
17+
$this->markTestSkipped('STOMP library not installed');
18+
} else {
19+
$options = array(
20+
'uri' => 'tcp://127.0.0.1:61613',
21+
'queue' => 'test_queue',
22+
'read_timeout' => 1,
23+
);
24+
$this->object = new Stomp($options);
25+
}
26+
27+
$this->unique = mt_rand();
28+
}
29+
30+
public function tearDown()
31+
{
32+
if ($this->unclean) {
33+
// Gross. Clear the queue.
34+
try {
35+
while ($result = $this->object->pop()) {
36+
// pass
37+
}
38+
} catch (JobNotFoundException $ex) {
39+
// pass
40+
}
41+
}
42+
43+
parent::tearDown();
44+
}
45+
46+
/**
47+
* @medium
48+
*/
49+
public function testPushPop()
50+
{
51+
$data = array('unique' => $this->unique);
52+
$this->unclean = true;
53+
$this->object->push($data);
54+
55+
$this->assertEquals($data, $this->object->pop());
56+
$this->unclean = false;
57+
}
58+
59+
/**
60+
* @medium
61+
*/
62+
public function testSetGet()
63+
{
64+
$data = array('unique' => $this->unique);
65+
$this->unclean = true;
66+
$result = $this->object->set($this->unique, $data);
67+
68+
$result = $this->object->get($this->unique);
69+
$this->assertEquals($data, $result);
70+
$this->unclean = false;
71+
}
72+
73+
/**
74+
* @medium
75+
*/
76+
public function testPopEmpty()
77+
{
78+
$this->assertNull($this->object->pop());
79+
}
80+
81+
/**
82+
* @medium
83+
*/
84+
public function testGetNonexistent()
85+
{
86+
$this->assertNull($this->object->get(mt_rand()));
87+
}
88+
89+
/**
90+
* @medium
91+
*/
92+
public function testMergeHeaders()
93+
{
94+
$data = array('unique' => $this->unique);
95+
$this->unclean = true;
96+
$this->object->push($data, array('fooHeader' => 5));
97+
98+
$this->object->merge_headers = true;
99+
$result = $this->object->pop();
100+
101+
$this->assertTrue(array_key_exists('fooHeader', $result));
102+
$this->assertEquals($result['fooHeader'], 5);
103+
$this->assertTrue(array_key_exists('unique', $result));
104+
$this->assertEquals($result['unique'], $this->unique);
105+
$this->unclean = false;
106+
}
107+
}

0 commit comments

Comments
 (0)