-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_test.py
More file actions
131 lines (107 loc) Β· 4.93 KB
/
load_test.py
File metadata and controls
131 lines (107 loc) Β· 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import asyncio
import os
import time
from datetime import datetime
import numpy as np
from livekit import rtc
from dotenv import load_dotenv
from livekit.api import LiveKitAPI, AccessToken, VideoGrants, CreateRoomRequest
# Load environment variables from your .env file
load_dotenv()
# --- Your LiveKit Credentials ---
LIVEKIT_URL = os.getenv("LIVEKIT_URL_without_wss")
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
# --- Test Configuration ---
NUMBER_OF_CALLS = 10 # The number of concurrent calls to simulate
TEST_ROOM_PREFIX = "load-test-call-"
# A thread-safe list to store latency results
latency_results = []
class SimulatedCaller:
"""
Simulates a single caller connecting to a LiveKit room, waiting for the agent's
first audio frame, and then disconnecting.
"""
def __init__(self, livekit_api: LiveKitAPI, room_name: str, token: str):
self.livekit_api = livekit_api
self.room_name = room_name
self.token = token
self.room = rtc.Room()
self.start_time = 0
self.latency_recorded = asyncio.Event()
# Event handlers
self.room.on("track_subscribed", self.on_track_subscribed)
def on_track_subscribed(self, track: rtc.Track, publication: rtc.TrackPublication, participant: rtc.RemoteParticipant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
print(f"π§ Caller in {self.room_name} subscribed to agent's audio track.")
asyncio.create_task(self.process_audio_stream(track))
async def process_audio_stream(self, track: rtc.RemoteAudioTrack):
audio_stream = rtc.AudioStream(track)
async for frame in audio_stream:
if not self.latency_recorded.is_set():
latency = time.time() - self.start_time
latency_results.append(latency)
print(f"π€ Agent in {self.room_name} responded in {latency:.3f} seconds.")
self.latency_recorded.set()
break # Stop processing after the first frame
async def run(self):
try:
await self.room.connect(f"wss://{LIVEKIT_URL}", self.token)
print(f"β
Caller connected to room: {self.room_name}. Waiting for agent...")
self.start_time = time.time()
await asyncio.wait_for(self.latency_recorded.wait(), timeout=15.0)
except asyncio.TimeoutError:
print(f"β° Timeout: Agent in {self.room_name} did not respond within 15 seconds.")
except Exception as e:
print(f"β Error in caller for room {self.room_name}: {e}")
finally:
if self.room.isconnected:
await self.room.disconnect()
async def main():
"""
Main function to orchestrate the load test.
"""
if not all([LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET]):
print("β Error: Please ensure LIVEKIT_URL_without_wss, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set.")
return
livekit_api = LiveKitAPI(f"https://{LIVEKIT_URL}", LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
print(f"π Starting load test for {NUMBER_OF_CALLS} concurrent calls...")
tasks = []
for i in range(NUMBER_OF_CALLS):
room_name = f"{TEST_ROOM_PREFIX}{i}"
participant_identity = f"simulated-caller-{i}"
try:
await livekit_api.room.create_room(CreateRoomRequest(name=room_name))
token = (AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(participant_identity)
.with_grants(VideoGrants(room_join=True, room=room_name))
.to_jwt())
caller = SimulatedCaller(livekit_api, room_name, token)
tasks.append(asyncio.create_task(caller.run()))
await asyncio.sleep(0.1) # Stagger connections
except Exception as e:
print(f"β Failed to create room or token for call {i}: {e}")
await asyncio.gather(*tasks)
if latency_results:
avg_latency = np.mean(latency_results)
p95_latency = np.percentile(latency_results, 95)
p99_latency = np.percentile(latency_results, 99)
print("\n--- π Latency Report ---")
print(f"Successful Connections: {len(latency_results)} / {NUMBER_OF_CALLS}")
print(f"Average Agent Join Time: {avg_latency:.2f} seconds")
print(f"p95 Latency (95% of calls were faster than): {p95_latency:.2f} seconds")
print(f"p99 Latency (99% of calls were faster than): {p99_latency:.2f} seconds")
print("------------------------")
print("\nπ§Ή Cleaning up test rooms...")
for i in range(NUMBER_OF_CALLS):
try:
await livekit_api.room.delete_room(room=f"{TEST_ROOM_PREFIX}{i}")
except Exception:
pass
await livekit_api.aclose()
print("\nπ Load test finished.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nπ Test interrupted by user.")