try to fix intermittent test failure in HubSpec#2943
Open
pjfanning wants to merge 7 commits intoapache:mainfrom
Open
try to fix intermittent test failure in HubSpec#2943pjfanning wants to merge 7 commits intoapache:mainfrom
pjfanning wants to merge 7 commits intoapache:mainfrom
Conversation
Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/3b8db3b8-fb93-4089-a882-327922405f8f Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/25fead82-6b9d-4995-9cdd-83a627a9c572 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/25fead82-6b9d-4995-9cdd-83a627a9c572 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
The test 'ensure that subsequent consumers see subsequent elements without gap' was flaky because it cancelled the first consumer via a callback that fires INSIDE RegistrationPending processing. The cancel is asynchronous, so UnRegister arrives at the hub AFTER RegistrationPending has already read a stale head value, causing consumer 2 to start from the wrong offset. Rewrite the test to let consumer 1 complete naturally via take(10). Pekko's GraphStageLogic.internalCompleteStage processes inlets (cancel) before outlets (complete), so the hub's UnRegister is enqueued in the hub actor's mailbox before Sink.seq's postStop resolves the future. Consumer 2's RegistrationPending is only sent after futureValue returns, guaranteeing the hub processes UnRegister → head advanced before RegistrationPending. Remove now-unused imports: AtomicInteger, AtomicReference, @nowarn. No production code changes. Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/0ed4c2ab-7f82-4e08-85af-f4b061725c67 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
relates to #2942
HubSpecintermittently failed because the original assertion relied on a timing-sensitive consumer handoff. Depending on scheduling, the second consumer could attach before the first consumer’s shutdown had been fully observed, leading to overlapping ranges in the test.Root cause of flakiness: the old test called
cancel()on consumer 1 from insideregistrationPendingCallback, which fires whileRegistrationPendingis being processed. Becausecancel()is asynchronous in Pekko Streams, the resultingUnRegisterarrives at the hub afterRegistrationPendinghas already read a stalehead, so consumer 2 started from the wrong offset.Fix: let consumer 1 complete naturally via
take(10).runWith(Sink.seq).futureValue. The ordering guarantee comes directly fromGraphStageLogic.internalCompleteStage, which cancels inlets before completing outlets — so the hub'sUnRegisteris enqueued in the hub actor's mailbox beforeSink.seq'spostStopresolves the future. Consumer 2'sRegistrationPendingis only sent afterfutureValuereturns, ensuring the hub always processesUnRegister(advancinghead) first. Nosynchronized, noReentrantLock, no production changes.