Skip to content

Commit ce728f4

Browse files
committed
Add run-scoped PAT renewal for chat transport
1 parent 21fa332 commit ce728f4

File tree

11 files changed

+950
-365
lines changed

11 files changed

+950
-365
lines changed

.changeset/chat-run-pat-renewal.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Add run-scoped PAT renewal for chat transport (`renewRunAccessToken`), fail fast on 401/403 for SSE without retry backoff, and export `isTriggerRealtimeAuthError` for auth-error detection.

packages/core/src/v3/apiClient/errors.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ export class PermissionDeniedError extends ApiError {
128128
override readonly status: 403 = 403;
129129
}
130130

131+
/**
132+
* True when `error` is a 401/403 from the Trigger API (e.g. expired run-scoped PAT on realtime streams).
133+
* Uses structural checks so it works even if multiple copies of `@trigger.dev/core` are bundled (subclass `instanceof` can fail).
134+
*/
135+
export function isTriggerRealtimeAuthError(error: unknown): boolean {
136+
if (error === null || typeof error !== "object") {
137+
return false;
138+
}
139+
const e = error as ApiError;
140+
return e.name === "TriggerApiError" && (e.status === 401 || e.status === 403);
141+
}
142+
131143
export class NotFoundError extends ApiError {
132144
override readonly status: 404 = 404;
133145
}

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
IOPacket,
1515
parsePacket,
1616
} from "../utils/ioSerialization.js";
17-
import { ApiError } from "./errors.js";
17+
import { ApiError, isTriggerRealtimeAuthError } from "./errors.js";
1818
import { ApiClient } from "./index.js";
1919
import { zodShapeStream } from "./stream.js";
2020

@@ -344,6 +344,12 @@ export class SSEStreamSubscription implements StreamSubscription {
344344
return;
345345
}
346346

347+
if (isTriggerRealtimeAuthError(error)) {
348+
this.options.onError?.(error as Error);
349+
controller.error(error as Error);
350+
return;
351+
}
352+
347353
// Retry on error
348354
await this.retryConnection(controller, error as Error);
349355
}

packages/core/test/runStream.test.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import { describe, expect, it } from "vitest";
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
22
import {
33
RunSubscription,
44
SSEStreamPart,
5+
SSEStreamSubscription,
56
StreamSubscription,
67
StreamSubscriptionFactory,
78
} from "../src/v3/apiClient/runStream.js";
@@ -470,6 +471,47 @@ describe("RunSubscription", () => {
470471
});
471472
});
472473

474+
describe("SSEStreamSubscription", () => {
475+
let originalFetch: typeof global.fetch;
476+
477+
beforeEach(() => {
478+
originalFetch = global.fetch;
479+
});
480+
481+
afterEach(() => {
482+
global.fetch = originalFetch;
483+
vi.restoreAllMocks();
484+
});
485+
486+
it("does not retry the initial fetch on 401", async () => {
487+
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 401 }));
488+
global.fetch = fetchMock;
489+
490+
const sub = new SSEStreamSubscription("https://api.test/realtime/v1/streams/run_x/chat", {
491+
headers: { Authorization: "Bearer expired" },
492+
});
493+
494+
const stream = await sub.subscribe();
495+
const reader = stream.getReader();
496+
await expect(reader.read()).rejects.toMatchObject({ status: 401 });
497+
expect(fetchMock).toHaveBeenCalledTimes(1);
498+
});
499+
500+
it("does not retry the initial fetch on 403", async () => {
501+
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 403 }));
502+
global.fetch = fetchMock;
503+
504+
const sub = new SSEStreamSubscription("https://api.test/realtime/v1/streams/run_x/chat", {
505+
headers: { Authorization: "Bearer denied" },
506+
});
507+
508+
const stream = await sub.subscribe();
509+
const reader = stream.getReader();
510+
await expect(reader.read()).rejects.toMatchObject({ status: 403 });
511+
expect(fetchMock).toHaveBeenCalledTimes(1);
512+
});
513+
});
514+
473515
export async function convertAsyncIterableToArray<T>(iterable: AsyncIterable<T>): Promise<T[]> {
474516
const result: T[] = [];
475517
for await (const item of iterable) {

packages/trigger-sdk/src/v3/chat-react.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* function Chat() {
1616
* const transport = useTriggerChatTransport<typeof chat>({
1717
* task: "ai-chat",
18-
* accessToken: () => fetchToken(),
18+
* accessToken: ({ chatId }) => fetchToken(chatId),
1919
* });
2020
*
2121
* const { messages, sendMessage } = useChat({ transport });
@@ -74,7 +74,7 @@ export type { InferChatUIMessage };
7474
* function Chat() {
7575
* const transport = useTriggerChatTransport<typeof chat>({
7676
* task: "ai-chat",
77-
* accessToken: () => fetchToken(),
77+
* accessToken: ({ chatId }) => fetchToken(chatId),
7878
* });
7979
*
8080
* const { messages, sendMessage } = useChat({ transport });
@@ -90,11 +90,15 @@ export function useTriggerChatTransport<TTask extends AnyTask = AnyTask>(
9090
}
9191

9292
// Keep onSessionChange up to date without recreating the transport
93-
const { onSessionChange } = options;
93+
const { onSessionChange, renewRunAccessToken } = options;
9494
useEffect(() => {
9595
ref.current?.setOnSessionChange(onSessionChange);
9696
}, [onSessionChange]);
9797

98+
useEffect(() => {
99+
ref.current?.setRenewRunAccessToken(renewRunAccessToken);
100+
}, [renewRunAccessToken]);
101+
98102
return ref.current;
99103
}
100104

0 commit comments

Comments
 (0)