Skip to content

Commit a4d4550

Browse files
committed
fix(outbox): don't remove outbox prematurely
Prevent deleting an outbox when there may still be messages to process (batch was full, messages delayed, or redelivery scheduled). Also simplify channel retrieval by removing a redundant CreateUnbounded call.
1 parent 1c6c79b commit a4d4550

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDeliveryService.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
190190
.ConfigureAwait(false);
191191

192192

193+
var outboxHasRemainingMessages = false;
193194
foreach (var message in messages)
194195
{
195196
if (DateTimeOffset.UtcNow > message.DeliveryNotAfter)
@@ -212,6 +213,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
212213
outbox.DeliveryDelayedUntil = message.DeliveryNotBefore;
213214

214215
//Proceed to the next available outbox, since this one is delayed
216+
outboxHasRemainingMessages = true;
215217
break;
216218
}
217219

@@ -270,6 +272,7 @@ await DeliverMessage(message, messageContext, messageHandler, deliveryTimeout.To
270272
outbox.DeliveryDelayedUntil =
271273
DateTimeOffset.UtcNow.Add(redeliverMessageAction.Delay);
272274

275+
outboxHasRemainingMessages = true;
273276
break;
274277
}
275278

@@ -287,11 +290,14 @@ await DeliverMessage(message, messageContext, messageHandler, deliveryTimeout.To
287290
}
288291
}
289292

293+
// If the batch was full, there may be more messages beyond this batch
294+
if (messages.Count >= outboxDeliveryOptions.BatchSize)
295+
outboxHasRemainingMessages = true;
290296

291297
outbox.Lock = null;
292298
dbContext.Update(outbox);
293299

294-
if (messages.Count == 0)
300+
if (!outboxHasRemainingMessages)
295301
{
296302
logger.LogTrace("Removing empty outbox {outboxId}", outbox.Id);
297303
dbContext.Remove(outbox);

src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Internals/OutboxDeliverySignal.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,8 @@ internal static class OutboxDeliverySignal
1010
{
1111
private static readonly ConcurrentDictionary<Type, Channel<string>> Channels = new();
1212

13-
public static Channel<string> GetChannel(Type type)
14-
{
15-
Channel.CreateUnbounded<string>(new UnboundedChannelOptions());
16-
return Channels.GetOrAdd(type, _ => Channel.CreateUnbounded<string>());
17-
}
13+
public static Channel<string> GetChannel(Type type) =>
14+
Channels.GetOrAdd(type, _ => Channel.CreateUnbounded<string>());
1815

1916
public static Channel<string> GetChannel<TType>() => GetChannel(typeof(TType));
2017

0 commit comments

Comments
 (0)