diff --git a/dlslime-ctrl/src/redis_repo.rs b/dlslime-ctrl/src/redis_repo.rs index 25b04e76..142b8a3e 100644 --- a/dlslime-ctrl/src/redis_repo.rs +++ b/dlslime-ctrl/src/redis_repo.rs @@ -283,7 +283,14 @@ impl RedisRepo { } } - // 5. Delete inbox + // 5. Delete mailbox streams/inbox + let stream_key = self.scoped_key(scope, &["stream", agent_name]); + redis::cmd("DEL") + .arg(&stream_key) + .query_async::<()>(&mut *conn) + .await + .ok(); + let inbox_key = self.scoped_key(scope, &["inbox", agent_name]); redis::cmd("DEL") .arg(&inbox_key) diff --git a/dlslime/dlslime/peer_agent/_agent.py b/dlslime/dlslime/peer_agent/_agent.py index d63792a6..e4abbcb3 100644 --- a/dlslime/dlslime/peer_agent/_agent.py +++ b/dlslime/dlslime/peer_agent/_agent.py @@ -200,6 +200,7 @@ def __init__( # Build Redis key prefix from scope parameter self._redis_key_prefix = scope or "" + self._session_started_at = time.time() # NanoCtrl HTTP client self._client = NanoCtrlClient(ctrl_url, scope=self._redis_key_prefix or None) @@ -670,6 +671,7 @@ def _cleanup_stale_exchange_keys(self) -> None: patterns = [ f"{prefix}exchange:{self.alias}:*", f"{prefix}exchange:*:{self.alias}", + f"{prefix}exchange:*:{self.alias}:*", ] try: keys_to_delete: list[str] = [] diff --git a/dlslime/dlslime/peer_agent/_mailbox.py b/dlslime/dlslime/peer_agent/_mailbox.py index 63caa9d1..accaf216 100644 --- a/dlslime/dlslime/peer_agent/_mailbox.py +++ b/dlslime/dlslime/peer_agent/_mailbox.py @@ -63,10 +63,10 @@ def start(self) -> None: # so losing the message stalls the handshake forever. # # Instead, _listen_loop reads from "0-0" and processes every message - # on the stream. Truly stale messages from a prior crashed session - # with the same alias fail endpoint.connect at handshake time — the - # listen loop catches that and continues, so later in-flight qp_ready - # from the current peer session still completes the handshake. + # on the stream. Stale messages must be removed when the previous + # agent incarnation is cleaned up. Do not use local wall-clock + # timestamps here: a valid early qp_ready can predate this listener, + # and sender/receiver clocks may drift across nodes. self._thread = threading.Thread(target=self._listen_loop, daemon=True) self._thread.start() logger.info( @@ -316,7 +316,6 @@ def _try_connect_peer_inner( "peer": self._agent.alias, "qp_info": json.dumps(my_qp_info, default=str), "conn_meta": json.dumps(my_conn_meta, default=str), - "timestamp": str(time.time()), }, maxlen=1000, approximate=True,