66
77use PHPQueue \Exception \BackendException ;
88use PHPQueue \Interfaces \AtomicReadBuffer ;
9- use PHPQueue \Interfaces \KeyValueStore ;
109use PHPQueue \Interfaces \FifoQueueStore ;
1110use PHPQueue \Json ;
1211
1312/**
14- * Wraps several styles of redis use:
15- * - If constructed with a "order_key" option, the data will be accessible
16- * as a key-value store, and will also provide pop and push using
17- * $data[$order_key] as the FIFO ordering. If the ordering value is a
18- * timestamp, for example, then the queue will have real-world FIFO
19- * behavior over time, and even if the data comes in out of order, we will
20- * always pop the true oldest record.
21- * If you wish to push to this type of store, you'll also need to provide
22- * the "correlation_key" option so the random-access key can be
23- * extracted from data.
13+ * Wraps redis use:
2414 * - Pushing scalar data will store it as a queue under queue_name.
2515 * - Setting scalar data will store it under the key.
2616 * - If data is an array, setting will store it as a hash, under the key.
27- *
28- * TODO: The different behaviors should be modeled as several backends which
29- * perhaps inherit from an AbstractPredis.
3017 */
3118class Predis
3219 extends Base
3320 implements
3421 AtomicReadBuffer,
35- FifoQueueStore,
36- KeyValueStore
22+ FifoQueueStore
3723{
3824 const TYPE_STRING ='string ' ;
3925 const TYPE_HASH ='hash ' ;
4026 const TYPE_LIST ='list ' ;
4127 const TYPE_SET ='set ' ;
4228 const TYPE_NONE ='none ' ;
4329
44- // Internal sub-key to hold the ordering.
45- const FIFO_INDEX = 'fifo ' ;
46-
4730 public $ servers ;
4831 public $ redis_options = array ();
4932 public $ queue_name ;
5033 public $ expiry ;
51- public $ order_key ;
52- public $ correlation_key ;
5334
5435 public function __construct ($ options =array ())
5536 {
@@ -66,13 +47,6 @@ public function __construct($options=array())
6647 if (!empty ($ options ['expiry ' ])) {
6748 $ this ->expiry = $ options ['expiry ' ];
6849 }
69- if (!empty ($ options ['order_key ' ])) {
70- $ this ->order_key = $ options ['order_key ' ];
71- $ this ->redis_options ['prefix ' ] = $ this ->queue_name . ': ' ;
72- }
73- if (!empty ($ options ['correlation_key ' ])) {
74- $ this ->correlation_key = $ options ['correlation_key ' ];
75- }
7650 }
7751
7852 public function connect ()
@@ -100,60 +74,10 @@ public function push($data)
10074 throw new BackendException ("No queue specified. " );
10175 }
10276 $ encoded_data = json_encode ($ data );
103- if ($ this ->order_key ) {
104- if (!$ this ->correlation_key ) {
105- throw new BackendException ("Cannot push to indexed fifo queue without a correlation key. " );
106- }
107- $ key = $ data [$ this ->correlation_key ];
108- if (!$ key ) {
109- throw new BackendException ("Cannot push to indexed fifo queue without correlation data. " );
110- }
111- $ status = $ this ->addToIndexedFifoQueue ($ key , $ data );
112- if (!self ::boolStatus ($ status )) {
113- throw new BackendException ('Couldn \'t push to indexed fifo queue: ' . $ status ->getMessage ());
114- }
115- } else {
116- // Note that we're ignoring the "new length" return value, cos I don't
117- // see how to make it useful.
118- $ this ->getConnection ()->rpush ($ this ->queue_name , $ encoded_data );
119- }
120- }
12177
122- /**
123- * Remove stale elements at the top of the queue and return the first real entry
124- *
125- * When data expires, it still leaves a queue entry linking to its
126- * correlation ID. Clear any of these stale entries at the head of
127- * the queue.
128- *
129- * Note that we run this from inside a transaction, to make it less
130- * likely that we'll hit a race condition.
131- *
132- * @param MultiExec $tx transaction we're working within.
133- *
134- * @return string|null Top element's key, or null if the queue is empty.
135- */
136- public function peekWithCleanup (MultiExec $ tx )
137- {
138- for (;;) {
139- // Look up the first element in the FIFO ordering.
140- $ values = $ tx ->zrange (Predis::FIFO_INDEX , 0 , 0 );
141- if ($ values ) {
142- // Use that value as a key into the key-value block.
143- $ key = $ values [0 ];
144- $ exists = $ tx ->exists ($ key );
145-
146- if (!$ exists ) {
147- // If the data is missing, then remove from the FIFO index.
148- $ tx ->zrem (Predis::FIFO_INDEX , $ key );
149- } else {
150- return $ key ;
151- }
152- } else {
153- break ;
154- }
155- }
156- return null ;
78+ // Note that we're ignoring the "new length" return value, cos I don't
79+ // see how to make it useful.
80+ $ this ->getConnection ()->rpush ($ this ->queue_name , $ encoded_data );
15781 }
15882
15983 /**
@@ -166,33 +90,7 @@ public function pop()
16690 if (!$ this ->hasQueue ()) {
16791 throw new BackendException ("No queue specified. " );
16892 }
169- if ($ this ->order_key ) {
170- // Pop the first element.
171- // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
172- $ options = array (
173- 'cas ' => true ,
174- 'watch ' => self ::FIFO_INDEX ,
175- 'retry ' => 3 ,
176- );
177- $ self = $ this ;
178- $ this ->getConnection ()->transaction ($ options , function ($ tx ) use (&$ data , &$ self ) {
179- // Begin transaction.
180- $ tx ->multi ();
181-
182- $ key = $ self ->peekWithCleanup ($ tx );
183-
184- if ($ key ) {
185- // Use that value as a key into the key-value block.
186- $ data = $ tx ->get ($ key );
187-
188- // Remove from both indexes.
189- $ tx ->zrem (Predis::FIFO_INDEX , $ key );
190- $ tx ->del ($ key );
191- }
192- });
193- } else {
194- $ data = $ this ->getConnection ()->lpop ($ this ->queue_name );
195- }
93+ $ data = $ this ->getConnection ()->lpop ($ this ->queue_name );
19694 if (!$ data ) {
19795 return null ;
19896 }
@@ -207,9 +105,6 @@ public function popAtomic($callback) {
207105 if (!$ this ->hasQueue ()) {
208106 throw new BackendException ("No queue specified. " );
209107 }
210- if ($ this ->order_key ) {
211- throw new BackendException ("atomicPop not yet supported for zsets " );
212- }
213108
214109 // Pop and process the first element, erring on the side of
215110 // at-least-once processing where the callback might get the same
@@ -246,33 +141,12 @@ public function peek()
246141 if (!$ this ->hasQueue ()) {
247142 throw new BackendException ("No queue specified. " );
248143 }
249- if ($ this ->order_key ) {
250- // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
251- $ options = array (
252- 'cas ' => true ,
253- 'watch ' => self ::FIFO_INDEX ,
254- 'retry ' => 3 ,
255- );
256- $ self = $ this ;
257- $ this ->getConnection ()->transaction ($ options , function ($ tx ) use (&$ data , &$ self ) {
258- // Begin transaction.
259- $ tx ->multi ();
260-
261- $ key = $ self ->peekWithCleanup ($ tx );
262-
263- if ($ key ) {
264- // Use that value as a key into the key-value block.
265- $ data = $ tx ->get ($ key );
266- }
267- });
144+ $ data_range = $ this ->getConnection ()->lrange ($ this ->queue_name , 0 , 0 );
145+ if (!$ data_range ) {
146+ return null ;
268147 } else {
269- $ data_range = $ this ->getConnection ()->lrange ($ this ->queue_name , 0 , 0 );
270- if (!$ data_range ) {
271- return null ;
272- } else {
273- // Unpack list.
274- $ data = $ data_range [0 ];
275- }
148+ // Unpack list.
149+ $ data = $ data_range [0 ];
276150 }
277151 if (!$ data ) {
278152 return null ;
@@ -323,9 +197,7 @@ public function set($key, $data, $properties=array())
323197 $ this ->beforeAdd ();
324198 try {
325199 $ status = false ;
326- if ($ this ->order_key ) {
327- $ status = $ this ->addToIndexedFifoQueue ($ key , $ data );
328- } elseif (is_array ($ data )) {
200+ if (is_array ($ data )) {
329201 // FIXME: Assert
330202 $ status = $ this ->getConnection ()->hmset ($ key , $ data );
331203 } elseif (is_string ($ data ) || is_numeric ($ data )) {
@@ -343,36 +215,6 @@ public function set($key, $data, $properties=array())
343215 }
344216 }
345217
346- /**
347- * Store the data under its order and correlation keys
348- *
349- * @param string $key
350- * @param array $data
351- * @return Predis\Response\ResponseInterface
352- */
353- protected function addToIndexedFifoQueue ($ key , $ data )
354- {
355- $ options = array (
356- 'cas ' => true ,
357- 'watch ' => self ::FIFO_INDEX ,
358- 'retry ' => 3 ,
359- );
360- $ score = $ data [$ this ->order_key ];
361- $ encoded_data = json_encode ($ data );
362- $ status = false ;
363- $ expiry = $ this ->expiry ;
364- $ this ->getConnection ()->transaction ($ options , function ($ tx ) use ($ key , $ score , $ encoded_data , $ expiry , &$ status ) {
365- $ tx ->multi ();
366- $ tx ->zadd (Predis::FIFO_INDEX , $ score , $ key );
367- if ($ expiry ) {
368- $ status = $ tx ->setex ($ key , $ expiry , $ encoded_data );
369- } else {
370- $ status = $ tx ->set ($ key , $ encoded_data );
371- }
372- });
373- return $ status ;
374- }
375-
376218 /** @deprecated */
377219 public function getKey ($ key =null )
378220 {
@@ -394,10 +236,6 @@ public function get($key=null)
394236 return null ;
395237 }
396238 $ this ->beforeGet ($ key );
397- if ($ this ->order_key ) {
398- $ data = $ this ->getConnection ()->get ($ key );
399- return Json::safe_decode ($ data );
400- }
401239 $ type = $ this ->getConnection ()->type ($ key );
402240 switch ($ type ) {
403241 case self ::TYPE_STRING :
@@ -433,16 +271,7 @@ public function clear($key)
433271 {
434272 $ this ->beforeClear ($ key );
435273
436- if ($ this ->order_key ) {
437- $ result = $ this ->getConnection ()->pipeline ()
438- ->zrem (self ::FIFO_INDEX , $ key )
439- ->del ($ key )
440- ->execute ();
441-
442- $ num_removed = $ result [1 ];
443- } else {
444- $ num_removed = $ this ->getConnection ()->del ($ key );
445- }
274+ $ num_removed = $ this ->getConnection ()->del ($ key );
446275
447276 $ this ->afterClearRelease ();
448277
0 commit comments