-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_audio_auditor.py
More file actions
354 lines (274 loc) · 12.7 KB
/
Copy pathtest_audio_auditor.py
File metadata and controls
354 lines (274 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
"""[FACT] Tests for Live Multimodal Auditing pipeline.
[HYPOTHESIS] Audio chunk ingestion, transcription, and validation
can be tested independently of actual audio hardware.
"""
from __future__ import annotations
import base64
import sys
import time
from pathlib import Path
from unittest.mock import Mock
import pytest
# [FACT] Add helix_code to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from audio_auditor import (
AudioAuditor,
AudioAuditSession,
AudioChunk,
TranscriptionSegment,
create_audio_auditor,
)
class TestAudioChunk:
"""[FACT] Unit tests for AudioChunk dataclass."""
def test_chunk_creation(self) -> None:
"""[FACT] AudioChunk stores PCM data with metadata."""
chunk = AudioChunk(
timestamp=time.time(),
pcm_data=b"\x00\x01\x02\x03",
sequence_num=0,
duration_ms=100.0,
)
assert chunk.sequence_num == 0
assert chunk.duration_ms == 100.0
assert len(chunk.pcm_data) == 4
class TestTranscriptionSegment:
"""[FACT] Unit tests for TranscriptionSegment dataclass."""
def test_segment_creation(self) -> None:
"""[FACT] Segment stores transcription with validation results."""
segment = TranscriptionSegment(
text="Hello world",
start_time=time.time(),
end_time=time.time() + 1.0,
is_final=True,
confidence=0.95,
validation_result={"valid": True, "intervention_required": False},
receipt_id="r_test_123",
)
assert segment.text == "Hello world"
assert segment.confidence == 0.95
assert segment.receipt_id == "r_test_123"
class TestAudioAuditSession:
"""[FACT] Unit tests for AudioAuditSession."""
def test_session_initialization(self) -> None:
"""[FACT] Session initializes with default parameters."""
from datetime import datetime
session = AudioAuditSession(
session_id="test_session",
created_at=datetime.utcnow(),
)
assert session.sample_rate == 16000
assert session.channels == 1
assert len(session.audio_buffer) == 0
assert session.guardian is not None
def test_session_callbacks(self) -> None:
"""[FACT] Session can store callback functions."""
from datetime import datetime
mock_callback = Mock()
session = AudioAuditSession(
session_id="test_session",
created_at=datetime.utcnow(),
on_transcription=mock_callback,
)
assert session.on_transcription is mock_callback
class TestAudioAuditor:
"""[FACT] Integration tests for AudioAuditor."""
@pytest.fixture
def auditor(self) -> AudioAuditor:
"""[FACT] Create fresh auditor for each test."""
auditor = create_audio_auditor(api_key="test_key")
auditor.enable_simulation_fallback = True
return auditor
@pytest.mark.anyio
async def test_create_session(self, auditor: AudioAuditor) -> None:
"""[FACT] Can create and retrieve audit session."""
session = await auditor.create_session("test_123")
assert session.session_id == "test_123"
assert "test_123" in auditor.sessions
@pytest.mark.anyio
async def test_ingest_audio_chunk(self, auditor: AudioAuditor) -> None:
"""[FACT] Can ingest base64-encoded PCM audio."""
await auditor.create_session("test_123")
# [FACT] Create fake PCM data (16-bit samples)
pcm_data = b"\x00\x00\x01\x00\x02\x00\x03\x00" # 4 samples
base64_pcm = base64.b64encode(pcm_data).decode()
result = await auditor.ingest_audio_chunk("test_123", base64_pcm)
assert result["status"] == "accepted"
assert result["chunk_num"] == 0
assert result["buffer_size"] == 1
@pytest.mark.anyio
async def test_ingest_invalid_session(self, auditor: AudioAuditor) -> None:
"""[FACT] Returns error for non-existent session."""
result = await auditor.ingest_audio_chunk("nonexistent", "dGVzdA==")
assert result["status"] == "error"
assert "not found" in result["error"]
@pytest.mark.anyio
async def test_ingest_invalid_base64(self, auditor: AudioAuditor) -> None:
"""[FACT] Handles invalid base64 gracefully."""
await auditor.create_session("test_123")
result = await auditor.ingest_audio_chunk("test_123", "!!!invalid!!!")
assert result["status"] == "error"
@pytest.mark.anyio
async def test_ingest_rejects_oversized_payload(self, auditor: AudioAuditor) -> None:
"""[FACT] Oversized base64 payload is rejected before decode."""
await auditor.create_session("test_oversize")
auditor.max_base64_chars = 12
result = await auditor.ingest_audio_chunk("test_oversize", "A" * 20)
assert result["status"] == "error"
assert result["error_code"] == "PAYLOAD_TOO_LARGE"
@pytest.mark.anyio
async def test_ingest_rate_limited(self, auditor: AudioAuditor) -> None:
"""[FACT] Session ingest is rate-limited to protect service budget."""
await auditor.create_session("test_rate")
auditor.rate_window_seconds = 60.0
auditor.max_chunks_per_window = 2
pcm_data = b"\x00\x00" * 10
base64_pcm = base64.b64encode(pcm_data).decode()
ok1 = await auditor.ingest_audio_chunk("test_rate", base64_pcm)
ok2 = await auditor.ingest_audio_chunk("test_rate", base64_pcm)
blocked = await auditor.ingest_audio_chunk("test_rate", base64_pcm)
assert ok1["status"] == "accepted"
assert ok2["status"] == "accepted"
assert blocked["status"] == "error"
assert blocked["error_code"] == "RATE_LIMITED"
@pytest.mark.anyio
async def test_handle_gemini_response_ignores_non_text(self, auditor: AudioAuditor) -> None:
"""[FACT] Non-text Gemini events are ignored, not stringified into transcripts."""
session = await auditor.create_session("test_non_text")
await auditor._handle_gemini_response(session, {"event": "metadata_only"})
assert len(session.segments) == 0
@pytest.mark.anyio
async def test_handle_gemini_response_uses_input_transcription(
self, auditor: AudioAuditor
) -> None:
"""[FACT] Input transcription events are converted into validated segments."""
session = await auditor.create_session("test_input_tx")
await auditor._handle_gemini_response(
session,
{"server_content": {"input_transcription": {"text": "hello audio world"}}},
)
assert len(session.segments) == 1
assert session.segments[0].text == "hello audio world"
@pytest.mark.anyio
async def test_process_turn_empty_buffer(self, auditor: AudioAuditor) -> None:
"""[FACT] Handles empty buffer gracefully."""
await auditor.create_session("test_123")
result = await auditor.process_turn("test_123")
assert result["status"] == "no_audio"
@pytest.mark.anyio
async def test_process_turn_no_transcript_when_simulation_disabled(
self, auditor: AudioAuditor
) -> None:
"""[FACT] No synthetic transcript is generated when simulation is disabled."""
auditor.enable_simulation_fallback = False
await auditor.create_session("test_no_transcript")
pcm_data = b"\x00\x00" * 1600
base64_pcm = base64.b64encode(pcm_data).decode()
await auditor.ingest_audio_chunk("test_no_transcript", base64_pcm)
result = await auditor.process_turn("test_no_transcript")
assert result["status"] == "no_transcript_available"
assert result["error_code"] == "NO_TRANSCRIPT_AVAILABLE"
@pytest.mark.anyio
async def test_detect_turn_end_threshold(self, auditor: AudioAuditor) -> None:
"""[FACT] Turn detection triggers after ~2 seconds of audio."""
session = await auditor.create_session("test_123")
# [FACT] Add chunks until threshold
for i in range(20):
pcm_data = b"\x00\x00" * 1600 # 100ms @ 16kHz
base64_pcm = base64.b64encode(pcm_data).decode()
result = await auditor.ingest_audio_chunk("test_123", base64_pcm)
assert result["should_process"] is True
@pytest.mark.anyio
async def test_close_session(self, auditor: AudioAuditor) -> None:
"""[FACT] Session cleanup removes from active sessions."""
await auditor.create_session("test_123")
assert "test_123" in auditor.sessions
await auditor.close_session("test_123")
assert "test_123" not in auditor.sessions
@pytest.mark.anyio
async def test_get_session_stats(self, auditor: AudioAuditor) -> None:
"""[FACT] Stats reflect session activity."""
session = await auditor.create_session("test_123")
# Add some audio
pcm_data = b"\x00\x00" * 1600
base64_pcm = base64.b64encode(pcm_data).decode()
await auditor.ingest_audio_chunk("test_123", base64_pcm)
stats = auditor.get_session_stats("test_123")
assert stats["total_chunks"] == 1
assert stats["session_id"] == "test_123"
def test_get_stats_invalid_session(self, auditor: AudioAuditor) -> None:
"""[FACT] Returns error for invalid session stats request."""
stats = auditor.get_session_stats("nonexistent")
assert "error" in stats
class TestAudioAuditorIntegration:
"""[FACT] Integration tests with mocked Gemini API."""
@pytest.mark.anyio
async def test_full_pipeline_simulation(self) -> None:
"""[FACT] End-to-end audio -> transcription -> validation pipeline."""
auditor = create_audio_auditor()
auditor.enable_simulation_fallback = True
callbacks: list[dict] = []
def on_transcription(segment: TranscriptionSegment) -> None:
callbacks.append({"type": "transcription", "text": segment.text})
def on_intervention(text: str, drift_code: str) -> None:
callbacks.append({"type": "intervention", "drift_code": drift_code})
session = await auditor.create_session(
"integration_test",
on_transcription=on_transcription,
on_intervention=on_intervention,
)
# [FACT] Simulate audio input (20 chunks = ~2 seconds)
for _ in range(20):
pcm_data = b"\x00\x00" * 1600
base64_pcm = base64.b64encode(pcm_data).decode()
await auditor.ingest_audio_chunk("integration_test", base64_pcm)
# Process the turn
result = await auditor.process_turn("integration_test")
# [FACT] Should have processed and created a segment
assert result["status"] == "processed"
assert len(session.segments) == 1
# Cleanup
await auditor.close_session("integration_test")
class TestAudioAuditorCallbacks:
"""[FACT] Test callback invocation."""
@pytest.mark.anyio
async def test_transcription_callback(self) -> None:
"""[FACT] Callback invoked on transcription completion."""
received: list[str] = []
def callback(segment: TranscriptionSegment) -> None:
received.append(segment.text)
auditor = create_audio_auditor()
auditor.enable_simulation_fallback = True
await auditor.create_session("cb_test", on_transcription=callback)
# Add audio and process
for _ in range(20):
pcm_data = b"\x00\x00" * 1600
base64_pcm = base64.b64encode(pcm_data).decode()
await auditor.ingest_audio_chunk("cb_test", base64_pcm)
await auditor.process_turn("cb_test")
# [FACT] Callback should have been invoked
assert len(received) == 1
assert isinstance(received[0], str)
class TestEdgeCases:
"""[FACT] Edge case handling."""
@pytest.mark.anyio
async def test_very_small_chunks(self) -> None:
"""[FACT] Handles very small audio chunks."""
auditor = create_audio_auditor()
await auditor.create_session("small_test")
# 1 sample = 2 bytes
pcm_data = b"\x00\x00"
base64_pcm = base64.b64encode(pcm_data).decode()
result = await auditor.ingest_audio_chunk("small_test", base64_pcm)
assert result["status"] == "accepted"
assert result["duration_ms"] < 1 # Very short
@pytest.mark.anyio
async def test_large_base64_payload(self) -> None:
"""[FACT] Handles reasonably large audio payloads."""
auditor = create_audio_auditor()
await auditor.create_session("large_test")
# 1 second of 16kHz audio = 32000 bytes
pcm_data = b"\x00\x00" * 16000
base64_pcm = base64.b64encode(pcm_data).decode()
result = await auditor.ingest_audio_chunk("large_test", base64_pcm)
assert result["status"] == "accepted"
assert result["duration_ms"] == 1000.0