-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathtest12RealtimeCacheService.js
More file actions
168 lines (151 loc) · 5.79 KB
/
Copy pathtest12RealtimeCacheService.js
File metadata and controls
168 lines (151 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import { spawn } from 'child_process';
import net from 'net';
import path from 'path';
import { fileURLToPath } from 'url';
import fetch from 'node-fetch';
const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), '..');
const children = [];
let shuttingDown = false;
try {
const port = await getFreePort();
const baseUrl = `http://127.0.0.1:${port}`;
const streamId = `realtime-cache-${Date.now()}`;
children.push(spawnService('06-realtime-cache/server.js', {
REALTIME_CACHE_PORT: String(port),
REALTIME_CACHE_BACKEND: 'memory',
REALTIME_CACHE_TTL_SECONDS: '60'
}));
const health = await waitForJson(`${baseUrl}/health`, 'realtime cache health');
assert(health.ok === true && health.backend === 'memory', 'realtime cache did not start in memory mode');
console.log(`PASS realtime_cache_ready port=${port}`);
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/state`, {
state: 'connected',
regionCode: 'amer-east',
nodeId: 'test-node'
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/metrics`, {
metrics: {
audio_bytes_total: 1200,
video_bytes_total: 2400
}
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/latency`, {
name: 'webhook_ingress_latency_ms',
valueMs: 120,
source: 'test',
regionCode: 'amer-east',
nodeId: 'test-node'
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/latency`, {
name: 'webhook_ingress_latency_ms',
valueMs: 80,
source: 'test',
regionCode: 'amer-east',
nodeId: 'test-node'
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/latency`, {
name: 'signaling_ping_rtt_ms',
valueMs: 35,
source: 'test',
regionCode: 'amer-east',
nodeId: 'test-node'
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/summary`, {
text: 'live summary',
userName: 'Tester'
});
await postJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}/events`, {
type: 'first_packet'
});
console.log(`PASS realtime_cache_writes stream=${streamId}`);
const stream = await waitForJson(`${baseUrl}/streams/${encodeURIComponent(streamId)}`, 'realtime stream read');
assert(stream.state?.state === 'connected', 'state was not stored');
assert(stream.metrics?.audio_bytes_total === 1200, 'metrics were not stored');
assert(stream.latency?.webhook_ingress_latency_ms?.minMs === 80, 'webhook latency min was not stored');
assert(stream.latency?.webhook_ingress_latency_ms?.maxMs === 120, 'webhook latency max was not stored');
assert(stream.latency?.webhook_ingress_latency_ms?.avgMs === 100, 'webhook latency average was not stored');
assert(stream.latency?.signaling_ping_rtt_ms?.avgMs === 35, 'signaling latency average was not stored');
assert(stream.summary?.text === 'live summary', 'summary was not stored');
assert(stream.events?.[0]?.type === 'first_packet', 'event was not stored');
console.log('PASS realtime_cache_readback');
const dashboard = await fetchText(`${baseUrl}/dashboard`);
assert(dashboard.includes('RTMS Realtime Cache'), 'dashboard html missing title');
assert(dashboard.includes('Latency Summary'), 'dashboard html missing latency summary');
console.log('PASS realtime_cache_dashboard');
const metrics = await fetchText(`${baseUrl}/metrics`);
assert(metrics.includes('rtms_realtime_active_streams 1'), 'prometheus active stream metric missing');
assert(metrics.includes('metric="audio_bytes_total"'), 'prometheus summed metric missing');
assert(metrics.includes('rtms_realtime_latency_avg_ms'), 'prometheus latency average metric missing');
assert(metrics.includes('metric="webhook_ingress_latency_ms"'), 'prometheus webhook latency metric missing');
console.log('PASS realtime_cache_prometheus_metrics');
console.log('12 realtime cache tester passed: 5/5');
} finally {
shuttingDown = true;
for (const child of children.reverse()) {
child.kill('SIGTERM');
}
}
function spawnService(script, env) {
const child = spawn(process.execPath, [path.join(repoRoot, script)], {
cwd: repoRoot,
env: {
...process.env,
...env
},
stdio: ['ignore', 'pipe', 'pipe']
});
child.stdout.on('data', (chunk) => {
if (process.env.TEST_VERBOSE) process.stdout.write(chunk);
});
child.stderr.on('data', (chunk) => {
if (process.env.TEST_VERBOSE) process.stderr.write(chunk);
});
child.on('exit', (code, signal) => {
if (!shuttingDown && code !== 0) {
console.error(`[test12] ${script} exited code=${code} signal=${signal || ''}`);
}
});
return child;
}
async function getFreePort() {
return new Promise((resolve, reject) => {
const server = net.createServer();
server.once('error', reject);
server.listen(0, '127.0.0.1', () => {
const { port } = server.address();
server.close(() => resolve(port));
});
});
}
async function waitForJson(url, label, attempts = 80) {
for (let i = 0; i < attempts; i += 1) {
try {
const response = await fetch(url);
if (response.ok) return response.json();
} catch (_error) {
// retry
}
await sleep(100);
}
throw new Error(`Timed out waiting for ${label}`);
}
async function postJson(url, body) {
const response = await fetch(url, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body)
});
if (!response.ok) throw new Error(`POST ${url} failed status=${response.status}`);
return response.json();
}
async function fetchText(url) {
const response = await fetch(url);
if (!response.ok) throw new Error(`GET ${url} failed status=${response.status}`);
return response.text();
}
function assert(condition, message) {
if (!condition) throw new Error(message);
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}