Skip to content

Commit 3bc237d

Browse files
authored
feat: live bytes/sec progress for in-flight requests in Logs view (#410)
## Summary Adds real-time throughput visibility for in-flight streaming requests in the Request Logs page. Previously, once a request was dispatched to a provider, the Logs UI showed a static pending row with no indication of whether bytes were actually flowing. This change surfaces live byte counts and throughput from the existing `StallInspector` ring buffer via SSE, updated once per second. ## How it works **Backend:** - `StallInspector.getStats()` — new read-only method that snapshots current state (bytes received, bytes/sec via sliding window) without touching any stream logic. Throughput is computed from the ring buffer regardless of stall enforcement state, so it works even during the grace period. - `UsageStorageService` — new in-flight registry (`registerInFlight` / `deregisterInFlight` / `getProgressUpdates`). Keyed by `requestId`, stores a reference to the inspector and the owning API key for scoping. - `response-handler.ts` — registers the inspector when it enters the pipeline; deregisters in `cleanupDisconnectWiring` (fires on both `end` and `error`), so the registry never leaks. - `/v0/management/events` SSE endpoint — adds a 1s `setInterval` per connected client that calls `getProgressUpdates()`, filters by the client's scoped API key, and emits `progress` events. Fire-and-forget; write errors are swallowed. Interval is cleared on connection close alongside the existing listeners. **Frontend:** - New `progress` SSE event handler updates a `progressMapRef` (`Map<requestId, ProgressUpdate>`) and increments a render-tick counter to trigger re-renders without mutating the `logs` array. - Progress entries are cleared when a `completed` event arrives for the same request. - **Desktop Perf column** — for pending rows with live data: shows bytes received + KB/s instead of the empty Duration/TTFT/TPS fields. - **Mobile Latency card** — same conditional rendering. - New `formatBytes()` helper in `format.ts`. ## What is not changed - Stall detection/abort logic is untouched — `getStats()` is purely observational. - No new DB writes or queries; all data comes from the in-memory ring buffer. - No new API endpoints. - Only chat/messages/responses/gemini routes have a `StallInspector` in the pipeline; other request types (embeddings, images, speech, transcriptions) simply have no registry entry and are skipped silently.
2 parents ba47fd2 + 267eab3 commit 3bc237d

7 files changed

Lines changed: 233 additions & 25 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ When a provider fails, Plexus removes it from rotation using exponential backoff
204204
- Client disconnects now cancel the upstream provider request, reducing wasted tokens/quota on abandoned streams.
205205
- Global and per-provider upstream timeouts cut off requests that run too long.
206206
- Optional stall detection can fail over slow-to-start providers before bytes reach the client, and can abort streams that become too slow mid-flight.
207+
- The Request Logs page shows **live bytes received and throughput (KB/s)** for in-flight streaming requests, updated every second via SSE, so you can see whether a provider is actively responding before it completes.
207208

208209
→ See [Configuration: Request Timeouts](docs/CONFIGURATION.md#request-timeouts) and [Configuration: Stall Detection](docs/CONFIGURATION.md#stall-detection)
209210

packages/backend/src/routes/management/usage.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,29 @@ export async function registerUsageRoutes(
449449
// Also listen for 'created' for backward compatibility
450450
usageStorage.on('created', completedListener);
451451

452+
// Periodic progress updates for in-flight requests (every 1s, fire-and-forget)
453+
const progressInterval = setInterval(() => {
454+
if (reply.raw.destroyed) return;
455+
const updates = usageStorage.getProgressUpdates();
456+
for (const update of updates) {
457+
if (scopeKey && update.apiKey !== scopeKey) continue;
458+
try {
459+
reply.raw.write(
460+
encode({
461+
data: JSON.stringify(update),
462+
event: 'progress',
463+
id: String(Date.now()),
464+
})
465+
);
466+
} catch {
467+
// Fire-and-forget: ignore write errors
468+
}
469+
}
470+
}, 1000);
471+
progressInterval.unref?.();
472+
452473
request.raw.on('close', () => {
474+
clearInterval(progressInterval);
453475
usageStorage.off('started', startedListener);
454476
usageStorage.off('updated', updatedListener);
455477
usageStorage.off('completed', completedListener);

packages/backend/src/services/inspectors/stall-inspector.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,46 @@ export class StallInspector extends PassThrough {
257257
}
258258
}
259259

260+
// ─── Public stats ─────────────────────────────────────────────────
261+
262+
/**
263+
* Return a snapshot of the current throughput state for live progress reporting.
264+
* Safe to call from any goroutine — reads only primitive fields.
265+
*/
266+
getStats(): {
267+
state: 'DISPATCHED' | 'GRACE_PERIOD' | 'MONITORING' | 'THROUGHPUT_STALLED';
268+
bytesReceived: number;
269+
bytesPerSec: number | null;
270+
elapsedMs: number;
271+
} {
272+
const now = Date.now();
273+
let bytesPerSec: number | null = null;
274+
275+
if (this.samples.length >= 2) {
276+
const windowStart = now - this.config.windowMs;
277+
// Find oldest sample within the window
278+
let oldest = this.samples[0]!;
279+
for (let i = 1; i < this.samples.length; i++) {
280+
if (this.samples[i]!.timestamp >= windowStart) {
281+
oldest = this.samples[i - 1]!;
282+
break;
283+
}
284+
}
285+
const bytesInWindow = this.totalBytes - oldest.cumulativeBytes;
286+
const timeSpanMs = now - oldest.timestamp;
287+
if (timeSpanMs > 0) {
288+
bytesPerSec = (bytesInWindow / timeSpanMs) * 1000;
289+
}
290+
}
291+
292+
return {
293+
state: this.state as 'DISPATCHED' | 'GRACE_PERIOD' | 'MONITORING' | 'THROUGHPUT_STALLED',
294+
bytesReceived: this.totalBytes,
295+
bytesPerSec,
296+
elapsedMs: now - this.startTime,
297+
};
298+
}
299+
260300
// ─── Cleanup ──────────────────────────────────────────────────────
261301

262302
private cleanup(): void {

packages/backend/src/services/response-handler.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ export async function handleResponse(
197197
const stallInspector = stallDetectionResult?.stallInspector ?? null;
198198
if (stallInspector) {
199199
stallInspector.setRequestId(usageRecord.requestId!);
200+
usageStorage.registerInFlight(
201+
usageRecord.requestId!,
202+
stallInspector,
203+
(usageRecord.apiKey as string | null) ?? null
204+
);
200205
}
201206

202207
// Pipeline: Source -> StallInspector (if active) -> Usage -> Client
@@ -370,6 +375,9 @@ export async function handleResponse(
370375
clearInterval(disconnectPoll);
371376
disconnectPoll = null;
372377
}
378+
if (stallInspector) {
379+
usageStorage.deregisterInFlight(usageRecord.requestId!);
380+
}
373381
};
374382

375383
pipeline.once('end', cleanupDisconnectWiring);
@@ -391,7 +399,7 @@ export async function handleResponse(
391399
) {
392400
onDisconnect(isTimeout ? 'pipeline.error.timeout' : 'pipeline.error.' + code);
393401
}
394-
cleanupDisconnectWiring();
402+
cleanupDisconnectWiring(); // also deregisters stallInspector
395403
// Restore debug mode on error
396404
if (shouldEstimateTokens && !wasDebugEnabled) {
397405
debugManager.setEnabled(false);

packages/backend/src/services/usage-storage.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ import { getCurrentKeyName } from './request-context';
99
import { estimateKwhUsed } from './inference-energy';
1010
import { resolveModelParams, DEFAULT_GPU_PARAMS } from '@plexus/shared';
1111
import type { ModelArchitecture, GpuParams } from '@plexus/shared';
12+
import type { StallInspector } from './inspectors/stall-inspector';
13+
14+
export interface ProgressUpdate {
15+
requestId: string;
16+
apiKey: string | null;
17+
bytesReceived: number;
18+
bytesPerSec: number | null;
19+
state: 'DISPATCHED' | 'GRACE_PERIOD' | 'MONITORING' | 'THROUGHPUT_STALLED';
20+
elapsedMs: number;
21+
}
1222

1323
// ModelArchitecture is now imported from @plexus/shared
1424

@@ -54,6 +64,31 @@ export class UsageStorageService extends EventEmitter {
5464
private schema: any = null;
5565
private readonly defaultPerformanceRetentionLimit = 100;
5666
private telemetryQueue: Promise<void> = Promise.resolve();
67+
private inFlightRegistry = new Map<
68+
string,
69+
{ inspector: StallInspector; apiKey: string | null }
70+
>();
71+
72+
registerInFlight(requestId: string, inspector: StallInspector, apiKey: string | null): void {
73+
this.inFlightRegistry.set(requestId, { inspector, apiKey });
74+
}
75+
76+
deregisterInFlight(requestId: string): void {
77+
this.inFlightRegistry.delete(requestId);
78+
}
79+
80+
getProgressUpdates(): ProgressUpdate[] {
81+
const updates: ProgressUpdate[] = [];
82+
for (const [requestId, { inspector, apiKey }] of this.inFlightRegistry) {
83+
try {
84+
const stats = inspector.getStats();
85+
updates.push({ requestId, apiKey, ...stats });
86+
} catch {
87+
// Inspector may have been destroyed; skip it
88+
}
89+
}
90+
return updates;
91+
}
5792

5893
constructor(connectionString?: string) {
5994
super();

packages/frontend/src/lib/format.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ export function formatMs(ms: number): string {
109109
return `${(ms / 1000).toFixed(1)}s`;
110110
}
111111

112+
/**
113+
* Format byte counts with B/KB/MB suffixes (e.g., 1536 -> "1.5 KB")
114+
*/
115+
export function formatBytes(bytes: number): string {
116+
if (bytes < 1024) return `${Math.round(bytes)} B`;
117+
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
118+
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
119+
}
120+
112121
/**
113122
* Format tokens per second
114123
*/

packages/frontend/src/pages/Logs.tsx

Lines changed: 117 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from '../lib/api';
1717
import {
1818
KWH_PER_SLICE,
19+
formatBytes,
1920
formatCost,
2021
formatEnergy,
2122
formatMs,
@@ -154,6 +155,19 @@ export const Logs = () => {
154155
filtersRef.current = filters;
155156
}, [filters]);
156157

158+
interface ProgressUpdate {
159+
requestId: string;
160+
bytesReceived: number;
161+
bytesPerSec: number | null;
162+
state: 'DISPATCHED' | 'GRACE_PERIOD' | 'MONITORING' | 'THROUGHPUT_STALLED';
163+
elapsedMs: number;
164+
}
165+
166+
const progressMapRef = useRef<Map<string, ProgressUpdate>>(new Map());
167+
// progressTick is incremented to trigger re-renders when progress data changes.
168+
// The value itself is intentionally unused; only the setter is called.
169+
const [, setProgressTick] = useState(0);
170+
157171
const loadLogs = async () => {
158172
setLoading(true);
159173
try {
@@ -270,6 +284,17 @@ export const Logs = () => {
270284
}
271285
}
272286

287+
// Handle progress updates for in-flight requests
288+
if (eventType === 'progress' && eventData) {
289+
try {
290+
const update: ProgressUpdate = JSON.parse(eventData);
291+
progressMapRef.current.set(update.requestId, update);
292+
setProgressTick((t) => t + 1);
293+
} catch {
294+
// ignore malformed progress events
295+
}
296+
}
297+
273298
// Handle different event types: started, updated, completed
274299
if (
275300
(eventType === 'started' || eventType === 'updated' || eventType === 'completed') &&
@@ -312,6 +337,10 @@ export const Logs = () => {
312337
}
313338

314339
if (matches) {
340+
// If a completed event arrives, clear any stale progress entry
341+
if (eventType === 'completed') {
342+
progressMapRef.current.delete(newLog.requestId);
343+
}
315344
setLogs((prev) => {
316345
const existingIndex = prev.findIndex((l) => l.requestId === newLog.requestId);
317346
if (existingIndex >= 0) {
@@ -610,7 +639,39 @@ export const Logs = () => {
610639
<div className="text-[10px] uppercase tracking-wider text-text-muted">
611640
Latency
612641
</div>
613-
<div className="text-text">{formatMs(log.durationMs)}</div>
642+
<div className="text-text">
643+
{(() => {
644+
const progress =
645+
log.responseStatus === 'pending'
646+
? progressMapRef.current.get(log.requestId)
647+
: undefined;
648+
if (progress) {
649+
return (
650+
<div
651+
style={{ display: 'flex', flexDirection: 'column', gap: '2px' }}
652+
>
653+
<div
654+
style={{ display: 'flex', alignItems: 'center', gap: '4px' }}
655+
>
656+
<CloudDownload size={11} className="text-yellow-400" />
657+
<span>{formatBytes(progress.bytesReceived)}</span>
658+
</div>
659+
{progress.bytesPerSec != null && (
660+
<span
661+
style={{
662+
color: 'var(--color-text-secondary)',
663+
fontSize: '0.85em',
664+
}}
665+
>
666+
{formatBytes(progress.bytesPerSec)}/s
667+
</span>
668+
)}
669+
</div>
670+
);
671+
}
672+
return formatMs(log.durationMs);
673+
})()}
674+
</div>
614675
</div>
615676
<div className="rounded-md bg-bg-subtle p-2">
616677
<div className="text-[10px] uppercase tracking-wider text-text-muted">
@@ -1153,29 +1214,61 @@ export const Logs = () => {
11531214
)}
11541215
</td>
11551216
<td className="px-2 py-1.5 text-left border-b border-border-glass text-text align-middle whitespace-nowrap">
1156-
<div style={{ display: 'flex', flexDirection: 'column' }}>
1157-
<span>Duration: {formatMs(log.durationMs)}</span>
1158-
<span
1159-
style={{
1160-
color: 'var(--color-text-secondary)',
1161-
fontSize: '0.85em',
1162-
whiteSpace: 'nowrap',
1163-
}}
1164-
>
1165-
{log.ttftMs && log.ttftMs > 0 ? `TTFT: ${formatMs(log.ttftMs)}` : ''}
1166-
</span>
1167-
<span
1168-
style={{
1169-
color: 'var(--color-text-secondary)',
1170-
fontSize: '0.85em',
1171-
whiteSpace: 'nowrap',
1172-
}}
1173-
>
1174-
{log.tokensPerSec && log.tokensPerSec > 0
1175-
? `TPS: ${formatTPS(log.tokensPerSec)}`
1176-
: ''}
1177-
</span>
1178-
</div>
1217+
{(() => {
1218+
const progress =
1219+
log.responseStatus === 'pending'
1220+
? progressMapRef.current.get(log.requestId)
1221+
: undefined;
1222+
if (progress) {
1223+
return (
1224+
<div style={{ display: 'flex', flexDirection: 'column', gap: '2px' }}>
1225+
<div style={{ display: 'flex', alignItems: 'center', gap: '4px' }}>
1226+
<CloudDownload size={12} className="text-yellow-400" />
1227+
<span style={{ fontSize: '0.9em' }}>
1228+
{formatBytes(progress.bytesReceived)}
1229+
</span>
1230+
</div>
1231+
{progress.bytesPerSec != null && (
1232+
<span
1233+
style={{
1234+
color: 'var(--color-text-secondary)',
1235+
fontSize: '0.85em',
1236+
}}
1237+
>
1238+
{formatBytes(progress.bytesPerSec)}/s
1239+
</span>
1240+
)}
1241+
</div>
1242+
);
1243+
}
1244+
return (
1245+
<div style={{ display: 'flex', flexDirection: 'column' }}>
1246+
<span>Duration: {formatMs(log.durationMs)}</span>
1247+
<span
1248+
style={{
1249+
color: 'var(--color-text-secondary)',
1250+
fontSize: '0.85em',
1251+
whiteSpace: 'nowrap',
1252+
}}
1253+
>
1254+
{log.ttftMs && log.ttftMs > 0
1255+
? `TTFT: ${formatMs(log.ttftMs)}`
1256+
: ''}
1257+
</span>
1258+
<span
1259+
style={{
1260+
color: 'var(--color-text-secondary)',
1261+
fontSize: '0.85em',
1262+
whiteSpace: 'nowrap',
1263+
}}
1264+
>
1265+
{log.tokensPerSec && log.tokensPerSec > 0
1266+
? `TPS: ${formatTPS(log.tokensPerSec)}`
1267+
: ''}
1268+
</span>
1269+
</div>
1270+
);
1271+
})()}
11791272
</td>
11801273
<td
11811274
className="px-2 py-1.5 text-center border-b border-border-glass text-text align-middle"

0 commit comments

Comments
 (0)