66import numpy as np
77import json
88import websockets
9+ from websockets .exceptions import ConnectionClosed
910from datetime import datetime
1011from collections import defaultdict
1112import base64
@@ -97,29 +98,66 @@ def __init__(self, url=None, api_key=None):
9798 self .ws = None
9899
99100 def is_connected (self ):
100- return self .ws is not None
101+ if self .ws is None :
102+ return False
103+ # Some websockets versions don't have a closed attribute
104+ try :
105+ return not self .ws .closed
106+ except AttributeError :
107+ # Fallback: check if websocket is still alive by checking state
108+ try :
109+ return hasattr (self .ws , 'state' ) and self .ws .state .name == 'OPEN'
110+ except :
111+ # Last fallback: assume connected if ws exists
112+ return True
101113
102114 def log (self , * args ):
103115 logger .debug (f"[Websocket/{ datetime .utcnow ().isoformat ()} ]" , * args )
104116
105- async def connect (self , model = 'gpt-4o-realtime-preview-2024-10-01 ' ):
117+ async def connect (self , model = 'gpt-4o-mini- realtime-preview-2024-12-17 ' ):
106118 if self .is_connected ():
107119 raise Exception ("Already connected" )
108- self .ws = await websockets .connect (f"{ self .url } ?model={ model } " , extra_headers = {
120+
121+ headers = {
109122 'Authorization' : f'Bearer { self .api_key } ' ,
110123 'OpenAI-Beta' : 'realtime=v1'
111- })
124+ }
125+
126+ # Try different header parameter names for compatibility
127+ try :
128+ self .ws = await websockets .connect (f"{ self .url } ?model={ model } " , additional_headers = headers )
129+ except TypeError :
130+ # Fallback to older websockets versions
131+ try :
132+ self .ws = await websockets .connect (f"{ self .url } ?model={ model } " , extra_headers = headers )
133+ except TypeError :
134+ # Last fallback - some versions might not support headers parameter
135+ raise Exception ("Websockets library version incompatible. Please update websockets to version 11.0 or higher." )
136+
112137 self .log (f"Connected to { self .url } " )
113138 asyncio .create_task (self ._receive_messages ())
114139
115140 async def _receive_messages (self ):
116- async for message in self .ws :
117- event = json .loads (message )
118- if event ['type' ] == "error" :
119- logger .error ("ERROR" , event )
120- self .log ("received:" , event )
121- self .dispatch (f"server.{ event ['type' ]} " , event )
122- self .dispatch ("server.*" , event )
141+ try :
142+ async for message in self .ws :
143+ event = json .loads (message )
144+ if event ['type' ] == "error" :
145+ logger .error (f"OpenAI Realtime API Error: { event } " )
146+ self .log ("received:" , event )
147+ self .dispatch (f"server.{ event ['type' ]} " , event )
148+ self .dispatch ("server.*" , event )
149+ except ConnectionClosed as e :
150+ logger .info (f"WebSocket connection closed normally: { e } " )
151+ # Mark connection as closed
152+ self .ws = None
153+ # Dispatch disconnection event
154+ self .dispatch ("disconnected" , {"reason" : str (e )})
155+ except Exception as e :
156+ logger .warning (f"WebSocket receive loop ended: { e } " )
157+ # Mark connection as closed
158+ self .ws = None
159+ # Dispatch disconnection event
160+ self .dispatch ("disconnected" , {"reason" : str (e )})
123161
124162 async def send (self , event_name , data = None ):
125163 if not self .is_connected ():
@@ -135,16 +173,33 @@ async def send(self, event_name, data=None):
135173 self .dispatch (f"client.{ event_name } " , event )
136174 self .dispatch ("client.*" , event )
137175 self .log ("sent:" , event )
138- await self .ws .send (json .dumps (event ))
176+
177+ try :
178+ await self .ws .send (json .dumps (event ))
179+ except ConnectionClosed as e :
180+ logger .info (f"WebSocket connection closed during send: { e } " )
181+ # Mark connection as closed if send fails
182+ self .ws = None
183+ raise Exception (f"WebSocket connection lost: { e } " )
184+ except Exception as e :
185+ logger .error (f"Failed to send WebSocket message: { e } " )
186+ # Mark connection as closed if send fails
187+ self .ws = None
188+ raise Exception (f"WebSocket connection lost: { e } " )
139189
140190 def _generate_id (self , prefix ):
141191 return f"{ prefix } { int (datetime .utcnow ().timestamp () * 1000 )} "
142192
143193 async def disconnect (self ):
144194 if self .ws :
145- await self .ws .close ()
146- self .ws = None
147- self .log (f"Disconnected from { self .url } " )
195+ try :
196+ await self .ws .close ()
197+ logger .info (f"Disconnected from { self .url } " )
198+ except Exception as e :
199+ logger .warning (f"Error during WebSocket close: { e } " )
200+ finally :
201+ self .ws = None
202+ self .log (f"WebSocket connection cleaned up" )
148203
149204class RealtimeConversation :
150205 default_frequency = config .features .audio .sample_rate
@@ -341,8 +396,7 @@ def _process_audio_delta(self, event):
341396 return None , None
342397 array_buffer = base64_to_array_buffer (delta )
343398 append_values = array_buffer .tobytes ()
344- # TODO: make it work
345- # item['formatted']['audio'] = merge_int16_arrays(item['formatted']['audio'], append_values)
399+ item ['formatted' ]['audio' ].append (append_values )
346400 return item , {'audio' : append_values }
347401
348402 def _process_text_delta (self , event ):
@@ -381,7 +435,6 @@ def __init__(self, url=None, api_key=None):
381435 "tools" : [],
382436 "tool_choice" : "auto" ,
383437 "temperature" : 0.8 ,
384- "max_response_output_tokens" : 4096 ,
385438 }
386439 self .session_config = {}
387440 self .transcription_models = [{"model" : "whisper-1" }]
@@ -431,8 +484,13 @@ def _log_event(self, event):
431484 self .dispatch ("realtime.event" , realtime_event )
432485
433486 def _on_session_created (self , event ):
434- print (f"Session created: { event } " )
435- logger .debug (f"Session created: { event } " )
487+ try :
488+ session_id = event .get ('session' , {}).get ('id' , 'unknown' )
489+ model = event .get ('session' , {}).get ('model' , 'unknown' )
490+ logger .info (f"OpenAI Realtime session created - ID: { session_id } , Model: { model } " )
491+ except Exception as e :
492+ logger .warning (f"Error processing session created event: { e } " )
493+ logger .debug (f"Session event details: { event } " )
436494 self .session_created = True
437495
438496 def _process_event (self , event , * args ):
@@ -497,10 +555,15 @@ def reset(self):
497555 self ._add_api_event_handlers ()
498556 return True
499557
500- async def connect (self ):
558+ async def connect (self , model = None ):
501559 if self .is_connected ():
502560 raise Exception ("Already connected, use .disconnect() first" )
503- await self .realtime .connect ()
561+
562+ # Use provided model or default
563+ if model is None :
564+ model = 'gpt-4o-mini-realtime-preview-2024-12-17'
565+
566+ await self .realtime .connect (model )
504567 await self .update_session ()
505568 return True
506569
@@ -516,6 +579,7 @@ async def disconnect(self):
516579 self .conversation .clear ()
517580 if self .realtime .is_connected ():
518581 await self .realtime .disconnect ()
582+ logger .info ("RealtimeClient disconnected" )
519583
520584 def get_turn_detection_type (self ):
521585 return self .session_config .get ("turn_detection" , {}).get ("type" )
@@ -579,11 +643,22 @@ async def send_user_message_content(self, content=[]):
579643 return True
580644
581645 async def append_input_audio (self , array_buffer ):
646+ if not self .is_connected ():
647+ logger .warning ("Cannot append audio: RealtimeClient is not connected" )
648+ return False
649+
582650 if len (array_buffer ) > 0 :
583- await self .realtime .send ("input_audio_buffer.append" , {
584- "audio" : array_buffer_to_base64 (np .array (array_buffer )),
585- })
586- self .input_audio_buffer .extend (array_buffer )
651+ try :
652+ await self .realtime .send ("input_audio_buffer.append" , {
653+ "audio" : array_buffer_to_base64 (np .array (array_buffer )),
654+ })
655+ self .input_audio_buffer .extend (array_buffer )
656+ except Exception as e :
657+ logger .error (f"Failed to append input audio: { e } " )
658+ # Connection might be lost, mark as disconnected
659+ if "connection" in str (e ).lower () or "closed" in str (e ).lower ():
660+ logger .warning ("WebSocket connection appears to be lost. Audio input will be queued until reconnection." )
661+ return False
587662 return True
588663
589664 async def create_response (self ):
@@ -650,4 +725,17 @@ async def _send_chainlit_message(self, item):
650725 logger .debug (f"Unhandled item type:\n { json .dumps (item , indent = 2 )} " )
651726
652727 # Additional debug logging
653- logger .debug (f"Processed Chainlit message for item: { item .get ('id' , 'unknown' )} " )
728+ logger .debug (f"Processed Chainlit message for item: { item .get ('id' , 'unknown' )} " )
729+
730+ async def ensure_connected (self ):
731+ """Check connection health and attempt reconnection if needed"""
732+ if not self .is_connected ():
733+ try :
734+ logger .info ("Attempting to reconnect to OpenAI Realtime API..." )
735+ model = 'gpt-4o-mini-realtime-preview-2024-12-17'
736+ await self .connect (model )
737+ return True
738+ except Exception as e :
739+ logger .error (f"Failed to reconnect: { e } " )
740+ return False
741+ return True
0 commit comments