Skip to content

perf: Replace Dictionary+lock with ConcurrentDictionary in ModuleResultRegistry#1726

Merged
thomhurst merged 6 commits into
mainfrom
fix/1540-concurrent-dictionary
Jan 1, 2026
Merged

perf: Replace Dictionary+lock with ConcurrentDictionary in ModuleResultRegistry#1726
thomhurst merged 6 commits into
mainfrom
fix/1540-concurrent-dictionary

Conversation

@thomhurst
Copy link
Copy Markdown
Owner

Summary

  • Replaces Dictionary<Type, object> + explicit lock with ConcurrentDictionary<Type, object>
  • Removes lock contention during parallel module execution
  • Uses GetOrAdd for atomic module registration

Changes

  • _results and _completionSources now use ConcurrentDictionary
  • Removed _lock object and all lock statements
  • Read operations are now lock-free

Fixes #1540

Test plan

  • Existing unit tests pass
  • Parallel module execution works correctly

🤖 Generated with Claude Code

…ltRegistry (#1540)

Use ConcurrentDictionary for lock-free reads and thread-safe writes,
reducing lock contention during parallel module execution.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings January 1, 2026 17:47
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

Replaces lock-based synchronization with ConcurrentDictionary in ModuleResultRegistry for improved performance.

Critical Issues

Race condition in RegisterResult methods - src/ModularPipelines/Engine/IModuleResultRegistry.cs:83-91, 140-151

The refactored code has a race condition between writing to _results and setting the TaskCompletionSource:

public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
    _results[moduleType] = result;  // Write 1
    
    if (_completionSources.TryGetValue(moduleType, out var tcs))  // Read 2
    {
        tcs.TrySetResult(result);  // Write 3
    }
}

Problem: If a module calls RegisterResult before RegisterModule is called (or if they execute concurrently), the result gets stored but the TCS never gets signaled because it doesn't exist yet. Any code waiting on GetCompletionTask would wait forever.

Why this matters: While unlikely in the current codebase flow, this violates the semantic contract of the registry - registering a result should always signal completion if there's a waiter, regardless of call ordering.

Fix: Use GetOrAdd to ensure the TCS exists before setting the result:

public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
    _results[moduleType] = result;
    
    var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
    tcs.TrySetResult(result);
}

Apply the same fix to both RegisterResult methods (lines 82-93 and 140-151).


Note: The RegisterModule refactor using GetOrAdd is correct and thread-safe.

Suggestions

None - the lock-free approach is a good performance improvement once the race condition is fixed.

Verdict

⚠️ REQUEST CHANGES - Critical race condition must be fixed

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the lock-based synchronization in ModuleResultRegistry with ConcurrentDictionary to reduce lock contention during parallel module execution. The change removes the _lock object and lock statements, converting _results and _completionSources to concurrent dictionaries for lock-free read operations.

Key changes:

  • Replaced Dictionary<Type, object> with ConcurrentDictionary<Type, object> for both _results and _completionSources
  • Removed all lock statements and the _lock object
  • Updated RegisterModule to use GetOrAdd for atomic module registration

Comment on lines 120 to 128
public void RegisterResult(Type moduleType, IModuleResult result)
{
lock (_lock)
{
_results[moduleType] = result;
_results[moduleType] = result;

if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetResult(result);
}
if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetResult(result);
}
}
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This RegisterResult method has the same race condition as the generic version. Between storing the result in line 122 and attempting to signal the TaskCompletionSource in lines 124-127, another thread could call RegisterModule and create a new TCS that never gets signaled. This would cause modules waiting on GetCompletionTask to hang indefinitely.

The fix should ensure atomicity between result registration and TCS signaling, similar to what the original lock-based implementation provided.

Copilot uses AI. Check for mistakes.
Comment on lines 112 to 118
public void SetException(Type moduleType, Exception exception)
{
lock (_lock)
if (_completionSources.TryGetValue(moduleType, out var tcs))
{
if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetException(exception);
}
tcs.TrySetException(exception);
}
}
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SetException method has a similar race condition issue. If RegisterModule is called after TryGetValue but before or during TrySetException, a new TaskCompletionSource could be created that never gets the exception set. This would cause any code waiting on GetCompletionTask to hang.

Consider using a similar fix as for RegisterResult to ensure atomicity of operations on the TaskCompletionSource.

