Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions reflex/utils/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,22 @@ async def enumerate_tokens(self) -> AsyncIterator[str]:
if not cursor:
break

def _handle_socket_record_del(self, token: str) -> None:
async def _handle_socket_record_del(
self, token: str, expired: bool = False
) -> None:
"""Handle deletion of a socket record from Redis.

Args:
token: The client token whose record was deleted.
expired: Whether the deletion was due to expiration.
"""
if (
socket_record := self.token_to_socket.pop(token, None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: the token has been popped from token_to_socket on line 242, but then re-inserted during link_token_to_sid on line 247 - this creates a race where the socket_record is temporarily missing

Suggested change
socket_record := self.token_to_socket.pop(token, None)
) is not None and socket_record.instance_id == self.instance_id:
if expired:
# Keep the record alive as long as this process is alive and not deleted.
await self.link_token_to_sid(token, socket_record.sid)
self.sid_to_token.pop(socket_record.sid, None)
Prompt To Fix With AI
This is a comment left during a code review.
Path: reflex/utils/token_manager.py
Line: 242:242

Comment:
**logic:** the token has been popped from `token_to_socket` on line 242, but then re-inserted during `link_token_to_sid` on line 247 - this creates a race where the socket_record is temporarily missing

```suggestion
        ) is not None and socket_record.instance_id == self.instance_id:
            if expired:
                # Keep the record alive as long as this process is alive and not deleted.
                await self.link_token_to_sid(token, socket_record.sid)
            self.sid_to_token.pop(socket_record.sid, None)
```

How can I resolve this? If you propose a fix, please make it concise.

) is not None and socket_record.instance_id != self.instance_id:
) is not None and socket_record.instance_id == self.instance_id:
self.sid_to_token.pop(socket_record.sid, None)
if expired:
# Keep the record alive as long as this process is alive and not deleted.
await self.link_token_to_sid(token, socket_record.sid)

async def _subscribe_socket_record_updates(self) -> None:
"""Subscribe to Redis keyspace notifications for socket record updates."""
Expand All @@ -262,7 +268,10 @@ async def _subscribe_socket_record_updates(self) -> None:

event = message["data"].decode()
if event in ("del", "expired", "evicted"):
self._handle_socket_record_del(token)
await self._handle_socket_record_del(
token,
expired=(event == "expired"),
)
elif event == "set":
await self._get_token_owner(token, refresh=True)

Expand Down