Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/InformaticsGateway/Services/Export/ExportServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService, IDispos
{
protected static readonly object SyncRoot = new();

protected static string BuildExportKey(string workflowInstanceId, string exportTaskId) => $"{workflowInstanceId}/{exportTaskId}";

internal event EventHandler? ReportActionCompleted;

private readonly CancellationTokenSource _cancellationTokenSource;
Expand Down Expand Up @@ -373,7 +375,7 @@ private static void HandleStatus(ExportRequestDataMessage exportRequestData, Exp

private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
{
var exportRequest = ExportRequests[exportRequestData.ExportTaskId];
var exportRequest = ExportRequests[BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId)];
HandleStatus(exportRequestData, exportRequest);
if (!exportRequest.IsCompleted)
{
Expand All @@ -391,7 +393,7 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)

lock (SyncRoot)
{
ExportRequests.Remove(exportRequestData.ExportTaskId);
ExportRequests.Remove(BuildExportKey(exportRequestData.WorkflowInstanceId, exportRequestData.ExportTaskId));
}

if (ReportActionCompleted != null)
Expand Down Expand Up @@ -588,7 +590,8 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs)
lock (SyncRoot)
{
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestEvent>();
if (ExportRequests.ContainsKey(exportRequest.ExportTaskId))
string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId);
if (ExportRequests.ContainsKey(exportKey))
{
_logger.ExportRequestAlreadyQueued(exportRequest.CorrelationId, exportRequest.ExportTaskId);
return;
Expand All @@ -599,7 +602,7 @@ protected async Task BaseProcessMessage(MessageReceivedEventArgs eventArgs)

var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest);

ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
ExportRequests.Add(exportKey, exportRequestWithDetails);
if (!exportFlow.Post(exportRequestWithDetails))
{
_logger.ErrorPostingExportJobToQueue(exportRequest.CorrelationId, exportRequest.ExportTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
lock (SyncRoot)
{
var externalAppRequest = eventArgs.Message.ConvertTo<ExternalAppRequestEvent>();
if (ExportRequests.ContainsKey(externalAppRequest.ExportTaskId))
string exportKey = BuildExportKey(externalAppRequest.WorkflowInstanceId, externalAppRequest.ExportTaskId);
if (ExportRequests.ContainsKey(exportKey))
{
_logger.ExportRequestAlreadyQueued(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);
return;
Expand All @@ -78,7 +79,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)

var exportRequestWithDetails = new ExportRequestEventDetails(externalAppRequest);

ExportRequests.Add(externalAppRequest.ExportTaskId, exportRequestWithDetails);
ExportRequests.Add(exportKey, exportRequestWithDetails);
if (!exportFlow.Post(exportRequestWithDetails))
{
_logger.ErrorPostingExportJobToQueue(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
lock (SyncRoot)
{
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestEvent>();
if (ExportRequests.ContainsKey(exportRequest.ExportTaskId))
string exportKey = BuildExportKey(exportRequest.WorkflowInstanceId, exportRequest.ExportTaskId);
if (ExportRequests.ContainsKey(exportKey))
{
return;
}
Expand All @@ -91,7 +92,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)

var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest);

ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
ExportRequests.Add(exportKey, exportRequestWithDetails);
if (!exportFlow.Post(exportRequestWithDetails))
{
MessageSubscriber.Reject(eventArgs.Message);
Expand Down Expand Up @@ -342,6 +343,62 @@ public async Task DataflowTest_EndToEnd()
_outputDataPlugInEngine.Verify(p => p.ExecutePlugInsAsync(It.IsAny<ExportRequestDataMessage>()), Times.Exactly(5 * 2));
}

[RetryFact(1, 10, DisplayName = "Data flow test - same ExportTaskId different WorkflowInstanceId are not deduplicated")]
public async Task DataflowTest_SameExportTaskId_DifferentWorkflowInstanceId_BothProcessed()
{
var sharedExportTaskId = Guid.NewGuid().ToString();
var completedCount = 0;

_messagePublisherService.Setup(p => p.Publish(It.IsAny<string>(), It.IsAny<Message>()));
_messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny<MessageBase>()));
_messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny<MessageBase>()));
_messageSubscriberService.Setup(
p => p.SubscribeAsync(It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<Func<MessageReceivedEventArgs, Task>>(),
It.IsAny<ushort>()))
.Callback<string, string, Func<MessageReceivedEventArgs, Task>, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) =>
{
// Same ExportTaskId, two different WorkflowInstanceIds - must NOT be deduplicated
await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId));
await messageReceivedCallback(CreateMessageReceivedEventArgs(Guid.NewGuid().ToString(), sharedExportTaskId));
});

_storageService.Setup(p => p.GetObjectAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new MemoryStream(Encoding.UTF8.GetBytes("test")));

var countdownEvent = new CountdownEvent(2);
var service = new TestExportService(_logger.Object, _configuration, _serviceScopeFactory.Object, _dicomToolkit.Object);
service.ReportActionCompleted += (sender, e) =>
{
Interlocked.Increment(ref completedCount);
countdownEvent.Signal();
};
await service.StartAsync(_cancellationTokenSource.Token);
Assert.True(countdownEvent.Wait(60000), $"Expected 2 exports to complete but only {completedCount} completed - second request was incorrectly deduplicated");
await StopAndVerify(service);

_messagePublisherService.Verify(
p => p.Publish(It.IsAny<string>(),
It.Is<Message>(match => (match.ConvertTo<ExportCompleteEvent>()).Status == ExportStatus.Success)), Times.Exactly(2));
_messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny<MessageBase>()), Times.Exactly(2));
}

internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs(string workflowInstanceId, string exportTaskId)
{
var exportRequestEvent = new ExportRequestEvent
{
ExportTaskId = exportTaskId,
CorrelationId = Guid.NewGuid().ToString(),
Destinations = new[] { "destination" },
Files = new[] { "file1", "file2" },
MessageId = Guid.NewGuid().ToString(),
WorkflowInstanceId = workflowInstanceId,
};
var jsonMessage = new JsonMessage<ExportRequestEvent>(exportRequestEvent, MessageBrokerConfiguration.InformaticsGatewayApplicationId, exportRequestEvent.CorrelationId, exportRequestEvent.DeliveryTag);
return new MessageReceivedEventArgs(jsonMessage.ToMessage(), CancellationToken.None);
}

internal static MessageReceivedEventArgs CreateMessageReceivedEventArgs()
{
var exportRequestEvent = new ExportRequestEvent
Expand Down
Loading