Copilot uses AI. Check for mistakes.
Comment on lines 77 to 85
public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
lock (_lock)
{
_results[moduleType] = result;
_results[moduleType] = result;

if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetResult(result);
}
if (_completionSources.TryGetValue(moduleType, out var tcs))
{
tcs.TrySetResult(result);
}
}
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RegisterResult methods have a race condition. Between storing the result in line 79 (or 122) and attempting to signal the TaskCompletionSource in lines 81-84 (or 124-127), another thread could call RegisterModule and create a new TCS that never gets signaled. This violates the coordination contract where modules waiting on GetCompletionTask would hang indefinitely.

The original lock-based implementation prevented this by atomically performing both operations. With ConcurrentDictionary, you need to ensure the TCS exists before or atomically with storing the result. Consider using GetOrAdd to ensure a TCS exists, or rethink the approach to make result registration and TCS signaling atomic.

Copilot uses AI. Check for mistakes.
…nt race condition

Ensures TCS is created atomically when registering results/exceptions,
preventing race conditions where TryGetValue might miss a not-yet-registered
module entry.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

Replaces lock-based synchronization in ModuleResultRegistry with ConcurrentDictionary for lock-free reads and thread-safe writes.

Critical Issues

Race condition in RegisterResult and SetException methods - src/ModularPipelines/Engine/IModuleResultRegistry.cs:81-82, 111-112, 119-120

In the new implementation, there's a problematic pattern:

_results[moduleType] = result;
var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetResult(result);

The issue: If a module has never been registered via RegisterModule, the TaskCompletionSource is created after the result is stored. Between these two operations, another thread could call GetCompletionTask and get null, even though the result is already available.

