-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathtraceEvents.server.ts
More file actions
125 lines (119 loc) · 4.11 KB
/
traceEvents.server.ts
File metadata and controls
125 lines (119 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import { EventRepository } from "~/v3/eventRepository.server";
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import { TaskRun } from "@trigger.dev/database";
export class DefaultTraceEventsConcern implements TraceEventConcern {
private readonly eventRepository: EventRepository;
constructor(eventRepository: EventRepository) {
this.eventRepository = eventRepository;
}
async traceRun<T>(
request: TriggerTaskRequest,
callback: (span: TracedEventSpan) => Promise<T>
): Promise<T> {
return await this.eventRepository.traceEvent(
request.taskId,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
parentAsLinkType: request.options?.parentAsLinkType,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
},
style: {
icon: request.options?.customIcon ?? "task",
},
runIsTest: request.body.options?.test ?? false,
batchId: request.options?.batchId
? BatchId.toFriendlyId(request.options.batchId)
: undefined,
idempotencyKey: request.options?.idempotencyKey,
},
incomplete: true,
immediate: true,
startTime: request.options?.overrideCreatedAt
? BigInt(request.options.overrideCreatedAt.getTime()) * BigInt(1000000)
: undefined,
},
async (event, traceContext, traceparent) => {
return await callback({
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
});
}
);
}
async traceIdempotentRun<T>(
request: TriggerTaskRequest,
options: {
existingRun: TaskRun;
idempotencyKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan) => Promise<T>
): Promise<T> {
const { existingRun, idempotencyKey, incomplete, isError } = options;
return await this.eventRepository.traceEvent(
`${request.taskId} (cached)`,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
parentAsLinkType: request.options?.parentAsLinkType,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
},
style: {
icon: "task-cached",
},
runIsTest: request.body.options?.test ?? false,
batchId: request.options?.batchId
? BatchId.toFriendlyId(request.options.batchId)
: undefined,
idempotencyKey,
runId: existingRun.friendlyId,
},
incomplete,
isError,
immediate: true,
},
async (event, traceContext, traceparent) => {
//log a message
await this.eventRepository.recordEvent(
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
{
taskSlug: request.taskId,
environment: request.environment,
attributes: {
runId: existingRun.friendlyId,
},
context: request.options?.traceContext,
parentId: event.spanId,
}
);
return await callback({
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
});
}
);
}
}