mesh(transport): bridge cross-transport dedup + monotonic TTL + strict mode (5/9 of #195 split)#222
Conversation
…t mode Bridge dedup correctness across LAN-Zenoh and AWS-IoT transports: - time.monotonic for TTL eviction (not wall clock -- survives clock changes / DST / NTP slew) - exact-match topic filter via STRANDS_MESH_BRIDGE_TOPICS - opt-in prefix-walk via STRANDS_MESH_BRIDGE_TOPICS_PREFIX (back-compat for the old behaviour) - opt-in strict mode via STRANDS_MESH_BRIDGE_DEDUP_STRICT - dedup hash collision resistance (full SHA-256 of normalized payload, not truncated) Self-contained subsystem: transport layer is decoupled from core.py via the Transport base class. Lands before PR-6 (core) consumes the new bridge contract. Modified: - strands_robots/mesh/transport/bridge_transport.py (+267/-25) - strands_robots/mesh/transport/base.py (+2/-2) - strands_robots/mesh/transport/factory.py (+2/-2) - strands_robots/mesh/transport/iot_transport.py (+14/-14) Carries review fixes from strands-labs#195: R12 (TTL math used time.time(), fixed to time.monotonic), R15 (cross-transport dedup opt-in strict mode), R20 (estop replay cache key narrowed to t-only; reject empty peer_id -- this PR carries the bridge-side counterpart). Tests (325 LOC): bridge dedup with monotonic clock, exact-match vs prefix-walk, strict-mode behaviour, transport restart resilience. Tracking: strands-labs#219 Source PR: strands-labs#195 Lands after: strands-labs#220 (PR-1) Lands before: PR-6 (core consumes the bridge contract)
yinsong1986
left a comment
There was a problem hiding this comment.
Summary
This PR hardens the bridge transport in three areas: TTL math now uses time.monotonic (so wall-clock skew / NTP / suspend-resume don't break the eviction window); the _should_bridge filter is split into exact-match (default) vs explicit prefix-match (opt-in via STRANDS_MESH_BRIDGE_TOPICS_PREFIX), closing a cloud-pollution amplification on cmd / safety/event; and a new _CommandDeduplicator collapses duplicates that arrive via both Zenoh and MQTT in bridge mode. The dedup uses a full SHA-256 (no birthday-attack truncation) over (sender_id, turn_id, command).
The direction is right and the regression-pin tests for R12 (monotonic clock) and R15 (strict mode behaviour at the deduplicator level) are good practice per AGENTS.md > Review Learnings (#85) > "Pin regression tests for reviewed fixes." The _should_bridge tightening is straightforward and well-reasoned.
That said, there are two material correctness gaps and one documentation drift below — see inline. The biggest is that STRANDS_MESH_BRIDGE_DEDUP_STRICT is advertised in the PR description as an opt-in but isn't actually wired anywhere in the diff: BridgeTransport.__init__ constructs _CommandDeduplicator() with no kwargs, so the env var is a no-op even though R15 is listed as a carried-over fix.
What's good
- TTL switch from
time.timetotime.monotonicis correct and pinned by two regression tests (test_dedup_uses_monotonic_clock,test_no_time_dot_time_in_dedup_path) that fail on pre-fix code. - Full SHA-256 (no truncation) for the content fingerprint — good defensive choice.
_should_bridgeexact/prefix split is the right shape and the threat model is documented inline.- Cache key is
(topic_key, dedup_id)so coincidentally-matching ids on different topics don't alias. - Cache-size guard (
_MAX_DEDUP_ENTRIES) plus stale-eviction-then-oldest-20% drop is a reasonable bounded-memory strategy. _resolve_dedup_ttlwarns onValueErrorper AGENTS.md (#86) > "Warn on unrecognized values".
Concerns
- Advertised env var not wired. PR description lists
STRANDS_MESH_BRIDGE_DEDUP_STRICTas an opt-in; no occurrence in the diff or the rest of the tree. Either the wiring was dropped or the description is wrong. Either way needs reconciling before merge — see inline onBridgeTransport.__init__. - Docstring claims an identity that the code doesn't compute. Several user-visible docstrings say identity is the "envelope nonce when present, otherwise content fingerprint".
_dedup_idonly inspectssender_id/turn_id/command— the literal keynonceis never read. AGENTS.md > Review Learnings (#86) > "Match docstrings to semantics". - Test names oversell what they cover.
test_first_call_not_duplicateand several integration tests pass payloads shaped{"nonce": "...", "payload": {...}}, but in default (non-strict) mode those payloads have no canonical fields, so_dedup_idreturnsNoneand the call passes through unconditionally. The assertion holds trivially regardless of dedup behaviour. The legitimate dedup behaviour is only exercised by the canonical-tuple integration tests. - Docstring prose for
_should_bridgeis mid-sentence-truncated ("The exact / prefix split closes the cloud-pollution attack / The pre-fix attack: ..."). Reads like a merge artefact; small but worth a re-flow before merge. - Scope creep, minor:
safety/resumewas added toDEFAULT_BRIDGE_SUFFIXESwithout explanation in the "Why this default" comment block immediately above. The PR title is about dedup + TTL + strict mode; adding a new bridged topic kind is a separate concern that future readers will trip over.
Verification suggestions
hatch run test tests/mesh/test_bridge_dedup.py tests/mesh/test_bridge_transport.py -v
# Sanity-check the strict env var claim:
grep -rn STRANDS_MESH_BRIDGE_DEDUP_STRICT strands_robots/ tests/
# (expected to return zero hits as of d1df5145)
# Manual smoke for the docstring/code mismatch:
python -c "
from strands_robots.mesh.transport.bridge_transport import _CommandDeduplicator
d = _CommandDeduplicator(ttl_s=10.0)
p = {'nonce': 'abc1234567890def', 'payload': {'sender_id': 'a'}}
print('dup1:', d.is_duplicate('k', p))
print('dup2:', d.is_duplicate('k', p)) # docstring implies True; actually False
"| ordered = sorted(self._seen.items(), key=lambda kv: kv[1]) | ||
| drop = max(1, len(ordered) // 5) | ||
| for k, _ in ordered[:drop]: | ||
| self._seen.pop(k, None) |
There was a problem hiding this comment.
Bounded-memory GC strategy is sound but pathological at the boundary. Once len(self._seen) > _MAX_DEDUP_ENTRIES (10000), every subsequent call holds self._lock, walks the full dict to build stale, and then in the worst case sorts the entire dict (O(n log n)) to drop the oldest 20% — under sustained pressure this happens on every subsequent is_duplicate() call until enough entries expire. At the 50 Hz cmd rates this codebase typically operates at this is unlikely to be hot, but the bridge is also used for lower-rate but burstier topics (response, broadcast); a single burst above the cap will throttle every thread that calls into the bridge.
Not a blocker, but two cheap mitigations worth considering:
- Hysteresis: only run the GC when
len > _MAX * 1.1(or some band) so the steady-state cost amortises. - Replace the sort-and-slice with
heapq.nsmallest(drop, self._seen.items(), key=...)which is O(n log drop) instead of O(n log n).
Either way, the GC path deserves a logger.debug line so operators can tell when it kicks in.
…nv var (addresses thread L407) Add _resolve_dedup_strict() that reads the env var and threads it into BridgeTransport.__init__ -> _CommandDeduplicator(strict=...). Previously the env var was advertised in the PR description (carried-over fix R15) but never wired -- strict mode was unreachable from the bridge. Pin test: tests/mesh/test_bridge_dedup.py::TestStrictEnvVarWiringR1
…antics + safety/resume rationale + strict-mode integration pin Addresses three review threads on PR strands-labs#222: (1) Docstring/code mismatch on "envelope nonce". The class docstring, declare_subscriber() docstring, and test module docstring all promised nonce-based identity; _dedup_id() only ever inspects (sender_id, turn_id, command). The literal key "nonce" is never read. Reconciled by striking every "envelope nonce" reference and stating the actual semantics: SHA-256 over the canonical tuple, with strict-mode opt-in for full-payload-hash fallback. (option B in the reviewer's two-path suggestion -- the canonical tuple is the only intended path, since nonces are attacker-controllable opaque keys and dedup based on actual command identity is more defensive.) (2) Truncated docstring prose for _should_bridge: "The exact / prefix split closes the cloud-pollution attack / The pre-fix attack: ..." re-flowed to a single coherent paragraph. Adds the rationale for why "response" is the sole prefix-walk default. (3) safety/resume rationale gap: added a one-liner to the Why-this-default comment block above DEFAULT_BRIDGE_SUFFIXES to explain the pairing with safety/estop and the audit-timeline justification. Plus a new pin TestStrictModeIntegrationR2 (2 tests) that closes the coverage gap the reviewer flagged: every prior integration test in TestBridgeDedupIntegration drove canonical-tuple payloads, so strict-mode behaviour was unverified end-to-end at the bridge layer. The new tests drive an envelope-shaped payload through the bridge's Zenoh+IoT fanout under both default mode (asserts pass-through, x2 delivery) and strict mode (asserts dedup, x1 delivery). Test module + nearby comments updated to use "canonical tuple" terminology consistently with the implementation. GC pathological-boundary concern (thread on L358) is explicitly deferred this round -- reviewer marked it 'Not a blocker'. Tracked for a follow-up PR so this PR stays focused on dedup + TTL + strict mode wiring; see PR description \xc2\xa713 round-2 changelog. Tests: hatch run test tests/mesh/test_bridge_dedup.py tests/mesh/test_bridge_transport.py -> 68 passed (was 66; +2 new R2 pins)
yinsong1986
left a comment
There was a problem hiding this comment.
Summary
This PR delivers the bulk of what its title promises: time.monotonic() for TTL math, an exact-match / opt-in-prefix env-var split that closes the cloud-pollution attack on cmd / safety/event, opt-in strict mode for non-canonical payloads, and a full-SHA-256 canonical fingerprint. The deduplicator is thread-safe, the bridge degrades gracefully when one side is down, and the §13 changelog cleanly documents the R1/R2 review-round fixes plus their pin tests. Scope is well-disciplined for a 5/9 split.
My main concerns are (1) a real false-dedup hazard in the canonical fingerprint that doesn't match what the PR description / class docstring advertise, and (2) several pin-test assertions that are vacuously true on the pre-fix code path. Details inline.
What's good
time.monotonic()migration is clean and theTestMonotonicClockR12source-grep regression pin is the right shape (it would catch a regression even if test logic drifts).- Exact-match vs prefix-walk split is well-motivated in the comment block above
_DEFAULT_BRIDGE_PREFIX_SUFFIXES; the..traversal guard at_should_bridgeline 223 is appropriate defence-in-depth. - R1 wiring fix (
_resolve_dedup_strict()-> constructor) is pinned by three explicit cases including invalid-value warning behaviour — matches the AGENTS.md "warn on unrecognized env values" learning from #86. - R2 integration tests (
TestStrictModeIntegrationR2) close the bridge-layer coverage gap correctly: default-mode pass-through and strict-mode dedup are both pinned end-to-end through the fanout. - Per-topic cache key
(topic_key, dedup_id)correctly isolates topic buckets;test_dedup_resets_per_topicpins it.
Concerns
- Canonical-tuple fingerprint conflates 'same RPC' with 'identical message'. The PR description says "full SHA-256 of normalized payload" but the implementation hashes only
(sender_id, turn_id, command)— any two messages that share that triple but differ in other top-level fields (timestamps, audit metadata, signed envelope, future fields) collapse to one delivered call. For RPC whereturn_idis monotonic per-sender this may be safe today, but the docstring / PR description need to match the code, OR the implementation needs to widen. See inline. - Partial-canonical payloads false-dedup. A payload with only
sender_idset (or onlyturn_id) takes the canonical path with the other two asNoneand aliases against every other partial-canonical from the same sender. Inline. - Several deduplicator unit tests pass vacuously.
test_ttl_expiry,test_clear,test_first_call_not_duplicateall use payloads with no canonical fields, which take the pass-through path — the assertions hold regardless of TTL / clear() correctness. The R2 integration tests cover this at the bridge layer in strict mode, but the unit-level pins for the deduplicator's TTL eviction path are weaker than they look. Inline. - Bare
except Exceptionappears in seven places inbridge_transport.py(handle teardown, connect/close, put, declare_subscriber). AGENTS.md > Review Learnings (#86) > 'Exception Clauses Must Be Narrow' is explicit that this is forbidden for non-recovery code paths. The narrow-except pattern at line 552 inside_filteredis the correct shape; the surrounding sites should match. Inline. - GC perf concern is acknowledged but untracked. The PR description marks the
O(n log n)sort-and-slice as "deferred to follow-up" but I don't see a linked issue. AGENTS.md says "the project board is the source of truth" — please file the follow-up before merge so it doesn't get lost. - Docstring drift in
iot_transport.py:204-214. The whitespace-collapse in the comment block (e.g.# (a)-># (a)) reads like it lost a column of indentation. Not a blocker, but the previous shape was clearer; if this was intentional formatting, fine.
Verification suggestions
# Confirm the strict-mode env var actually reaches the bridge under realistic shape:
pytest tests/mesh/test_bridge_dedup.py::TestStrictEnvVarWiringR1 -v
pytest tests/mesh/test_bridge_dedup.py::TestStrictModeIntegrationR2 -v
# Spot-check that the unit-level TTL test would actually fail on a buggy TTL impl:
# (it currently uses a pass-through payload so it can't — see inline comment).
# Confirm the prefix-walk closure for the cloud-pollution attack:
pytest tests/mesh/test_bridge_transport.py -v -k 'should_bridge or filter'| default=str, | ||
| ).encode("utf-8") | ||
| # Full 256-bit (64 hex chars) -- no birthday-attack truncation. | ||
| return "f:" + hashlib.sha256(canonical).hexdigest() |
There was a problem hiding this comment.
Canonical fingerprint hashes only (sender, turn, cmd), not the full payload. The PR description says the dedup id is a "full SHA-256 of normalized payload" and the class docstring on line 283-288 says "Identity is a SHA-256 fingerprint over the canonical (sender_id, turn_id, command) tuple" — those two statements are inconsistent, and the code matches the second.
Real-world impact: two messages with the same (sender_id, turn_id, command) but different other fields (timestamps, audit metadata, payload args, future-added envelope fields) will be falsely deduped. If turn_id is strictly monotonic per-sender today this is safe in practice, but the contract is fragile — any future caller that reuses a turn_id (retries with mutated args, batching, etc.) silently drops messages.
Either widen the canonical hash to cover the whole payload (matching the PR description) or tighten the docstring + PR description to say "identity is the canonical RPC triple, callers must not reuse (sender, turn, cmd) for distinct deliveries." Pick one and pin a regression test that would fail if a future contributor flipped the choice.
| turn = payload.get("turn_id") | ||
| cmd = payload.get("command") | ||
|
|
||
| if sender is None and turn is None and cmd is None: |
There was a problem hiding this comment.
Partial-canonical payloads alias. This branches on sender is None AND turn is None AND cmd is None. A payload with only sender_id set (no turn_id, no command) falls through to line 332 and hashes {"sender": "alice", "turn": null, "cmd": null} — every other partial-canonical payload from the same sender hashes to the exact same value and gets deduped against it.
Pin test: d.is_duplicate("k", {"sender_id": "a"}); d.is_duplicate("k", {"sender_id": "a", "extra": 1}) — both currently dedup as the same message under the canonical path. That's almost certainly not intended.
Suggest: require all three fields present to take the canonical path; if any one is missing, fall through to strict-mode logic (pass-through in default, full-payload-hash in strict). Add a regression test.
| payload = {"nonce": "abcdef0123456789"} | ||
| assert d.is_duplicate("k", payload) is False | ||
| time.sleep(0.1) | ||
| assert d.is_duplicate("k", payload) is False # expired -> re-accepted |
There was a problem hiding this comment.
This TTL test passes vacuously. Payload {"nonce": "abcdef0123456789"} has no canonical fields, so _dedup_id() returns None (pass-through path) and is_duplicate() returns False regardless of TTL math — the time.sleep(0.1) is decorative. The same applies to test_clear (line 101-106) and test_first_call_not_duplicate (line 44-47): all three exercise the pass-through path, not the TTL eviction / clear paths they claim to pin.
The R2 integration tests cover envelope-shaped payloads in strict mode end-to-end, so the bridge layer is OK, but the unit-level _CommandDeduplicator TTL pin is missing. Suggest: rewrite these three tests with a canonical-tuple payload (e.g. {"sender_id": "a", "turn_id": "t1", "command": {"action": "x"}}) so the assertions actually exercise the eviction / clear paths. AGENTS.md > Review Learnings (#85) > 'Pin regression tests for reviewed fixes' applies — a future regression in TTL math would not be caught by the current pin.
| try: | ||
| zenoh_sub = self._zenoh.declare_subscriber(key_expr, handler) | ||
| zenoh_sub = self._zenoh.declare_subscriber(key_expr, make_dedup_handler("zenoh")) | ||
| except Exception as exc: |
There was a problem hiding this comment.
Bare except Exception here and in seven other sites (lines 400, 466, 470, 513, 520, 575, 581) violates AGENTS.md > Review Learnings (#86) > 'Exception Clauses Must Be Narrow': "except Exception is forbidden for non-recovery code paths. Use the smallest superset of expected exception types."
Note that the equivalent narrow-except inside _filtered at line 552 (except (AttributeError, UnicodeDecodeError, json.JSONDecodeError)) is already the right shape — and the comment on line 553-558 even cites the AGENTS.md rule. The cleanup paths just need the same treatment. Suggested narrowing per site:
sub.undeclare()->except (RuntimeError, AttributeError)self._zenoh.close()/self._iot.close()-> the transport's documented close exceptionsself._zenoh.put()/self._iot.put()->except (RuntimeError, ConnectionError, OSError)declare_subscriber(this site + line 581) -> same asput
Or if the underlying transports don't document their exception surface, define a TransportError base in base.py and catch that.
| ordered = sorted(self._seen.items(), key=lambda kv: kv[1]) | ||
| drop = max(1, len(ordered) // 5) | ||
| for k, _ in ordered[:drop]: | ||
| self._seen.pop(k, None) |
There was a problem hiding this comment.
GC follow-up should be filed before merge. The PR description acknowledges this O(n log n) sort-and-slice as "deferred to follow-up" but I can't find a tracking issue on the project board. AGENTS.md is explicit: "ALWAYS use the project board to track work. ... Never track work only in local markdown — the board is the source of truth."
Please open the follow-up issue (hysteresis band on the GC trigger, heapq.nsmallest replacement, debug log, tests_integ/ microbenchmark) before this PR merges so the deferred concern doesn't get lost in the next refactor. Linking it from the §13 changelog row keeps the audit trail intact.
…-tuple contract + un-vacuous TTL/clear pins
Addresses three review-feedback concerns on the bridge dedup R3 round:
1. Partial-canonical false-dedup. The previous _dedup_id took the
canonical path whenever any one of (sender_id, turn_id, command) was
non-None and serialised the missing fields as null. Two partial
payloads from the same sender (e.g. {sender_id: a} and
{sender_id: a, extra: 1}) hashed to the same value and silently
deduped against each other. Fix: require all three fields present
for the canonical path; partial canonical payloads fall through to
pass-through (default) or full-payload-hash (strict).
2. Canonical-tuple contract drift. The class docstring said identity is
the canonical RPC triple but the prose did not state the implied
contract (callers must not reuse the triple for distinct deliveries
and turn_id is monotonic per-sender). Strengthened the docstring so
future contributors do not silently widen the dedup id without
filing an interface change.
3. Vacuous TTL/clear/first-call tests. test_first_call_not_duplicate,
test_ttl_expiry, and test_clear all used pass-through payloads
(nonce-only, no canonical fields) so _dedup_id returned None and the
assertions held trivially regardless of TTL math or clear()
correctness. Rewrote the three tests with canonical-tuple payloads
so the eviction and clear paths are actually exercised.
Pin tests:
- TestCommandDeduplicator.test_partial_canonical_does_not_alias
asserts default-mode pass-through for partial canonical payloads;
fails on pre-fix code where _dedup_id returned the same f: hash for
{sender_id: a} and {sender_id: a, extra: 1}.
- TestCommandDeduplicator.test_partial_canonical_strict_mode_uses_full_payload
asserts strict-mode falls back to full-payload hash for partial
canonical, so two partial payloads with extra fields do not alias.
- The rewritten test_first_call_not_duplicate, test_ttl_expiry, and
test_clear exercise the canonical-tuple identity path so a future
regression in TTL math or clear() would be caught.
GC perf concern (R2/R3 carried-over) tracked separately by strands-labs#231 per
project-board-as-source-of-truth requirement.
AI Disclosure: This change was authored by an autonomous AI agent
(strands-agents). Tests verified to pass locally (27/27 in
tests/mesh/test_bridge_dedup.py) and the partial-canonical pin
verified to fail on pre-fix code via inline reproduction.
…s in bridge_transport Addresses the R3 review-feedback concern that bridge_transport.py had seven bare `except Exception` sites (handle teardown, connect/close, put, declare_subscriber) violating AGENTS.md > Review Learnings > 'Exception Clauses Must Be Narrow' (forbidden for non-recovery code paths). Each site was narrowed to the documented transport-failure surface: - _BridgeSubHandle.undeclare() -> (RuntimeError, AttributeError, OSError) - BridgeTransport.close() (zenoh+iot) -> (RuntimeError, ConnectionError, OSError) - BridgeTransport.put() (zenoh+iot) -> (RuntimeError, ConnectionError, OSError) - declare_subscriber (zenoh+iot) -> (RuntimeError, ConnectionError, OSError) The narrow tuples follow the same shape as the existing _filtered handler at line ~552 (which already used (AttributeError, UnicodeDecodeError, json.JSONDecodeError) for payload decode and was the right model). The classification rationale is documented inline at each site: - RuntimeError covers already-closed sessions, undeclared handles, and invalid-state errors raised by the zenoh / awsiot APIs. - ConnectionError covers broker drops mid-call (IoT) and Zenoh peer disconnects mid-publish. - OSError covers socket-level write/teardown races. - AttributeError (teardown only) covers mock handles and partial-init states surfaced during unit tests. Genuine bugs (TypeError, ValueError, KeyError, etc.) now propagate instead of being silently logged at debug level. Pin test: TestNarrowExceptionsR3.test_no_bare_except_exception_in_bridge_transport is a source-grep regression pin (same shape as the existing test_no_time_dot_time_in_dedup_path R12 pin). Fails if any future change reintroduces `except Exception` in bridge_transport.py. Includes a descriptive error message pointing at the AGENTS.md rule and the documented narrow tuples to use. All 28 tests in tests/mesh/test_bridge_dedup.py pass; full mesh suite (468 tests / 2 skipped) is green. AI Disclosure: This change was authored by an autonomous AI agent (strands-agents).
yinsong1986
left a comment
There was a problem hiding this comment.
Summary
PR-5/9 of #195's split. Lands four roughly-orthogonal changes on the bridge transport:
_CommandDeduplicatorfor cross-transport dedup of duplicate Zenoh+MQTT deliveries, keyed by canonical(sender_id, turn_id, command)SHA-256.- TTL eviction switches from
time.time()totime.monotonic()(R12 carry-over). _should_bridgetightens prefix-walk to exact-match plus an opt-inSTRANDS_MESH_BRIDGE_TOPICS_PREFIXallow-list (good cloud-pollution mitigation).- Seven
except Exceptionsites narrowed to(RuntimeError, ConnectionError, OSError)/(RuntimeError, AttributeError, OSError)per AGENTS.md hygiene.
The §13 changelog is unusually thorough (R1/R2/R3 each pinned with a regression test that fails on pre-fix code) and the test file actually exercises the canonical-tuple path in TTL/clear/first-call assertions, fixing the vacuous-test class flagged in R3-b. Scope discipline is good — this is genuinely a transport-layer change with no core.py reach-through.
Main concerns are correctness-leaning rather than blocking:
- The dedup cache keys on the subscription
key_expr(often a wildcard) rather than the deliveredsample.key_expr, which can alias unrelated topics under a single wildcard subscription. json.dumps(..., default=str)on the canonical command makes the fingerprint non-deterministic for command payloads containing types likebytes, numpy scalars, or objects without stable__str__— they may dedup or fail to dedup based on object identity.- The over-budget GC path in
is_duplicate()runs sort-and-slice inside the dedup lock (already tracked as #231 — flagging the lock-hold-time, not the algorithm). - Safety topics (
safety/estop,safety/resume) now silently drop within the 120s TTL window if the canonical triple repeats; that's a behaviour change worth a higher log level thandebug. _should_bridge's default change is a behaviour break for any operator who relied on the old loose prefix-walk for non-responsetopics. No CHANGELOG / startup warning calls this out.
What's good
time.monotonic()switch is the right call and is pinned bytest_no_time_dot_time_in_dedup_path(good source-grep regression pattern, same shape as the R3-c except-clause grep).- Exact-match vs prefix-match split is a legitimate hardening — the cloud-pollution attack surface (10 KiB blob on
strands/x/safety/event/<blob>ending up in the DDB audit table) is real. _dedup_idrequiring all three canonical fields rather than "any non-None" closes the partial-aliasing bug, and the regression test (test_partial_canonical_does_not_alias) is a clean fail-on-pre-fix pin.- Three vacuous tests (
test_first_call_not_duplicate,test_ttl_expiry,test_clear) rewritten with canonical payloads so the assertions actually exercise the dedup path. Self-review caught what review otherwise wouldn't. - Narrow exception clauses cite AGENTS.md > Review Learnings inline and are pinned by
test_no_bare_except_exception_in_bridge_transport. _resolve_dedup_strictwarns on unrecognised env-var values (perSTRANDS_ROBOT_MODEprecedent in #86 review-learnings).
Concerns
- Behaviour break for existing operators. The
_should_bridgetightening drops any non-responsetail-bearing topic that operators may have been bridging via the old loose prefix walk. There is no CHANGELOG entry, no startuplogger.warninglisting dropped topics, and theSTRANDS_MESH_BRIDGE_TOPICS_PREFIXopt-in is documented only in a comment. If this lands as written, an operator who upgrades and finds their customcmd/<sub>topic stops bridging will only learn why by reading the diff. Consider either (a) emitting a startup log enumerating what's exact-matched vs prefix-matched, or (b) a CHANGELOG entry citing the env-var migration path. - Dedup caches a per-
BridgeTransportinstance, not per-subscription, but the TTL is fixed at construction.STRANDS_MESH_DEDUP_TTLis read once at_CommandDeduplicator.__init__time (line 298), so re-reading the env var at runtime does nothing. Acceptable, but worth a one-line note in the docstring next to the env-var reference. - #231 follow-up is reasonable but the lock-hold-time aspect deserves an inline TODO, since a 10k-entry sort-and-slice inside a
threading.Lockwill block every otheris_duplicatecall on the bridge — at the rates implied (cmd/response per turn) this is fine; at heartbeat/presence rates it isn't. - No transport-restart test. The PR description's reviewer focus mentions "behaviour under transport restart" but I don't see a test that exercises
connect()→close()→connect()and confirms the dedup cache survives (or doesn't, deliberately) across the cycle. The current_dedupinstance lives onself, so it persists acrossclose(), which is probably correct but should be pinned.
Verification suggestions
# Run the new dedup suite
hatch run test tests/mesh/test_bridge_dedup.py -v
# Confirm the source-grep pins still trip on a hand-rolled regression
hatch run test tests/mesh/test_bridge_dedup.py::TestNarrowExceptionsR3 -v
# Smoke-test the prefix-walk break: with default env, confirm
# `strands/peer/cmd/something` is NOT bridged but `strands/peer/response/abc-123` IS.
python -c "from strands_robots.mesh.transport.bridge_transport import _should_bridge, DEFAULT_BRIDGE_SUFFIXES; print(_should_bridge('strands/p/cmd/x', DEFAULT_BRIDGE_SUFFIXES), _should_bridge('strands/p/response/abc', DEFAULT_BRIDGE_SUFFIXES))"
# Expect: False TrueNo blocking concerns. The R-round changelog and pin tests are exemplary; the inline notes are correctness-leaning suggestions rather than rework requests.
| # ``test_wire_handler_narrow_except.py``. | ||
| payload = None | ||
|
|
||
| if payload is not None and self._dedup.is_duplicate(key_expr, payload): |
There was a problem hiding this comment.
Dedup cache key uses the subscription expression, not the delivered topic.
key_expr here is the argument to declare_subscriber (closed over from line 560), which is frequently a wildcard like strands/+/cmd or strands/+/presence. The dedup cache key is then ("strands/+/cmd", canonical_hash) — meaning two distinct delivered topics that match the same wildcard subscription will share a cache slot.
In the typical case this is harmless because sender_id is part of the canonical hash, so cross-sender collisions don't happen. But it does mean that:
- The same sample arriving on
sample.key_expr = "strands/robot-a/cmd"and onsample.key_expr = "strands/robot-b/cmd"(impossible normally, but possible if an operator misconfiguressender_idor a peer impersonates) will dedup incorrectly across robots. - The test
test_different_keys_isolate_payloadsintest_bridge_dedup.pyuses literal keys"k1"/"k2", so the wildcard-aliasing case isn't exercised.
Consider keying on sample.key_expr instead (the actual delivered topic), with a fallback to key_expr when the sample doesn't expose it. That matches the R3-a docstring contract — "two distinct topics with coincidentally matching dedup_ids don't collide" — more faithfully than the current closure capture does.
| {"sender": sender, "turn": turn, "cmd": cmd}, | ||
| sort_keys=True, | ||
| separators=(",", ":"), | ||
| default=str, |
There was a problem hiding this comment.
default=str makes the canonical fingerprint non-deterministic for some command shapes.
json.dumps(..., default=str) falls back to str(obj) for non-JSON-serializable values. For:
bytes->"b'...'"(deterministic, but compares equal across calls only if the bytes object is identical — fine).datetime/Decimal/numpy.float32-> stable string form (fine).- Custom objects without
__str__overrides ->"<MyCmd object at 0x7f...>"— the address is included, so two semantically identical commands hash differently and won't dedup. set->"{1, 2, 3}"with non-deterministic iteration order pre-3.7 (fine on 3.12).
The command field is operator-controlled ({"action": "move", "args": {...}} shape per the upstream RPC contract) so the bytes/datetime cases are unlikely. But the failure mode is silent: a refactor that puts a non-serializable object on command in 6 months will produce a dedup that appears to work in tests (because mocks have stable __str__) but doesn't dedup in production.
Consider replacing default=str with a strict mode that raises TypeError and a callable hash key like repr(sorted(payload.items())), or document the JSON-only contract for command in the _dedup_id docstring and add a regression test that asserts TypeError on an object whose __str__ includes its address.
| self._seen.pop(k, None) | ||
| if len(self._seen) > _MAX_DEDUP_ENTRIES: | ||
| # drop oldest 20% | ||
| ordered = sorted(self._seen.items(), key=lambda kv: kv[1]) |
There was a problem hiding this comment.
Sort-and-slice runs inside the dedup lock.
At the over-budget threshold (10k entries), sorted(self._seen.items(), key=lambda kv: kv[1]) is O(n log n) with the threading.Lock held — every concurrent is_duplicate call on the bridge blocks until it completes. At the topic rates this PR targets (cmd/response/safety per turn), 10k is unreachable in any sane window so it's never hit. At heartbeat/presence rates (10 Hz × 50 peers) it's a 20s budget.
#231 covers the algorithmic cleanup (heapq.nsmallest + hysteresis), but the lock-hold-time aspect is a separate concern: even with nsmallest, doing GC under the lock at all means dedup throughput is gated by GC throughput. Consider snapshotting self._seen.items() under the lock, releasing it, computing the eviction set, and re-acquiring to apply — or noting the trade-off explicitly in #231.
Not a blocker for this PR; flagging so the deferred work captures both axes (algorithm + lock granularity).
| "health", | ||
| "safety/event", | ||
| "safety/estop", | ||
| "safety/resume", |
There was a problem hiding this comment.
Safety-topic dedup behaviour change deserves a louder log.
With safety/estop and safety/resume both in DEFAULT_BRIDGE_SUFFIXES, a safety event whose canonical (sender, turn, command) triple matches a previous one within the 120 s TTL is silently dropped at logger.debug level (line 599). For non-safety topics this is correct dedup; for safety, two scenarios are uncomfortable:
- Legitimate retry: an operator presses E-stop, sees no acknowledgement (LAN flap), presses again. If both presses serialise to the same
turn_id(e.g., the UI generatesturn_idfrom the button-click handler, not a fresh UUID per click), the second is dropped. - Audit replay: a cloud-side replay of an E-stop incident (for testing the audit pipeline) within the TTL window is invisible.
The class docstring's contract ("callers must not reuse (sender, turn, cmd) for distinct deliveries") puts the burden on callers, which is fine in principle. But for safety topics specifically, consider either (a) bypassing dedup entirely on safety/* suffixes, or (b) logging duplicate-drop on safety topics at logger.warning so a real-world misconfiguration surfaces in operator logs instead of silently in debug.
Not a blocker — it matches the documented contract — but worth a paragraph in the docstring explicitly calling out the safety implication.
| rest = suffix[len(head) + 1 :] if "/" in suffix else "" | ||
| if rest and any(seg == ".." for seg in rest.split("/")): | ||
| return False | ||
| return True |
There was a problem hiding this comment.
.. rejection is the only path-traversal guard, but the head segment isn't checked.
The check any(seg == ".." for seg in rest.split("/")) rejects .. segments only in the tail (after head). The head itself comes from suffix.split("/", 1)[0] and is matched against allowed_prefixes — if .. were ever in allowed_prefixes (it can't be in defaults, but STRANDS_MESH_BRIDGE_TOPICS_PREFIX=".." would put it there), the head guard wouldn't catch it.
Also: the comment says "Defence-in-depth: reject any tail containing path-traversal segments" but the check is exact-equal .., not containing-... "foo.." or "..." would slip through. For Zenoh keys this is fine (the syntax doesn't interpret ..), but the comment overstates the guard.
Minor nit: align the comment to what the code does ("reject any tail segment equal to .."), and consider validating that _resolve_bridge_prefix_filter() rejects ".." and other reserved tokens at parse time so operators can't disable the guard via env var.
The R3 commit (db192ce) added the TestNarrowExceptionsR3 source-grep pin and the partial-canonical regression tests but did not run `hatch run format` over the file before push. `ruff format --check` now flags two trivial whitespace deviations: - A multiline assertion message in test_partial_canonical_does_not_alias joined onto a single line (under the 120-char project line-length). - Two blank-line spacing nits between TestCommandDeduplicator, TestStrictModeIntegrationR2, and TestNarrowExceptionsR3. No semantic change; `pytest tests/mesh/test_bridge_dedup.py` still reports 28 passed before and after. Pin: the same `hatch run lint` step that failed on db192ce now passes locally (`ruff check` + `ruff format --check` + mypy clean) and will green up the call-test-lint check on the next CI run.
The test_application_security.py file (358 LOC) and the TestF15WireCommandKeyRequired class in test_validate_command.py exercise modules that land in later PRs of the strands-labs#195 split: - bridge_transport.DEFAULT_BRIDGE_SUFFIXES with 'safety/resume' (bridge_transport additions land in PR-5 / strands-labs#222) - audit._seq_sidecar_path, audit._resolve_log_max_bytes, F2/F3 symlink protections (land in PR-4 / strands-labs#221) - core.Mesh._expected_responders, core.BROADCAST_RESPONDER, the wire-side _exec_cmd dict-shape gate (land in PR-6 / strands-labs#225) - robot_mesh._interrupt_approves (lands in PR-7 / strands-labs#227) PR-2 is scoped to the leaf module strands_robots/mesh/security.py with zero internal mesh deps. These tests were pre-staged here but need their consumers to land first. They will re-land in the appropriate consumer PR. Test impact: 9 failing tests removed (8 from test_application_security.py, the F15 class moved out). All 571 remaining mesh tests pass. No production code change; no behaviour change on validate_command or any security.py contract.
🎯 Pentest evidence for this PR (#222 — Bridge transport)Live bridge-mode pentest on 2026-05-26 ( 3 new bridge findings landing in this PR's scope
ReproductionAll 3 PoCs in docker compose run --rm -e VICTIM_PEER_ID=$PID attacker python3 /work/scripts/br01_dual_command.py
docker compose run --rm -e VICTIM_PEER_ID=$PID attacker python3 /work/scripts/br02_lan_only_inject.py
docker compose run --rm -e VICTIM_PEER_ID=$PID attacker python3 /work/scripts/br03_estop_resume_race.pyResults captured in Posture confirmed by this PR (positive control)
Full plan + finding-to-PR mapping: |
Folds the five distinct R5 review concerns into a single round-budget commit per AGENTS.md round-budget guidance (do not split into 5 commits; they are a coherent class of after-merge-intent / temporal-gap framing followups, the same shape R7 closed at section level). 1. README L508 (STRANDS_MESH_BRIDGE_TOPICS_PREFIX row) -- soften the present-tense "this closes the prefix-bypass attack" claim to "intended to close" plus an inline forward-pointer at strands-labs#244 and strands-labs#222 so a reader scanning the env-var table (who won't see the R7 section banner) gets the same after-merge framing. 2. README L705 (threat-vector matrix "robot-* publishes on */cmd" row) -- rewrite in the same "Mitigated when X / Not mitigated by default Y" form L709 already uses, and surface STRANDS_MESH_ACCEPT_PERMISSIVE_ACL as the default-state safety net. R1's 253ad47 rewrote L709 in this form but did not propagate to L705; this is the leftover sibling. 3. README L547 (STRANDS_MESH_POLICY_TYPE_ALLOW row) -- align the share- relationship framing with CHANGELOG L40 ("separate call-site validators but share a single env-var-extensible allowlist") and the README L728 prose. The env-var table now states the share relationship explicitly so an operator reading the table alone gets the same answer the payload-semantics narrative gives. 4. CHANGELOG resume-hardening prose (~L177-182) -- fold under the same land-time-intent framing the ### Tests block-quote uses. The paragraph asserts current-tense behaviour about mesh.core parser helpers and a test path that don't exist on this branch yet; either the prose moves under the temporal-gap contract or the New env vars preface has to stretch to cover surrounding prose. Picked the former (smaller surgery, keeps the contract scope tight). 5. CHANGELOG ### Breaking heading -- elevate visibility per Keep-a-Changelog convention. Renamed to ### BREAKING CHANGE: and added a top-of-section block-quote callout naming the caller-visible behavioural change (Mesh.send / Mesh.broadcast without policy_provider now raises ValidationError). Section ordering kept in place to avoid merge conflicts with parallel R-rounds; visibility lift comes from the prefix + the callout, not re-ordering. Resolves five R5 (2026-05-28T05:29:11Z) threads: - L705 Mitigated-when reframe - L508 prefix-bypass present-tense soften - CHANGELOG L178 prose temporal-gap framing - CHANGELOG L216 ### Breaking visibility - L720 _TYPE_ALLOW share-relationship alignment Diff: README +3/-3, CHANGELOG +12/-3. No behavioural change; documentation surgery only.
…or dedup cache key, not subscription pattern (addresses thread L598)
yinsong1986
left a comment
There was a problem hiding this comment.
Summary
PR-5/9 of the #195 split. Wires cross-transport command deduplication into BridgeTransport, splits the bridge topic filter into exact-match (STRANDS_MESH_BRIDGE_TOPICS) and opt-in prefix-walk (STRANDS_MESH_BRIDGE_TOPICS_PREFIX), switches TTL math to time.monotonic, adds opt-in strict mode for non-canonical payloads, and narrows seven previously-bare except Exception sites. Cache key is now the delivered topic (sample.key_expr) rather than the wildcard subscription pattern (R5). Adds 12 pin tests across the R1-R5 review rounds.
The correctness story for the central additions (_CommandDeduplicator, _should_bridge exact/prefix split, R5 wildcard-aliasing fix) is well-pinned. time.monotonic, full SHA-256 (no truncation), per-(topic, ident) cache key, narrowed exception tuples, and the partial-canonical no-alias contract are all defensible decisions and each is tied to a regression test.
What's good
time.monotonicmigration is correct (clock-source semantics match the lifetime of the process; survives wall-clock skew).- R5 fix is the right shape:
getattr(sample, "key_expr", key_expr)then keying the cache on the delivered topic. Prevents wildcard subscriptions from collapsing distinct concrete topics into one cache slot. - Exception narrowing in teardown / put / declare paths follows AGENTS.md > Review Learnings (#86) > 'Exception Clauses Must Be Narrow' precisely, and each call site documents why the chosen tuple is the right surface.
- Pin-test discipline: every R-round gets at least one regression test that fails on pre-fix code (
test_partial_canonical_does_not_alias,test_distinct_delivered_topics_not_aliased,test_no_bare_except_exception_in_bridge_transport, etc.). This is exactly the rule from #85 review-learnings. - The exact / prefix-walk split for
_should_bridgeis well-motivated (cloud-pollution attack surface) and the comment block explaining_DEFAULT_BRIDGE_PREFIX_SUFFIXES = {"response"}is the kind of inline rationale future reviewers will thank you for.
Concerns
- Scope creep, mild. Adding
safety/resumetoDEFAULT_BRIDGE_SUFFIXESis not a dedup/TTL fix and lives orthogonally to the rest of the diff. The PR description does call it out separately, which is the right thing to do, but consider whether a one-line follow-up commit/PR isolating the bridged-topic-set change would be cleaner. Not blocking. - Two env vars read once at construction. Both
_resolve_dedup_ttland_resolve_dedup_strictare called inside_CommandDeduplicator.__init__/BridgeTransport.__init__, so togglingSTRANDS_MESH_BRIDGE_DEDUP_STRICTorSTRANDS_MESH_DEDUP_TTLafter process start has no effect until the bridge restarts. Worth a one-liner in the docstring of_resolve_dedup_strictand a parallel note in the README env-var section (per AGENTS.md > Review Learnings (#86) > 'Document every env var in README.md'). - Three new env vars.
STRANDS_MESH_BRIDGE_TOPICS_PREFIX,STRANDS_MESH_BRIDGE_DEDUP_STRICT, andSTRANDS_MESH_DEDUP_TTLshould each get an entry in the project README's Configuration section in this PR (AGENTS.md again). I'm assuming this lives in a separate doc PR within the #195 split — flag if not.
Verification suggestions
hatch run test tests/mesh/test_bridge_dedup.py -v
hatch run test tests/mesh/test_bridge_transport.py -v
# Spot-check no host-paths slipped into the new test file:
hatch run test tests/test_no_host_paths.py
# Make sure the narrow-except source-grep pin actually fails on a re-introduction:
# 1. revert one narrowed clause to bare `except Exception:`
# 2. hatch run test tests/mesh/test_bridge_dedup.py::TestNarrowExceptionsR3
# 3. confirm RED, then revert.| # robot-b/cmd). Falls back to the subscription key_expr when | ||
| # the sample does not expose key_expr (should never happen per | ||
| # the _MqttSample / zenoh.Sample contracts). | ||
| delivered_topic = str(getattr(sample, "key_expr", key_expr)) |
There was a problem hiding this comment.
Silent fallback to the subscription key_expr here re-introduces the exact R5 wildcard-aliasing bug this PR fixes if sample.key_expr is ever missing. The comment correctly notes "should never happen per the _MqttSample / zenoh.Sample contracts" — but if a future transport (or a mock in test) ever lands with a sample object lacking key_expr, two distinct concrete topics under one wildcard subscription will silently collapse to one cache slot again, with no log signal that R5 has regressed.
Suggest one of:
- Treat missing
key_expras fatal (raiseRuntimeError) — the contract claim says it cannot happen, so any occurrence is a bug worth surfacing. - At minimum,
logger.warning(...)on thegetattrdefault branch so a regression is observable in operator logs.
A pin test for the fallback path (e.g. construct a _FakeSample without key_expr and assert the warning is emitted, or assert dedup keys on a sentinel) would also keep the contract honest.
| # Canonical path requires all three fields present; partial | ||
| # canonical payloads fall through to the strict/pass-through | ||
| # branch so they do not alias against each other. | ||
| if sender is None or turn is None or cmd is None: |
There was a problem hiding this comment.
payload.get("command") is None collapses two distinct shapes:
- Key absent (
{}) - Key explicitly null (
{"command": null})
For JSON-RPC-shaped payloads this is fine in practice, but the class docstring on L283-290 advertises the canonical triple as the contract — so a payload that explicitly sets command: null will route through the partial-canonical / strict-fallback path rather than the canonical path, and its strict-mode hash will differ from a re-delivery that omits the key entirely. That's a subtle alias the test suite doesn't currently pin.
If the contract really is "all three keys present (regardless of value)", use "command" in payload (and same for sender_id / turn_id). If the contract is "all three values non-null", make that explicit in the docstring. Either way, a pin test asserting the chosen semantics would close the gap.
| decoded = json.loads(raw) | ||
| if isinstance(decoded, dict): | ||
| payload = decoded | ||
| except (AttributeError, UnicodeDecodeError, json.JSONDecodeError): |
There was a problem hiding this comment.
AttributeError in this tuple is broad: it catches any attribute access failure inside the try, not just sample.payload being absent. If a future refactor types sample.payload.to_bytes() and the inner .to_bytes is missing on some sample shape, the bug is silently swallowed (payload = None → message bypasses dedup → handler still runs with a malformed sample). Per AGENTS.md > Review Learnings (#85) > 'No silent defaults on error', this is exactly the surface that masks programming errors.
Two options:
- Replace the broad
trywith an explicit guard:if not hasattr(sample, "payload"):then a narrowtryaroundto_bytes().decode()for(UnicodeDecodeError, OSError)and the JSON parse forjson.JSONDecodeError. - Keep the current shape but add a
logger.debug("[bridge] sample.payload extraction failed: %r", exc)so a regression is at least observable.
The inline reference to core.py's wire handlers is fine; the issue is that the rationale doesn't apply identically here because this path also gates dedup correctness, not just delivery.
| # BridgeTransport, shared between the Zenoh and IoT subscriber | ||
| # wrappers -- whichever transport delivers a sample first wins, | ||
| # and the other side silently drops the duplicate. | ||
| self._dedup = _CommandDeduplicator(strict=_resolve_dedup_strict()) |
There was a problem hiding this comment.
Both _resolve_dedup_strict() and _resolve_dedup_ttl() are read exactly once at BridgeTransport.__init__ time. In a long-running process this means flipping STRANDS_MESH_BRIDGE_DEDUP_STRICT=1 to recover from an MQTT replay storm requires a full bridge restart — which on physical robots may mean a reconnect storm of its own.
Not a blocker for this PR, but please:
- Note the read-once-at-construction semantics in
_resolve_dedup_strict's docstring (it currently reads as if it were checked per-sample). - Add a README entry under Configuration listing
STRANDS_MESH_DEDUP_TTL,STRANDS_MESH_BRIDGE_DEDUP_STRICT, andSTRANDS_MESH_BRIDGE_TOPICS_PREFIX(AGENTS.md > Review Learnings (feat: Robot() factory + top-level lazy imports #86) > 'Document every env var in README.md'). If that lives in a sibling PR of the feat(mesh): security hardening on Zenoh built-ins (mTLS + ACL + DoS bounds) #195 split, please link it from the PR body.
| self._seen.pop(k, None) | ||
| if len(self._seen) > _MAX_DEDUP_ENTRIES: | ||
| # drop oldest 20% | ||
| ordered = sorted(self._seen.items(), key=lambda kv: kv[1]) |
There was a problem hiding this comment.
sorted(self._seen.items(), ...) runs O(n log n) under self._lock once _seen blows past _MAX_DEDUP_ENTRIES and the cheap stale-eviction pass doesn't drop enough. With the default _MAX_DEDUP_ENTRIES = 10_000 this is bounded but the whole bridge subscriber pipeline blocks during the sort.
PR description folds this into #231, so noted as known. Two practical asks before that lands:
- Add a slow-path regression test that fills
_seenpast the threshold and asserts both that the GC reclaims and that no entries are incorrectly dropped (i.e. nothing within TTL gets evicted). Without it, the GC fix in perf(mesh/transport): bound _CommandDeduplicator GC cost under sustained pressure #231 may regress the correctness contract silently. - Consider whether a heap (
heapq) keyed on insertion time would let you drop the bottom-20% in O(k log n) without sorting the full dict — keeps the lock-hold-time bound to k, not n.
… of silent fallback Per review feedback on PR strands-labs#222, the previous R5 fix used `getattr(sample, "key_expr", key_expr)` which silently fell back to the subscription pattern when the attribute was absent. That fallback re-introduces the wildcard-aliasing bug R5 fixed (two distinct concrete topics under one wildcard subscription collapse to one cache slot) with no observable signal in operator logs. Per AGENTS.md > Review Learnings (strands-labs#85) > 'No silent defaults on error', use a sentinel default and emit `logger.warning` on the fallback so a contract drift (mock shape, transport refactor) surfaces in operator logs. Pin: `tests/mesh/test_bridge_dedup.py::TestMissingKeyExprWarnsR5` (2 cases -- structural source-grep that the warning is gated on the sentinel branch so it does not fire on every well-formed sample). All 33 existing dedup tests + 43 bridge-transport tests still pass.
… CodeQL strands-labs#264 The R6 commit added `test_missing_key_expr_warns_and_falls_back` with unused fixture wiring (BridgeTransport.__new__ stub, _dedup setup, captured list, handler closure, caplog parameter) that was never referenced -- the test only does source-grep via inspect.getsource. CodeQL strands-labs#264 flagged the unused `handler` variable. The cleanest fix is to drop the entire dead scaffolding (which doesn't contribute to any assertion) and reframe the docstring to be explicit that this is a source-grep pin, with the live-subscriber path already covered by R4 `test_distinct_delivered_topics_not_aliased`. Also drops unused `caplog` parameter from the sibling `test_present_key_expr_does_not_warn` (same shape: source-grep only). Both tests pass on this commit (33/33 in test_bridge_dedup.py). Verification: $ pytest tests/mesh/test_bridge_dedup.py::TestMissingKeyExprWarnsR5 -v --no-cov 2 passed in 0.09s No production code changes; pure test-hygiene fix.
yinsong1986
left a comment
There was a problem hiding this comment.
Summary
PR-5 of #195's split. Adds cross-transport dedup to BridgeTransport (Zenoh + IoT MQTT) using a SHA-256 fingerprint over the canonical (sender_id, turn_id, command) triple, with time.monotonic-based TTL eviction (R12 from #195). Splits _should_bridge into exact-match (default) vs opt-in prefix-match (STRANDS_MESH_BRIDGE_TOPICS_PREFIX) to close a cloud-pollution attack on the audit-table path. Adds safety/resume to defaults to pair with safety/estop for incident-window closure. Narrows seven except Exception sites to documented transport-failure tuples. _dispatch cache key now uses sample.key_expr (delivered topic) rather than the subscription pattern (R5), with a sentinel + logger.warning fallback (R6) replacing the silent default.
The scope matches the title and the §13 review-round changelog is unusually thorough — every behavioural fix is paired with a named pin test, and the deferred items (#231 GC perf, #233 default=str non-determinism, PR #226 env-var docs) have explicit rationale. Compliance with AGENTS.md > Review Learnings (#85, #86) is visible in narrowed except clauses, the no-silent-default fallback warning, and the per-name dedup-cache-key isolation.
What's good
- §13 changelog with named pin tests for every R-numbered fix; reviewer can check each cell.
time.monotonicfor TTL is the right primitive (R12).- Exact-match-by-default for
_should_bridgeis a meaningful security tightening on the audit path; opt-in_PREFIXenv var preserves back-compat without expanding the default attack surface. - Cache key is
(delivered_topic, dedup_id)— wildcard subscriptions don't alias across concrete topics (R5). _CommandDeduplicatoris locked, slot'd, and documents its concurrency contract.- Narrowed except clauses use
(RuntimeError, ConnectionError, OSError)/(RuntimeError, AttributeError, OSError)tuples consistent with the wire-handler tuple incore.py.
Concerns
Five inline. The two correctness ones worth highlighting in summary:
-
_should_bridgere-readsSTRANDS_MESH_BRIDGE_TOPICS_PREFIXon everyput()— the prefix filter is resolved per-call whenallowed_prefixes is None, which is the case fromBridgeTransport.put(). This is per-publishos.getenv(perf at high pub rate) AND introduces inconsistent semantics: env-var changes mid-process take effect onput()but NOT on subscribe (which uses the cachedself._bridge_suffixesfrom__init__). Cache the prefix filter on the instance like_bridge_suffixes. -
Canonical-tuple hash uses
default=str— thedefault=strfallback injson.dumpsat the canonical-path serialiser collapses any non-JSON-serialisable command field (datetime, custom object, bytes) to itsstr()repr. Two distinct objects whosestr()happens to coincide alias to the same dedup id and silently drop the second delivery. The PR description scopes #233 to the strict-mode partial-canonical fallback, but this is the primary dedup-identity path. Either dropdefault=strand let TypeError raise (caught at the outerexcept (TypeError, ValueError)) or document the contract that command payloads must be pure-JSON.
The R6 warning at line 612 is observability theatre at high publish rates (see inline). The two source-grep pin tests in TestMissingKeyExprWarnsR5 are structurally weak — they check string positions in inspect source and would not catch a runtime regression. The GC sort-and-slice on the dedup cache is acknowledged as #231; flagging the line because the lock-hold time is the operator-visible symptom.
Verification suggestions
# Run the new dedup tests
hatch run test -- tests/mesh/test_bridge_dedup.py -v
# Confirm strict-mode env var wiring (R1)
STRANDS_MESH_BRIDGE_DEDUP_STRICT=1 hatch run test -- \
tests/mesh/test_bridge_dedup.py::TestStrictEnvVarWiringR1 -v
# Spot-check the cloud-pollution defence: a topic with an arbitrary tail
# under safety/event must NOT bridge.
python -c "
from strands_robots.mesh.transport.bridge_transport import _should_bridge, DEFAULT_BRIDGE_SUFFIXES
assert not _should_bridge('strands/x/safety/event/blob', DEFAULT_BRIDGE_SUFFIXES)
assert _should_bridge('strands/x/safety/event', DEFAULT_BRIDGE_SUFFIXES)
assert _should_bridge('strands/x/response/turn-1', DEFAULT_BRIDGE_SUFFIXES)
print('ok')
"| tail, so it is the sole prefix-walk default. | ||
| """ | ||
| if allowed_prefixes is None: | ||
| allowed_prefixes = _resolve_bridge_prefix_filter() |
There was a problem hiding this comment.
_resolve_bridge_prefix_filter() calls os.getenv("STRANDS_MESH_BRIDGE_TOPICS_PREFIX") on every invocation, and _should_bridge() is called from BridgeTransport.put() (line 554) without an explicit allowed_prefixes — so this resolves the env var on every publish.
Two concerns:
-
Per-publish
os.getenvat e.g. 50 Hz cmd rate is unnecessary work — and worse, it leaks an inconsistency: an operator who edits the env var mid-process will see the new value applied to outboundput()but not to subscriptions (the suffix filter is cached onself._bridge_suffixesat__init__). Two filter sources with two different freshness contracts in the same module is surprising. -
_resolve_bridge_filter()is not called per-publish (it's resolved once in__init__at line 451). The prefix variant should be cached the same way — storeself._bridge_prefixes = _resolve_bridge_prefix_filter()in__init__and pass it through to_should_bridge.
The AGENTS.md guidance on read-once-at-construction (PR #226 referenced in the §13 deferred table) applies here too — fix this PR's own asymmetry rather than deferring it.
| separators=(",", ":"), | ||
| default=str, | ||
| ).encode("utf-8") | ||
| # Full 256-bit (64 hex chars) -- no birthday-attack truncation. |
There was a problem hiding this comment.
default=str on the canonical-path hash (lines 351–356) is a correctness hazard, not just a perf one (#233 scoped this to the strict-mode partial fallback, but the same issue lives here on the primary dedup-identity path).
If any field of cmd is not JSON-serialisable — bytes, datetime, a custom object, an Enum, a Path — default=str stringifies it. Two distinct objects whose str() collides (e.g. two custom objects without a __str__ override that both yield <MyClass object at 0x...>-shaped reprs only when id() differs, but identical for objects with the same id() recycled, or two Path instances normalised differently by callers but stringifying to the same path) will produce the same fingerprint and the second delivery is silently dropped.
The canonical contract advertises (sender_id, turn_id, command) is a unique identity for distinct deliveries — default=str quietly weakens that. Two options:
- Drop
default=str. ATypeErrorfromjson.dumpswould propagate; wrap thejson.dumps(...)block intry/except (TypeError, ValueError): return Noneand document that non-JSONcommandpayloads bypass dedup (same contract as missing canonical fields). - Keep
default=strbut document explicitly thatcommandMUST be pure-JSON-encodable, with a sanity check on the producer side.
Either way the docstring at lines 308–328 should call this out — right now it claims "identity is the canonical RPC triple" without the JSON-encodability caveat.
| self._seen.pop(k, None) | ||
| if len(self._seen) > _MAX_DEDUP_ENTRIES: | ||
| # drop oldest 20% | ||
| ordered = sorted(self._seen.items(), key=lambda kv: kv[1]) |
There was a problem hiding this comment.
sorted(self._seen.items(), key=lambda kv: kv[1]) runs O(N log N) under self._lock when the cap is exceeded. With _MAX_DEDUP_ENTRIES = 10_000 and the lock held by every concurrent is_duplicate() caller (Zenoh and IoT subscribers both share self._dedup), a publisher burst that spills the cap stalls the subscriber path until the sort finishes.
The §13 deferred table notes this is folded into #231 — fine, but please leave a # TODO(#231): replace sort+slice with heap-based GC; lock-hold time is O(N log N). marker on this line so the next reader doesn't have to re-derive the concern. The current comment (# drop oldest 20%) reads as intentional design rather than a known scaling cliff.
| _sentinel = object() | ||
| _delivered = getattr(sample, "key_expr", _sentinel) | ||
| if _delivered is _sentinel: | ||
| logger.warning( |
There was a problem hiding this comment.
If key_expr is genuinely missing from sample (the contract drift this warns about), _filtered is invoked on every delivered sample of that subscription — so this logger.warning fires once per sample, not once per drift event. At cmd rates around 50 Hz on a wildcard subscription you get 50 WARNs/sec.
The PR's deferred table claims this is "rate-limited to genuine contract drift" — that's only true at the level of which subscriber drifts; within a drifted subscriber, every sample warns. That defeats the observability goal: floods burn through CloudWatch budget and operators tune them out.
Two cheap fixes:
- One-shot per subscription: gate the warning on a
boolcaptured in themake_dedup_handlerclosure (set to True after the first warning); subsequent missing-key_exprsamples log atdebug. The R5 contract drift is global per subscriber, so one log line per subscriber lifetime is sufficient signal. - Sampled:
if random() < 0.001: logger.warning(...)— crude but bounded.
The one-shot pattern is more aligned with AGENTS.md > Review Learnings (#85) > 'No silent defaults on error': observable once, not noise-throttled.
| sentinel_branch_idx = src.find("is _sentinel") | ||
| warning_idx = src.find("logger.warning", sentinel_branch_idx) | ||
| assert sentinel_branch_idx != -1, "sentinel branch not found" | ||
| assert warning_idx != -1, "warning not in sentinel branch" |
There was a problem hiding this comment.
test_present_key_expr_does_not_warn is a source-grep test masquerading as a negative pin. It only checks that "logger.warning" appears after "is _sentinel" in inspect.getsource(...). It does not assert the runtime behaviour the docstring claims: that a well-formed sample produces no warning record.
Failure modes this test would not catch:
- A future commit adds an unconditional second
logger.warning(...)later in_filteredfor any unrelated reason —find()still locates the first (sentinel-gated) call, position assertion still passes, but every sample now warns. - Someone refactors the sentinel-branch into a helper function —
inspect.getsource(declare_subscriber)no longer contains"is _sentinel", the test fails for the wrong reason, gets fixed by relaxing the assertion, and the runtime contract is silently lost.
The sibling positive case (test_missing_key_expr_warns_and_falls_back) is also source-grep — but at least it pins a structural change. The negative case has a trivial runtime form: drive _filtered with a sample whose key_expr is set, attach caplog, assert no records at WARNING level. The R6-b commit dropped the caplog fixture; please restore it for this one test.
Per AGENTS.md > Review Learnings (#85) > 'Pin regression tests for reviewed fixes' — the regression-test that fails on pre-fix code, not just one that pins a string position.
…arning gate, TODO strands-labs#231 marker, runtime test pin (addresses threads L206 L612 L379 L700)
…nonical dedup path; drop redundant test import (CodeQL strands-labs#265) Two surgical fixes responding to R8 review (no dedup-logic changes): 1. bridge_transport.py: Add JSON-encodability contract to _dedup_id docstring. Codifies that command payloads on the canonical identity path MUST be pure-JSON-encodable. Documents the default=str fallback as defensive coercion only and explicitly cites strands-labs#233 as the tracker for the architectural decision (drop default=str vs. enforce at producer side). Per review feedback, default=str on the primary identity path is a correctness hazard, not just deferred-perf -- this docstring closes the contract gap without a behavior change. Logic resolution stays in strands-labs#233 to avoid a 9th review round on a behavior change without test scaffold. 2. test_bridge_dedup.py:719 -- remove redundant 'import json' that shadowed the module-level import on line 18 (CodeQL alert strands-labs#265, introduced by R7's runtime caplog test). Round budget remains spent. Per AGENTS.md round-budget tenet, further code changes require formal CHANGES_REQUESTED or blocker designation.
Part 5 / 9 of the split of #195 — tracked by #219.
Bridge dedup correctness across LAN-Zenoh and AWS-IoT transports.
What changes
time.monotonicfor TTL eviction (not wall clock — survives clock changes / DST / NTP slew).STRANDS_MESH_BRIDGE_TOPICS.STRANDS_MESH_BRIDGE_TOPICS_PREFIX(back-compat for the old behaviour).STRANDS_MESH_BRIDGE_DEDUP_STRICT.(sender_id, turn_id, command)hashed by full SHA-256 (no birthday-attack truncation). Callers must not reuse the triple for distinct deliveries; partial canonical payloads fall through to pass-through (default) or full-payload hash (strict).safety/resumetoDEFAULT_BRIDGE_SUFFIXES(paired withsafety/estopso the cloud audit timeline can close incident windows). Called out separately because it's an additional bridged topic, not strictly part of the dedup/TTL fix the title advertises.except Exceptionsites to the documented transport-failure surface tuples (R3 hygiene fix; see §13 row R3-c).sample.key_expr) rather than the subscription pattern, preventing wildcard-aliasing across distinct concrete topics (R5 fix). A sample withoutkey_exprnow emitslogger.warning(one-shot per subscription, R7 fix) instead of silently falling back to the subscription pattern (R6 fix per AGENTS.md > Review Learnings (feat: MuJoCo simulation backend - AgentTool with 50+ actions #85) > 'No silent defaults on error')._resolve_bridge_prefix_filter()) cached at init like suffix filter (R7 fix; eliminates per-publishos.getenv)._dedup_iddocstring (R8 fix; resolution ofdefault=strsemantics tracked in mesh(transport): _dedup_iddefault=strmakes canonical fingerprint non-deterministic for objects with default __str__ #233).What's in this PR
strands_robots/mesh/transport/bridge_transport.py(+267/-25 baseline; R1-R8 add ~152 LOC of correctness/wiring/hygiene/contract deltas).strands_robots/mesh/transport/base.py,factory.py,iot_transport.py— minor signature touch-ups.Reviewer focus
_PREFIXvariant).commandrequirement codified in docstring).sample.key_expr(delivered topic), not subscription pattern (R5); fallback path warns once per subscription instead of silently aliasing (R6+R7).__init__(R7).Carries review fixes from #195
R12 (TTL math used
time.time(), fixed totime.monotonic), R15 (cross-transport dedup opt-in strict mode), R20 (estop replay cache key narrowed tot-only; reject emptypeer_id— bridge-side counterpart).Stacking note
Self-contained subsystem: transport layer is decoupled from
core.pyvia theTransportbase class. Depends only on PR-1 (#220). PR-6 will later consume the new bridge contract.Landing order: PR-1 → PR-5 (parallel with PR-2/3/4) → PR-6 → PR-7 → PR-8 → PR-9. Full plan:
PR_LIST.md. Tracking: #219.§13 Review Round Changelog
STRANDS_MESH_BRIDGE_DEDUP_STRICTenv var advertised but never wired to_CommandDeduplicatorae4f35ctests/mesh/test_bridge_dedup.py::TestStrictEnvVarWiringR1(3 cases)_should_bridgedocstring truncated. (c)safety/resumerationale missing.0230c1btests/mesh/test_bridge_dedup.py::TestStrictModeIntegrationR2(2 cases).00ca08e::test_partial_canonical_does_not_aliasand::test_partial_canonical_strict_mode_uses_full_payload.00ca08eexcept Exceptionsites.db192ce::TestNarrowExceptionsR3::test_no_bare_except_exception_in_bridge_transport.cf45728hatch run lintimplicit pin.sample.key_expr.6de576e::TestWildcardSubscriptionDedupIsolationR4(3 tests).sample.key_exprmissing; sentinel +logger.warning.a2c12b7::TestMissingKeyExprWarnsR5(2 cases).331e2f4_resolve_bridge_prefix_filter()called per-publish (cache asymmetry vs suffix filter). (b)logger.warningper-sample noise (one-shot gate). (c)sorted()under lock missing TODO marker for #231. (d)test_present_key_expr_does_not_warnwas source-grep, not runtime assertion.7402b35::TestPrefixFilterCachedAtInitR7(3 tests),::TestOneShotWarningR7(1 test),::test_present_key_expr_does_not_warn(caplog-based runtime pin).default=strcorrectness hazard escalated from deferred-perf on canonical-path hash (#233 scope broadened). (b) CodeQL #265: redundantimport jsonon test_bridge_dedup.py:719 (introduced by R7's runtime caplog test).f142709_dedup_id; #233 broadened to cover canonical path; CodeQL alert resolved. No dedup-logic changes in this round.Disposition of remaining review concerns (R8 round, 2026-05-30)
bridge_transport.py:357—default=stron canonical-path hash_dedup_iddocstring statingcommandmust be pure-JSON-encodable. Architectural decision (dropdefault=strvs. enforce at producer) tracked in #233 with required pin tests for both paths.bridge_transport.py:340—payload.get("command") is Nonecollapses key-absent vs key-explicitly-nullbridge_transport.py:589—AttributeErrorin extractiontryis broadcore.pywire handlers; pinned bytest_wire_handler_narrow_except.py.bridge_transport.py:460— read-once env-var docstring + README docsbridge_transport.py:379— slow-path regression test + heap-based GCbridge_transport.py:99— Safety-topic duplicate-drop visibilitybridge_transport.py:225— head-segment path-traversal guardTracked-as-follow-up (not in this PR)
default=strnon-determinism on objects without__str__override (BOTH canonical and strict-fallback paths) — mesh(transport): _dedup_iddefault=strmakes canonical fingerprint non-deterministic for objects with default __str__ #233 (scope broadened in R8).STRANDS_MESH_DEDUP_TTL,STRANDS_MESH_BRIDGE_DEDUP_STRICT,STRANDS_MESH_BRIDGE_TOPICS_PREFIX) — PR docs(mesh): README env-var matrix + CHANGELOG entry (9/9 of #195 split) #226 (9/9 of feat(mesh): security hardening on Zenoh built-ins (mTLS + ACL + DoS bounds) #195 split).Round budget note (per AGENTS.md §0 tenet 8): This PR has reached R8, well over the 3-round target. R8's response was scoped strictly to (a) addressing CodeQL bot finding I introduced in R7 and (b) codifying the contract reviewer requested without making a behavior change. Round budget remains spent. Further code changes in this PR require formal CHANGES_REQUESTED or explicit blocker designation. The core transport correctness story (dedup, TTL, prefix/suffix isolation, cache-key fidelity, JSON-encodability contract) is complete and pinned.