diff --git a/src/InformaticsGateway/Services/Export/ExportServiceBase.cs b/src/InformaticsGateway/Services/Export/ExportServiceBase.cs index 00b5f4b0..d7f36fdc 100644 --- a/src/InformaticsGateway/Services/Export/ExportServiceBase.cs +++ b/src/InformaticsGateway/Services/Export/ExportServiceBase.cs @@ -53,6 +53,8 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService, IDispos { protected static readonly object SyncRoot = new(); + protected static string BuildExportKey(string workflowInstanceId, string exportTaskId) => $"{workflowInstanceId}/{exportTaskId}"; + internal event EventHandler? ReportActionCompleted; private readonly CancellationTokenSource _cancellationTokenSource; @@ -373,7 +375,7 @@ private static void HandleStatus(ExportRequestDataMessage exportRequestData, Exp private void ReportingActionBlock(ExportRequestDataMessage exportRequestData) { - var exportRequest = ExportRequests[exportRequestData.ExportTaskId]; + var exportRequest = ExportRequests[BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId)]; HandleStatus(exportRequestData, exportRequest); if (!exportRequest.IsCompleted) { @@ -391,7 +393,7 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData) lock (SyncRoot) { - ExportRequests.Remove(exportRequestData.ExportTaskId); + ExportRequests.Remove(BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId)); } if (ReportActionCompleted != null) @@ -588,7 +590,8 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs) lock (SyncRoot) { var exportRequest = eventArgs.Message.ConvertTo(); - if (ExportRequests.ContainsKey(exportRequest.ExportTaskId)) + string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId); + if (ExportRequests.ContainsKey(exportKey)) { _logger.ExportRequestAlreadyQueued(exportRequest.CorrelationId, exportRequest.ExportTaskId); return; @@ -599,7 +602,7 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs) var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest); - ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails); + ExportRequests.Add(exportKey, exportRequestWithDetails); if (!exportFlow.Post(exportRequestWithDetails)) { _logger.ErrorPostingExportJobToQueue(exportRequest.CorrelationId, exportRequest.ExportTaskId); diff --git a/src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs b/src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs index fb602be5..552df302 100755 --- a/src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs +++ b/src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs @@ -67,7 +67,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs) lock (SyncRoot) { var externalAppRequest = eventArgs.Message.ConvertTo(); - if (ExportRequests.ContainsKey(externalAppRequest.ExportTaskId)) + string exportKey = BuildExportKey(externalAppRequest.WorkflowInstanceId, externalAppRequest.ExportTaskId); + if (ExportRequests.ContainsKey(exportKey)) { _logger.ExportRequestAlreadyQueued(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId); return; @@ -78,7 +79,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs) var exportRequestWithDetails = new ExportRequestEventDetails(externalAppRequest); - ExportRequests.Add(externalAppRequest.ExportTaskId, exportRequestWithDetails); + ExportRequests.Add(exportKey, exportRequestWithDetails); if (!exportFlow.Post(exportRequestWithDetails)) { _logger.ErrorPostingExportJobToQueue(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId); diff --git a/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs b/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs index afdb3e6f..717326a2 100755 --- a/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs +++ b/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs @@ -81,7 +81,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs) lock (SyncRoot) { var exportRequest = eventArgs.Message.ConvertTo(); - if (ExportRequests.ContainsKey(exportRequest.ExportTaskId)) + string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId); + if (ExportRequests.ContainsKey(exportKey)) { return; } @@ -91,7 +92,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs) var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest); - ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails); + ExportRequests.Add(exportKey, exportRequestWithDetails); if (!exportFlow.Post(exportRequestWithDetails)) { MessageSubscriber.Reject(eventArgs.Message); @@ -342,6 +343,62 @@ public async Task DataflowTest_EndToEnd() _outputDataPlugInEngine.Verify(p => p.ExecutePlugInsAsync(It.IsAny()), Times.Exactly(5 * 2)); } + [RetryFact(1, 10, DisplayName = "Data flow test - same ExportTaskId different WorkflowInstanceId are not deduplicated")] + public async Task DataflowTest_SameExportTaskId_DifferentWorkflowInstanceId_BothProcessed() + { + var sharedExportTaskId = Guid.NewGuid().ToString(); + var completedCount = 0; + + _messagePublisherService.Setup(p => p.Publish(It.IsAny(), It.IsAny())); + _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); + _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); + _messageSubscriberService.Setup( + p => p.SubscribeAsync(It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny())) + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => + { + // Same ExportTaskId, two different WorkflowInstanceIds - must NOT be deduplicated + await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId)); + }); + + _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new MemoryStream(Encoding.UTF8.GetBytes("test"))); + + var countdownEvent = new CountdownEvent(2); + var service = new TestExportService(_logger.Object, _configuration, _serviceScopeFactory.Object, _dicomToolkit.Object); + service.ReportActionCompleted += (sender, e) => + { + Interlocked.Increment(ref completedCount); + countdownEvent.Signal(); + }; + await service.StartAsync(_cancellationTokenSource.Token); + Assert.True(countdownEvent.Wait(60000), $"Expected 2 exports to complete but only {completedCount} completed - second request was incorrectly deduplicated"); + await StopAndVerify(service); + + _messagePublisherService.Verify( + p => p.Publish(It.IsAny(), + It.Is(match => (match.ConvertTo()).Status == ExportStatus.Success)), Times.Exactly(2)); + _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Exactly(2)); + } + + internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs(string workflowInstanceId, string exportTaskId) + { + var exportRequestEvent = new ExportRequestEvent + { + ExportTaskId = exportTaskId, + CorrelationId = Guid.NewGuid().ToString(), + Destinations = new[] { "destination" }, + Files = new[] { "file1", "file2" }, + MessageId = Guid.NewGuid().ToString(), + WorkflowInstanceId = workflowInstanceId, + }; + var jsonMessage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.InformaticsGatewayApplicationId, exportRequestEvent.CorrelationId, exportRequestEvent.DeliveryTag); + return new MessageReceivedEventArgs(jsonMessage.ToMessage(), CancellationToken.None); + } + internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs() { var exportRequestEvent = new ExportRequestEvent