Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dlslime-ctrl/src/redis_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,14 @@ impl RedisRepo {
}
}

// 5. Delete inbox
// 5. Delete mailbox streams/inbox
let stream_key = self.scoped_key(scope, &["stream", agent_name]);
redis::cmd("DEL")
.arg(&stream_key)
.query_async::<()>(&mut *conn)
.await
.ok();

let inbox_key = self.scoped_key(scope, &["inbox", agent_name]);
redis::cmd("DEL")
.arg(&inbox_key)
Expand Down
2 changes: 2 additions & 0 deletions dlslime/dlslime/peer_agent/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def __init__(

# Build Redis key prefix from scope parameter
self._redis_key_prefix = scope or ""
self._session_started_at = time.time()

# NanoCtrl HTTP client
self._client = NanoCtrlClient(ctrl_url, scope=self._redis_key_prefix or None)
Expand Down Expand Up @@ -670,6 +671,7 @@ def _cleanup_stale_exchange_keys(self) -> None:
patterns = [
f"{prefix}exchange:{self.alias}:*",
f"{prefix}exchange:*:{self.alias}",
f"{prefix}exchange:*:{self.alias}:*",
]
try:
keys_to_delete: list[str] = []
Expand Down
9 changes: 4 additions & 5 deletions dlslime/dlslime/peer_agent/_mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ def start(self) -> None:
# so losing the message stalls the handshake forever.
#
# Instead, _listen_loop reads from "0-0" and processes every message
# on the stream. Truly stale messages from a prior crashed session
# with the same alias fail endpoint.connect at handshake time — the
# listen loop catches that and continues, so later in-flight qp_ready
# from the current peer session still completes the handshake.
# on the stream. Stale messages must be removed when the previous
# agent incarnation is cleaned up. Do not use local wall-clock
# timestamps here: a valid early qp_ready can predate this listener,
# and sender/receiver clocks may drift across nodes.
self._thread = threading.Thread(target=self._listen_loop, daemon=True)
self._thread.start()
logger.info(
Expand Down Expand Up @@ -316,7 +316,6 @@ def _try_connect_peer_inner(
"peer": self._agent.alias,
"qp_info": json.dumps(my_qp_info, default=str),
"conn_meta": json.dumps(my_conn_meta, default=str),
"timestamp": str(time.time()),
},
maxlen=1000,
approximate=True,
Expand Down