Skip to content

zenoh-ext: deduplicate FetchingSubscriber liveliness samples on startup#2564

Open
aki1770-del wants to merge 1 commit into
eclipse-zenoh:mainfrom
aki1770-del:fix/fetching-subscriber-liveliness-dedup-2523
Open

zenoh-ext: deduplicate FetchingSubscriber liveliness samples on startup#2564
aki1770-del wants to merge 1 commit into
eclipse-zenoh:mainfrom
aki1770-del:fix/fetching-subscriber-liveliness-dedup-2523

Conversation

@aki1770-del
Copy link
Copy Markdown

@aki1770-del aki1770-del commented Apr 11, 2026

Fixes #2523.

Summary

  • Add untimestamped_keys: HashSet<(String, u8)> to MergeQueue in zenoh-ext/src/querying_subscriber.rs to track (key_expr, SampleKind) pairs buffered via the fetch path.
  • When the live subscription delivers the same key+kind with a synthetic timestamp during the fetch window, drop the duplicate before it enters the timestamped BTreeMap.
  • Add regression test test_liveliness_fetching_subscriber_no_duplicates to zenoh-ext/tests/liveliness.rs.

Root Cause (Existing Behavior Audit)

MergeQueue uses two internal stores:

Path Storage Dedup
Fetch reply — no timestamp untimestamped: VecDeque<Sample> none
Live sub during fetch — synthetic SystemTime::now() timestamp timestamped: BTreeMap<Timestamp, Sample> by exact timestamp

A liveliness token that exists before the subscriber is created produces a fetch reply with no timestamp (→ untimestamped). The same token is also delivered by the live subscription with a freshly-minted synthetic timestamp (→ timestamped). drain() delivers both, so the caller receives two identical SampleKind::Put events for the same key expression with no way to distinguish them.

See sub_callback in FetchingSubscriber::new (lines 852–875): live samples always receive a synthetic timestamp during the fetch window, so they always go to timestamped, never to untimestamped. The cross-queue duplicate can only arise when a fetch reply has no timestamp.

Fix

// MergeQueue::push — before this fix
fn push(&mut self, sample: Sample) {
    if let Some(ts) = sample.timestamp() {
        self.timestamped.entry(*ts).or_insert(sample);  // no cross-queue dedup
    } else {
        self.untimestamped.push_back(sample);            // no dedup at all
    }
}
// MergeQueue::push — after
fn push(&mut self, sample: Sample) {
    if let Some(ts) = sample.timestamp() {
        let merge_key = (sample.key_expr().as_str().to_owned(), sample.kind() as u8);
        if self.untimestamped_keys.contains(&merge_key) {
            return;  // already buffered via fetch path; drop duplicate
        }
        self.timestamped.entry(*ts).or_insert(sample);
    } else {
        let merge_key = (sample.key_expr().as_str().to_owned(), sample.kind() as u8);
        self.untimestamped_keys.insert(merge_key);
        self.untimestamped.push_back(sample);
    }
}

untimestamped_keys is cleared in drain() so re-fetches work correctly and post-fetch live deliveries are unaffected (they go directly to the callback when pending_fetches == 0, bypassing MergeQueue entirely).

Scope Classification

(a) Bug fix — change is contained to MergeQueue::{push, drain} within zenoh-ext; no API changes.

Prior Art

  1. PR Liveliness & FetchingSubscriber #441 — introduced FetchingSubscriber + liveliness integration (2023-04-04); the merge path that contains this bug was introduced here without dedup logic.
  2. Issue Liveliness token undeclaration duplication in routers #1274 / Duplicated liveliness token drops #1438 — liveliness duplication bugs at the router and subscriber layers (closed 2024-09-19); establish the precedent of fixing liveliness duplication at the delivery layer rather than the source.
  3. Issue Bug when declaring 2 liveliness tokens with the same key #1617 — duplicate liveliness tokens with the same key (closed 2024-11-29); confirms FetchingSubscriber dedup must be scoped to the merge window only (not indefinitely), which this fix respects via untimestamped_keys.clear() on drain().

Test Evidence (OPS-RULE-011)

test_liveliness_fetching_subscriber_no_duplicates in zenoh-ext/tests/liveliness.rs:

  1. Peer 2 declares a liveliness token before the subscriber is created.
  2. Peer 1 creates a FetchingSubscriber with liveliness().get() as the fetch source.
  3. After the fetch window flushes, all received samples are collected with a 500 ms timeout.
  4. Asserts the token was delivered exactly once (counts[(key, Put)] == 1).

Test uses UDP peer mode on localhost:47453; no router required.


AI-assisted — authored with Claude, reviewed by Komada.


🏷️ Label-Based Checklist

Based on the labels applied to this PR, please complete these additional requirements:

Labels: bug

🐛 Bug Fix Requirements

Since this PR is labeled as a bug fix, please ensure:

  • Root cause documented - Explain what caused the bug in the PR description
  • Reproduction test added - Test that fails on main branch without the fix
  • Test passes with fix - The reproduction test passes with your changes
  • Regression prevention - Test will catch if this bug reoccurs in the future
  • Fix is minimal - Changes are focused only on fixing the bug
  • Related bugs checked - Verified no similar bugs exist in related code

Why this matters: Bugs without tests often reoccur.

Instructions:

  1. Check off items as you complete them (change - [ ] to - [x])
  2. The PR checklist CI will verify these are completed

This checklist updates automatically when labels change, but preserves your checked boxes.

…-zenoh#2523)

Add an `untimestamped_keys: HashSet<(String, u8)>` to `MergeQueue` that
tracks (key_expr, SampleKind) pairs already buffered via the fetch path
(no timestamp). When the live subscription delivers the same logical
event with a synthetic timestamp during the fetch window, the duplicate
is dropped before it enters the `timestamped` BTreeMap.

Root cause: the fetch path for liveliness tokens produces samples without
timestamps, which are stored in `untimestamped: VecDeque<Sample>` with no
dedup. The concurrent live subscription assigns a synthetic `SystemTime::now()`
timestamp to every sample during the fetch window, so the same event ends
up in both `untimestamped` and `timestamped`, and `drain()` delivers both.

The fix is contained to `MergeQueue::push` and `MergeQueue::drain`.
The guard is active only while the merge window is open; after `drain()`
clears `untimestamped_keys` all future live deliveries pass through
normally, so legitimate re-declarations are not suppressed.

Adds `test_liveliness_fetching_subscriber_no_duplicates` to
`zenoh-ext/tests/liveliness.rs` to reproduce the race and verify it is
fixed: a token declared before the subscriber is created must be
delivered exactly once.

Co-Authored-By: Claude and aki1770-del <aki1770@gmail.com>
@diogomatsubara diogomatsubara added the bug Something isn't working label Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Duplicate liveliness samples in FetchingSubscriber (zenoh-ext) under concurrent fetch + live updates

2 participants