zenoh-ext: deduplicate FetchingSubscriber liveliness samples on startup#2564
Open
aki1770-del wants to merge 1 commit into
Open
Conversation
…-zenoh#2523) Add an `untimestamped_keys: HashSet<(String, u8)>` to `MergeQueue` that tracks (key_expr, SampleKind) pairs already buffered via the fetch path (no timestamp). When the live subscription delivers the same logical event with a synthetic timestamp during the fetch window, the duplicate is dropped before it enters the `timestamped` BTreeMap. Root cause: the fetch path for liveliness tokens produces samples without timestamps, which are stored in `untimestamped: VecDeque<Sample>` with no dedup. The concurrent live subscription assigns a synthetic `SystemTime::now()` timestamp to every sample during the fetch window, so the same event ends up in both `untimestamped` and `timestamped`, and `drain()` delivers both. The fix is contained to `MergeQueue::push` and `MergeQueue::drain`. The guard is active only while the merge window is open; after `drain()` clears `untimestamped_keys` all future live deliveries pass through normally, so legitimate re-declarations are not suppressed. Adds `test_liveliness_fetching_subscriber_no_duplicates` to `zenoh-ext/tests/liveliness.rs` to reproduce the race and verify it is fixed: a token declared before the subscriber is created must be delivered exactly once. Co-Authored-By: Claude and aki1770-del <aki1770@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #2523.
Summary
untimestamped_keys: HashSet<(String, u8)>toMergeQueueinzenoh-ext/src/querying_subscriber.rsto track(key_expr, SampleKind)pairs buffered via the fetch path.timestampedBTreeMap.test_liveliness_fetching_subscriber_no_duplicatestozenoh-ext/tests/liveliness.rs.Root Cause (Existing Behavior Audit)
MergeQueueuses two internal stores:untimestamped: VecDeque<Sample>SystemTime::now()timestamptimestamped: BTreeMap<Timestamp, Sample>A liveliness token that exists before the subscriber is created produces a fetch reply with no timestamp (→
untimestamped). The same token is also delivered by the live subscription with a freshly-minted synthetic timestamp (→timestamped).drain()delivers both, so the caller receives two identicalSampleKind::Putevents for the same key expression with no way to distinguish them.See
sub_callbackinFetchingSubscriber::new(lines 852–875): live samples always receive a synthetic timestamp during the fetch window, so they always go totimestamped, never tountimestamped. The cross-queue duplicate can only arise when a fetch reply has no timestamp.Fix
untimestamped_keysis cleared indrain()so re-fetches work correctly and post-fetch live deliveries are unaffected (they go directly to the callback whenpending_fetches == 0, bypassingMergeQueueentirely).Scope Classification
(a) Bug fix — change is contained to
MergeQueue::{push, drain}withinzenoh-ext; no API changes.Prior Art
FetchingSubscriber+ liveliness integration (2023-04-04); the merge path that contains this bug was introduced here without dedup logic.FetchingSubscriberdedup must be scoped to the merge window only (not indefinitely), which this fix respects viauntimestamped_keys.clear()ondrain().Test Evidence (OPS-RULE-011)
test_liveliness_fetching_subscriber_no_duplicatesinzenoh-ext/tests/liveliness.rs:FetchingSubscriberwithliveliness().get()as the fetch source.counts[(key, Put)] == 1).Test uses UDP peer mode on
localhost:47453; no router required.AI-assisted — authored with Claude, reviewed by Komada.
🏷️ Label-Based Checklist
Based on the labels applied to this PR, please complete these additional requirements:
Labels:
bug🐛 Bug Fix Requirements
Since this PR is labeled as a bug fix, please ensure:
Why this matters: Bugs without tests often reoccur.
Instructions:
- [ ]to- [x])This checklist updates automatically when labels change, but preserves your checked boxes.