Fix race condition in ExpireAfter when items are removed before expiration fires#1076
Fix race condition in ExpireAfter when items are removed before expiration fires#1076dwcullop wants to merge 1 commit intoreactivemarbles:mainfrom
Conversation
…tion fires The expiration timer callback now checks that the item still exists and still has a pending expiration before attempting removal. Prevents InvalidOperationException when items are concurrently removed or updated between when the expiration was scheduled and when it fires. Also uses |= to accumulate expiration changes correctly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Fixes a concurrency edge case in ExpireAfter.ForSource where an expiration-management callback could assume an item still exists in the source cache, leading to exceptions under concurrent remove/update activity.
Changes:
- Guard expiration removals by verifying the key still exists in the source before dereferencing
Lookup().Value. - Add an additional “still expiring” check before removing an item during expiration management.
- Fix expiration-change aggregation by using
|=when accumulatingTrySetExpiration(...)results across a changeset.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (lookup.HasValue && _timeSelector.Invoke(lookup.Value) is not null) | ||
| { | ||
| _removedItemsBuffer.Add(new( | ||
| key: proposedExpiration.Key, | ||
| value: lookup.Value)); | ||
|
|
||
| updater.RemoveKey(proposedExpiration.Key); | ||
| } |
There was a problem hiding this comment.
_timeSelector is user-provided and can throw. Calling _timeSelector.Invoke(lookup.Value) inside the scheduled expiration-management path (OnEditingSource via ManageExpirations) is currently unguarded (no try/catch here or around _source.Edit(_onEditingSource)), so an exception would escape the scheduler callback instead of being propagated via _observer.OnError (as OnSourceNext does). Consider either avoiding timeSelector evaluation here (rely on internal expiration state), or wrapping the invocation/removal block to cancel further scheduling and forward the error to _observer.OnError consistently.
| if (lookup.HasValue && _timeSelector.Invoke(lookup.Value) is not null) | |
| { | |
| _removedItemsBuffer.Add(new( | |
| key: proposedExpiration.Key, | |
| value: lookup.Value)); | |
| updater.RemoveKey(proposedExpiration.Key); | |
| } | |
| try | |
| { | |
| if (lookup.HasValue && _timeSelector.Invoke(lookup.Value) is not null) | |
| { | |
| _removedItemsBuffer.Add(new( | |
| key: proposedExpiration.Key, | |
| value: lookup.Value)); | |
| updater.RemoveKey(proposedExpiration.Key); | |
| } | |
| } | |
| catch (Exception error) | |
| { | |
| _observer.OnError(error); | |
| return; | |
| } |
Description
Fixes a race condition in
ExpireAfter.ForSourcewhere the expiration timer callback assumes the item still exists in the source cache. When an item is removed or updated by another thread between when the expiration was scheduled and when it fires, the callback would throw InvalidOperationException fromLookup().Valueon a missing key.Root Cause
The timer callback in
OnEditingSourceunconditionally called:Fix
Lookup()is checked forHasValuebefore accessing.ValuehaveExpirationsChanged |= TrySetExpiration(...)uses |= instead of = to correctly accumulate changes across multiple items in a changeset (previously, only the last item's result was kept)Impact
This race was reproducible under moderate concurrent load. A stress test calling
AddOrUpdateandRemoveKeyon aSourceCachewithExpireAfterfrom multiple threads would intermittently throw. The fix is minimal and surgical: two guard checks and one operator change.