Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/ModularPipelines/Engine/Execution/ModuleRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ private async Task ExecuteModuleWithPipeline(ModuleState moduleState, IServicePr
var moduleContext = new ModuleContext(pipelineContext, module, executionContext, logger);

// Set up logging - use try/finally to ensure cleanup of AsyncLocal context
ModuleLogger.Values.Value = logger;

// Assignment MUST be inside try block to guarantee cleanup even if an exception
// occurs immediately after assignment
try
{
ModuleLogger.Values.Value = logger;
await ExecuteModuleLifecycle(moduleState, scopedServiceProvider, pipelineContext, executionContext, moduleContext, cancellationToken).ConfigureAwait(false);
}
finally
Expand Down
22 changes: 11 additions & 11 deletions src/ModularPipelines/Logging/ModuleLoggerContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ namespace ModularPipelines.Logging;
/// This container tracks all module loggers created during pipeline execution
/// and ensures they are properly flushed and disposed in the correct order
/// (ordered by last log written time) to maintain logical output ordering.
/// Uses ConcurrentBag for lock-free thread-safe operations during high-concurrency
/// module execution.
/// Uses ConcurrentDictionary for O(1) thread-safe lookups by type.
/// </remarks>
internal class ModuleLoggerContainer : IModuleLoggerContainer, IDisposable
{
private readonly ConcurrentBag<ModuleLogger> _loggers = new();
private readonly ConcurrentDictionary<Type, ModuleLogger> _loggers = new();

// Interlocked.Exchange provides both atomicity and full memory barrier,
// making volatile unnecessary for this disposal guard pattern
private int _isDisposed;

public void FlushAndDisposeAll()
Expand All @@ -26,25 +28,23 @@ public void FlushAndDisposeAll()
return;
}

// ToArray() is thread-safe on ConcurrentBag
var snapshot = _loggers.ToArray();

foreach (var logger in snapshot.Where(x => x != null).OrderBy(x => x.LastLogWritten))
// Values provides a snapshot for safe enumeration
foreach (var logger in _loggers.Values.Where(x => x != null).OrderBy(x => x.LastLogWritten))
{
logger.Dispose();
}
}

public IModuleLogger? GetLogger(Type type)
{
// ToArray() creates a snapshot for safe enumeration
return _loggers.ToArray().FirstOrDefault(logger => logger.GetType() == type);
// O(1) lookup via ConcurrentDictionary
return _loggers.TryGetValue(type, out var logger) ? logger : null;
}

public void AddLogger(ModuleLogger logger)
{
// ConcurrentBag.Add is lock-free and thread-safe
_loggers.Add(logger);
// Thread-safe add; each module type should only have one logger instance
_loggers.TryAdd(logger.GetType(), logger);
}

public void Dispose()
Expand Down
Loading