From acb5b5287e7e56bde651b4e262aa54dfb80121c6 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 00:45:53 +0100 Subject: [PATCH 1/6] feat: add support for text messages, user activity and contextual updates --- .../conversational_ai/conversation.py | 123 +++++++++++++++++- 1 file changed, 121 insertions(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index d5bf8636..afcac59f 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -2,16 +2,71 @@ import base64 import json import threading -from typing import Callable, Optional, Awaitable, Union, Any +from typing import Callable, Optional, Awaitable, Union, Any, Literal import asyncio from concurrent.futures import ThreadPoolExecutor +from enum import StrEnum -from websockets.sync.client import connect +from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK from ..base_client import BaseElevenLabs +class ClientToOrchestratorEvent(StrEnum): + """Event types that can be sent from client to orchestrator.""" + # Response to a ping request. + PONG = "pong" + CLIENT_TOOL_RESULT = "client_tool_result" + CONVERSATION_INITIATION_CLIENT_DATA = "conversation_initiation_client_data" + FEEDBACK = "feedback" + # Non-interrupting content that is sent to the server to update the conversation state. + CONTEXTUAL_UPDATE = "contextual_update" + # User text message. + USER_MESSAGE = "user_message" + USER_ACTIVITY = "user_activity" + + +class UserMessageClientToOrchestratorEvent: + """Event for sending user text messages.""" + + def __init__(self, text: Optional[str] = None): + self.type: Literal[ClientToOrchestratorEvent.USER_MESSAGE] = ClientToOrchestratorEvent.USER_MESSAGE + self.text = text + + def to_dict(self) -> dict: + return { + "type": self.type, + "text": self.text + } + + +class UserActivityClientToOrchestratorEvent: + """Event for registering user activity (ping to prevent timeout).""" + + def __init__(self): + self.type: Literal[ClientToOrchestratorEvent.USER_ACTIVITY] = ClientToOrchestratorEvent.USER_ACTIVITY + + def to_dict(self) -> dict: + return { + "type": self.type + } + + +class ContextualUpdateClientToOrchestratorEvent: + """Event for sending non-interrupting contextual updates to the conversation state.""" + + def __init__(self, content: str): + self.type: Literal[ClientToOrchestratorEvent.CONTEXTUAL_UPDATE] = ClientToOrchestratorEvent.CONTEXTUAL_UPDATE + self.content = content + + def to_dict(self) -> dict: + return { + "type": self.type, + "content": self.content + } + + class AudioInterface(ABC): """AudioInterface provides an abstraction for handling audio input and output.""" @@ -193,6 +248,7 @@ class Conversation: _should_stop: threading.Event _conversation_id: Optional[str] _last_interrupt_id: int + _ws: Optional[ClientConnection] def __init__( self, @@ -243,6 +299,7 @@ def __init__( self._should_stop = threading.Event() self._conversation_id = None self._last_interrupt_id = 0 + self._ws = None def start_session(self): """Starts the conversation session. @@ -271,8 +328,68 @@ def wait_for_session_end(self) -> Optional[str]: self._thread.join() return self._conversation_id + def send_user_message(self, text: str): + """Send a text message from the user to the agent. + + Args: + text: The text message to send to the agent. + + Raises: + RuntimeError: If the session is not active or websocket is not connected. + """ + if not self._ws: + raise RuntimeError("Session not started or websocket not connected.") + + event = UserMessageClientToOrchestratorEvent(text=text) + try: + self._ws.send(json.dumps(event.to_dict())) + except Exception as e: + print(f"Error sending user message: {e}") + raise + + def register_user_activity(self): + """Register user activity to prevent session timeout. + + This sends a ping to the orchestrator to reset the timeout timer. + + Raises: + RuntimeError: If the session is not active or websocket is not connected. + """ + if not self._ws: + raise RuntimeError("Session not started or websocket not connected.") + + event = UserActivityClientToOrchestratorEvent() + try: + self._ws.send(json.dumps(event.to_dict())) + except Exception as e: + print(f"Error registering user activity: {e}") + raise + + def send_contextual_update(self, content: str): + """Send a contextual update to the conversation. + + Contextual updates are non-interrupting content that is sent to the server + to update the conversation state without directly prompting the agent. + + Args: + content: The contextual information to send to the conversation. + + Raises: + RuntimeError: If the session is not active or websocket is not connected. + """ + if not self._ws: + raise RuntimeError("Session not started or websocket not connected.") + + event = ContextualUpdateClientToOrchestratorEvent(content=content) + try: + self._ws.send(json.dumps(event.to_dict())) + except Exception as e: + print(f"Error sending contextual update: {e}") + raise + def _run(self, ws_url: str): with connect(ws_url, max_size=16 * 1024 * 1024) as ws: + self._ws = ws ws.send( json.dumps( { @@ -313,6 +430,8 @@ def input_callback(audio): except Exception as e: print(f"Error receiving message: {e}") self.end_session() + + self._ws = None def _handle_message(self, message, ws): if message["type"] == "conversation_initiation_metadata": From d9b5dfb16ed566868e879bf9481deafebae08916 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 00:51:38 +0100 Subject: [PATCH 2/6] fix --- src/elevenlabs/conversational_ai/conversation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index afcac59f..4c14ba30 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -5,7 +5,7 @@ from typing import Callable, Optional, Awaitable, Union, Any, Literal import asyncio from concurrent.futures import ThreadPoolExecutor -from enum import StrEnum +from enum import Enum from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK @@ -13,7 +13,7 @@ from ..base_client import BaseElevenLabs -class ClientToOrchestratorEvent(StrEnum): +class ClientToOrchestratorEvent(str, Enum): """Event types that can be sent from client to orchestrator.""" # Response to a ping request. PONG = "pong" From 0cccb2ae31fbd618a9fcb7c2e87905e4a5da4566 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 08:37:33 +0100 Subject: [PATCH 3/6] add annotations to init methods --- src/elevenlabs/conversational_ai/conversation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 4c14ba30..5f1336b8 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -7,7 +7,7 @@ from concurrent.futures import ThreadPoolExecutor from enum import Enum -from websockets.sync.client import connect, ClientConnection +from websockets.sync.client import connect, Connection from websockets.exceptions import ConnectionClosedOK from ..base_client import BaseElevenLabs @@ -44,7 +44,7 @@ def to_dict(self) -> dict: class UserActivityClientToOrchestratorEvent: """Event for registering user activity (ping to prevent timeout).""" - def __init__(self): + def __init__(self) -> None: self.type: Literal[ClientToOrchestratorEvent.USER_ACTIVITY] = ClientToOrchestratorEvent.USER_ACTIVITY def to_dict(self) -> dict: @@ -118,7 +118,7 @@ class ClientTools: ensuring non-blocking operation of the main conversation thread. """ - def __init__(self): + def __init__(self) -> None: self.tools: dict[str, tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} self.lock = threading.Lock() self._loop = None @@ -248,7 +248,7 @@ class Conversation: _should_stop: threading.Event _conversation_id: Optional[str] _last_interrupt_id: int - _ws: Optional[ClientConnection] + _ws: Optional[Connection] def __init__( self, From 61512ebc968fc82043746cba8bbbc0191f8a2547 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 10:36:03 +0100 Subject: [PATCH 4/6] fix --- .../conversational_ai/conversation.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index a3e7e68e..de4573ac 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -2,7 +2,7 @@ import base64 import json import threading -from typing import Callable, Optional, Awaitable, Union, Any, Literal +from typing import Callable, Optional, Awaitable, Union, Any, Literal, Dict, Tuple import asyncio from concurrent.futures import ThreadPoolExecutor from enum import Enum @@ -119,7 +119,7 @@ class ClientTools: """ def __init__(self) -> None: - self.tools: dict[str, tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} + self.tools: Dict[str, Tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} self.lock = threading.Lock() self._loop = None self._thread = None @@ -296,11 +296,10 @@ def __init__( self.client_tools.start() self._thread = None - self._ws: Optional[ClientConnection] = None + self._ws: Optional[Connection] = None self._should_stop = threading.Event() self._conversation_id = None self._last_interrupt_id = 0 - self._ws = None def start_session(self): """Starts the conversation session. @@ -491,16 +490,6 @@ def send_response(response): else: pass # Ignore all other message types. - def send_contextual_update(self, text: str): - if not self._ws: - raise RuntimeError("WebSocket is not connected") - - payload = { - "type": "contextual_update", - "text": text, - } - self._ws.send(json.dumps(payload)) - def _get_wss_url(self): base_ws_url = self.client._client_wrapper.get_environment().wss return f"{base_ws_url}/v1/convai/conversation?agent_id={self.agent_id}" From 35a8e9f066db084798dcee6a9c6a0b0dedf6ea6b Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 10:45:32 +0100 Subject: [PATCH 5/6] use inbuilt types --- src/elevenlabs/conversational_ai/conversation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index de4573ac..9c90f843 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -2,7 +2,7 @@ import base64 import json import threading -from typing import Callable, Optional, Awaitable, Union, Any, Literal, Dict, Tuple +from typing import Callable, Optional, Awaitable, Union, Any, Literal import asyncio from concurrent.futures import ThreadPoolExecutor from enum import Enum @@ -119,7 +119,7 @@ class ClientTools: """ def __init__(self) -> None: - self.tools: Dict[str, Tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} + self.tools: dict[str, tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} self.lock = threading.Lock() self._loop = None self._thread = None From 4dd4d1f698bb5399c6825a05d71639b1635d3958 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 27 May 2025 11:08:03 +0100 Subject: [PATCH 6/6] fix --- src/elevenlabs/conversational_ai/conversation.py | 7 +++++-- tests/test_convai.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 9c90f843..497dc8ea 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -2,7 +2,7 @@ import base64 import json import threading -from typing import Callable, Optional, Awaitable, Union, Any, Literal +from typing import Callable, Optional, Awaitable, Union, Any, Literal, Dict, Tuple import asyncio from concurrent.futures import ThreadPoolExecutor from enum import Enum @@ -119,7 +119,7 @@ class ClientTools: """ def __init__(self) -> None: - self.tools: dict[str, tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} + self.tools: Dict[str, Tuple[Union[Callable[[dict], Any], Callable[[dict], Awaitable[Any]]], bool]] = {} self.lock = threading.Lock() self._loop = None self._thread = None @@ -196,6 +196,9 @@ def execute_tool(self, tool_name: str, parameters: dict, callback: Callable[[dic """ if not self._running.is_set(): raise RuntimeError("ClientTools event loop is not running") + + if self._loop is None: + raise RuntimeError("Event loop is not available") async def _execute_and_callback(): try: diff --git a/tests/test_convai.py b/tests/test_convai.py index c57135f1..65a12cae 100644 --- a/tests/test_convai.py +++ b/tests/test_convai.py @@ -191,5 +191,5 @@ def test_conversation_with_contextual_update(): conversation.wait_for_session_end() # Assertions - expected = json.dumps({"type": "contextual_update", "text": "User appears to be looking at pricing page"}) + expected = json.dumps({"type": "contextual_update", "content": "User appears to be looking at pricing page"}) mock_ws.send.assert_any_call(expected)