Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/tame-oranges-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/redis-worker": patch
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Adapted the CLI API client to propagate the trigger source via http headers.
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
}

const service = new ReplayTaskRunService();
const newRun = await service.call(taskRun);
const newRun = await service.call(taskRun, { triggerSource: "api" });

if (!newRun) {
return json({ error: "Failed to create new run" }, { status: 400 });
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import {
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";

const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]);

export function sanitizeTriggerSource(value: string | null | undefined): string | undefined {
if (value && ALLOWED_TRIGGER_SOURCES.has(value)) {
return value;
}
return undefined;
}

const ParamsSchema = z.object({
taskId: z.string(),
});
Expand All @@ -36,6 +45,7 @@ export const HeadersSchema = z.object({
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
"x-trigger-request-idempotency-key": z.string().nullish(),
"x-trigger-realtime-streams-version": z.string().nullish(),
"x-trigger-source": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -67,6 +77,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
} = headers;

const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
Expand Down Expand Up @@ -119,6 +130,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
},
engineVersion ?? undefined
);
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";

Expand Down Expand Up @@ -72,6 +72,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -113,6 +114,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader),
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";

Expand Down Expand Up @@ -62,6 +62,7 @@ const { action, loader } = createActionApiRoute(
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -127,6 +128,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader),
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => {
ttlSeconds: submission.value.ttlSeconds,
version: submission.value.version,
prioritySeconds: submission.value.prioritySeconds,
triggerSource: "dashboard",
});

if (!newRun) {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

/**
Expand Down Expand Up @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
batchId: batch.id,
batchIndex: currentIndex,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
},
"V2"
);
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import { Tracer } from "@opentelemetry/api";
import { tryCatch } from "@trigger.dev/core/utils";
import {
RunAnnotations,
TaskRunError,
taskRunErrorEnhancer,
taskRunErrorToString,
Expand Down Expand Up @@ -289,6 +290,17 @@ export class RunEngineTriggerTaskService {

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
const triggerAction = options.triggerAction ?? "trigger";
const parentAnnotations = RunAnnotations.safeParse(parentRun?.annotations).data;
const annotations = {
triggerSource,
triggerAction,
rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource,
rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined,
};

try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand Down Expand Up @@ -369,6 +381,7 @@ export class RunEngineTriggerTaskService {
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
annotations,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() {
batchIndex: itemIndex,
realtimeStreamsVersion: meta.realtimeStreamsVersion,
planType: meta.planType,
triggerSource: meta.parentRunId ? "sdk" : "api",
triggerAction: "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function createScheduleEngine() {
scheduleInstanceId,
queueTimestamp: exactScheduleTime,
overrideCreatedAt: exactScheduleTime,
triggerSource: "schedule",
triggerAction: "trigger",
}
);

Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

type RunItemData = {
Expand Down Expand Up @@ -853,6 +855,8 @@ export class BatchTriggerV3Service extends BaseService {
skipChecks: true,
runFriendlyId: task.runId,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
}
);

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService {
const [error, result] = await tryCatch(
replayService.call(run, {
bulkActionId: bulkActionId,
triggerSource: "dashboard",
})
);
if (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService {
switch (item.group.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });

await this._prisma.bulkActionItem.update({
where: { id: item.id },
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type OverrideOptions = {
payload?: string;
metadata?: unknown;
bulkActionId?: string;
triggerSource?: string;
} & RunOptionsData;

export class ReplayTaskRunService extends BaseService {
Expand Down Expand Up @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService {
realtimeStreamsVersion: determineRealtimeStreamsVersion(
existingTaskRun.realtimeStreamsVersion
),
triggerSource: overrideOptions.triggerSource ?? "api",
triggerAction: "replay",
}
);

Expand Down
51 changes: 29 additions & 22 deletions apps/webapp/app/v3/services/testTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService {

switch (triggerSource) {
case "STANDARD": {
const result = await triggerTaskService.call(data.taskIdentifier, environment, {
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
const result = await triggerTaskService.call(
data.taskIdentifier,
environment,
{
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds
? new Date(Date.now() + data.delaySeconds * 1000)
: undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
},
},
});
{ triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
}
Expand Down Expand Up @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService {
priority: data.prioritySeconds,
},
},
{ customIcon: "scheduled" }
{ customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = {
replayedFromTaskRunFriendlyId?: string;
planType?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

export class OutOfEntitlementError extends Error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "annotations" JSONB;
3 changes: 3 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ model TaskRun {
metadataType String @default("application/json")
metadataVersion Int @default(1)

/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
annotations Json?

/// Run output
output String?
outputType String @default("application/json")
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ export class RunEngine {
planType,
realtimeStreamsVersion,
debounce,
annotations,
onDebounced,
}: TriggerParams,
tx?: PrismaClientOrTransaction
Expand Down Expand Up @@ -668,6 +669,7 @@ export class RunEngine {
createdAt: new Date(),
}
: undefined,
annotations,
executionSnapshots: {
create: {
engine: "V2",
Expand Down
Loading
Loading