Skip to content

Commit e7d33cb

Browse files
committed
more fair queue fixes
1 parent c8c73e8 commit e7d33cb

2 files changed

Lines changed: 45 additions & 7 deletions

File tree

Core.TaskProcessor.Tests/ProcessorTests.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,14 @@ public async Task ExecuteSchedules()
183183
}
184184

185185
[Fact]
186-
public async Task Cleanup()
186+
public async Task Pushback()
187187
{
188188
await _processor.PushbackAsync();
189+
}
190+
191+
[Fact]
192+
public async Task Cleanup()
193+
{
189194
await _processor.CleanUpAsync();
190195
}
191196

Core.TaskProcessor/TaskProcessor.cs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,16 @@ public async Task<bool> AppendBatchAsync(string queue, string tenant, string bat
240240
foreach (var q in push)
241241
{
242242
tra.SortedSetAddAsync(Prefix("queues"), q.Key, DateTimeOffset.UtcNow.ToUnixTimeSeconds());
243-
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}"), q.Value.ToArray());
243+
244+
if (queue.StartsWith("fair_"))
245+
{
246+
tra.HashIncrementAsync(Prefix($"queue:{q}:fairness"), tenant, q.Value.Count);
247+
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}:{tenant}"), q.Value.ToArray());
248+
}
249+
else
250+
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}"), q.Value.ToArray());
251+
252+
244253
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{q.Key}:event")), "fetch");
245254
}
246255
#pragma warning restore CS4014
@@ -529,8 +538,16 @@ await db.ScriptEvaluateAsync($@"
529538
local q = ""{Prefix("queue:")}""..v
530539
local r = redis.call('zrange', q.."":pushback"", 0, t, 'BYSCORE', 'LIMIT', 0, 500);
531540
for j, w in ipairs(r) do
532-
redis.call('lpush', q, w);
533541
redis.call('zadd', ""{Prefix("queues")}"", t, v);
542+
543+
if string.find(q, 'fair_') then
544+
local tenant = redis.call('hget', q..':'..w, 'tenant');
545+
redis.call('hincrby', q..':fairness', tenant);
546+
redis.call('lpush', q..':'..tenant, taskId);
547+
else
548+
redis.call('lpush', q, w);
549+
end
550+
534551
redis.call('zrem', q.."":pushback"", w);
535552
redis.call('lrem', q.."":checkout"", 0, w);
536553
end;
@@ -773,8 +790,16 @@ private async Task Process(TaskContext task)
773790
}
774791
else
775792
{
776-
tra.ListLeftPushAsync(Prefix($"queue:{task.Queue}"), task.TaskId,
777-
flags: CommandFlags.FireAndForget);
793+
if (task.Queue!.StartsWith("fair_"))
794+
{
795+
tra.HashIncrementAsync(Prefix($"queue:{task.Queue}:fairness"), task.Tenant);
796+
tra.ListLeftPushAsync(Prefix($"queue:{task.Queue}:{task.Tenant}"), task.TaskId,
797+
flags: CommandFlags.FireAndForget);
798+
}
799+
else
800+
tra.ListLeftPushAsync(Prefix($"queue:{task.Queue}"), task.TaskId,
801+
flags: CommandFlags.FireAndForget);
802+
778803
tra.SortedSetRemoveAsync(Prefix($"queue:{task.Queue}:pushback"), task.TaskId,
779804
CommandFlags.FireAndForget);
780805
}
@@ -796,7 +821,6 @@ private async Task Process(TaskContext task)
796821
if (!_shutdown.IsCancellationRequested)
797822
await FetchAsync();
798823
}
799-
800824
private async Task CompleteBatchAsync(TaskContext task, IDatabase db)
801825
{
802826
if (string.IsNullOrEmpty(task.BatchId))
@@ -818,8 +842,17 @@ private async Task CompleteBatchAsync(TaskContext task, IDatabase db)
818842
819843
for i, taskId in ipairs(continuations) do
820844
local q = redis.call('hget', '{Prefix("task:")}'..taskId, 'queue');
845+
821846
redis.call('zadd', ""{Prefix("queues")}"", t, q);
822-
redis.call('lpush', '{Prefix("queue:")}'..q, taskId);
847+
848+
if string.find(queue, 'fair_') then
849+
local tenant = redis.call('hget', '{Prefix("task:")}'..taskId, 'tenant');
850+
redis.call('hincrby', '{Prefix("queue:")}'..q..':fairness', tenant);
851+
redis.call('lpush', '{Prefix("queue:")}'..q..':'..tenant, taskId);
852+
else
853+
redis.call('lpush', '{Prefix("queue:")}'..q, taskId);
854+
end
855+
823856
redis.call('publish', '{Prefix("queue:")}'..q..':event', 'fetch');
824857
end;
825858

0 commit comments

Comments
 (0)