fix stream name mismatch inbox => stream#104
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces mailbox stream deletion during cleanup in the Redis repository, tracks the session start time for peer agents, and implements a stale message filtering mechanism in the mailbox. However, a critical issue was identified in the stale message filtering logic: comparing absolute wall-clock timestamps across distributed systems introduces vulnerabilities to clock drift and startup race conditions, which can permanently stall connection handshakes.
| def _is_stale_message(self, fields: Dict[str, str]) -> bool: | ||
| raw_timestamp = fields.get("timestamp") | ||
| if raw_timestamp is None: | ||
| return False | ||
| if isinstance(raw_timestamp, bytes): | ||
| raw_timestamp = raw_timestamp.decode("utf-8", errors="replace") | ||
| try: | ||
| timestamp = float(raw_timestamp) | ||
| except (TypeError, ValueError): | ||
| return False | ||
| return timestamp < getattr(self._agent, "_session_started_at", 0.0) |
There was a problem hiding this comment.
Critical Bug: Clock Drift and Startup Race Condition in Stale Message Filtering
The introduction of _is_stale_message to filter out messages where timestamp < self._session_started_at introduces two severe issues that can permanently stall the connection handshake:
-
Startup Rendezvous Violation:
As documented in the comments forstart()(lines 59–63), a peer whoseconnect_toruns before this agent registers will writeqp_readyto our stream before our listener/session starts. In this valid scenario, the message's timestamp will naturally be less thanself._session_started_at. By classifying these messages as stale and ignoring them, the agent will fail to establish the connection, stalling the handshake forever. -
Distributed Clock Drift:
Thetimestampis generated using the local system clock of the sending peer or the control plane (time.time()), while_session_started_atis generated using the local system clock of the receiving agent. In a distributed environment, even minor clock drift (e.g., if the sender's clock is slightly behind the receiver's clock) will cause newly sent, perfectly valid messages to be incorrectly discarded as stale.
Suggested Remedy:
Instead of using absolute wall-clock timestamps from different machines to detect stale messages, consider using a unique session/run identifier (e.g., a UUID or an incrementing epoch/incarnation number generated during registration) that is passed in the messages. Alternatively, rely on the fact that cleanup_agent already deletes the stream key upon deregistration/restart, or use Redis stream IDs (which are monotonic and generated by the single Redis server clock) to establish a safe boundary.
There was a problem hiding this comment.
Pull request overview
This PR updates mailbox cleanup and stale-message handling for Redis stream-based peer handshakes, addressing stream/inbox naming drift and stale RDMA exchange data.
Changes:
- Adds timestamp-based stale stream message filtering in
StreamMailbox. - Records agent session start time for stale-message comparisons.
- Deletes
stream:{agent}during controller-side agent cleanup and expands Python startup exchange-key cleanup.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
dlslime/dlslime/peer_agent/_mailbox.py |
Adds stale-message detection before handling stream messages. |
dlslime/dlslime/peer_agent/_agent.py |
Tracks session start time and expands stale exchange-key cleanup pattern. |
dlslime-ctrl/src/redis_repo.rs |
Deletes stream mailbox keys alongside legacy inbox keys during cleanup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| raw_timestamp = fields.get("timestamp") | ||
| if raw_timestamp is None: | ||
| return False | ||
| if isinstance(raw_timestamp, bytes): | ||
| raw_timestamp = raw_timestamp.decode("utf-8", errors="replace") | ||
| try: | ||
| timestamp = float(raw_timestamp) | ||
| except (TypeError, ValueError): | ||
| return False | ||
| return timestamp < getattr(self._agent, "_session_started_at", 0.0) | ||
|
|
| patterns = [ | ||
| f"{prefix}exchange:{self.alias}:*", | ||
| f"{prefix}exchange:*:{self.alias}", | ||
| f"{prefix}exchange:*:{self.alias}:*", |
|
root seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
No description provided.