66use PHPQueue \Interfaces \FifoQueueStore ;
77
88/**
9- * NOTE: The FIFO index is not usable as a key-value selector in this backend.
9+ * Wraps several styles of redis use:
10+ * - If constructed with a "order_key" option, the data will be accessible
11+ * as a key-value store, and will also provide pop and push using
12+ * $data[$order_key] as the FIFO ordering. If the ordering value is a
13+ * timestamp, for example, then the queue will have real-world FIFO
14+ * behavior over time, and even if the data comes in out of order, we will
15+ * always pop the true oldest record.
16+ * If you wish to push to this type of store, you'll also need to provide
17+ * the "correlation_key" option so the random-access key can be
18+ * extracted from data.
19+ * - Pushing scalar data will store it as a queue under queue_name.
20+ * - Setting scalar data will store it under the key.
21+ * - If data is an array, setting will store it as a hash, under the key.
22+ *
23+ * TODO: The different behaviors should be modeled as several backends which
24+ * perhaps inherit from an AbstractPredis.
1025 */
1126class Predis
1227 extends Base
@@ -18,9 +33,15 @@ class Predis
1833 const TYPE_SET ='set ' ;
1934 const TYPE_NONE ='none ' ;
2035
36+ // Internal sub-key to hold the ordering.
37+ const FIFO_INDEX = 'fifo ' ;
38+
2139 public $ servers ;
2240 public $ redis_options = array ();
2341 public $ queue_name ;
42+ public $ expiry ;
43+ public $ order_key ;
44+ public $ correlation_key ;
2445
2546 public function __construct ($ options =array ())
2647 {
@@ -34,11 +55,21 @@ public function __construct($options=array())
3455 if (!empty ($ options ['queue ' ])) {
3556 $ this ->queue_name = $ options ['queue ' ];
3657 }
58+ if (!empty ($ options ['expiry ' ])) {
59+ $ this ->expiry = $ options ['expiry ' ];
60+ }
61+ if (!empty ($ options ['order_key ' ])) {
62+ $ this ->order_key = $ options ['order_key ' ];
63+ $ this ->redis_options ['prefix ' ] = $ this ->queue_name . ': ' ;
64+ }
65+ if (!empty ($ options ['correlation_key ' ])) {
66+ $ this ->correlation_key = $ options ['correlation_key ' ];
67+ }
3768 }
3869
3970 public function connect ()
4071 {
41- if (empty ( $ this ->servers ) ) {
72+ if (! $ this ->servers ) {
4273 throw new BackendException ("No servers specified " );
4374 }
4475 $ this ->connection = new \Predis \Client ($ this ->servers , $ this ->redis_options );
@@ -47,7 +78,7 @@ public function connect()
4778 /** @deprecated */
4879 public function add ($ data =array ())
4980 {
50- if (empty ( $ data) ) {
81+ if (! $ data ) {
5182 throw new BackendException ("No data. " );
5283 }
5384 $ this ->push ($ data );
@@ -61,21 +92,64 @@ public function push($data)
6192 throw new BackendException ("No queue specified. " );
6293 }
6394 $ encoded_data = json_encode ($ data );
64- // Note that we're ignoring the "new length" return value, cos I don't
65- // see how to make it useful.
66- $ this ->getConnection ()->rpush ($ this ->queue_name , $ encoded_data );
95+ if ($ this ->order_key ) {
96+ if (!$ this ->correlation_key ) {
97+ throw new BackendException ("Cannot push to indexed fifo queue without a correlation key. " );
98+ }
99+ $ key = $ data [$ this ->correlation_key ];
100+ if (!$ key ) {
101+ throw new BackendException ("Cannot push to indexed fifo queue without correlation data. " );
102+ }
103+ $ status = $ this ->addToIndexedFifoQueue ($ key , $ data );
104+ if (!$ status ) {
105+ throw new BackendException ("Couldn't push to indexed fifo queue. " );
106+ }
107+ } else {
108+ // Note that we're ignoring the "new length" return value, cos I don't
109+ // see how to make it useful.
110+ $ this ->getConnection ()->rpush ($ this ->queue_name , $ encoded_data );
111+ }
67112 }
68113
69114 /**
70115 * @return array|null
71116 */
72117 public function pop ()
73118 {
119+ $ data = null ;
74120 $ this ->beforeGet ();
75121 if (!$ this ->hasQueue ()) {
76122 throw new BackendException ("No queue specified. " );
77123 }
78- $ data = $ this ->getConnection ()->lpop ($ this ->queue_name );
124+ if ($ this ->order_key ) {
125+ // Pop the first element.
126+ //
127+ // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
128+ $ options = array (
129+ 'cas ' => true ,
130+ 'watch ' => self ::FIFO_INDEX ,
131+ 'retry ' => 3 ,
132+ );
133+ $ order_key = $ this ->order_key ;
134+ $ this ->getConnection ()->transaction ($ options , function ($ tx ) use ($ order_key , &$ data ) {
135+ // Look up the first element in the FIFO ordering.
136+ $ values = $ tx ->zrange (self ::FIFO_INDEX , 0 , 0 );
137+ if ($ values ) {
138+ // Use that value as a key into the key-value block, to get the data.
139+ $ key = $ values [0 ];
140+ $ data = $ tx ->get ($ key );
141+
142+ // Begin transaction.
143+ $ tx ->multi ();
144+
145+ // Remove from both indexes.
146+ $ tx ->zrem (self ::FIFO_INDEX , $ key );
147+ $ tx ->del ($ key );
148+ }
149+ });
150+ } else {
151+ $ data = $ this ->getConnection ()->lpop ($ this ->queue_name );
152+ }
79153 if (!$ data ) {
80154 return null ;
81155 }
@@ -111,24 +185,30 @@ public function setKey($key=null, $data=null)
111185 /**
112186 * @param string $key
113187 * @param array|string $data
114- * @return boolean
115188 * @throws \PHPQueue\Exception
116189 */
117190 public function set ($ key , $ data )
118191 {
119- if (empty ( $ key) && !is_string ($ key )) {
192+ if (! $ key || !is_string ($ key )) {
120193 throw new BackendException ("Key is invalid. " );
121194 }
122- if (empty ( $ data) ) {
195+ if (! $ data ) {
123196 throw new BackendException ("No data. " );
124197 }
125198 $ this ->beforeAdd ();
126199 try {
127200 $ status = false ;
128- if (is_array ($ data )) {
201+ if ($ this ->order_key ) {
202+ $ status = $ this ->addToIndexedFifoQueue ($ key , $ data );
203+ } elseif (is_array ($ data )) {
204+ // FIXME: Assert
129205 $ status = $ this ->getConnection ()->hmset ($ key , $ data );
130206 } elseif (is_string ($ data ) || is_numeric ($ data )) {
131- $ status = $ this ->getConnection ()->set ($ key , $ data );
207+ if ($ this ->expiry ) {
208+ $ status = $ this ->getConnection ()->setex ($ key , $ this ->expiry , $ data );
209+ } else {
210+ $ status = $ this ->getConnection ()->set ($ key , $ data );
211+ }
132212 }
133213 if (!$ status ) {
134214 throw new BackendException ("Unable to save data. " );
@@ -138,6 +218,35 @@ public function set($key, $data)
138218 }
139219 }
140220
221+ /**
222+ * Store the data under its order and correlation keys
223+ *
224+ * @param string $key
225+ * @param array $data
226+ */
227+ protected function addToIndexedFifoQueue ($ key , $ data )
228+ {
229+ $ options = array (
230+ 'cas ' => true ,
231+ 'watch ' => self ::FIFO_INDEX ,
232+ 'retry ' => 3 ,
233+ );
234+ $ score = $ data [$ this ->order_key ];
235+ $ encoded_data = json_encode ($ data );
236+ $ status = false ;
237+ $ expiry = $ this ->expiry ;
238+ $ this ->getConnection ()->transaction ($ options , function ($ tx ) use ($ key , $ score , $ encoded_data , $ expiry , &$ status ) {
239+ $ tx ->multi ();
240+ $ tx ->zadd (self ::FIFO_INDEX , $ score , $ key );
241+ if ($ expiry ) {
242+ $ status = $ tx ->setex ($ key , $ expiry , $ encoded_data );
243+ } else {
244+ $ status = $ tx ->set ($ key , $ encoded_data );
245+ }
246+ });
247+ return $ status ;
248+ }
249+
141250 /** @deprecated */
142251 public function getKey ($ key =null )
143252 {
@@ -159,6 +268,10 @@ public function get($key=null)
159268 return null ;
160269 }
161270 $ this ->beforeGet ($ key );
271+ if ($ this ->order_key ) {
272+ $ data = $ this ->getConnection ()->get ($ key );
273+ return json_decode ($ data , true );
274+ }
162275 $ type = $ this ->getConnection ()->type ($ key );
163276 switch ($ type ) {
164277 case self ::TYPE_STRING :
@@ -193,7 +306,17 @@ public function clearKey($key=null)
193306 public function clear ($ key )
194307 {
195308 $ this ->beforeClear ($ key );
196- $ num_removed = $ this ->getConnection ()->del ($ key );
309+
310+ if ($ this ->order_key ) {
311+ $ result = $ this ->getConnection ()->pipeline ()
312+ ->zrem (self ::FIFO_INDEX , $ key )
313+ ->del ($ key )
314+ ->execute ();
315+
316+ $ num_removed = $ result [1 ];
317+ } else {
318+ $ num_removed = $ this ->getConnection ()->del ($ key );
319+ }
197320
198321 $ this ->afterClearRelease ();
199322
0 commit comments