Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bright-keys-shine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Expose user-provided idempotency key and scope in task context. `ctx.run.idempotencyKey` now returns the original key passed to `idempotencyKeys.create()` instead of the hash, and `ctx.run.idempotencyKeyScope` shows the scope ("run", "attempt", or "global").
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
logger,
} from "@trigger.dev/core/v3";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
Expand Down Expand Up @@ -38,6 +39,7 @@ const commonRunSelect = {
baseCostInCents: true,
usageDurationMs: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
isTest: true,
depth: true,
scheduleId: true,
Expand Down Expand Up @@ -442,7 +444,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
return {
id: run.friendlyId,
taskIdentifier: run.taskIdentifier,
idempotencyKey: run.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(run),
version: run.lockedToVersion?.version,
status: ApiRetrieveRunPresenter.apiStatusFromRunStatus(run.status, apiVersion),
createdAt: run.createdAt,
Expand Down
35 changes: 33 additions & 2 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import {
type V3TaskRunContext,
} from "@trigger.dev/core/v3";
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
import {
getIdempotencyKeyScope,
getUserProvidedIdempotencyKey,
} from "@trigger.dev/core/v3/serverOnly";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { logger } from "~/services/logger.server";
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
Expand Down Expand Up @@ -229,8 +233,10 @@ export class SpanPresenter extends BasePresenter {
isTest: run.isTest,
replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId,
environmentId: run.runtimeEnvironment.id,
idempotencyKey: run.idempotencyKey,
idempotencyKey: getUserProvidedIdempotencyKey(run),
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
idempotencyKeyScope: getIdempotencyKeyScope(run),
idempotencyKeyStatus: this.getIdempotencyKeyStatus(run),
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
queue: {
Expand Down Expand Up @@ -276,6 +282,30 @@ export class SpanPresenter extends BasePresenter {
};
}

private getIdempotencyKeyStatus(run: {
idempotencyKey: string | null;
idempotencyKeyExpiresAt: Date | null;
idempotencyKeyOptions: unknown;
}): "active" | "inactive" | "expired" | undefined {
// No idempotency configured if no scope exists
const scope = getIdempotencyKeyScope(run);
if (!scope) {
return undefined;
}

// Check if expired first (takes precedence)
if (run.idempotencyKeyExpiresAt && run.idempotencyKeyExpiresAt < new Date()) {
return "expired";
}

// Check if reset (hash is null but options exist)
if (run.idempotencyKey === null) {
return "inactive";
}

return "active";
}

async resolveSchedule(scheduleId?: string) {
if (!scheduleId) {
return;
Expand Down Expand Up @@ -355,6 +385,7 @@ export class SpanPresenter extends BasePresenter {
//idempotency
idempotencyKey: true,
idempotencyKeyExpiresAt: true,
idempotencyKeyOptions: true,
//debounce
debounce: true,
//delayed
Expand Down Expand Up @@ -644,7 +675,7 @@ export class SpanPresenter extends BasePresenter {
createdAt: run.createdAt,
tags: run.runTags,
isTest: run.isTest,
idempotencyKey: run.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
durationMs: run.usageDurationMs,
costInCents: run.costInCents,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,16 @@
import { parse } from "@conform-to/zod";
import { type ActionFunction, json } from "@remix-run/node";
import { z } from "zod";
import { prisma } from "~/db.server";
import { jsonWithErrorMessage } from "~/models/message.server";
import { jsonWithErrorMessage, jsonWithSuccessMessage } from "~/models/message.server";
import { logger } from "~/services/logger.server";
import { requireUserId } from "~/services/session.server";
import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server";
import { v3RunParamsSchema } from "~/utils/pathBuilder";

export const resetIdempotencyKeySchema = z.object({
taskIdentifier: z.string().min(1, "Task identifier is required"),
});

export const action: ActionFunction = async ({ request, params }) => {
const userId = await requireUserId(request);
const { projectParam, organizationSlug, envParam, runParam } =
v3RunParamsSchema.parse(params);

const formData = await request.formData();
const submission = parse(formData, { schema: resetIdempotencyKeySchema });

if (!submission.value) {
return json(submission);
}
const { projectParam, organizationSlug, envParam, runParam } = v3RunParamsSchema.parse(params);

try {
const { taskIdentifier } = submission.value;

const taskRun = await prisma.taskRun.findFirst({
where: {
friendlyId: runParam,
Expand Down Expand Up @@ -54,21 +38,11 @@ export const action: ActionFunction = async ({ request, params }) => {
});

if (!taskRun) {
submission.error = { runParam: ["Run not found"] };
return json(submission);
return jsonWithErrorMessage({}, request, "Run not found");
}

if (!taskRun.idempotencyKey) {
return jsonWithErrorMessage(
submission,
request,
"This run does not have an idempotency key"
);
}

if (taskRun.taskIdentifier !== taskIdentifier) {
submission.error = { taskIdentifier: ["Task identifier does not match this run"] };
return json(submission);
return jsonWithErrorMessage({}, request, "This run does not have an idempotency key");
}

const environment = await prisma.runtimeEnvironment.findUnique({
Expand All @@ -85,22 +59,18 @@ export const action: ActionFunction = async ({ request, params }) => {
});

if (!environment) {
return jsonWithErrorMessage(
submission,
request,
"Environment not found"
);
return jsonWithErrorMessage({}, request, "Environment not found");
}

const service = new ResetIdempotencyKeyService();

await service.call(taskRun.idempotencyKey, taskIdentifier, {
await service.call(taskRun.idempotencyKey, taskRun.taskIdentifier, {
...environment,
organizationId: environment.project.organizationId,
organization: environment.project.organization,
});

return json({ success: true });
return jsonWithSuccessMessage({}, request, "Idempotency key reset successfully");
} catch (error) {
if (error instanceof Error) {
logger.error("Failed to reset idempotency key", {
Expand All @@ -110,15 +80,11 @@ export const action: ActionFunction = async ({ request, params }) => {
stack: error.stack,
},
});
return jsonWithErrorMessage(
submission,
request,
`Failed to reset idempotency key: ${error.message}`
);
return jsonWithErrorMessage({}, request, `Failed to reset idempotency key: ${error.message}`);
} else {
logger.error("Failed to reset idempotency key", { error });
return jsonWithErrorMessage(
submission,
{},
request,
`Failed to reset idempotency key: ${JSON.stringify(error)}`
);
Expand Down
Loading