Skip to content

Commit af5e15c

Browse files
committed
composite key (WorkflowInstanceId/ExportTaskId) for export deduplication
Signed-off-by: bluna301 <luna.bryanr@gmail.com>
1 parent 89c9218 commit af5e15c

3 files changed

Lines changed: 69 additions & 8 deletions

File tree

src/InformaticsGateway/Services/Export/ExportServiceBase.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService, IDispos
5353
{
5454
protected static readonly object SyncRoot = new();
5555

56+
protected static string BuildExportKey(string workflowInstanceId, string exportTaskId) => $"{workflowInstanceId}/{exportTaskId}";
57+
5658
internal event EventHandler? ReportActionCompleted;
5759

5860
private readonly CancellationTokenSource _cancellationTokenSource;
@@ -373,7 +375,7 @@ private static void HandleStatus(ExportRequestDataMessage exportRequestData, Exp
373375

374376
private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
375377
{
376-
var exportRequest = ExportRequests[exportRequestData.ExportTaskId];
378+
var exportRequest = ExportRequests[BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId)];
377379
HandleStatus(exportRequestData, exportRequest);
378380
if (!exportRequest.IsCompleted)
379381
{
@@ -391,7 +393,7 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
391393

392394
lock (SyncRoot)
393395
{
394-
ExportRequests.Remove(exportRequestData.ExportTaskId);
396+
ExportRequests.Remove(BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId));
395397
}
396398

397399
if (ReportActionCompleted != null)
@@ -588,7 +590,8 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs)
588590
lock (SyncRoot)
589591
{
590592
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestEvent>();
591-
if (ExportRequests.ContainsKey(exportRequest.ExportTaskId))
593+
string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId);
594+
if (ExportRequests.ContainsKey(exportKey))
592595
{
593596
_logger.ExportRequestAlreadyQueued(exportRequest.CorrelationId, exportRequest.ExportTaskId);
594597
return;
@@ -599,7 +602,7 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs)
599602

600603
var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest);
601604

602-
ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
605+
ExportRequests.Add(exportKey, exportRequestWithDetails);
603606
if (!exportFlow.Post(exportRequestWithDetails))
604607
{
605608
_logger.ErrorPostingExportJobToQueue(exportRequest.CorrelationId, exportRequest.ExportTaskId);

src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
6767
lock (SyncRoot)
6868
{
6969
var externalAppRequest = eventArgs.Message.ConvertTo<ExternalAppRequestEvent>();
70-
if (ExportRequests.ContainsKey(externalAppRequest.ExportTaskId))
70+
string exportKey = BuildExportKey(externalAppRequest.WorkflowInstanceId, externalAppRequest.ExportTaskId);
71+
if (ExportRequests.ContainsKey(exportKey))
7172
{
7273
_logger.ExportRequestAlreadyQueued(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);
7374
return;
@@ -78,7 +79,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
7879

7980
var exportRequestWithDetails = new ExportRequestEventDetails(externalAppRequest);
8081

81-
ExportRequests.Add(externalAppRequest.ExportTaskId, exportRequestWithDetails);
82+
ExportRequests.Add(exportKey, exportRequestWithDetails);
8283
if (!exportFlow.Post(exportRequestWithDetails))
8384
{
8485
_logger.ErrorPostingExportJobToQueue(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);

src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
8181
lock (SyncRoot)
8282
{
8383
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestEvent>();
84-
if (ExportRequests.ContainsKey(exportRequest.ExportTaskId))
84+
string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId);
85+
if (ExportRequests.ContainsKey(exportKey))
8586
{
8687
return;
8788
}
@@ -91,7 +92,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
9192

9293
var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest);
9394

94-
ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
95+
ExportRequests.Add(exportKey, exportRequestWithDetails);
9596
if (!exportFlow.Post(exportRequestWithDetails))
9697
{
9798
MessageSubscriber.Reject(eventArgs.Message);
@@ -342,6 +343,62 @@ public async Task DataflowTest_EndToEnd()
342343
_outputDataPlugInEngine.Verify(p => p.ExecutePlugInsAsync(It.IsAny<ExportRequestDataMessage>()), Times.Exactly(5 * 2));
343344
}
344345

346+
[RetryFact(1, 10, DisplayName = "Data flow test - same ExportTaskId different WorkflowInstanceId are not deduplicated")]
347+
public async Task DataflowTest_SameExportTaskId_DifferentWorkflowInstanceId_BothProcessed()
348+
{
349+
var sharedExportTaskId = Guid.NewGuid().ToString();
350+
var completedCount = 0;
351+
352+
_messagePublisherService.Setup(p => p.Publish(It.IsAny<string>(), It.IsAny<Message>()));
353+
_messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny<MessageBase>()));
354+
_messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny<MessageBase>()));
355+
_messageSubscriberService.Setup(
356+
p => p.SubscribeAsync(It.IsAny<string>(),
357+
It.IsAny<string>(),
358+
It.IsAny<Func<MessageReceivedEventArgs, Task>>(),
359+
It.IsAny<ushort>()))
360+
.Callback<string, string, Func<MessageReceivedEventArgs, Task>, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) =>
361+
{
362+
// Same ExportTaskId, two different WorkflowInstanceIds - must NOT be deduplicated
363+
await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId));
364+
await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId));
365+
});
366+
367+
_storageService.Setup(p => p.GetObjectAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
368+
.ReturnsAsync(new MemoryStream(Encoding.UTF8.GetBytes("test")));
369+
370+
var countdownEvent = new CountdownEvent(2);
371+
var service = new TestExportService(_logger.Object, _configuration, _serviceScopeFactory.Object, _dicomToolkit.Object);
372+
service.ReportActionCompleted += (sender, e) =>
373+
{
374+
Interlocked.Increment(ref completedCount);
375+
countdownEvent.Signal();
376+
};
377+
await service.StartAsync(_cancellationTokenSource.Token);
378+
Assert.True(countdownEvent.Wait(60000), $"Expected 2 exports to complete but only {completedCount} completed - second request was incorrectly deduplicated");
379+
await StopAndVerify(service);
380+
381+
_messagePublisherService.Verify(
382+
p => p.Publish(It.IsAny<string>(),
383+
It.Is<Message>(match => (match.ConvertTo<ExportCompleteEvent>()).Status == ExportStatus.Success)), Times.Exactly(2));
384+
_messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny<MessageBase>()), Times.Exactly(2));
385+
}
386+
387+
internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs(string workflowInstanceId, string exportTaskId)
388+
{
389+
var exportRequestEvent = new ExportRequestEvent
390+
{
391+
ExportTaskId = exportTaskId,
392+
CorrelationId = Guid.NewGuid().ToString(),
393+
Destinations = new[] { "destination" },
394+
Files = new[] { "file1", "file2" },
395+
MessageId = Guid.NewGuid().ToString(),
396+
WorkflowInstanceId = workflowInstanceId,
397+
};
398+
var jsonMessage = new JsonMessage<ExportRequestEvent>(exportRequestEvent, MessageBrokerConfiguration.InformaticsGatewayApplicationId, exportRequestEvent.CorrelationId, exportRequestEvent.DeliveryTag);
399+
return new MessageReceivedEventArgs(jsonMessage.ToMessage(), CancellationToken.None);
400+
}
401+
345402
internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs()
346403
{
347404
var exportRequestEvent = new ExportRequestEvent

0 commit comments

Comments
 (0)