Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/pycrdt/websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,22 @@ async def serve(self, channel: Channel):
"""
try:
async with create_task_group() as tg:
self.clients.add(channel)
# Send our state vector to the client (SYNC_STEP1).
sync_message = create_sync_message(self.ydoc)
self.log.debug(
"Sending %s message to endpoint: %s",
YSyncMessageType.SYNC_STEP1.name,
channel.path,
)
await channel.send(sync_message)
# Do not add the client to the broadcast list until the initial
# sync handshake is complete. Otherwise, concurrent document
# mutations (e.g. from MCP tool calls) trigger _broadcast_updates
# to send SYNC_UPDATE messages to this client before it has
# received the SYNC_STEP2 reply, causing the client-side yjs to
# throw "Unexpected case" in findIndexSS because the update
# references a struct the client doesn't have yet.
synced = False
async for message in channel:
# filter messages (e.g. awareness)
skip = False
Expand All @@ -319,6 +327,12 @@ async def serve(self, channel: Channel):
channel.path,
)
tg.start_soon(channel.send, reply)
if not synced:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we are synchronized when we receive a SYNC_STEP2 message:

Suggested change
if not synced:
if not synced and message[1] == YSyncMessageType.SYNC_STEP2:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that's not correct. The client is synchronized when the server sends a SYNC_STEP2 after receiving the client's SYNC_STEP1, so we don't need the synced variable:

Suggested change
if not synced:

# The first sync exchange is complete — the client
# now has all the data it needs. Safe to add to the
# broadcast list for incremental updates.
self.clients.add(channel)
synced = True
elif message_type == YMessageType.AWARENESS:
# forward awareness messages from this client to all clients,
# including itself, because it's used to keep the connection alive
Expand Down Expand Up @@ -350,8 +364,9 @@ async def serve(self, channel: Channel):
except Exception as exception:
self._handle_exception(exception)
finally:
# remove this client
self.clients.remove(channel)
# remove this client (may not have been added if it disconnected
# before the sync handshake completed)
self.clients.discard(channel)

def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
"""
Expand Down
64 changes: 62 additions & 2 deletions tests/test_yroom.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pytest
from anyio import TASK_STATUS_IGNORED, create_task_group, sleep
from anyio.abc import TaskStatus
from utils import Websocket
from utils import Websocket, connected_websockets

from pycrdt import Map
from pycrdt import Doc, Map, Provider
from pycrdt.websocket import exception_logger
from pycrdt.websocket.yroom import YRoom

Expand Down Expand Up @@ -34,6 +34,66 @@ async def test_yroom(yroom, yws_providers, websocket_provider_connect, room_name
tg.cancel_scope.cancel()


async def test_broadcast_not_sent_before_sync_completes(room_name):
"""Regression test: a client must not receive broadcast updates before the
initial sync handshake finishes.

When a new client connects, YRoom.serve() previously added it to
self.clients immediately — before sending SYNC_STEP2. If the server-side
document was mutated concurrently (e.g. by an MCP tool call adding cells),
_broadcast_updates would send a SYNC_UPDATE to the un-synced client.
The client's yjs would then throw "Unexpected case" in findIndexSS
because the update referenced a struct the client didn't have yet.

This test reproduces the race by:
1. Creating a YRoom with existing data
2. Connecting a new client
3. Mutating the server-side doc before the client finishes syncing
4. Asserting the client receives all data without errors
"""
room = YRoom()
async with room:
# Pre-populate the room's document with data (simulates an existing notebook).
room.ydoc["map"] = server_map = Map()
server_map["existing"] = "data"

# Set up a fake WebSocket pair for the new client.
server_ws, client_ws = connected_websockets()
ws = Websocket(server_ws, room_name)

# Client document that will sync with the room.
client_doc = Doc()

async with create_task_group() as tg:
# Start serving the new client (this begins the sync handshake).
tg.start_soon(room.serve, ws)

# Simulate rapid server-side mutations DURING the handshake.
# In production this is caused by MCP add_cell calls from the AI agent.
for i in range(20):
server_map[f"cell_{i}"] = f"content_{i}"

# Give the sync some time to propagate.
await sleep(0.5)

# Connect a Provider on the client side to process the sync messages.
client_ws_wrapper = Websocket(client_ws, room_name)
async with Provider(client_doc, client_ws_wrapper):
await sleep(0.5)

# Verify the client received all data — if the race exists,
# the client would have crashed on "Unexpected case" and the
# map would be incomplete or empty.
client_map = client_doc.get("map", type=Map)
assert client_map["existing"] == "data"
for i in range(20):
assert client_map[f"cell_{i}"] == f"content_{i}", (
f"Client missing cell_{i} — broadcast likely sent before sync completed"
)

tg.cancel_scope.cancel()


@pytest.mark.parametrize("websocket_server_api", ["websocket_server_start_stop"], indirect=True)
@pytest.mark.parametrize("yws_server", [{"exception_handler": exception_logger}], indirect=True)
async def test_yroom_restart(yws_server, yws_provider):
Expand Down
Loading