@@ -46,9 +46,12 @@ public abstract class QueueProcessor<T> where T : QueueItem
4646
4747 private readonly QueueConfiguration config ;
4848
49- private readonly Lazy < ConnectionMultiplexer > redis = new Lazy < ConnectionMultiplexer > ( RedisAccess . GetConnection ) ;
49+ private readonly Lazy < IDatabase > redis = new Lazy < IDatabase > ( ( ) => RedisAccess . GetConnection ( ) . GetDatabase ( ) ) ;
5050
51- private IDatabase getRedisDatabase ( ) => redis . Value . GetDatabase ( ) ;
51+ /// <summary>
52+ /// Access redis instance.
53+ /// </summary>
54+ protected IDatabase Redis => redis . Value ;
5255
5356 private long totalProcessed ;
5457
@@ -90,8 +93,6 @@ public void Run(CancellationToken cancellation = default)
9093
9194 using ( var threadPool = new ThreadedTaskScheduler ( Environment . ProcessorCount , "workers" ) )
9295 {
93- IDatabase database = getRedisDatabase ( ) ;
94-
9596 while ( ! cts . Token . IsCancellationRequested )
9697 {
9798 if ( consecutiveErrors > config . ErrorThreshold )
@@ -105,7 +106,7 @@ public void Run(CancellationToken cancellation = default)
105106 continue ;
106107 }
107108
108- var redisItems = database . ListRightPop ( QueueName , config . BatchSize ) ;
109+ var redisItems = Redis . ListRightPop ( QueueName , config . BatchSize ) ;
109110
110111 // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
111112 // queue doesn't exist.
@@ -232,19 +233,19 @@ private void outputStats()
232233 /// </summary>
233234 /// <param name="item"></param>
234235 public void PushToQueue ( T item ) =>
235- getRedisDatabase ( ) . ListLeftPush ( QueueName , JsonConvert . SerializeObject ( item ) ) ;
236+ Redis . ListLeftPush ( QueueName , JsonConvert . SerializeObject ( item ) ) ;
236237
237238 /// <summary>
238239 /// Push multiple items to the queue.
239240 /// </summary>
240241 /// <param name="items"></param>
241242 public void PushToQueue ( IEnumerable < T > items ) =>
242- getRedisDatabase ( ) . ListLeftPush ( QueueName , items . Select ( obj => new RedisValue ( JsonConvert . SerializeObject ( obj ) ) ) . ToArray ( ) ) ;
243+ Redis . ListLeftPush ( QueueName , items . Select ( obj => new RedisValue ( JsonConvert . SerializeObject ( obj ) ) ) . ToArray ( ) ) ;
243244
244245 public long GetQueueSize ( ) =>
245- getRedisDatabase ( ) . ListLength ( QueueName ) ;
246+ Redis . ListLength ( QueueName ) ;
246247
247- public void ClearQueue ( ) => getRedisDatabase ( ) . KeyDelete ( QueueName ) ;
248+ public void ClearQueue ( ) => Redis . KeyDelete ( QueueName ) ;
248249
249250 /// <summary>
250251 /// Publishes a message to a Redis channel with the supplied <paramref name="channelName"/>.
@@ -258,7 +259,7 @@ public long GetQueueSize() =>
258259 /// <typeparam name="TMessage">The type of message to be published.</typeparam>
259260 public void PublishMessage < TMessage > ( string channelName , TMessage message )
260261 {
261- getRedisDatabase ( ) . Publish ( new RedisChannel ( channelName , RedisChannel . PatternMode . Auto ) , JsonConvert . SerializeObject ( message ) ) ;
262+ Redis . Publish ( new RedisChannel ( channelName , RedisChannel . PatternMode . Auto ) , JsonConvert . SerializeObject ( message ) ) ;
262263 DogStatsd . Increment ( "messages_published" , tags : new [ ] { $ "channel:{ channelName } ", $ "type:{ typeof ( TMessage ) . FullName } " } ) ;
263264 }
264265
0 commit comments