From db2116496169023ef9959449830c95b9589f78d7 Mon Sep 17 00:00:00 2001 From: Paul Asjes Date: Thu, 28 Aug 2025 17:36:45 +0200 Subject: [PATCH 1/3] Add asyncio event loop --- README.md | 94 ++++++ .../conversational_ai/conversation.py | 58 +++- tests/test_client_tools.py | 312 ++++++++++++++++++ 3 files changed, 447 insertions(+), 17 deletions(-) create mode 100644 tests/test_client_tools.py diff --git a/README.md b/README.md index 6265d707..8fa0564a 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,100 @@ async def print_models() -> None: asyncio.run(print_models()) ``` +## Conversational AI + +Build interactive AI agents with real-time audio capabilities using ElevenLabs Conversational AI. + +### Basic Usage + +```python +from elevenlabs.client import ElevenLabs +from elevenlabs.conversational_ai.conversation import Conversation, ClientTools +from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface + +client = ElevenLabs(api_key="YOUR_API_KEY") + +# Create audio interface for real-time audio input/output +audio_interface = DefaultAudioInterface() + +# Create conversation +conversation = Conversation( + client=client, + agent_id="your-agent-id", + requires_auth=True, + audio_interface=audio_interface, +) + +# Start the conversation +conversation.start_session() + +# The conversation runs in background until you call: +conversation.end_session() +``` + +### Custom Event Loop Support + +For advanced use cases involving context propagation, resource reuse, or specific event loop management, `ClientTools` supports custom asyncio event loops: + +```python +import asyncio +from elevenlabs.conversational_ai.conversation import ClientTools + +async def main(): + # Get the current event loop + custom_loop = asyncio.get_running_loop() + + # Create ClientTools with custom loop to prevent "different event loop" errors + client_tools = ClientTools(loop=custom_loop) + + # Register your tools + async def get_weather(params): + location = params.get("location", "Unknown") + # Your async logic here + return f"Weather in {location}: Sunny, 72°F" + + client_tools.register("get_weather", get_weather, is_async=True) + + # Use with conversation + conversation = Conversation( + client=client, + agent_id="your-agent-id", + requires_auth=True, + audio_interface=audio_interface, + client_tools=client_tools + ) + +asyncio.run(main()) +``` + +**Benefits of Custom Event Loop:** +- **Context Propagation**: Maintain request-scoped state across async operations +- **Resource Reuse**: Share existing async resources like HTTP sessions or database pools +- **Loop Management**: Prevent "Task got Future attached to a different event loop" errors +- **Performance**: Better control over async task scheduling and execution + +### Tool Registration + +Register custom tools that the AI agent can call during conversations: + +```python +client_tools = ClientTools() + +# Sync tool +def calculate_sum(params): + numbers = params.get("numbers", []) + return sum(numbers) + +# Async tool +async def fetch_data(params): + url = params.get("url") + # Your async HTTP request logic + return {"data": "fetched"} + +client_tools.register("calculate_sum", calculate_sum, is_async=False) +client_tools.register("fetch_data", fetch_data, is_async=True) +``` + ## Languages Supported Explore [all models & languages](https://elevenlabs.io/docs/models). diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 62a50ab7..f6b95f89 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -155,11 +155,18 @@ class ClientTools: Supports both synchronous and asynchronous tools running in a dedicated event loop, ensuring non-blocking operation of the main conversation thread. + + Args: + loop: Optional custom asyncio event loop to use for tool execution. If not provided, + a new event loop will be created and run in a separate thread. Using a custom + loop prevents "different event loop" runtime errors and allows for better + context propagation and resource management. """ - def __init__(self) -> None: + def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: self.tools: Dict[str, Tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} self.lock = threading.Lock() + self._custom_loop = loop self._loop = None self._thread = None self._running = threading.Event() @@ -170,27 +177,39 @@ def start(self): if self._running.is_set(): return - def run_event_loop(): - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) + if self._custom_loop is not None: + # Use the provided custom event loop + self._loop = self._custom_loop self._running.set() - try: - self._loop.run_forever() - finally: - self._running.clear() - self._loop.close() - self._loop = None + else: + # Create and run our own event loop in a separate thread + def run_event_loop(): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._running.set() + try: + self._loop.run_forever() + finally: + self._running.clear() + self._loop.close() + self._loop = None - self._thread = threading.Thread(target=run_event_loop, daemon=True, name="ClientTools-EventLoop") - self._thread.start() - # Wait for loop to be ready - self._running.wait() + self._thread = threading.Thread(target=run_event_loop, daemon=True, name="ClientTools-EventLoop") + self._thread.start() + # Wait for loop to be ready + self._running.wait() def stop(self): """Gracefully stop the event loop and clean up resources.""" if self._loop and self._running.is_set(): - self._loop.call_soon_threadsafe(self._loop.stop) - self._thread.join() + if self._custom_loop is not None: + # For custom loops, we don't stop the loop itself, just clear our running flag + self._running.clear() + else: + # For our own loop, stop it and join the thread + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread: + self._thread.join() self.thread_pool.shutdown(wait=False) def register( @@ -257,7 +276,12 @@ async def _execute_and_callback(): } callback(response) - asyncio.run_coroutine_threadsafe(_execute_and_callback(), self._loop) + if self._custom_loop is not None: + # For custom loops, schedule the task on the custom loop + self._loop.create_task(_execute_and_callback()) + else: + # For our own loop running in a separate thread, use run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(_execute_and_callback(), self._loop) class ConversationInitiationData: diff --git a/tests/test_client_tools.py b/tests/test_client_tools.py new file mode 100644 index 00000000..5df2efd1 --- /dev/null +++ b/tests/test_client_tools.py @@ -0,0 +1,312 @@ +""" +Tests for ClientTools custom event loop functionality. +""" + +import asyncio +import pytest +import threading +import time +from elevenlabs.conversational_ai.conversation import ClientTools + + +class TestClientTools: + """Test suite for ClientTools functionality.""" + + def test_default_initialization(self): + """Test that ClientTools can be initialized without parameters (backwards compatibility).""" + client_tools = ClientTools() + assert client_tools._custom_loop is None + assert client_tools._loop is None + assert not client_tools._running.is_set() + + def test_custom_loop_initialization(self): + """Test that ClientTools can be initialized with a custom event loop.""" + loop = asyncio.new_event_loop() + try: + client_tools = ClientTools(loop=loop) + assert client_tools._custom_loop is loop + assert client_tools._loop is None + assert not client_tools._running.is_set() + finally: + loop.close() + + def test_start_with_default_loop(self): + """Test starting ClientTools with default behavior (creates own loop).""" + client_tools = ClientTools() + + try: + client_tools.start() + assert client_tools._running.is_set() + assert client_tools._loop is not None + assert client_tools._thread is not None + assert client_tools._thread.is_alive() + finally: + client_tools.stop() + + @pytest.mark.asyncio + async def test_start_with_custom_loop(self): + """Test starting ClientTools with a custom event loop.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + + try: + client_tools.start() + assert client_tools._running.is_set() + assert client_tools._loop is custom_loop + assert client_tools._thread is None # No thread created for custom loop + finally: + client_tools.stop() + + def test_tool_registration(self): + """Test that tools can be registered correctly.""" + client_tools = ClientTools() + + def sync_tool(params): + return "sync result" + + async def async_tool(params): + return "async result" + + client_tools.register("sync_tool", sync_tool, is_async=False) + client_tools.register("async_tool", async_tool, is_async=True) + + assert "sync_tool" in client_tools.tools + assert "async_tool" in client_tools.tools + + sync_handler, sync_is_async = client_tools.tools["sync_tool"] + async_handler, async_is_async = client_tools.tools["async_tool"] + + assert sync_handler is sync_tool + assert not sync_is_async + assert async_handler is async_tool + assert async_is_async + + def test_duplicate_tool_registration(self): + """Test that registering a tool with the same name raises an error.""" + client_tools = ClientTools() + + def tool(params): + return "result" + + client_tools.register("test_tool", tool) + + with pytest.raises(ValueError, match="Tool 'test_tool' is already registered"): + client_tools.register("test_tool", tool) + + def test_invalid_handler_registration(self): + """Test that registering a non-callable handler raises an error.""" + client_tools = ClientTools() + + with pytest.raises(ValueError, match="Handler must be callable"): + client_tools.register("invalid_tool", "not_callable") + + @pytest.mark.asyncio + async def test_sync_tool_execution_with_custom_loop(self): + """Test executing a sync tool with a custom event loop.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + + def sync_tool(params): + return f"sync result: {params.get('input', 'default')}" + + client_tools.register("sync_tool", sync_tool, is_async=False) + client_tools.start() + + try: + result = await client_tools.handle("sync_tool", {"input": "test"}) + assert result == "sync result: test" + finally: + client_tools.stop() + + @pytest.mark.asyncio + async def test_async_tool_execution_with_custom_loop(self): + """Test executing an async tool with a custom event loop.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + + async def async_tool(params): + await asyncio.sleep(0.01) # Simulate async work + return f"async result: {params.get('input', 'default')}" + + client_tools.register("async_tool", async_tool, is_async=True) + client_tools.start() + + try: + result = await client_tools.handle("async_tool", {"input": "test"}) + assert result == "async result: test" + finally: + client_tools.stop() + + @pytest.mark.asyncio + async def test_tool_execution_with_callback_custom_loop(self): + """Test executing a tool via callback mechanism with custom event loop.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + + def sync_tool(params): + return f"callback result: {params.get('data', 'none')}" + + client_tools.register("callback_tool", sync_tool, is_async=False) + client_tools.start() + + callback_results = [] + + def test_callback(response): + callback_results.append(response) + + try: + client_tools.execute_tool( + "callback_tool", + {"tool_call_id": "test123", "data": "callback_test"}, + test_callback + ) + + # Wait for callback to be executed + await asyncio.sleep(0.1) + + assert len(callback_results) == 1 + response = callback_results[0] + assert response["type"] == "client_tool_result" + assert response["tool_call_id"] == "test123" + assert "callback result: callback_test" in response["result"] + assert response["is_error"] is False + finally: + client_tools.stop() + + def test_sync_tool_execution_with_default_loop(self): + """Test executing a sync tool with default loop behavior.""" + client_tools = ClientTools() + + def sync_tool(params): + return f"default sync: {params.get('value', 'empty')}" + + client_tools.register("sync_tool", sync_tool, is_async=False) + client_tools.start() + + # Use asyncio.run to test from a fresh event loop + async def test(): + result = await client_tools.handle("sync_tool", {"value": "default_test"}) + return result + + try: + result = asyncio.run(test()) + assert result == "default sync: default_test" + finally: + client_tools.stop() + + @pytest.mark.asyncio + async def test_unregistered_tool_error(self): + """Test that calling an unregistered tool raises an error.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + client_tools.start() + + try: + with pytest.raises(ValueError, match="Tool 'nonexistent' is not registered"): + await client_tools.handle("nonexistent", {}) + finally: + client_tools.stop() + + def test_execute_tool_without_start(self): + """Test that execute_tool raises error when not started.""" + client_tools = ClientTools() + + def callback(response): + pass + + with pytest.raises(RuntimeError, match="ClientTools event loop is not running"): + client_tools.execute_tool("any_tool", {}, callback) + + @pytest.mark.asyncio + async def test_tool_exception_handling(self): + """Test that tool exceptions are handled correctly.""" + custom_loop = asyncio.get_running_loop() + client_tools = ClientTools(loop=custom_loop) + + def failing_tool(params): + raise ValueError("Tool failed intentionally") + + client_tools.register("failing_tool", failing_tool, is_async=False) + client_tools.start() + + callback_results = [] + + def error_callback(response): + callback_results.append(response) + + try: + client_tools.execute_tool( + "failing_tool", + {"tool_call_id": "error_test"}, + error_callback + ) + + # Wait for callback + await asyncio.sleep(0.1) + + assert len(callback_results) == 1 + response = callback_results[0] + assert response["type"] == "client_tool_result" + assert response["tool_call_id"] == "error_test" + assert response["is_error"] is True + assert "Tool failed intentionally" in response["result"] + finally: + client_tools.stop() + + @pytest.mark.asyncio + async def test_event_loop_isolation(self): + """Test that custom loop prevents 'different event loop' errors.""" + # This test simulates the scenario mentioned in the GitHub issue + main_loop = asyncio.get_running_loop() + + # Create ClientTools with the current loop + client_tools = ClientTools(loop=main_loop) + + async def loop_aware_tool(params): + # This tool checks which loop it's running on + current_loop = asyncio.get_running_loop() + return f"Running on loop: {id(current_loop)}" + + client_tools.register("loop_tool", loop_aware_tool, is_async=True) + client_tools.start() + + try: + result = await client_tools.handle("loop_tool", {}) + # The result should indicate it's running on the same loop + expected_loop_id = str(id(main_loop)) + assert expected_loop_id in result + finally: + client_tools.stop() + + def test_stop_custom_loop_behavior(self): + """Test that stop() doesn't close custom loops.""" + custom_loop = asyncio.new_event_loop() + client_tools = ClientTools(loop=custom_loop) + + try: + client_tools.start() + assert client_tools._running.is_set() + + client_tools.stop() + assert not client_tools._running.is_set() + + # Custom loop should still be open and usable + assert not custom_loop.is_closed() + finally: + custom_loop.close() + + def test_stop_default_loop_behavior(self): + """Test that stop() properly cleans up default loops.""" + client_tools = ClientTools() + + client_tools.start() + assert client_tools._running.is_set() + assert client_tools._thread is not None + thread = client_tools._thread + + client_tools.stop() + assert not client_tools._running.is_set() + + # Wait for thread to finish + thread.join(timeout=1.0) + assert not thread.is_alive() \ No newline at end of file From 73d9ee4349adf63120e6404661a34c0fafcb32c7 Mon Sep 17 00:00:00 2001 From: Paul Asjes Date: Tue, 9 Sep 2025 10:51:03 +0200 Subject: [PATCH 2/3] address feedback --- README.md | 15 +-- .../conversational_ai/conversation.py | 101 +++++++++--------- 2 files changed, 61 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 8fa0564a..a3616199 100644 --- a/README.md +++ b/README.md @@ -198,22 +198,22 @@ from elevenlabs.conversational_ai.conversation import ClientTools async def main(): # Get the current event loop custom_loop = asyncio.get_running_loop() - + # Create ClientTools with custom loop to prevent "different event loop" errors client_tools = ClientTools(loop=custom_loop) - + # Register your tools async def get_weather(params): location = params.get("location", "Unknown") # Your async logic here return f"Weather in {location}: Sunny, 72°F" - + client_tools.register("get_weather", get_weather, is_async=True) - + # Use with conversation conversation = Conversation( client=client, - agent_id="your-agent-id", + agent_id="your-agent-id", requires_auth=True, audio_interface=audio_interface, client_tools=client_tools @@ -228,6 +228,9 @@ asyncio.run(main()) - **Loop Management**: Prevent "Task got Future attached to a different event loop" errors - **Performance**: Better control over async task scheduling and execution +**Important:** When using a custom loop, you're responsible for its lifecycle +Don't close the loop while ClientTools are still using it. + ### Tool Registration Register custom tools that the AI agent can call during conversations: @@ -240,7 +243,7 @@ def calculate_sum(params): numbers = params.get("numbers", []) return sum(numbers) -# Async tool +# Async tool async def fetch_data(params): url = params.get("url") # Your async HTTP request logic diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index f6b95f89..814d31d2 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -155,7 +155,7 @@ class ClientTools: Supports both synchronous and asynchronous tools running in a dedicated event loop, ensuring non-blocking operation of the main conversation thread. - + Args: loop: Optional custom asyncio event loop to use for tool execution. If not provided, a new event loop will be created and run in a separate thread. Using a custom @@ -276,12 +276,15 @@ async def _execute_and_callback(): } callback(response) + self._schedule_coroutine(_execute_and_callback()) + + + def _schedule_coroutine(self, coro): + """Schedule a coroutine on the appropriate event loop.""" if self._custom_loop is not None: - # For custom loops, schedule the task on the custom loop - self._loop.create_task(_execute_and_callback()) + return self._loop.create_task(coro) else: - # For our own loop running in a separate thread, use run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(_execute_and_callback(), self._loop) + return asyncio.run_coroutine_threadsafe(coro, self._loop) class ConversationInitiationData: @@ -302,7 +305,7 @@ def __init__( class BaseConversation: """Base class for conversation implementations with shared parameters and logic.""" - + def __init__( self, client: BaseElevenLabs, @@ -319,9 +322,9 @@ def __init__( self.requires_auth = requires_auth self.config = config or ConversationInitiationData() self.client_tools = client_tools or ClientTools() - + self.client_tools.start() - + self._conversation_id = None self._last_interrupt_id = 0 @@ -353,7 +356,7 @@ def _create_initiation_message(self): def _handle_message_core(self, message, message_handler): """Core message handling logic shared between sync and async implementations. - + Args: message: The parsed message dictionary message_handler: Handler object with methods for different operations @@ -369,36 +372,36 @@ def _handle_message_core(self, message, message_handler): return audio = base64.b64decode(event["audio_base_64"]) message_handler.handle_audio_output(audio) - + elif message["type"] == "agent_response": if message_handler.callback_agent_response: event = message["agent_response_event"] message_handler.handle_agent_response(event["agent_response"].strip()) - + elif message["type"] == "agent_response_correction": if message_handler.callback_agent_response_correction: event = message["agent_response_correction_event"] message_handler.handle_agent_response_correction( - event["original_agent_response"].strip(), + event["original_agent_response"].strip(), event["corrected_agent_response"].strip() ) - + elif message["type"] == "user_transcript": if message_handler.callback_user_transcript: event = message["user_transcription_event"] message_handler.handle_user_transcript(event["user_transcript"].strip()) - + elif message["type"] == "interruption": event = message["interruption_event"] self._last_interrupt_id = int(event["event_id"]) message_handler.handle_interruption() - + elif message["type"] == "ping": event = message["ping_event"] message_handler.handle_ping(event) if message_handler.callback_latency_measurement and event["ping_ms"]: message_handler.handle_latency_measurement(int(event["ping_ms"])) - + elif message["type"] == "client_tool_call": tool_call = message.get("client_tool_call", {}) tool_name = tool_call.get("tool_name") @@ -420,36 +423,36 @@ async def _handle_message_core_async(self, message, message_handler): return audio = base64.b64decode(event["audio_base_64"]) await message_handler.handle_audio_output(audio) - + elif message["type"] == "agent_response": if message_handler.callback_agent_response: event = message["agent_response_event"] await message_handler.handle_agent_response(event["agent_response"].strip()) - + elif message["type"] == "agent_response_correction": if message_handler.callback_agent_response_correction: event = message["agent_response_correction_event"] await message_handler.handle_agent_response_correction( - event["original_agent_response"].strip(), + event["original_agent_response"].strip(), event["corrected_agent_response"].strip() ) - + elif message["type"] == "user_transcript": if message_handler.callback_user_transcript: event = message["user_transcription_event"] await message_handler.handle_user_transcript(event["user_transcript"].strip()) - + elif message["type"] == "interruption": event = message["interruption_event"] self._last_interrupt_id = int(event["event_id"]) await message_handler.handle_interruption() - + elif message["type"] == "ping": event = message["ping_event"] await message_handler.handle_ping(event) if message_handler.callback_latency_measurement and event["ping_ms"]: await message_handler.handle_latency_measurement(int(event["ping_ms"])) - + elif message["type"] == "client_tool_call": tool_call = message.get("client_tool_call", {}) tool_name = tool_call.get("tool_name") @@ -514,7 +517,7 @@ def __init__( config=config, client_tools=client_tools, ) - + self.audio_interface = audio_interface self.callback_agent_response = callback_agent_response self.callback_agent_response_correction = callback_agent_response_correction @@ -663,22 +666,22 @@ def __init__(self, conversation, ws): self.callback_agent_response_correction = conversation.callback_agent_response_correction self.callback_user_transcript = conversation.callback_user_transcript self.callback_latency_measurement = conversation.callback_latency_measurement - + def handle_audio_output(self, audio): self.conversation.audio_interface.output(audio) - + def handle_agent_response(self, response): self.conversation.callback_agent_response(response) - + def handle_agent_response_correction(self, original, corrected): self.conversation.callback_agent_response_correction(original, corrected) - + def handle_user_transcript(self, transcript): self.conversation.callback_user_transcript(transcript) - + def handle_interruption(self): self.conversation.audio_interface.interrupt() - + def handle_ping(self, event): self.ws.send( json.dumps( @@ -688,17 +691,17 @@ def handle_ping(self, event): } ) ) - + def handle_latency_measurement(self, latency): self.conversation.callback_latency_measurement(latency) - + def handle_client_tool_call(self, tool_name, parameters): def send_response(response): if not self.conversation._should_stop.is_set(): self.ws.send(json.dumps(response)) - + self.conversation.client_tools.execute_tool(tool_name, parameters, send_response) - + handler = SyncMessageHandler(self, ws) self._handle_message_core(message, handler) @@ -759,7 +762,7 @@ def __init__( config=config, client_tools=client_tools, ) - + self.audio_interface = audio_interface self.callback_agent_response = callback_agent_response self.callback_agent_response_correction = callback_agent_response_correction @@ -777,7 +780,7 @@ async def start_session(self): Will run in background task until `end_session` is called. """ ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() - self._task = asyncio.create_task(self._run(ws_url)) + self._task = self._schedule_coroutine(self._run(ws_url)) async def end_session(self): """Ends the conversation session and cleans up resources.""" @@ -881,7 +884,7 @@ async def input_callback(audio): await self.end_session() await self.audio_interface.start(input_callback) - + try: while not self._should_stop.is_set(): try: @@ -911,22 +914,22 @@ def __init__(self, conversation, ws): self.callback_agent_response_correction = conversation.callback_agent_response_correction self.callback_user_transcript = conversation.callback_user_transcript self.callback_latency_measurement = conversation.callback_latency_measurement - + async def handle_audio_output(self, audio): await self.conversation.audio_interface.output(audio) - + async def handle_agent_response(self, response): await self.conversation.callback_agent_response(response) - + async def handle_agent_response_correction(self, original, corrected): await self.conversation.callback_agent_response_correction(original, corrected) - + async def handle_user_transcript(self, transcript): await self.conversation.callback_user_transcript(transcript) - + async def handle_interruption(self): await self.conversation.audio_interface.interrupt() - + async def handle_ping(self, event): await self.ws.send( json.dumps( @@ -936,18 +939,18 @@ async def handle_ping(self, event): } ) ) - + async def handle_latency_measurement(self, latency): await self.conversation.callback_latency_measurement(latency) - + def handle_client_tool_call(self, tool_name, parameters): def send_response(response): if not self.conversation._should_stop.is_set(): - asyncio.create_task(self.ws.send(json.dumps(response))) - + self.conversation._schedule_coroutine(self.ws.send(json.dumps(response))) + self.conversation.client_tools.execute_tool(tool_name, parameters, send_response) - + handler = AsyncMessageHandler(self, ws) - + # Use the shared core message handling logic with async wrapper await self._handle_message_core_async(message, handler) From 78af1639378c381cf88a0aa877233fadd156bdb3 Mon Sep 17 00:00:00 2001 From: Paul Asjes Date: Tue, 9 Sep 2025 10:55:42 +0200 Subject: [PATCH 3/3] fix --- src/elevenlabs/conversational_ai/conversation.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 814d31d2..3880e112 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -278,7 +278,6 @@ async def _execute_and_callback(): self._schedule_coroutine(_execute_and_callback()) - def _schedule_coroutine(self, coro): """Schedule a coroutine on the appropriate event loop.""" if self._custom_loop is not None: @@ -780,7 +779,7 @@ async def start_session(self): Will run in background task until `end_session` is called. """ ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() - self._task = self._schedule_coroutine(self._run(ws_url)) + self._task = asyncio.create_task(self._run(ws_url)) async def end_session(self): """Ends the conversation session and cleans up resources.""" @@ -946,7 +945,7 @@ async def handle_latency_measurement(self, latency): def handle_client_tool_call(self, tool_name, parameters): def send_response(response): if not self.conversation._should_stop.is_set(): - self.conversation._schedule_coroutine(self.ws.send(json.dumps(response))) + asyncio.create_task(self.ws.send(json.dumps(response))) self.conversation.client_tools.execute_tool(tool_name, parameters, send_response)