-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStreamingRequestDeduplicator.ts
More file actions
124 lines (106 loc) · 3.85 KB
/
StreamingRequestDeduplicator.ts
File metadata and controls
124 lines (106 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
type RequestSignature = string;
type StreamHandler = (chunk: any) => void | Promise<void>;
type OnComplete = () => void;
interface PendingRequest {
handlers: Set<StreamHandler>;
onComplete: Set<OnComplete>;
controller: AbortController;
}
interface DedupeConfig {
ttl?: number;
maxConcurrent?: number;
}
class StreamingRequestDeduplicator {
private pending = new Map<RequestSignature, PendingRequest>();
private completed = new Map<RequestSignature, any>();
private config: Required<DedupeConfig>;
constructor(config: DedupeConfig = {}) {
this.config = {
ttl: config.ttl ?? 30000,
maxConcurrent: config.maxConcurrent ?? 50
};
}
async deduplicate(
signature: RequestSignature,
fn: (signal: AbortSignal) => AsyncGenerator<any>,
handler: StreamHandler,
onComplete?: OnComplete
): Promise<void> {
const existing = this.pending.get(signature);
if (existing) {
existing.handlers.add(handler);
if (onComplete) existing.onComplete.add(onComplete);
return this.attachToInFlight(existing, handler, onComplete);
}
if (this.pending.size >= this.config.maxConcurrent) {
throw new Error(`Max concurrent requests (${this.config.maxConcurrent}) reached`);
}
const controller = new AbortController();
const request: PendingRequest = {
handlers: new Set([handler]),
onComplete: onComplete ? new Set([onComplete]) : new Set(),
controller
};
this.pending.set(signature, request);
try {
const generator = fn(controller.signal);
for await (const chunk of generator) {
for (const h of request.handlers) {
await h(chunk);
}
}
for (const cb of request.onComplete) {
cb();
}
this.pending.delete(signature);
this.completed.set(signature, { status: 'success' });
setTimeout(() => this.completed.delete(signature), this.config.ttl);
} catch (error) {
if (error !== 'AbortError') {
for (const h of request.handlers) {
await h({ error, type: 'error' });
}
}
this.pending.delete(signature);
throw error;
}
}
private attachToInFlight(
request: PendingRequest,
handler: StreamHandler,
onComplete?: OnComplete
): Promise<void> {
return new Promise((resolve, reject) => {
const checkInterval = setInterval(() => {
if (!this.pending.has(request.controller.signal.toString())) {
clearInterval(checkInterval);
resolve();
}
}, 100);
setTimeout(() => {
clearInterval(checkInterval);
reject(new Error('Request timeout'));
}, this.config.ttl);
});
}
abort(signature: RequestSignature): void {
const request = this.pending.get(signature);
if (request) {
request.controller.abort();
this.pending.delete(signature);
}
}
status(): { pending: number; completed: number } {
return {
pending: this.pending.size,
completed: this.completed.size
};
}
}
export { StreamingRequestDeduplicator, type DedupeConfig };
/*
================================================================================
EXPLANATION
StreamingRequestDeduplicator prevents duplicate API calls when multiple handlers subscribe to the same request. Built because LLM streaming APIs charge per request, not per consumer—duplicate calls blow budgets. Use it when users retry, network flakes, or you have multiple UI components requesting the same data stream. The trick: signature-based deduplication with request coalescing—multiple handlers attach to a single in-flight stream. Drop into real-time AI apps, collaborative editors, or any system where concurrent requests to expensive APIs need consolidation. Handles abort signals and tracks in-flight vs completed for observability.
================================================================================
*/