Skip to content

Commit a0042ea

Browse files
authored
fix: openai realtime participants and conversation sync (#258)
1 parent 6ab5238 commit a0042ea

2 files changed

Lines changed: 7 additions & 133 deletions

File tree

plugins/openai/tests/test_openai_realtime.py

Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,6 @@ async def on_user_transcript(event: RealtimeUserSpeechTranscriptionEvent):
250250
)
251251
await realtime.simple_audio_response(pcm, test_participant)
252252

253-
# Simulate OpenAI creating the conversation item (this is when we map item_id -> participant)
254-
item_created_event = {
255-
"type": "conversation.item.created",
256-
"event_id": "event_created_123",
257-
"item": {
258-
"id": "item_test_456",
259-
"type": "message",
260-
"role": "user",
261-
"content": [],
262-
},
263-
}
264-
await realtime._handle_openai_event(item_created_event)
265-
266253
# Now simulate receiving the transcription event from OpenAI
267254
openai_event = {
268255
"content_index": 0,
@@ -286,95 +273,3 @@ async def on_user_transcript(event: RealtimeUserSpeechTranscriptionEvent):
286273

287274
# Verify the user_id() helper method works
288275
assert user_transcripts[0].user_id() == "test_user_123"
289-
290-
async def test_multi_user_participant_tracking(self, realtime):
291-
"""Test that participant tracking works correctly when multiple users speak in succession"""
292-
user_transcripts = []
293-
294-
@realtime.events.subscribe
295-
async def on_user_transcript(event: RealtimeUserSpeechTranscriptionEvent):
296-
user_transcripts.append(event)
297-
298-
from vision_agents.core.edge.types import Participant
299-
from getstream.video.rtc.track_util import PcmData, AudioFormat
300-
import numpy as np
301-
302-
# User A sends audio
303-
participant_a = Participant(original=None, user_id="user_a")
304-
pcm_a = PcmData(
305-
samples=np.zeros(100, dtype=np.int16),
306-
sample_rate=48000,
307-
format=AudioFormat.S16,
308-
)
309-
await realtime.simple_audio_response(pcm_a, participant_a)
310-
311-
# OpenAI creates conversation item for User A
312-
item_created_a = {
313-
"type": "conversation.item.created",
314-
"event_id": "event_created_a",
315-
"item": {
316-
"id": "item_a_123",
317-
"type": "message",
318-
"role": "user",
319-
"content": [],
320-
},
321-
}
322-
await realtime._handle_openai_event(item_created_a)
323-
324-
# User B sends audio (before A's transcription arrives)
325-
participant_b = Participant(original=None, user_id="user_b")
326-
pcm_b = PcmData(
327-
samples=np.zeros(100, dtype=np.int16),
328-
sample_rate=48000,
329-
format=AudioFormat.S16,
330-
)
331-
await realtime.simple_audio_response(pcm_b, participant_b)
332-
333-
# OpenAI creates conversation item for User B
334-
item_created_b = {
335-
"type": "conversation.item.created",
336-
"event_id": "event_created_b",
337-
"item": {
338-
"id": "item_b_456",
339-
"type": "message",
340-
"role": "user",
341-
"content": [],
342-
},
343-
}
344-
await realtime._handle_openai_event(item_created_b)
345-
346-
# Now transcriptions arrive (A's transcription arrives AFTER B started speaking)
347-
transcription_a = {
348-
"content_index": 0,
349-
"event_id": "event_trans_a",
350-
"item_id": "item_a_123", # References User A's item
351-
"transcript": "Hello from User A",
352-
"type": "conversation.item.input_audio_transcription.completed",
353-
"usage": {"seconds": 1, "type": "duration"},
354-
}
355-
await realtime._handle_openai_event(transcription_a)
356-
357-
transcription_b = {
358-
"content_index": 0,
359-
"event_id": "event_trans_b",
360-
"item_id": "item_b_456", # References User B's item
361-
"transcript": "Hello from User B",
362-
"type": "conversation.item.input_audio_transcription.completed",
363-
"usage": {"seconds": 1, "type": "duration"},
364-
}
365-
await realtime._handle_openai_event(transcription_b)
366-
367-
await asyncio.sleep(0.1)
368-
369-
# Verify both transcriptions are attributed to the correct users
370-
assert len(user_transcripts) == 2
371-
372-
# User A's transcription should be attributed to User A (not B, despite B speaking more recently)
373-
assert user_transcripts[0].text == "Hello from User A"
374-
assert user_transcripts[0].participant is not None
375-
assert user_transcripts[0].participant.user_id == "user_a"
376-
377-
# User B's transcription should be attributed to User B
378-
assert user_transcripts[1].text == "Hello from User B"
379-
assert user_transcripts[1].participant is not None
380-
assert user_transcripts[1].participant.user_id == "user_b"

plugins/openai/vision_agents/plugins/openai/openai_realtime.py

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,6 @@ def __init__(
102102
self.realtime_session["audio"]["output"] = RealtimeAudioConfigOutputParam()
103103
self.realtime_session["audio"]["output"]["voice"] = self.voice
104104

105-
# Map conversation item_id to participant to handle multi-user scenarios
106-
self._item_to_participant: Dict[str, Participant] = {}
107-
self._pending_participant: Optional[Participant] = None
108-
109105
# Track pending tool calls: item_id -> {call_id, name, argument_parts: []}
110106
# We accumulate argument deltas until response.output_item.done
111107
self._pending_tool_calls: Dict[str, Dict[str, Any]] = {}
@@ -196,8 +192,8 @@ async def simple_audio_response(
196192
audio: PCM audio frame to forward upstream.
197193
participant: Optional participant information for the audio source.
198194
"""
199-
# Track pending participant for the next conversation item
200-
self._pending_participant = participant
195+
# Track current participant for user speech transcription events
196+
self._current_participant = participant
201197
await self.rtc.send_audio_pcm(audio)
202198

203199
async def close(self):
@@ -246,15 +242,9 @@ async def _handle_openai_event(self, event: dict) -> None:
246242
conversation_item_id=transcript_event.item_id,
247243
)
248244
elif et == "conversation.item.created":
249-
# When OpenAI creates a conversation item, map it to the participant who sent the audio
250-
item = event.get("item", {})
251-
if item.get("type") == "message" and item.get("role") == "user":
252-
item_id = item.get("id")
253-
if item_id and self._pending_participant:
254-
self._item_to_participant[item_id] = self._pending_participant
255-
logger.debug(
256-
f"Mapped item {item_id} to participant {self._pending_participant.user_id if self._pending_participant else 'None'}"
257-
)
245+
# Conversation item created - no action needed
246+
# Participant tracking is handled via _current_participant in simple_audio_response
247+
pass
258248
elif et == "conversation.item.added":
259249
# Conversation item was added to the conversation
260250
pass
@@ -266,22 +256,11 @@ async def _handle_openai_event(self, event: dict) -> None:
266256
user_transcript_event: ConversationItemInputAudioTranscriptionCompletedEvent = ConversationItemInputAudioTranscriptionCompletedEvent.model_validate(
267257
event
268258
)
269-
item_id = user_transcript_event.item_id
270-
271-
# Look up the correct participant for this transcription
272-
participant = self._item_to_participant.get(item_id)
273-
274-
# Temporarily set the correct participant for this specific transcription
275-
original_participant = self._current_participant
276-
self._current_participant = participant
259+
# _current_participant is kept up-to-date in simple_audio_response
260+
# so it will be used by _emit_user_speech_transcription
277261
self._emit_user_speech_transcription(
278262
text=user_transcript_event.transcript, original=event
279263
)
280-
self._current_participant = original_participant
281-
282-
# Clean up the mapping to avoid memory leaks
283-
if item_id:
284-
self._item_to_participant.pop(item_id, None)
285264
elif et == "input_audio_buffer.speech_started":
286265
# Validate event but don't need to store it
287266
InputAudioBufferSpeechStartedEvent.model_validate(event)

0 commit comments

Comments
 (0)