From b0e4cd5d8a2ba36f641106b4d3cdbe5add3a5d7c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 13 May 2026 12:33:53 +0400 Subject: [PATCH] fix(anam): make `AnamAvatarPublisher.start()` idempotent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Agent._apply("start")` calls subsystem starts sequentially. Anam's `start()` blocks for ~3-5 seconds (REST session creation + WebRTC handshake + first video frame), which sits in the critical path of `agent.join()` and makes the avatar appear ~5-7 seconds after the user dials in. Callers can move that latency out of the join path by kicking off `avatar.start()` as a background task before `agent.join()` — but only if `start()` is safe to call twice (once eagerly, once from the framework's `_apply`). Guard `start()` with an `asyncio.Lock` and a `_started` flag: the first call runs `_connect()`, concurrent callers wait on the lock, and a successful start short-circuits subsequent calls. A failed connect leaves `_started=False` so retries still work. --- plugins/anam/tests/test_anam_plugin.py | 77 +++++++++++++++++++ .../plugins/anam/anam_avatar_publisher.py | 20 ++++- 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/plugins/anam/tests/test_anam_plugin.py b/plugins/anam/tests/test_anam_plugin.py index 53c9f79a1..e90bc08aa 100644 --- a/plugins/anam/tests/test_anam_plugin.py +++ b/plugins/anam/tests/test_anam_plugin.py @@ -1,3 +1,5 @@ +import asyncio + import pytest from getstream.video.rtc import audio_track from vision_agents.core.events import EventManager @@ -77,3 +79,78 @@ async def test_attach_agent_subscribes_to_events(self): assert agent.events.has_subscribers(RealtimeAudioOutputEvent) assert agent.events.has_subscribers(RealtimeAudioOutputDoneEvent) assert agent.events.has_subscribers(TurnStartedEvent) + + async def test_start_is_idempotent(self): + """Repeated `start()` runs `_connect()` once and short-circuits after. + + Callers warm up the publisher before `agent.join()` to keep Anam's + ~3s session creation off the join critical path. The framework + then calls `start()` again from `_apply("start")` — that second + call must be a no-op. + """ + pub = _make_publisher() + + call_count = 0 + + async def fake_connect(): + nonlocal call_count + call_count += 1 + + pub._connect = fake_connect # type: ignore[method-assign] + + await pub.start() + await pub.start() + await pub.start() + + assert call_count == 1 + assert pub._started is True + + async def test_start_serializes_concurrent_callers(self): + """Two tasks calling `start()` at the same time produce one connect. + + Without the lock, both would see `_started is False` and race into + `_connect()`, double-opening the Anam session. + """ + pub = _make_publisher() + + call_count = 0 + proceed = asyncio.Event() + + async def slow_connect(): + nonlocal call_count + call_count += 1 + await proceed.wait() + + pub._connect = slow_connect # type: ignore[method-assign] + + task_a = asyncio.create_task(pub.start()) + task_b = asyncio.create_task(pub.start()) + await asyncio.sleep(0) # let both tasks acquire/queue on the lock + proceed.set() + await asyncio.gather(task_a, task_b) + + assert call_count == 1 + + async def test_start_failure_allows_retry(self): + """A failing first `start()` leaves `_started=False` so the next + attempt can retry instead of silently no-op'ing on a half-connected + publisher.""" + pub = _make_publisher() + + attempts = 0 + + async def flaky_connect(): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise RuntimeError("anam unreachable") + + pub._connect = flaky_connect # type: ignore[method-assign] + + with pytest.raises(RuntimeError, match="anam unreachable"): + await pub.start() + assert pub._started is False + + await pub.start() + assert pub._started is True + assert attempts == 2 diff --git a/plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py b/plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py index dd6664b1d..32fe9b721 100644 --- a/plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py +++ b/plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py @@ -111,6 +111,8 @@ def __init__( self._send_lock = asyncio.Lock() self._audio_receiver_task: asyncio.Task[None] | None = None self._video_receiver_task: asyncio.Task[None] | None = None + self._start_lock = asyncio.Lock() + self._started = False def publish_video_track(self) -> QueuedVideoTrack: """Return the video track that receives avatar video frames.""" @@ -132,8 +134,22 @@ def attach_agent(self, agent: Agent) -> None: self._subscribe_to_audio_events() async def start(self) -> None: - """Connect to Anam. Called by Agent via _apply("start") during join().""" - await self._connect() + """Connect to Anam. + + Idempotent. The Anam REST session creation + WebRTC handshake takes + ~3-5 seconds, which would otherwise sit in the critical path of + `agent.join()` (Agent.`_apply("start")` runs subsystem starts + sequentially). Callers can kick this off as a background task before + join and the framework's eventual call from `_apply` will see the + already-started publisher and return immediately. Concurrent calls + wait on the same in-flight connect via `_start_lock` rather than + racing into `_connect`. + """ + async with self._start_lock: + if self._started: + return + await self._connect() + self._started = True async def close(self) -> None: """