Skip to content

Commit b0e4cd5

Browse files
committed
fix(anam): make AnamAvatarPublisher.start() idempotent
`Agent._apply("start")` calls subsystem starts sequentially. Anam's `start()` blocks for ~3-5 seconds (REST session creation + WebRTC handshake + first video frame), which sits in the critical path of `agent.join()` and makes the avatar appear ~5-7 seconds after the user dials in. Callers can move that latency out of the join path by kicking off `avatar.start()` as a background task before `agent.join()` — but only if `start()` is safe to call twice (once eagerly, once from the framework's `_apply`). Guard `start()` with an `asyncio.Lock` and a `_started` flag: the first call runs `_connect()`, concurrent callers wait on the lock, and a successful start short-circuits subsequent calls. A failed connect leaves `_started=False` so retries still work.
1 parent 33dbd3c commit b0e4cd5

2 files changed

Lines changed: 95 additions & 2 deletions

File tree

plugins/anam/tests/test_anam_plugin.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import pytest
24
from getstream.video.rtc import audio_track
35
from vision_agents.core.events import EventManager
@@ -77,3 +79,78 @@ async def test_attach_agent_subscribes_to_events(self):
7779
assert agent.events.has_subscribers(RealtimeAudioOutputEvent)
7880
assert agent.events.has_subscribers(RealtimeAudioOutputDoneEvent)
7981
assert agent.events.has_subscribers(TurnStartedEvent)
82+
83+
async def test_start_is_idempotent(self):
84+
"""Repeated `start()` runs `_connect()` once and short-circuits after.
85+
86+
Callers warm up the publisher before `agent.join()` to keep Anam's
87+
~3s session creation off the join critical path. The framework
88+
then calls `start()` again from `_apply("start")` — that second
89+
call must be a no-op.
90+
"""
91+
pub = _make_publisher()
92+
93+
call_count = 0
94+
95+
async def fake_connect():
96+
nonlocal call_count
97+
call_count += 1
98+
99+
pub._connect = fake_connect # type: ignore[method-assign]
100+
101+
await pub.start()
102+
await pub.start()
103+
await pub.start()
104+
105+
assert call_count == 1
106+
assert pub._started is True
107+
108+
async def test_start_serializes_concurrent_callers(self):
109+
"""Two tasks calling `start()` at the same time produce one connect.
110+
111+
Without the lock, both would see `_started is False` and race into
112+
`_connect()`, double-opening the Anam session.
113+
"""
114+
pub = _make_publisher()
115+
116+
call_count = 0
117+
proceed = asyncio.Event()
118+
119+
async def slow_connect():
120+
nonlocal call_count
121+
call_count += 1
122+
await proceed.wait()
123+
124+
pub._connect = slow_connect # type: ignore[method-assign]
125+
126+
task_a = asyncio.create_task(pub.start())
127+
task_b = asyncio.create_task(pub.start())
128+
await asyncio.sleep(0) # let both tasks acquire/queue on the lock
129+
proceed.set()
130+
await asyncio.gather(task_a, task_b)
131+
132+
assert call_count == 1
133+
134+
async def test_start_failure_allows_retry(self):
135+
"""A failing first `start()` leaves `_started=False` so the next
136+
attempt can retry instead of silently no-op'ing on a half-connected
137+
publisher."""
138+
pub = _make_publisher()
139+
140+
attempts = 0
141+
142+
async def flaky_connect():
143+
nonlocal attempts
144+
attempts += 1
145+
if attempts == 1:
146+
raise RuntimeError("anam unreachable")
147+
148+
pub._connect = flaky_connect # type: ignore[method-assign]
149+
150+
with pytest.raises(RuntimeError, match="anam unreachable"):
151+
await pub.start()
152+
assert pub._started is False
153+
154+
await pub.start()
155+
assert pub._started is True
156+
assert attempts == 2

plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ def __init__(
111111
self._send_lock = asyncio.Lock()
112112
self._audio_receiver_task: asyncio.Task[None] | None = None
113113
self._video_receiver_task: asyncio.Task[None] | None = None
114+
self._start_lock = asyncio.Lock()
115+
self._started = False
114116

115117
def publish_video_track(self) -> QueuedVideoTrack:
116118
"""Return the video track that receives avatar video frames."""
@@ -132,8 +134,22 @@ def attach_agent(self, agent: Agent) -> None:
132134
self._subscribe_to_audio_events()
133135

134136
async def start(self) -> None:
135-
"""Connect to Anam. Called by Agent via _apply("start") during join()."""
136-
await self._connect()
137+
"""Connect to Anam.
138+
139+
Idempotent. The Anam REST session creation + WebRTC handshake takes
140+
~3-5 seconds, which would otherwise sit in the critical path of
141+
`agent.join()` (Agent.`_apply("start")` runs subsystem starts
142+
sequentially). Callers can kick this off as a background task before
143+
join and the framework's eventual call from `_apply` will see the
144+
already-started publisher and return immediately. Concurrent calls
145+
wait on the same in-flight connect via `_start_lock` rather than
146+
racing into `_connect`.
147+
"""
148+
async with self._start_lock:
149+
if self._started:
150+
return
151+
await self._connect()
152+
self._started = True
137153

138154
async def close(self) -> None:
139155
"""

0 commit comments

Comments
 (0)