Skip to content

Commit 8f5c16a

Browse files
Report send-phase errors through onError callback
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent 6f4b9f0 commit 8f5c16a

File tree

6 files changed

+137
-14
lines changed

6 files changed

+137
-14
lines changed

docs/tasks/streams.mdx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,10 @@ The default payload sent to your task is a rich, typed object that includes:
617617
- `onError` callback to observe non-fatal transport issues
618618
- headers passed through transport can be object, `Headers`, or tuple arrays
619619

620+
`onError` receives phase-aware details (`payloadMapper`, `triggerOptions`, `triggerTask`,
621+
`onTriggeredRun`, `consumeTrackingStream`, `reconnect`) plus `chatId`, optional `runId`,
622+
and the underlying `error`.
623+
620624
```ts
621625
import type { TriggerChatRunState, TriggerChatRunStore } from "@trigger.dev/ai";
622626

packages/ai/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,9 @@ You can optionally provide `onError` to observe non-fatal transport errors
135135

136136
The callback receives:
137137

138-
- `phase`: `"onTriggeredRun" | "consumeTrackingStream" | "reconnect"`
138+
- `phase`: `"payloadMapper" | "triggerOptions" | "triggerTask" | "onTriggeredRun" | "consumeTrackingStream" | "reconnect"`
139139
- `chatId`
140-
- `runId`
140+
- `runId` (may be `undefined` before a run is created)
141141
- `error`
142142

143143
## Reconnect semantics

packages/ai/src/chatTransport.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ describe("TriggerChatTransport", function () {
732732

733733
it("surfaces payload mapper errors and does not trigger runs", async function () {
734734
let triggerCalls = 0;
735+
const errors: TriggerChatTransportError[] = [];
735736

736737
const server = await startServer(function (req, res) {
737738
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
@@ -755,6 +756,9 @@ describe("TriggerChatTransport", function () {
755756
payloadMapper: async function payloadMapper() {
756757
throw new Error("mapper failed");
757758
},
759+
onError: function onError(error) {
760+
errors.push(error);
761+
},
758762
});
759763

760764
await expect(
@@ -768,10 +772,18 @@ describe("TriggerChatTransport", function () {
768772
).rejects.toThrowError("mapper failed");
769773

770774
expect(triggerCalls).toBe(0);
775+
expect(errors).toHaveLength(1);
776+
expect(errors[0]).toMatchObject({
777+
phase: "payloadMapper",
778+
chatId: "chat-mapper-failure",
779+
runId: undefined,
780+
});
781+
expect(errors[0]?.error.message).toBe("mapper failed");
771782
});
772783

773784
it("surfaces trigger options resolver errors and does not trigger runs", async function () {
774785
let triggerCalls = 0;
786+
const errors: TriggerChatTransportError[] = [];
775787

776788
const server = await startServer(function (req, res) {
777789
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
@@ -792,6 +804,9 @@ describe("TriggerChatTransport", function () {
792804
triggerOptions: async function triggerOptions() {
793805
throw new Error("trigger options failed");
794806
},
807+
onError: function onError(error) {
808+
errors.push(error);
809+
},
795810
});
796811

797812
await expect(
@@ -805,6 +820,59 @@ describe("TriggerChatTransport", function () {
805820
).rejects.toThrowError("trigger options failed");
806821

807822
expect(triggerCalls).toBe(0);
823+
expect(errors).toHaveLength(1);
824+
expect(errors[0]).toMatchObject({
825+
phase: "triggerOptions",
826+
chatId: "chat-trigger-failure",
827+
runId: undefined,
828+
});
829+
expect(errors[0]?.error.message).toBe("trigger options failed");
830+
});
831+
832+
it("reports trigger task request failures through onError", async function () {
833+
const errors: TriggerChatTransportError[] = [];
834+
const server = await startServer(function (_req, res) {
835+
res.writeHead(500, {
836+
"content-type": "application/json",
837+
});
838+
res.end(JSON.stringify({ error: "task trigger failed" }));
839+
});
840+
841+
const transport = new TriggerChatTransport({
842+
task: "chat-task",
843+
stream: "chat-stream",
844+
accessToken: "pk_trigger",
845+
baseURL: server.url,
846+
requestOptions: {
847+
retry: {
848+
maxAttempts: 1,
849+
minTimeoutInMs: 1,
850+
maxTimeoutInMs: 1,
851+
factor: 1,
852+
randomize: false,
853+
},
854+
},
855+
onError: function onError(error) {
856+
errors.push(error);
857+
},
858+
});
859+
860+
await expect(
861+
transport.sendMessages({
862+
trigger: "submit-message",
863+
chatId: "chat-trigger-request-failure",
864+
messageId: undefined,
865+
messages: [],
866+
abortSignal: undefined,
867+
})
868+
).rejects.toThrowError("task trigger failed");
869+
870+
expect(errors).toHaveLength(1);
871+
expect(errors[0]).toMatchObject({
872+
phase: "triggerTask",
873+
chatId: "chat-trigger-request-failure",
874+
runId: undefined,
875+
});
808876
});
809877

810878
it("supports creating transport with factory function", async function () {

packages/ai/src/chatTransport.ts

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,47 @@ export class TriggerChatTransport<
161161
options: TriggerChatSendMessagesOptions<UI_MESSAGE>
162162
): Promise<ReadableStream<UIMessageChunk>> {
163163
const transportRequest = createTransportRequest<UI_MESSAGE>(options);
164-
const payload = await this.payloadMapper(transportRequest);
165-
const triggerOptions = await resolveTriggerOptions<UI_MESSAGE>(
166-
this.triggerOptions,
167-
transportRequest
168-
);
169-
const run = await this.triggerTask(payload, triggerOptions);
164+
let payload: PAYLOAD;
165+
try {
166+
payload = await this.payloadMapper(transportRequest);
167+
} catch (error) {
168+
await this.reportError({
169+
phase: "payloadMapper",
170+
chatId: options.chatId,
171+
runId: undefined,
172+
error: normalizeError(error),
173+
});
174+
throw error;
175+
}
176+
177+
let triggerOptions: TriggerOptions | undefined;
178+
try {
179+
triggerOptions = await resolveTriggerOptions<UI_MESSAGE>(
180+
this.triggerOptions,
181+
transportRequest
182+
);
183+
} catch (error) {
184+
await this.reportError({
185+
phase: "triggerOptions",
186+
chatId: options.chatId,
187+
runId: undefined,
188+
error: normalizeError(error),
189+
});
190+
throw error;
191+
}
192+
193+
let run: TriggerTaskResponse;
194+
try {
195+
run = await this.triggerTask(payload, triggerOptions);
196+
} catch (error) {
197+
await this.reportError({
198+
phase: "triggerTask",
199+
chatId: options.chatId,
200+
runId: undefined,
201+
error: normalizeError(error),
202+
});
203+
throw error;
204+
}
170205

171206
const runState: TriggerChatRunState = {
172207
chatId: options.chatId,
@@ -339,9 +374,15 @@ export class TriggerChatTransport<
339374

340375
private async reportError(
341376
event: {
342-
phase: "onTriggeredRun" | "consumeTrackingStream" | "reconnect";
377+
phase:
378+
| "payloadMapper"
379+
| "triggerOptions"
380+
| "triggerTask"
381+
| "onTriggeredRun"
382+
| "consumeTrackingStream"
383+
| "reconnect";
343384
chatId: string;
344-
runId: string;
385+
runId: string | undefined;
345386
error: Error;
346387
}
347388
) {

packages/ai/src/chatTransport.types.test.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,16 @@ it("exposes strongly typed onError callback payloads", function () {
108108

109109
function createTypedOnErrorCallback(): TriggerChatOnError {
110110
async function onError(error: TriggerChatTransportError) {
111-
expectTypeOf(error.phase).toEqualTypeOf<"onTriggeredRun" | "consumeTrackingStream" | "reconnect">();
111+
expectTypeOf(error.phase).toEqualTypeOf<
112+
| "payloadMapper"
113+
| "triggerOptions"
114+
| "triggerTask"
115+
| "onTriggeredRun"
116+
| "consumeTrackingStream"
117+
| "reconnect"
118+
>();
112119
expectTypeOf(error.chatId).toEqualTypeOf<string>();
113-
expectTypeOf(error.runId).toEqualTypeOf<string>();
120+
expectTypeOf(error.runId).toEqualTypeOf<string | undefined>();
114121
expectTypeOf(error.error).toEqualTypeOf<Error>();
115122
}
116123

@@ -196,7 +203,7 @@ it("accepts custom onError callbacks via options typing", function () {
196203
accessToken: "pk_test",
197204
onError: function onError(error) {
198205
expectTypeOf(error.chatId).toEqualTypeOf<string>();
199-
expectTypeOf(error.runId).toEqualTypeOf<string>();
206+
expectTypeOf(error.runId).toEqualTypeOf<string | undefined>();
200207
},
201208
});
202209

packages/ai/src/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,17 @@ export type TriggerChatOnTriggeredRun = (
7171
) => MaybePromise<void>;
7272

7373
export type TriggerChatTransportErrorPhase =
74+
| "payloadMapper"
75+
| "triggerOptions"
76+
| "triggerTask"
7477
| "onTriggeredRun"
7578
| "consumeTrackingStream"
7679
| "reconnect";
7780

7881
export type TriggerChatTransportError = {
7982
phase: TriggerChatTransportErrorPhase;
8083
chatId: string;
81-
runId: string;
84+
runId: string | undefined;
8285
error: Error;
8386
};
8487

0 commit comments

Comments
 (0)