perf: Replace Dictionary+lock with ConcurrentDictionary in ModuleResultRegistry#1726
Conversation
…ltRegistry (#1540) Use ConcurrentDictionary for lock-free reads and thread-safe writes, reducing lock contention during parallel module execution. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryReplaces lock-based synchronization with ConcurrentDictionary in ModuleResultRegistry for improved performance. Critical IssuesRace condition in RegisterResult methods - The refactored code has a race condition between writing to public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
_results[moduleType] = result; // Write 1
if (_completionSources.TryGetValue(moduleType, out var tcs)) // Read 2
{
tcs.TrySetResult(result); // Write 3
}
}Problem: If a module calls Why this matters: While unlikely in the current codebase flow, this violates the semantic contract of the registry - registering a result should always signal completion if there's a waiter, regardless of call ordering. Fix: Use public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
_results[moduleType] = result;
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetResult(result);
}Apply the same fix to both Note: The SuggestionsNone - the lock-free approach is a good performance improvement once the race condition is fixed. Verdict |
There was a problem hiding this comment.
Pull request overview
This PR replaces the lock-based synchronization in ModuleResultRegistry with ConcurrentDictionary to reduce lock contention during parallel module execution. The change removes the _lock object and lock statements, converting _results and _completionSources to concurrent dictionaries for lock-free read operations.
Key changes:
- Replaced
Dictionary<Type, object>withConcurrentDictionary<Type, object>for both_resultsand_completionSources - Removed all
lockstatements and the_lockobject - Updated
RegisterModuleto useGetOrAddfor atomic module registration
| public void RegisterResult(Type moduleType, IModuleResult result) | ||
| { | ||
| lock (_lock) | ||
| { | ||
| _results[moduleType] = result; | ||
| _results[moduleType] = result; | ||
|
|
||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| tcs.TrySetResult(result); | ||
| } | ||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| tcs.TrySetResult(result); | ||
| } | ||
| } |
There was a problem hiding this comment.
This RegisterResult method has the same race condition as the generic version. Between storing the result in line 122 and attempting to signal the TaskCompletionSource in lines 124-127, another thread could call RegisterModule and create a new TCS that never gets signaled. This would cause modules waiting on GetCompletionTask to hang indefinitely.
The fix should ensure atomicity between result registration and TCS signaling, similar to what the original lock-based implementation provided.
| public void SetException(Type moduleType, Exception exception) | ||
| { | ||
| lock (_lock) | ||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| tcs.TrySetException(exception); | ||
| } | ||
| tcs.TrySetException(exception); | ||
| } | ||
| } |
There was a problem hiding this comment.
The SetException method has a similar race condition issue. If RegisterModule is called after TryGetValue but before or during TrySetException, a new TaskCompletionSource could be created that never gets the exception set. This would cause any code waiting on GetCompletionTask to hang.
Consider using a similar fix as for RegisterResult to ensure atomicity of operations on the TaskCompletionSource.
| public void RegisterResult<T>(Type moduleType, ModuleResult<T> result) | ||
| { | ||
| lock (_lock) | ||
| { | ||
| _results[moduleType] = result; | ||
| _results[moduleType] = result; | ||
|
|
||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| tcs.TrySetResult(result); | ||
| } | ||
| if (_completionSources.TryGetValue(moduleType, out var tcs)) | ||
| { | ||
| tcs.TrySetResult(result); | ||
| } | ||
| } |
There was a problem hiding this comment.
The RegisterResult methods have a race condition. Between storing the result in line 79 (or 122) and attempting to signal the TaskCompletionSource in lines 81-84 (or 124-127), another thread could call RegisterModule and create a new TCS that never gets signaled. This violates the coordination contract where modules waiting on GetCompletionTask would hang indefinitely.
The original lock-based implementation prevented this by atomically performing both operations. With ConcurrentDictionary, you need to ensure the TCS exists before or atomically with storing the result. Consider using GetOrAdd to ensure a TCS exists, or rethink the approach to make result registration and TCS signaling atomic.
…nt race condition Ensures TCS is created atomically when registering results/exceptions, preventing race conditions where TryGetValue might miss a not-yet-registered module entry. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryReplaces lock-based synchronization in ModuleResultRegistry with ConcurrentDictionary for lock-free reads and thread-safe writes. Critical IssuesRace condition in RegisterResult and SetException methods - src/ModularPipelines/Engine/IModuleResultRegistry.cs:81-82, 111-112, 119-120 In the new implementation, there's a problematic pattern: _results[moduleType] = result;
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetResult(result);The issue: If a module has never been registered via Sequence that causes the race:
The old lock-based implementation prevented this by ensuring atomic read-modify-write operations. Fix required: The TaskCompletionSource must be created before or atomically with storing the result. Consider: var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
_results[moduleType] = result;
tcs.TrySetResult(result);This ensures the TCS exists before the result is visible, preventing the race condition in SuggestionsNone - addressing the critical issue is the priority. Previous Review StatusUnable to retrieve previous comments due to API permissions, but the race condition identified is a blocking issue. Verdict |
Create TCS before storing result to ensure GetCompletionTask never returns null when a result already exists. This prevents a race where: 1. Thread A stores result 2. Thread B calls GetCompletionTask, gets null (TCS doesn't exist) 3. Thread A creates TCS 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryThis PR replaces explicit locking with ConcurrentDictionary for lock-free reads in ModuleResultRegistry. Critical Issues1. Memory Visibility Race ConditionThe new implementation has a critical race condition in RegisterResult methods at src/ModularPipelines/Engine/IModuleResultRegistry.cs:80-83, 117-120. The problem: When TrySetResult completes the task, waiting threads wake up and may call GetResult. However, there is no memory barrier guaranteeing they will see the write to _results[moduleType]. Even though ConcurrentDictionary is thread-safe, the ordering between these operations is not guaranteed without proper synchronization. Scenario:
Fix needed: Ensure the write to _results happens-before the TCS completion. The original lock-based approach guaranteed this. Possible solutions:
2. SetException Has Same IssueThe SetException method has the same pattern and vulnerability at src/ModularPipelines/Engine/IModuleResultRegistry.cs:109-111. VerdictREQUEST CHANGES - Critical memory visibility race condition that could cause intermittent bugs where GetResult returns null even after the completion task finishes. |
…etion Adds Thread.MemoryBarrier() between writing to _results and calling TrySetResult. This ensures that threads waking up from task completion will see the _results write, preventing a potential memory visibility race condition on weak memory model architectures. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryThis PR refactors ModuleResultRegistry from lock-based synchronization to ConcurrentDictionary for lock-free reads and thread-safe writes. Critical Issues1. Memory Visibility Race Condition ❌The memory barrier placement does not guarantee correct visibility. Consider this scenario: Thread A (RegisterResult): var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
_results[moduleType] = result; // Write to _results
Thread.MemoryBarrier();
tcs.TrySetResult(result); // Signal completionThread B (waiting consumer): var task = GetCompletionTask(moduleType); // Gets TCS.Task
await task; // Task completes
var result = GetResult<T>(moduleType); // Reads _resultsProblem: ConcurrentDictionary only guarantees atomicity of individual operations, not cross-dictionary visibility. When Thread A signals the TCS, Thread B's await completes, but Thread B may not see the write to _results because:
Fix: The write to _results must happen before getting/creating the TCS, and you need to rely on TCS's built-in memory barriers: public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
_results[moduleType] = result; // Write first
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetResult(result); // TCS provides proper memory barriers
}When a consumer awaits the TCS task, the task completion's release semantics will ensure they see the prior write to _results. 2. Inconsistent Null Handling in GetCompletionTask
|
- Remove explicit Thread.MemoryBarrier() - TrySetResult already provides release semantics ensuring _results write is visible to awaiters - Change GetCompletionTask to use GetOrAdd for consistency with SetException (both now create TCS on-demand) - Update interface: GetCompletionTask now returns Task instead of Task? since it always returns a valid task 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryRefactors ModuleResultRegistry to use ConcurrentDictionary instead of explicit locks for improved performance. Critical Issues1. Behavioral Change in SetException MethodLocation: src/ModularPipelines/Engine/IModuleResultRegistry.cs:113-116 Old behavior: lock (_lock)
{
if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetException(exception);
}
}New behavior: var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetException(exception);Issue: The old code only set exception if a TaskCompletionSource already existed (module was previously registered). The new code creates a TCS on-demand via GetOrAdd, changing the behavior. Impact: If SetException is called for a module that was never registered with RegisterModule, the old code would silently do nothing. The new code creates a completion task. This could affect error handling semantics. Question: Is this behavioral change intentional? Should SetException create completion sources for unregistered modules? 2. Race Condition Between RegisterResult and SetExceptionIssue: There is a potential race condition where both RegisterResult and SetException could be called for the same module concurrently. Since both use TrySetResult/TrySetException, only one will succeed, but the code does not check return values. Scenario:
Current behavior: The TCS state and _results dictionary could be inconsistent (result stored but TCS faulted, or TCS completed successfully but no result stored if exception came first). Note: This issue also existed in the old lock-based code, so it is not a regression, but the refactoring makes it more likely since operations are not serialized by locks. 3. Using Statement PlacementLocation: Line 1 The PR adds using System.Collections.Concurrent; but this import is only needed in the implementation class, not the interface file. Consider verifying the placement is intentional. SuggestionsNone beyond the critical issues above. Verdict
|
…ompletionTask Revert behavioral changes to maintain backward compatibility: - SetException: Only sets exception if module was previously registered (does nothing for unregistered modules, matching original behavior) - GetCompletionTask: Returns null for unregistered modules (matching original behavior) RegisterResult methods still use GetOrAdd since registering a result implies the module should be tracked. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
SummaryThis PR refactors ModuleResultRegistry from lock-based synchronization to ConcurrentDictionary for lock-free reads and improved parallel module execution performance. Critical IssuesNone found ✅ The latest version addresses the previously identified concerns:
Suggestions1. Consider Performance TestingThe comment in the PR asks about whether this optimization is measurable. Lock-free does not always mean faster, especially for low-contention scenarios. Since ModuleResultRegistry operations are not typically hot-path (modules execute once during pipeline execution), consider:
If no performance issue was measured in profiling, the added complexity may not provide meaningful benefits. 2. Document Thread-Safety GuaranteesConsider adding XML doc comments to the class explaining:
3. Minor: Using Statement PlacementThe Previous Review StatusFive previous reviews identified critical issues around:
All critical issues from previous reviews have been addressed in the current implementation. Verdict✅ APPROVE - No critical issues The implementation is now correct and thread-safe. The suggestions are about weighing complexity vs. performance benefits, which is a valid architectural discussion but not blocking. |
Summary
Dictionary<Type, object>+ explicitlockwithConcurrentDictionary<Type, object>GetOrAddfor atomic module registrationChanges
_resultsand_completionSourcesnow useConcurrentDictionary_lockobject and alllockstatementsFixes #1540
Test plan
🤖 Generated with Claude Code