Skip to content

Commit c4a95b8

Browse files
thomhurstclaude
andauthored
perf: Replace Dictionary+lock with ConcurrentDictionary in ModuleResultRegistry (#1726)
* perf: Replace Dictionary+lock with ConcurrentDictionary in ModuleResultRegistry (#1540) Use ConcurrentDictionary for lock-free reads and thread-safe writes, reducing lock contention during parallel module execution. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Use GetOrAdd in all RegisterResult/SetException methods to prevent race condition Ensures TCS is created atomically when registering results/exceptions, preventing race conditions where TryGetValue might miss a not-yet-registered module entry. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Reorder operations to prevent race condition in GetCompletionTask Create TCS before storing result to ensure GetCompletionTask never returns null when a result already exists. This prevents a race where: 1. Thread A stores result 2. Thread B calls GetCompletionTask, gets null (TCS doesn't exist) 3. Thread A creates TCS 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Add memory barrier to ensure result visibility before task completion Adds Thread.MemoryBarrier() between writing to _results and calling TrySetResult. This ensures that threads waking up from task completion will see the _results write, preventing a potential memory visibility race condition on weak memory model architectures. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: Simplify memory ordering and fix GetCompletionTask consistency - Remove explicit Thread.MemoryBarrier() - TrySetResult already provides release semantics ensuring _results write is visible to awaiters - Change GetCompletionTask to use GetOrAdd for consistency with SetException (both now create TCS on-demand) - Update interface: GetCompletionTask now returns Task instead of Task? since it always returns a valid task 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Preserve original behavioral semantics for SetException and GetCompletionTask Revert behavioral changes to maintain backward compatibility: - SetException: Only sets exception if module was previously registered (does nothing for unregistered modules, matching original behavior) - GetCompletionTask: Returns null for unregistered modules (matching original behavior) RegisterResult methods still use GetOrAdd since registering a result implies the module should be tracked. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 0e51348 commit c4a95b8

1 file changed

Lines changed: 27 additions & 51 deletions

File tree

src/ModularPipelines/Engine/IModuleResultRegistry.cs

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Collections.Concurrent;
12
using ModularPipelines.Models;
23

34
namespace ModularPipelines.Engine;
@@ -61,92 +62,67 @@ internal interface IModuleResultRegistry
6162

6263
/// <summary>
6364
/// Default implementation of the module result registry.
65+
/// Uses ConcurrentDictionary for lock-free reads and thread-safe writes.
6466
/// </summary>
6567
internal class ModuleResultRegistry : IModuleResultRegistry
6668
{
67-
private readonly Dictionary<Type, object> _results = new();
68-
private readonly Dictionary<Type, TaskCompletionSource<object?>> _completionSources = new();
69-
private readonly object _lock = new();
69+
private readonly ConcurrentDictionary<Type, object> _results = new();
70+
private readonly ConcurrentDictionary<Type, TaskCompletionSource<object?>> _completionSources = new();
7071

7172
public void RegisterModule(Type moduleType)
7273
{
73-
lock (_lock)
74-
{
75-
if (!_completionSources.ContainsKey(moduleType))
76-
{
77-
_completionSources[moduleType] = new TaskCompletionSource<object?>();
78-
}
79-
}
74+
_completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
8075
}
8176

8277
public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
8378
{
84-
lock (_lock)
85-
{
86-
_results[moduleType] = result;
87-
88-
if (_completionSources.TryGetValue(moduleType, out var tcs))
89-
{
90-
tcs.TrySetResult(result);
91-
}
92-
}
79+
// Store result first, then signal completion
80+
// TrySetResult provides release semantics, ensuring _results write is visible to awaiters
81+
_results[moduleType] = result;
82+
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
83+
tcs.TrySetResult(result);
9384
}
9485

9586
public ModuleResult<T>? GetResult<T>(Type moduleType)
9687
{
97-
lock (_lock)
88+
if (_results.TryGetValue(moduleType, out var result) && result is ModuleResult<T> typedResult)
9889
{
99-
if (_results.TryGetValue(moduleType, out var result) && result is ModuleResult<T> typedResult)
100-
{
101-
return typedResult;
102-
}
103-
104-
return null;
90+
return typedResult;
10591
}
92+
93+
return null;
10694
}
10795

10896
public IModuleResult? GetResult(Type moduleType)
10997
{
110-
lock (_lock)
98+
if (_results.TryGetValue(moduleType, out var result))
11199
{
112-
if (_results.TryGetValue(moduleType, out var result))
113-
{
114-
return result as IModuleResult;
115-
}
116-
117-
return null;
100+
return result as IModuleResult;
118101
}
102+
103+
return null;
119104
}
120105

121106
public Task? GetCompletionTask(Type moduleType)
122107
{
123-
lock (_lock)
124-
{
125-
return _completionSources.TryGetValue(moduleType, out var tcs) ? tcs.Task : null;
126-
}
108+
return _completionSources.TryGetValue(moduleType, out var tcs) ? tcs.Task : null;
127109
}
128110

129111
public void SetException(Type moduleType, Exception exception)
130112
{
131-
lock (_lock)
113+
// Only set exception if module was previously registered (preserves original behavior)
114+
if (_completionSources.TryGetValue(moduleType, out var tcs))
132115
{
133-
if (_completionSources.TryGetValue(moduleType, out var tcs))
134-
{
135-
tcs.TrySetException(exception);
136-
}
116+
tcs.TrySetException(exception);
137117
}
138118
}
139119

140120
public void RegisterResult(Type moduleType, IModuleResult result)
141121
{
142-
lock (_lock)
143-
{
144-
_results[moduleType] = result;
145-
146-
if (_completionSources.TryGetValue(moduleType, out var tcs))
147-
{
148-
tcs.TrySetResult(result);
149-
}
150-
}
122+
// Store result first, then signal completion
123+
// TrySetResult provides release semantics, ensuring _results write is visible to awaiters
124+
_results[moduleType] = result;
125+
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
126+
tcs.TrySetResult(result);
151127
}
152128
}

0 commit comments

Comments
 (0)