-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.ts
More file actions
130 lines (111 loc) · 4.71 KB
/
Copy pathserver.ts
File metadata and controls
130 lines (111 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* Multi-namespace example — one server with three namespaces, each using a
* different overflow policy and delivery mode.
*
* Run: npx tsx examples/multi-namespace/server.ts
*
* Imports directly from src/ for dev-run mode (no build step needed).
* In production, import from 'streamfence-js' instead.
*/
import { StreamFenceServerBuilder } from '../../src/StreamFenceServerBuilder.js';
import { NamespaceSpec } from '../../src/NamespaceSpec.js';
import { DeliveryMode } from '../../src/DeliveryMode.js';
import { OverflowAction } from '../../src/OverflowAction.js';
import { Registry } from 'prom-client';
import { PromServerMetrics } from '../../src/PromServerMetrics.js';
import type { ServerEventListener } from '../../src/ServerEventListener.js';
// ── Event listener for observability ─────────────────────────────────────────
const listener: ServerEventListener = {
onServerStarted(event) {
console.log(`Server started on ${event.host}:${event.port}`);
},
onClientConnected(event) {
console.log(`[${event.namespace}] client connected: ${event.clientId} (${event.transport})`);
},
onClientDisconnected(event) {
console.log(`[${event.namespace}] client disconnected: ${event.clientId}`);
},
onSubscribed(event) {
console.log(`[${event.namespace}] ${event.clientId} subscribed to ${event.topic}`);
},
onQueueOverflow(event) {
console.warn(`[${event.namespace}] overflow on ${event.topic}: ${event.reason}`);
},
onPublishRejected(event) {
console.warn(`[${event.namespace}] rejected for ${event.clientId}: ${event.reasonCode} — ${event.reason}`);
},
};
// ── Namespace 1: /prices — fast ticker with DROP_OLDEST ──────────────────────
const pricesSpec = NamespaceSpec.builder('/prices')
.topics(['bid', 'ask'])
.deliveryMode(DeliveryMode.BEST_EFFORT)
.overflowAction(OverflowAction.DROP_OLDEST)
.maxQueuedMessagesPerClient(16)
.build();
// ── Namespace 2: /snapshots — only the latest value matters ──────────────────
const snapshotsSpec = NamespaceSpec.builder('/snapshots')
.topic('portfolio')
.deliveryMode(DeliveryMode.BEST_EFFORT)
.overflowAction(OverflowAction.SNAPSHOT_ONLY)
.maxQueuedMessagesPerClient(1)
.build();
// ── Namespace 3: /alerts — reliable delivery with retries ────────────────────
const alertsSpec = NamespaceSpec.builder('/alerts')
.topic('critical')
.topic('info')
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.overflowAction(OverflowAction.REJECT_NEW)
.maxQueuedMessagesPerClient(64)
.maxRetries(3)
.ackTimeoutMs(2000)
.maxInFlight(4)
.build();
// ── Build and start ──────────────────────────────────────────────────────────
const registry = new Registry();
const server = new StreamFenceServerBuilder()
.port(3000)
.metrics(new PromServerMetrics(registry))
.listener(listener)
.namespace(pricesSpec)
.namespace(snapshotsSpec)
.namespace(alertsSpec)
.buildServer();
await server.start();
console.log('Namespaces:');
console.log(' /prices — BEST_EFFORT, DROP_OLDEST (bid, ask)');
console.log(' /snapshots — BEST_EFFORT, SNAPSHOT_ONLY (portfolio)');
console.log(' /alerts — AT_LEAST_ONCE, REJECT_NEW (critical, info)');
console.log('Press Ctrl+C to stop.\n');
// ── Simulate publishing ──────────────────────────────────────────────────────
let tick = 0;
// Prices: rapid bid/ask updates every 200ms
const priceInterval = setInterval(() => {
const mid = 100 + Math.sin(tick / 10) * 5;
server.publish('/prices', 'bid', { price: (mid - 0.01).toFixed(2), tick });
server.publish('/prices', 'ask', { price: (mid + 0.01).toFixed(2), tick });
tick++;
}, 200);
// Snapshots: portfolio snapshot every 2s
const snapshotInterval = setInterval(() => {
server.publish('/snapshots', 'portfolio', {
totalValue: (50_000 + Math.random() * 1_000).toFixed(2),
positions: 12,
updatedAt: new Date().toISOString(),
});
}, 2000);
// Alerts: occasional critical alert every 10s
const alertInterval = setInterval(() => {
server.publish('/alerts', 'critical', {
level: 'CRITICAL',
message: 'Margin call threshold reached',
ts: Date.now(),
});
}, 10_000);
process.on('SIGINT', async () => {
clearInterval(priceInterval);
clearInterval(snapshotInterval);
clearInterval(alertInterval);
await server.stop();
console.log('\nServer stopped.');
process.exit(0);
});