11import json
22import asyncio
3- from typing import Optional , Dict , Any
3+ from typing import Optional , Dict , Any , Callable , Union , Awaitable
44import httpx
55from livekit .rtc import Room , TrackKind
66
77from .base_connection import BaseConnection
88
99
10+ class WebRTCConnectionConfig :
11+ """Configuration for WebRTC connection."""
12+ def __init__ (
13+ self ,
14+ conversation_token : Optional [str ] = None ,
15+ agent_id : Optional [str ] = None ,
16+ livekit_url : Optional [str ] = None ,
17+ api_origin : Optional [str ] = None ,
18+ overrides : Optional [Dict [str , Any ]] = None ,
19+ on_debug : Optional [Callable [[Dict [str , Any ]], None ]] = None ,
20+ ):
21+ self .conversation_token = conversation_token
22+ self .agent_id = agent_id
23+ self .livekit_url = livekit_url
24+ self .api_origin = api_origin
25+ self .overrides = overrides or {}
26+ self .on_debug = on_debug
27+
28+
1029class WebRTCConnection (BaseConnection ):
1130 """WebRTC-based connection for conversations using LiveKit."""
1231
13- LIVEKIT_WS_URL = "wss://livekit.rtc.elevenlabs.io"
32+ DEFAULT_LIVEKIT_WS_URL = "wss://livekit.rtc.elevenlabs.io"
33+ DEFAULT_API_ORIGIN = "https://api.elevenlabs.io"
1434
15- def __init__ (self , conversation_token : Optional [str ] = None , agent_id : Optional [str ] = None ):
35+ def __init__ (
36+ self ,
37+ conversation_token : Optional [str ] = None ,
38+ agent_id : Optional [str ] = None ,
39+ livekit_url : Optional [str ] = None ,
40+ api_origin : Optional [str ] = None ,
41+ overrides : Optional [Dict [str , Any ]] = None ,
42+ on_debug : Optional [Callable [[Dict [str , Any ]], None ]] = None ,
43+ ):
1644 super ().__init__ ()
1745 self .conversation_token = conversation_token
1846 self .agent_id = agent_id
47+ self .livekit_url = livekit_url or self .DEFAULT_LIVEKIT_WS_URL
48+ self .api_origin = api_origin or self .DEFAULT_API_ORIGIN
49+ self .overrides = overrides or {}
50+ self .on_debug = on_debug
1951 self ._room : Optional [Room ] = None
2052 self ._is_connected = False
2153
54+ @classmethod
55+ async def create (cls , config : WebRTCConnectionConfig ) -> "WebRTCConnection" :
56+ """Create and connect a WebRTC connection."""
57+ connection = cls (
58+ conversation_token = config .conversation_token ,
59+ agent_id = config .agent_id ,
60+ livekit_url = config .livekit_url ,
61+ api_origin = config .api_origin ,
62+ overrides = config .overrides ,
63+ on_debug = config .on_debug ,
64+ )
65+
66+ await connection .connect ()
67+ return connection
68+
2269 async def connect (self ) -> None :
2370 """Establish the WebRTC connection using LiveKit."""
2471 # Get conversation token if not provided
@@ -31,19 +78,26 @@ async def connect(self) -> None:
3178 self ._room = Room ()
3279 self ._setup_room_callbacks ()
3380
34- # Connect to LiveKit room
35- await self ._room .connect (self .LIVEKIT_WS_URL , self .conversation_token )
81+ # Connect to LiveKit room using configurable URL
82+ await self ._room .connect (self .livekit_url , self .conversation_token )
3683 self ._is_connected = True
3784
3885 # Set conversation ID from room name if available
3986 if self ._room .name :
40- self .conversation_id = self ._room .name
87+ # Extract conversation ID from room name if it contains one
88+ import re
89+ match = re .search (r'(conv_[a-zA-Z0-9]+)' , self ._room .name )
90+ self .conversation_id = match .group (0 ) if match else self ._room .name
4191 else :
4292 self .conversation_id = f"webrtc-{ id (self )} "
4393
4494 # Enable microphone
4595 await self ._room .local_participant .set_microphone_enabled (True )
4696
97+ # Send overrides if any
98+ if self .overrides :
99+ await self .send_message (self ._construct_overrides ())
100+
47101 async def close (self ) -> None :
48102 """Close the WebRTC connection."""
49103 if self ._room :
@@ -78,13 +132,24 @@ async def _fetch_conversation_token(self) -> str:
78132 if not self .agent_id :
79133 raise ValueError ("Agent ID is required to fetch conversation token" )
80134
81- url = f"https://api.elevenlabs.io/v1/convai/conversation/token?agent_id={ self .agent_id } "
135+ # Get version and source from overrides or use defaults
136+ version = self .overrides .get ("client" , {}).get ("version" , "2.15.0" ) # From pyproject.toml
137+ source = self .overrides .get ("client" , {}).get ("source" , "python_sdk" )
138+
139+ # Convert WSS origin to HTTPS for API calls
140+ api_origin = self ._convert_wss_to_https (self .api_origin )
141+
142+ url = f"{ api_origin } /v1/convai/conversation/token?agent_id={ self .agent_id } &source={ source } &version={ version } "
82143
83144 async with httpx .AsyncClient () as client :
84145 response = await client .get (url )
85146
86147 if not response .is_success :
87- raise Exception (f"Failed to fetch conversation token for agent { self .agent_id } : { response .status_code } { response .text } " )
148+ error_msg = f"ElevenLabs API returned { response .status_code } { response .reason_phrase } "
149+ if response .status_code == 401 :
150+ error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided."
151+
152+ raise Exception (f"Failed to fetch conversation token for agent { self .agent_id } : { error_msg } " )
88153
89154 data = response .json ()
90155 token = data .get ("token" )
@@ -94,19 +159,43 @@ async def _fetch_conversation_token(self) -> str:
94159
95160 return token
96161
162+ def _convert_wss_to_https (self , origin : str ) -> str :
163+ """Convert WSS origin to HTTPS for API calls."""
164+ return origin .replace ("wss://" , "https://" )
165+
166+ def _construct_overrides (self ) -> Dict [str , Any ]:
167+ """Construct overrides message for conversation initiation."""
168+ return {
169+ "type" : "conversation_initiation_client_data" ,
170+ "overrides" : self .overrides
171+ }
172+
173+ def debug (self , info : Dict [str , Any ]) -> None :
174+ """Log debug information."""
175+ if self .on_debug :
176+ self .on_debug (info )
177+
97178 def _setup_room_callbacks (self ) -> None :
98179 """Setup LiveKit room event callbacks."""
99180 if not self ._room :
100181 return
101182
102183 @self ._room .on ("connected" )
103184 def on_connected () -> None :
104- print ("WebRTC room connected" )
185+ self ._is_connected = True
186+ self .debug ({"type" : "webrtc_connected" , "message" : "WebRTC room connected" })
105187
106188 @self ._room .on ("disconnected" )
107189 def on_disconnected (reason : Optional [str ] = None ) -> None :
108190 self ._is_connected = False
109- print (f"WebRTC room disconnected: { reason } " )
191+ self .debug ({"type" : "webrtc_disconnected" , "message" : f"WebRTC room disconnected: { reason } " })
192+
193+ @self ._room .on ("connection_state_changed" )
194+ def on_connection_state_changed (state ) -> None :
195+ self .debug ({"type" : "connection_state_changed" , "state" : str (state )})
196+ # Handle disconnected state
197+ if hasattr (state , 'name' ) and state .name == 'DISCONNECTED' :
198+ self ._is_connected = False
110199
111200 @self ._room .on ("data_received" )
112201 def on_data_received (data : bytes , participant ) -> None :
@@ -119,13 +208,49 @@ def on_data_received(data: bytes, participant) -> None:
119208
120209 self ._handle_message (message )
121210 except (json .JSONDecodeError , UnicodeDecodeError ) as e :
122- print (f"Failed to parse incoming data message: { e } " )
211+ self .debug ({
212+ "type" : "data_parse_error" ,
213+ "error" : str (e ),
214+ "raw_data" : data .decode ('utf-8' , errors = 'replace' )
215+ })
123216
124217 @self ._room .on ("track_subscribed" )
125218 def on_track_subscribed (track , publication , participant ) -> None :
126219 if track .kind == TrackKind .KIND_AUDIO and "agent" in participant .identity :
127- # Handle incoming agent audio
128- print ("Subscribed to agent audio track" )
220+ self .debug ({
221+ "type" : "agent_audio_track_subscribed" ,
222+ "participant" : participant .identity
223+ })
224+
225+ @self ._room .on ("active_speakers_changed" )
226+ def on_active_speakers_changed (speakers ) -> None :
227+ # Update mode based on active speakers
228+ if speakers and len (speakers ) > 0 :
229+ is_agent_speaking = any ("agent" in speaker .identity for speaker in speakers )
230+ mode = "speaking" if is_agent_speaking else "listening"
231+ else :
232+ mode = "listening"
233+
234+ self .debug ({"type" : "mode_changed" , "mode" : mode })
235+
236+ async def set_microphone_enabled (self , enabled : bool ) -> None :
237+ """Enable or disable the microphone."""
238+ if not self ._room or not self ._room .local_participant :
239+ raise RuntimeError ("Room not connected" )
240+
241+ await self ._room .local_participant .set_microphone_enabled (enabled )
242+
243+ async def set_microphone_device (self , device_id : str ) -> None :
244+ """Set the microphone input device."""
245+ if not self ._room or not self ._room .local_participant :
246+ raise RuntimeError ("Room not connected" )
247+
248+ # This would require additional LiveKit functionality for device switching
249+ # For now, we log the request
250+ self .debug ({
251+ "type" : "microphone_device_change_requested" ,
252+ "device_id" : device_id
253+ })
129254
130255 def get_room (self ) -> Optional [Room ]:
131256 """Get the LiveKit room instance for advanced usage."""
0 commit comments