Skip to content

Commit 4d9833b

Browse files
thomhurstclaude
andauthored
fix: Improve pipeline context isolation and thread safety (#1449)
* chore: Add .worktrees/ to .gitignore for parallel development 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Improve pipeline context isolation and thread safety (#1417) This PR addresses issue #1417 by implementing proper module-scoped isolation: **Breaking Changes:** - `IEnvironmentContext.ContentDirectory` and `WorkingDirectory` are now readonly (changed from `{ get; set; }` to `{ get; }`) **Improvements:** - Add AsyncLocal cleanup in ModuleRunner to prevent potential context leaks - Replace `List<T>` with `ConcurrentBag<T>` in ModuleLoggerContainer for lock-free thread-safe operations during high-concurrency module execution - Add documentation explaining PipelineContext scoped lifetime and logger caching safety **Files Changed:** - ModuleRunner.cs: Added try/finally to clear AsyncLocal logger context - IEnvironmentContext.cs: Made ContentDirectory/WorkingDirectory readonly with docs - EnvironmentContext.cs: Implementation matches interface changes - PipelineContext.cs: Added XML documentation for scoped DI behavior - ModuleLoggerContainer.cs: Replaced List+locks with ConcurrentBag+Interlocked Closes #1417 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * fix: Address code review feedback for pipeline context isolation - Move AsyncLocal assignment inside try block to guarantee cleanup even if an exception occurs immediately after assignment (critical fix) - Optimize ModuleLoggerContainer with ConcurrentDictionary for O(1) lookups instead of O(n) ConcurrentBag enumeration - Document why volatile is unnecessary when using Interlocked.Exchange (provides full memory barrier) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 7dab1ef commit 4d9833b

2 files changed

Lines changed: 14 additions & 13 deletions

File tree

src/ModularPipelines/Engine/Execution/ModuleRunner.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,11 @@ private async Task ExecuteModuleWithPipeline(ModuleState moduleState, IServicePr
148148
var moduleContext = new ModuleContext(pipelineContext, module, executionContext, logger);
149149

150150
// Set up logging - use try/finally to ensure cleanup of AsyncLocal context
151-
ModuleLogger.Values.Value = logger;
152-
151+
// Assignment MUST be inside try block to guarantee cleanup even if an exception
152+
// occurs immediately after assignment
153153
try
154154
{
155+
ModuleLogger.Values.Value = logger;
155156
await ExecuteModuleLifecycle(moduleState, scopedServiceProvider, pipelineContext, executionContext, moduleContext, cancellationToken).ConfigureAwait(false);
156157
}
157158
finally

src/ModularPipelines/Logging/ModuleLoggerContainer.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ namespace ModularPipelines.Logging;
1010
/// This container tracks all module loggers created during pipeline execution
1111
/// and ensures they are properly flushed and disposed in the correct order
1212
/// (ordered by last log written time) to maintain logical output ordering.
13-
/// Uses ConcurrentBag for lock-free thread-safe operations during high-concurrency
14-
/// module execution.
13+
/// Uses ConcurrentDictionary for O(1) thread-safe lookups by type.
1514
/// </remarks>
1615
internal class ModuleLoggerContainer : IModuleLoggerContainer, IDisposable
1716
{
18-
private readonly ConcurrentBag<ModuleLogger> _loggers = new();
17+
private readonly ConcurrentDictionary<Type, ModuleLogger> _loggers = new();
18+
19+
// Interlocked.Exchange provides both atomicity and full memory barrier,
20+
// making volatile unnecessary for this disposal guard pattern
1921
private int _isDisposed;
2022

2123
public void FlushAndDisposeAll()
@@ -26,25 +28,23 @@ public void FlushAndDisposeAll()
2628
return;
2729
}
2830

29-
// ToArray() is thread-safe on ConcurrentBag
30-
var snapshot = _loggers.ToArray();
31-
32-
foreach (var logger in snapshot.Where(x => x != null).OrderBy(x => x.LastLogWritten))
31+
// Values provides a snapshot for safe enumeration
32+
foreach (var logger in _loggers.Values.Where(x => x != null).OrderBy(x => x.LastLogWritten))
3333
{
3434
logger.Dispose();
3535
}
3636
}
3737

3838
public IModuleLogger? GetLogger(Type type)
3939
{
40-
// ToArray() creates a snapshot for safe enumeration
41-
return _loggers.ToArray().FirstOrDefault(logger => logger.GetType() == type);
40+
// O(1) lookup via ConcurrentDictionary
41+
return _loggers.TryGetValue(type, out var logger) ? logger : null;
4242
}
4343

4444
public void AddLogger(ModuleLogger logger)
4545
{
46-
// ConcurrentBag.Add is lock-free and thread-safe
47-
_loggers.Add(logger);
46+
// Thread-safe add; each module type should only have one logger instance
47+
_loggers.TryAdd(logger.GetType(), logger);
4848
}
4949

5050
public void Dispose()

0 commit comments

Comments
 (0)