Skip to content

Commit 0922bb8

Browse files
authored
Merge pull request #38 from TheDomco/allow-blocking-pop
Add support for blocking pop instead of polling
2 parents 69c30a2 + 1ae0f78 commit 0922bb8

2 files changed

Lines changed: 37 additions & 7 deletions

File tree

osu.Server.QueueProcessor/QueueConfiguration.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,14 @@ public class QueueConfiguration
3434
/// Setting above 1 will allow processing in batches (see <see cref="QueueProcessor{T}.ProcessResults"/>).
3535
/// </summary>
3636
public int BatchSize { get; set; } = 1;
37+
38+
/// <summary>
39+
/// When enabled, uses <c>BRPOP</c> to wait for items instead of polling with <c>RPOP</c>.
40+
/// </summary>
41+
/// <remarks>
42+
/// <see cref="BatchSize"/> is ignored when this is enabled.
43+
/// <see cref="TimeBetweenPolls"/> is still used as a delay for when processor is overloaded.
44+
/// </remarks>
45+
public bool UseBlockingPop { get; set; } = false;
3746
}
3847
}

osu.Server.QueueProcessor/QueueProcessor.cs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ protected QueueProcessor(QueueConfiguration config)
8585
/// <param name="cancellation">An optional cancellation token.</param>
8686
public void Run(CancellationToken cancellation = default)
8787
{
88+
// dedicated redis connection for the BRPOP path, workaround for StackExchange.Redis not offering blocking operations
89+
var blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
90+
8891
using (SentrySdk.Init(setupSentry))
8992
using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
9093
using (var cts = new GracefulShutdownSource(cancellation))
@@ -100,20 +103,38 @@ public void Run(CancellationToken cancellation = default)
100103

101104
try
102105
{
103-
if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold)
106+
// avoid processing too many items at once
107+
if (totalInFlight >= config.MaxInFlightItems)
104108
{
105109
Thread.Sleep(config.TimeBetweenPolls);
106110
continue;
107111
}
108112

109-
var redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
113+
RedisValue[] redisItems;
110114

111-
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
112-
// queue doesn't exist.
113-
if (redisItems == null)
115+
if (config.UseBlockingPop)
114116
{
115-
Thread.Sleep(config.TimeBetweenPolls);
116-
continue;
117+
// timeout in seconds, can't be higher than StackExchange.Redis timeout (default is 5 seconds)
118+
const string timeout = "1";
119+
120+
RedisResult redisResult = blockingRedis.Value.Execute("BRPOP", QueueName, timeout);
121+
122+
if (redisResult.IsNull)
123+
continue;
124+
125+
redisItems = [(RedisValue)redisResult[1]];
126+
}
127+
else
128+
{
129+
redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
130+
131+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
132+
// queue doesn't exist.
133+
if (redisItems == null)
134+
{
135+
Thread.Sleep(config.TimeBetweenPolls);
136+
continue;
137+
}
117138
}
118139

119140
List<T> items = new List<T>();

0 commit comments

Comments
 (0)