Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions plugins/anam/tests/test_anam_plugin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

import pytest
from getstream.video.rtc import audio_track
from vision_agents.core.events import EventManager
Expand Down Expand Up @@ -77,3 +79,78 @@ async def test_attach_agent_subscribes_to_events(self):
assert agent.events.has_subscribers(RealtimeAudioOutputEvent)
assert agent.events.has_subscribers(RealtimeAudioOutputDoneEvent)
assert agent.events.has_subscribers(TurnStartedEvent)

async def test_start_is_idempotent(self):
"""Repeated `start()` runs `_connect()` once and short-circuits after.

Callers warm up the publisher before `agent.join()` to keep Anam's
~3s session creation off the join critical path. The framework
then calls `start()` again from `_apply("start")` — that second
call must be a no-op.
"""
pub = _make_publisher()

call_count = 0

async def fake_connect():
nonlocal call_count
call_count += 1

pub._connect = fake_connect # type: ignore[method-assign]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove # type: ignore[method-assign] from tests.

Please avoid # type: ignore here; use a typed test double approach (fixture/class) so the assignment is type-safe without suppressions.

As per coding guidelines: "Avoid # type: ignore comments."

Also applies to: 124-124, 148-148


await pub.start()
await pub.start()
await pub.start()

assert call_count == 1
assert pub._started is True
Comment on lines +95 to +106
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Replace call-path spying with behavior/state assertions.

These tests validate _connect() invocation count by overriding a private method and asserting call_count, which is call-path testing and effectively mocking. Rework them to assert externally observable behavior/state transitions of start() without spying on private method invocations.

As per coding guidelines: "Never mock in tests" and "ALWAYS test behavior, not calling a path. Assert on outputs and state, not method calls."

Also applies to: 119-133, 142-157


async def test_start_serializes_concurrent_callers(self):
"""Two tasks calling `start()` at the same time produce one connect.

Without the lock, both would see `_started is False` and race into
`_connect()`, double-opening the Anam session.
"""
pub = _make_publisher()

call_count = 0
proceed = asyncio.Event()

async def slow_connect():
nonlocal call_count
call_count += 1
await proceed.wait()

pub._connect = slow_connect # type: ignore[method-assign]

task_a = asyncio.create_task(pub.start())
task_b = asyncio.create_task(pub.start())
await asyncio.sleep(0) # let both tasks acquire/queue on the lock
proceed.set()
await asyncio.gather(task_a, task_b)

assert call_count == 1

async def test_start_failure_allows_retry(self):
"""A failing first `start()` leaves `_started=False` so the next
attempt can retry instead of silently no-op'ing on a half-connected
publisher."""
pub = _make_publisher()

attempts = 0

async def flaky_connect():
nonlocal attempts
attempts += 1
if attempts == 1:
raise RuntimeError("anam unreachable")

pub._connect = flaky_connect # type: ignore[method-assign]

with pytest.raises(RuntimeError, match="anam unreachable"):
await pub.start()
assert pub._started is False

await pub.start()
assert pub._started is True
assert attempts == 2
20 changes: 18 additions & 2 deletions plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def __init__(
self._send_lock = asyncio.Lock()
self._audio_receiver_task: asyncio.Task[None] | None = None
self._video_receiver_task: asyncio.Task[None] | None = None
self._start_lock = asyncio.Lock()
self._started = False

def publish_video_track(self) -> QueuedVideoTrack:
"""Return the video track that receives avatar video frames."""
Expand All @@ -132,8 +134,22 @@ def attach_agent(self, agent: Agent) -> None:
self._subscribe_to_audio_events()

async def start(self) -> None:
"""Connect to Anam. Called by Agent via _apply("start") during join()."""
await self._connect()
"""Connect to Anam.

Idempotent. The Anam REST session creation + WebRTC handshake takes
~3-5 seconds, which would otherwise sit in the critical path of
`agent.join()` (Agent.`_apply("start")` runs subsystem starts
sequentially). Callers can kick this off as a background task before
join and the framework's eventual call from `_apply` will see the
already-started publisher and return immediately. Concurrent calls
wait on the same in-flight connect via `_start_lock` rather than
racing into `_connect`.
"""
async with self._start_lock:
if self._started:
return
await self._connect()
self._started = True

async def close(self) -> None:
"""
Expand Down