Sequence that causes the race:

  1. Thread A: Calls RegisterResult → stores result in _results
  2. Thread B: Calls GetCompletionTask → gets null (TCS doesn't exist yet)
  3. Thread A: Creates TCS via GetOrAdd → sets result
  4. Thread B: Never gets notified because it received null

The old lock-based implementation prevented this by ensuring atomic read-modify-write operations.

Fix required: The TaskCompletionSource must be created before or atomically with storing the result. Consider:

var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
_results[moduleType] = result;
tcs.TrySetResult(result);

This ensures the TCS exists before the result is visible, preventing the race condition in GetCompletionTask.

Suggestions

None - addressing the critical issue is the priority.

Previous Review Status

Unable to retrieve previous comments due to API permissions, but the race condition identified is a blocking issue.

Verdict

⚠️ REQUEST CHANGES - Critical race condition must be fixed

Create TCS before storing result to ensure GetCompletionTask never
returns null when a result already exists. This prevents a race where:
1. Thread A stores result
2. Thread B calls GetCompletionTask, gets null (TCS doesn't exist)
3. Thread A creates TCS

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

This PR replaces explicit locking with ConcurrentDictionary for lock-free reads in ModuleResultRegistry.

Critical Issues

1. Memory Visibility Race Condition

The new implementation has a critical race condition in RegisterResult methods at src/ModularPipelines/Engine/IModuleResultRegistry.cs:80-83, 117-120.

The problem: When TrySetResult completes the task, waiting threads wake up and may call GetResult. However, there is no memory barrier guaranteeing they will see the write to _results[moduleType]. Even though ConcurrentDictionary is thread-safe, the ordering between these operations is not guaranteed without proper synchronization.

Scenario:

  • Thread A: Writes _results[moduleType] = result
  • Thread A: Calls tcs.TrySetResult(result) which completes the task
  • Thread B: Task completes, immediately calls GetResult(moduleType)
  • Thread B: May not see Thread A write due to CPU cache/memory reordering

Fix needed: Ensure the write to _results happens-before the TCS completion. The original lock-based approach guaranteed this. Possible solutions:

  1. Add Thread.MemoryBarrier() between the write and TrySetResult
  2. Store result in TCS itself and read from completed task
  3. Use proper memory ordering

2. SetException Has Same Issue

The SetException method has the same pattern and vulnerability at src/ModularPipelines/Engine/IModuleResultRegistry.cs:109-111.

Verdict

REQUEST CHANGES - Critical memory visibility race condition that could cause intermittent bugs where GetResult returns null even after the completion task finishes.

…etion

Adds Thread.MemoryBarrier() between writing to _results and calling
TrySetResult. This ensures that threads waking up from task completion
will see the _results write, preventing a potential memory visibility
race condition on weak memory model architectures.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

This PR refactors ModuleResultRegistry from lock-based synchronization to ConcurrentDictionary for lock-free reads and thread-safe writes.

Critical Issues

1. Memory Visibility Race Condition ❌

The memory barrier placement does not guarantee correct visibility. Consider this scenario:

Thread A (RegisterResult):

var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
_results[moduleType] = result;  // Write to _results
Thread.MemoryBarrier();
tcs.TrySetResult(result);       // Signal completion

Thread B (waiting consumer):

var task = GetCompletionTask(moduleType);  // Gets TCS.Task
await task;                                 // Task completes
var result = GetResult<T>(moduleType);     // Reads _results

Problem: ConcurrentDictionary only guarantees atomicity of individual operations, not cross-dictionary visibility. When Thread A signals the TCS, Thread B's await completes, but Thread B may not see the write to _results because:

  1. The memory barrier in Thread A only orders operations within Thread A
  2. ConcurrentDictionary writes to _results do not create acquire/release semantics with reads from _results in another thread
  3. The TCS completion (which does have proper memory barriers) does not synchronize with reads from _results

Fix: The write to _results must happen before getting/creating the TCS, and you need to rely on TCS's built-in memory barriers:

public void RegisterResult<T>(Type moduleType, ModuleResult<T> result)
{
    _results[moduleType] = result;  // Write first
    var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
    tcs.TrySetResult(result);  // TCS provides proper memory barriers
}

When a consumer awaits the TCS task, the task completion's release semantics will ensure they see the prior write to _results.

2. Inconsistent Null Handling in GetCompletionTask ⚠️

GetCompletionTask returns null if the module is not registered, but SetException creates a TCS on-demand with GetOrAdd. This creates an inconsistency:

  • If SetException is called before RegisterModule, it creates a TCS
  • If GetCompletionTask is called for an unregistered module, it returns null
  • These do not compose well together

Consider what happens if code assumes GetCompletionTask returns null only for unregistered modules, but SetException was already called.

Recommendation: Either:

  1. Make GetCompletionTask also use GetOrAdd to create TCS on-demand (consistent lazy creation), OR
  2. Make SetException check for null and log/throw if the module was not registered (strict registration requirement)

Suggestions

Remove Unnecessary Memory Barriers

If you fix the ordering issue above, the explicit Thread.MemoryBarrier() calls are unnecessary and should be removed. TaskCompletionSource already provides the necessary memory barriers.

Consider Performance Testing

Lock-free does not always mean faster. For this use case (likely low contention, mostly writes during module execution), the lock might have been fine. Consider:

  • The original lock-based code was simpler and obviously correct
  • ConcurrentDictionary has overhead for its lock-free operations
  • This is not a hot path (modules execute once)

If there is no measured performance problem, the added complexity may not be worth it.

Verdict

⚠️ REQUEST CHANGES - Critical memory visibility race condition must be fixed

- Remove explicit Thread.MemoryBarrier() - TrySetResult already provides
  release semantics ensuring _results write is visible to awaiters
- Change GetCompletionTask to use GetOrAdd for consistency with SetException
  (both now create TCS on-demand)
- Update interface: GetCompletionTask now returns Task instead of Task?
  since it always returns a valid task

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

Refactors ModuleResultRegistry to use ConcurrentDictionary instead of explicit locks for improved performance.

Critical Issues

1. Behavioral Change in SetException Method

Location: src/ModularPipelines/Engine/IModuleResultRegistry.cs:113-116

Old behavior:

lock (_lock)
{
    if (_completionSources.TryGetValue(moduleType, out var tcs))
    {
        tcs.TrySetException(exception);
    }
}

New behavior:

var tcs = _completionSources.GetOrAdd(moduleType, _ => new TaskCompletionSource<object?>());
tcs.TrySetException(exception);

Issue: The old code only set exception if a TaskCompletionSource already existed (module was previously registered). The new code creates a TCS on-demand via GetOrAdd, changing the behavior.

Impact: If SetException is called for a module that was never registered with RegisterModule, the old code would silently do nothing. The new code creates a completion task. This could affect error handling semantics.

Question: Is this behavioral change intentional? Should SetException create completion sources for unregistered modules?

2. Race Condition Between RegisterResult and SetException

Issue: There is a potential race condition where both RegisterResult and SetException could be called for the same module concurrently. Since both use TrySetResult/TrySetException, only one will succeed, but the code does not check return values.

Scenario:

  1. Thread A calls RegisterResult(moduleType, result)
  2. Thread B calls SetException(moduleType, exception) concurrently
  3. Whichever Try* method executes first wins
  4. The result is stored in _results but the TCS might be in a faulted state (or vice versa)

Current behavior: The TCS state and _results dictionary could be inconsistent (result stored but TCS faulted, or TCS completed successfully but no result stored if exception came first).

Note: This issue also existed in the old lock-based code, so it is not a regression, but the refactoring makes it more likely since operations are not serialized by locks.

3. Using Statement Placement

Location: Line 1

The PR adds using System.Collections.Concurrent; but this import is only needed in the implementation class, not the interface file. Consider verifying the placement is intentional.

Suggestions

None beyond the critical issues above.

Verdict

⚠️ REQUEST CHANGES - Critical behavioral changes need verification:

  1. Confirm the SetException behavior change (creating TCS for unregistered modules) is intentional
  2. Consider if concurrent RegisterResult/SetException calls are possible and document expected behavior
  3. Verify the using statement placement

…ompletionTask

Revert behavioral changes to maintain backward compatibility:
- SetException: Only sets exception if module was previously registered
  (does nothing for unregistered modules, matching original behavior)
- GetCompletionTask: Returns null for unregistered modules
  (matching original behavior)

RegisterResult methods still use GetOrAdd since registering a result
implies the module should be tracked.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@thomhurst
Copy link
Copy Markdown
Owner Author

Summary

This PR refactors ModuleResultRegistry from lock-based synchronization to ConcurrentDictionary for lock-free reads and improved parallel module execution performance.

Critical Issues

None found ✅

The latest version addresses the previously identified concerns:

  • RegisterResult methods now use GetOrAdd to ensure TaskCompletionSource exists before signaling, preventing race conditions
  • Memory ordering is correctly handled via TaskCompletionSource's built-in memory barriers (when TrySetResult is called, it provides release semantics that ensure the write to _results is visible to awaiters)
  • SetException preserves original behavior by checking existence before setting exception
  • The comment correctly notes that "TrySetResult provides release semantics, ensuring _results write is visible to awaiters"

Suggestions

1. Consider Performance Testing

The comment in the PR asks about whether this optimization is measurable. Lock-free does not always mean faster, especially for low-contention scenarios. Since ModuleResultRegistry operations are not typically hot-path (modules execute once during pipeline execution), consider:

  • Measuring actual contention and performance impact
  • The original lock-based code was simpler and obviously correct
  • ConcurrentDictionary has overhead for its thread-safety guarantees

If no performance issue was measured in profiling, the added complexity may not provide meaningful benefits.

2. Document Thread-Safety Guarantees

Consider adding XML doc comments to the class explaining:

  • That all methods are thread-safe
  • The memory ordering guarantees (particularly for RegisterResult → GetResult patterns)
  • That GetCompletionTask may return null for unregistered modules

3. Minor: Using Statement Placement

The using System.Collections.Concurrent; is added at line 1, but it's only used in the implementation class, not the interface. This is fine but worth noting.

Previous Review Status

Five previous reviews identified critical issues around:

  1. Race conditions in RegisterResult (FIXED - now uses GetOrAdd)
  2. Memory visibility concerns (ADDRESSED - relies on TCS memory barriers)
  3. SetException behavioral change (FIXED - preserves original behavior with TryGetValue check)
  4. Operation ordering (FIXED - result stored before TCS creation and signaling)

All critical issues from previous reviews have been addressed in the current implementation.

Verdict

APPROVE - No critical issues

The implementation is now correct and thread-safe. The suggestions are about weighing complexity vs. performance benefits, which is a valid architectural discussion but not blocking.

@thomhurst thomhurst merged commit c4a95b8 into main Jan 1, 2026
11 of 12 checks passed
@thomhurst thomhurst deleted the fix/1540-concurrent-dictionary branch January 1, 2026 21:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Performance: ModuleResultRegistry uses Dictionary with lock instead of ConcurrentDictionary

2 participants