From 379da6ab43ae9d7a40c6c3038a2d6e19c6df50b1 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 May 2026 23:13:16 +0800 Subject: [PATCH 1/2] fix stream name mismatch inbox => stream --- dlslime-ctrl/src/redis_repo.rs | 9 ++++++++- dlslime/dlslime/peer_agent/_agent.py | 2 ++ dlslime/dlslime/peer_agent/_mailbox.py | 21 +++++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/dlslime-ctrl/src/redis_repo.rs b/dlslime-ctrl/src/redis_repo.rs index 25b04e76..142b8a3e 100644 --- a/dlslime-ctrl/src/redis_repo.rs +++ b/dlslime-ctrl/src/redis_repo.rs @@ -283,7 +283,14 @@ impl RedisRepo { } } - // 5. Delete inbox + // 5. Delete mailbox streams/inbox + let stream_key = self.scoped_key(scope, &["stream", agent_name]); + redis::cmd("DEL") + .arg(&stream_key) + .query_async::<()>(&mut *conn) + .await + .ok(); + let inbox_key = self.scoped_key(scope, &["inbox", agent_name]); redis::cmd("DEL") .arg(&inbox_key) diff --git a/dlslime/dlslime/peer_agent/_agent.py b/dlslime/dlslime/peer_agent/_agent.py index d63792a6..e4abbcb3 100644 --- a/dlslime/dlslime/peer_agent/_agent.py +++ b/dlslime/dlslime/peer_agent/_agent.py @@ -200,6 +200,7 @@ def __init__( # Build Redis key prefix from scope parameter self._redis_key_prefix = scope or "" + self._session_started_at = time.time() # NanoCtrl HTTP client self._client = NanoCtrlClient(ctrl_url, scope=self._redis_key_prefix or None) @@ -670,6 +671,7 @@ def _cleanup_stale_exchange_keys(self) -> None: patterns = [ f"{prefix}exchange:{self.alias}:*", f"{prefix}exchange:*:{self.alias}", + f"{prefix}exchange:*:{self.alias}:*", ] try: keys_to_delete: list[str] = [] diff --git a/dlslime/dlslime/peer_agent/_mailbox.py b/dlslime/dlslime/peer_agent/_mailbox.py index 63caa9d1..ca9dab5a 100644 --- a/dlslime/dlslime/peer_agent/_mailbox.py +++ b/dlslime/dlslime/peer_agent/_mailbox.py @@ -148,6 +148,15 @@ def _listen_loop(self) -> None: def _handle_message(self, fields: Dict[str, str]) -> None: """Handle incoming stream message.""" + if self._is_stale_message(fields): + logger.info( + "StreamMailbox %s: Ignoring stale %s from %s", + self._agent.alias, + fields.get("type"), + fields.get("peer"), + ) + return + msg_type = fields.get("type") if msg_type == "connect_peer": @@ -223,6 +232,18 @@ def _handle_message(self, fields: Dict[str, str]) -> None: msg_type, ) + def _is_stale_message(self, fields: Dict[str, str]) -> bool: + raw_timestamp = fields.get("timestamp") + if raw_timestamp is None: + return False + if isinstance(raw_timestamp, bytes): + raw_timestamp = raw_timestamp.decode("utf-8", errors="replace") + try: + timestamp = float(raw_timestamp) + except (TypeError, ValueError): + return False + return timestamp < getattr(self._agent, "_session_started_at", 0.0) + def _try_connect_peer( self, peer: str, From 66450ffe6000d99a5abadbd06bc62d346d419861 Mon Sep 17 00:00:00 2001 From: FirwoodLin Date: Wed, 27 May 2026 23:29:30 +0800 Subject: [PATCH 2/2] fix race condition --- dlslime/dlslime/peer_agent/_mailbox.py | 30 ++++---------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/dlslime/dlslime/peer_agent/_mailbox.py b/dlslime/dlslime/peer_agent/_mailbox.py index ca9dab5a..accaf216 100644 --- a/dlslime/dlslime/peer_agent/_mailbox.py +++ b/dlslime/dlslime/peer_agent/_mailbox.py @@ -63,10 +63,10 @@ def start(self) -> None: # so losing the message stalls the handshake forever. # # Instead, _listen_loop reads from "0-0" and processes every message - # on the stream. Truly stale messages from a prior crashed session - # with the same alias fail endpoint.connect at handshake time — the - # listen loop catches that and continues, so later in-flight qp_ready - # from the current peer session still completes the handshake. + # on the stream. Stale messages must be removed when the previous + # agent incarnation is cleaned up. Do not use local wall-clock + # timestamps here: a valid early qp_ready can predate this listener, + # and sender/receiver clocks may drift across nodes. self._thread = threading.Thread(target=self._listen_loop, daemon=True) self._thread.start() logger.info( @@ -148,15 +148,6 @@ def _listen_loop(self) -> None: def _handle_message(self, fields: Dict[str, str]) -> None: """Handle incoming stream message.""" - if self._is_stale_message(fields): - logger.info( - "StreamMailbox %s: Ignoring stale %s from %s", - self._agent.alias, - fields.get("type"), - fields.get("peer"), - ) - return - msg_type = fields.get("type") if msg_type == "connect_peer": @@ -232,18 +223,6 @@ def _handle_message(self, fields: Dict[str, str]) -> None: msg_type, ) - def _is_stale_message(self, fields: Dict[str, str]) -> bool: - raw_timestamp = fields.get("timestamp") - if raw_timestamp is None: - return False - if isinstance(raw_timestamp, bytes): - raw_timestamp = raw_timestamp.decode("utf-8", errors="replace") - try: - timestamp = float(raw_timestamp) - except (TypeError, ValueError): - return False - return timestamp < getattr(self._agent, "_session_started_at", 0.0) - def _try_connect_peer( self, peer: str, @@ -337,7 +316,6 @@ def _try_connect_peer_inner( "peer": self._agent.alias, "qp_info": json.dumps(my_qp_info, default=str), "conn_meta": json.dumps(my_conn_meta, default=str), - "timestamp": str(time.time()), }, maxlen=1000, approximate=True,