-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathrunsBackfiller.server.ts
More file actions
92 lines (81 loc) · 2.51 KB
/
runsBackfiller.server.ts
File metadata and controls
92 lines (81 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { Tracer } from "@opentelemetry/api";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import { startSpan } from "~/v3/tracing.server";
import { FINAL_RUN_STATUSES } from "../v3/taskStatus";
import { Logger } from "@trigger.dev/core/logger";
export class RunsBackfillerService {
private readonly prisma: PrismaClientOrTransaction;
private readonly runsReplicationInstance: RunsReplicationService;
private readonly tracer: Tracer;
private readonly logger: Logger;
constructor(opts: {
prisma: PrismaClientOrTransaction;
runsReplicationInstance: RunsReplicationService;
tracer: Tracer;
logLevel?: "log" | "error" | "warn" | "info" | "debug";
}) {
this.prisma = opts.prisma;
this.runsReplicationInstance = opts.runsReplicationInstance;
this.tracer = opts.tracer;
this.logger = new Logger("RunsBackfillerService", opts.logLevel ?? "debug");
}
public async call({
from,
to,
cursor,
batchSize,
}: {
from: Date;
to: Date;
cursor?: string;
batchSize?: number;
}): Promise<string | undefined> {
return await startSpan(this.tracer, "RunsBackfillerService.call()", async (span) => {
span.setAttribute("from", from.toISOString());
span.setAttribute("to", to.toISOString());
span.setAttribute("cursor", cursor ?? "");
span.setAttribute("batchSize", batchSize ?? 0);
const runs = await this.prisma.taskRun.findMany({
where: {
createdAt: {
gte: from,
lte: to,
},
status: {
in: FINAL_RUN_STATUSES,
},
...(cursor ? { id: { gt: cursor } } : {}),
},
orderBy: {
id: "asc",
},
take: batchSize,
});
if (runs.length === 0) {
this.logger.info("No runs to backfill", { from, to, cursor });
return;
}
this.logger.info("Backfilling runs", {
from,
to,
cursor,
batchSize,
runCount: runs.length,
firstCreatedAt: runs[0].createdAt,
lastCreatedAt: runs[runs.length - 1].createdAt,
});
await this.runsReplicationInstance.backfill(runs);
const lastRun = runs[runs.length - 1];
this.logger.info("Backfilled runs", {
from,
to,
cursor,
batchSize,
lastRunId: lastRun.id,
});
// Return the last run ID to continue from
return lastRun.id;
});
}
}