-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathpoller.ts
More file actions
114 lines (93 loc) · 3.14 KB
/
Copy pathpoller.ts
File metadata and controls
114 lines (93 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3";
export type RunExecutionSnapshotPollerOptions = {
runFriendlyId: string;
snapshotFriendlyId: string;
httpClient: WorkloadHttpClient;
logger: RunLogger;
snapshotPollIntervalSeconds: number;
handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
};
export class RunExecutionSnapshotPoller {
private runFriendlyId: string;
private snapshotFriendlyId: string;
private enabled: boolean;
private readonly httpClient: WorkloadHttpClient;
private readonly logger: RunLogger;
private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise<void>;
private readonly poller: IntervalService;
constructor(opts: RunExecutionSnapshotPollerOptions) {
this.enabled = false;
this.runFriendlyId = opts.runFriendlyId;
this.snapshotFriendlyId = opts.snapshotFriendlyId;
this.httpClient = opts.httpClient;
this.logger = opts.logger;
this.handleSnapshotChange = opts.handleSnapshotChange;
const intervalMs = opts.snapshotPollIntervalSeconds * 1000;
this.poller = new IntervalService({
onInterval: async () => {
if (!this.enabled) {
this.sendDebugLog("poller disabled, skipping snapshot change handler (pre)");
return;
}
this.sendDebugLog("polling for latest snapshot");
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
if (!response.success) {
this.sendDebugLog("failed to get run execution data", { error: response.error });
return;
}
if (!this.enabled) {
this.sendDebugLog("poller disabled, skipping snapshot change handler (post)");
return;
}
await this.handleSnapshotChange(response.data.execution);
},
intervalMs,
leadingEdge: false,
onError: async (error) => {
this.sendDebugLog("failed to poll for snapshot", {
error: error instanceof Error ? error.message : String(error),
});
},
});
this.sendDebugLog("created");
}
private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) {
this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: `[poller] ${message}`,
properties: {
...properties,
runId: this.runFriendlyId,
snapshotId: this.snapshotFriendlyId,
pollIntervalMs: this.poller.intervalMs,
},
});
}
resetCurrentInterval() {
this.poller.resetCurrentInterval();
}
updateSnapshotId(snapshotFriendlyId: string) {
this.snapshotFriendlyId = snapshotFriendlyId;
}
updateInterval(intervalMs: number) {
this.poller.updateInterval(intervalMs);
}
start() {
if (this.enabled) {
this.sendDebugLog("already started");
return;
}
this.enabled = true;
this.poller.start();
}
stop() {
if (!this.enabled) {
this.sendDebugLog("already stopped");
return;
}
this.enabled = false;
this.poller.stop();
}
}