-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtracePubSub.server.ts
More file actions
94 lines (76 loc) · 2.6 KB
/
tracePubSub.server.ts
File metadata and controls
94 lines (76 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
import { EventEmitter } from "node:events";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { Gauge } from "prom-client";
import { metricsRegister } from "~/metrics.server";
export type TracePubSubOptions = {
redis: RedisWithClusterOptions;
};
export class TracePubSub {
private _publisher: RedisClient;
private _subscriberCount = 0;
constructor(private _options: TracePubSubOptions) {
this._publisher = createRedisClient("trigger:eventRepoPublisher", this._options.redis);
}
get subscriberCount() {
return this._subscriberCount;
}
async publish(traceIds: string[]) {
if (traceIds.length === 0) return;
const uniqueTraces = new Set(traceIds.map((e) => `events:${e}`));
await Promise.allSettled(
Array.from(uniqueTraces).map((traceId) =>
this._publisher.publish(traceId, new Date().toISOString())
)
);
}
async subscribeToTrace(traceId: string) {
const redis = createRedisClient("trigger:eventRepoSubscriber", this._options.redis);
const channel = `events:${traceId}`;
// Subscribe to the channel.
await redis.subscribe(channel);
// Increment the subscriber count.
this._subscriberCount++;
const eventEmitter = new EventEmitter();
// Define the message handler - store reference so we can remove it later.
const messageHandler = (_: string, message: string) => {
eventEmitter.emit("message", message);
};
redis.on("message", messageHandler);
// Return a function that can be used to unsubscribe.
const unsubscribe = async () => {
// Remove the message listener before closing the connection
redis.off("message", messageHandler);
await redis.unsubscribe(channel);
await redis.quit();
this._subscriberCount--;
};
return {
unsubscribe,
eventEmitter,
};
}
}
export const tracePubSub = singleton("tracePubSub", initializeTracePubSub);
function initializeTracePubSub() {
const pubSub = new TracePubSub({
redis: {
port: env.PUBSUB_REDIS_PORT,
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
},
});
new Gauge({
name: "trace_pub_sub_subscribers",
help: "Number of trace pub sub subscribers",
collect() {
this.set(pubSub.subscriberCount);
},
registers: [metricsRegister],
});
return pubSub;
}