77use Illuminate \Contracts \Redis \Factory as Redis ;
88use Illuminate \Queue \Jobs \InspectedJob ;
99use Illuminate \Queue \Jobs \RedisJob ;
10+ use Illuminate \Redis \Connections \Connection ;
1011use Illuminate \Redis \Connections \PhpRedisClusterConnection ;
1112use Illuminate \Redis \Connections \PredisClusterConnection ;
1213use Illuminate \Support \Collection ;
@@ -67,6 +68,13 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
6768 */
6869 protected $ secondaryQueueHadJob = false ;
6970
71+ /**
72+ * Indicates if the connection is a Redis Cluster connection.
73+ *
74+ * @var bool|null
75+ */
76+ protected $ isCluster = null ;
77+
7078 /**
7179 * Create a new Redis queue instance.
7280 *
@@ -104,7 +112,7 @@ public function __construct(
104112 */
105113 public function size ($ queue = null )
106114 {
107- $ queue = $ this ->getQueue ($ queue );
115+ $ queue = $ this ->getQueueRedisKey ($ queue );
108116
109117 return $ this ->getConnection ()->eval (
110118 LuaScripts::size (), 3 , $ queue , $ queue .':delayed ' , $ queue .':reserved '
@@ -119,7 +127,7 @@ public function size($queue = null)
119127 */
120128 public function pendingSize ($ queue = null )
121129 {
122- return $ this ->getConnection ()->llen ($ this ->getQueue ($ queue ));
130+ return $ this ->getConnection ()->llen ($ this ->getQueueRedisKey ($ queue ));
123131 }
124132
125133 /**
@@ -130,7 +138,7 @@ public function pendingSize($queue = null)
130138 */
131139 public function delayedSize ($ queue = null )
132140 {
133- return $ this ->getConnection ()->zcard ($ this ->getQueue ($ queue ).':delayed ' );
141+ return $ this ->getConnection ()->zcard ($ this ->getQueueRedisKey ($ queue ).':delayed ' );
134142 }
135143
136144 /**
@@ -141,7 +149,7 @@ public function delayedSize($queue = null)
141149 */
142150 public function reservedSize ($ queue = null )
143151 {
144- return $ this ->getConnection ()->zcard ($ this ->getQueue ($ queue ).':reserved ' );
152+ return $ this ->getConnection ()->zcard ($ this ->getQueueRedisKey ($ queue ).':reserved ' );
145153 }
146154
147155 /**
@@ -152,7 +160,7 @@ public function reservedSize($queue = null)
152160 */
153161 public function pendingJobs ($ queue = null ): Collection
154162 {
155- $ queue = $ this ->getQueue ($ queue );
163+ $ queue = $ this ->getQueueRedisKey ($ queue );
156164
157165 return (new Collection ($ this ->getConnection ()->lrange ($ queue , 0 , -1 )))
158166 ->map (fn ($ payload ) => InspectedJob::fromPayload ($ payload ));
@@ -166,7 +174,7 @@ public function pendingJobs($queue = null): Collection
166174 */
167175 public function delayedJobs ($ queue = null ): Collection
168176 {
169- $ queue = $ this ->getQueue ($ queue );
177+ $ queue = $ this ->getQueueRedisKey ($ queue );
170178
171179 return (new Collection ($ this ->getConnection ()->zrange ($ queue .':delayed ' , 0 , -1 )))
172180 ->map (fn ($ payload ) => InspectedJob::fromPayload ($ payload ));
@@ -180,7 +188,7 @@ public function delayedJobs($queue = null): Collection
180188 */
181189 public function reservedJobs ($ queue = null ): Collection
182190 {
183- $ queue = $ this ->getQueue ($ queue );
191+ $ queue = $ this ->getQueueRedisKey ($ queue );
184192
185193 return (new Collection ($ this ->getConnection ()->zrange ($ queue .':reserved ' , 0 , -1 )))
186194 ->map (fn ($ payload ) => InspectedJob::fromPayload ($ payload ));
@@ -194,7 +202,7 @@ public function reservedJobs($queue = null): Collection
194202 */
195203 public function creationTimeOfOldestPendingJob ($ queue = null )
196204 {
197- $ payload = $ this ->getConnection ()->lindex ($ this ->getQueue ($ queue ), 0 );
205+ $ payload = $ this ->getConnection ()->lindex ($ this ->getQueueRedisKey ($ queue ), 0 );
198206
199207 if (! $ payload ) {
200208 return null ;
@@ -267,9 +275,11 @@ function ($payload, $queue) {
267275 */
268276 public function pushRaw ($ payload , $ queue = null , array $ options = [])
269277 {
278+ $ queue = $ this ->getQueueRedisKey ($ queue );
279+
270280 $ this ->getConnection ()->eval (
271- LuaScripts::push (), 2 , $ this -> getQueue ( $ queue) ,
272- $ this -> getQueue ( $ queue) .':notify ' , $ payload
281+ LuaScripts::push (), 2 , $ queue ,
282+ $ queue .':notify ' , $ payload
273283 );
274284
275285 return json_decode ($ payload , true )['id ' ] ?? null ;
@@ -308,7 +318,7 @@ function ($payload, $queue, $delay) {
308318 protected function laterRaw ($ delay , $ payload , $ queue = null )
309319 {
310320 $ this ->getConnection ()->eval (
311- LuaScripts::later (), 1 , $ this ->getQueue ($ queue ).':delayed ' ,
321+ LuaScripts::later (), 1 , $ this ->getQueueRedisKey ($ queue ).':delayed ' ,
312322 $ this ->availableAt ($ delay ), $ payload
313323 );
314324
@@ -340,7 +350,7 @@ protected function createPayloadArray($job, $queue, $data = '')
340350 */
341351 public function pop ($ queue = null , $ index = 0 )
342352 {
343- $ this ->migrate ($ prefixed = $ this ->getQueue ($ queue ));
353+ $ this ->migrate ($ prefixed = $ this ->getQueueRedisKey ($ queue ));
344354
345355 $ block = ! $ this ->secondaryQueueHadJob && $ index == 0 ;
346356
@@ -428,7 +438,7 @@ protected function retrieveNextJob($queue, $block = true)
428438 */
429439 public function deleteReserved ($ queue , $ job )
430440 {
431- $ this ->getConnection ()->zrem ($ this ->getQueue ($ queue ).':reserved ' , $ job ->getReservedJob ());
441+ $ this ->getConnection ()->zrem ($ this ->getQueueRedisKey ($ queue ).':reserved ' , $ job ->getReservedJob ());
432442 }
433443
434444 /**
@@ -441,7 +451,7 @@ public function deleteReserved($queue, $job)
441451 */
442452 public function deleteAndRelease ($ queue , $ job , $ delay )
443453 {
444- $ queue = $ this ->getQueue ($ queue );
454+ $ queue = $ this ->getQueueRedisKey ($ queue );
445455
446456 $ this ->getConnection ()->eval (
447457 LuaScripts::release (), 2 , $ queue .':delayed ' , $ queue .':reserved ' ,
@@ -457,7 +467,7 @@ public function deleteAndRelease($queue, $job, $delay)
457467 */
458468 public function clear ($ queue )
459469 {
460- $ queue = $ this ->getQueue ($ queue );
470+ $ queue = $ this ->getQueueRedisKey ($ queue );
461471
462472 return $ this ->getConnection ()->eval (
463473 LuaScripts::clear (), 4 , $ queue , $ queue .':delayed ' ,
@@ -486,6 +496,21 @@ public function getQueue($queue)
486496 return 'queues: ' .($ queue ?: $ this ->default );
487497 }
488498
499+ /**
500+ * Get the cluster-safe Redis key for the given queue.
501+ *
502+ * @param string|null $queue
503+ * @return string
504+ */
505+ protected function getQueueRedisKey ($ queue = null )
506+ {
507+ $ queue = $ queue ?: $ this ->default ;
508+
509+ return $ this ->isClusterConnection () && ! Connection::hasHashTag ($ queue )
510+ ? $ this ->getQueue ('{ ' .$ queue .'} ' )
511+ : $ this ->getQueue ($ queue );
512+ }
513+
489514 /**
490515 * Get the connection for the queue.
491516 *
@@ -496,6 +521,16 @@ public function getConnection()
496521 return $ this ->redis ->connection ($ this ->connection );
497522 }
498523
524+ /**
525+ * Determine if the connection is a Redis Cluster connection.
526+ *
527+ * @return bool
528+ */
529+ protected function isClusterConnection ()
530+ {
531+ return $ this ->isCluster ??= $ this ->getConnection ()->isCluster ();
532+ }
533+
499534 /**
500535 * Get the underlying Redis instance.
501536 *
0 commit comments