Skip to content

Commit 0756660

Browse files
committed
fix: port-named outputs + durable error propagation for graph execution
- send-email handler returns { result: { complete: true } } matching handler.json output port name 'result' - send-verification-link handler wraps all returns in { result: {...} } - failGraphExecution now tracks node name in error message: '[nodeName] error details' - failGraphExecution sets error_code = 'NODE_EXECUTION_FAILED' and completed_at for proper lifecycle tracking - Race condition handled: if execution already completed/failed, logs a warning instead of swallowing silently - Removed try/catch wrapper around failGraphExecution — errors from the UPDATE propagate to the job queue's handleError - Updated unit tests for new port-named return values
1 parent da00f1c commit 0756660

5 files changed

Lines changed: 33 additions & 25 deletions

File tree

functions/send-email/__tests__/handler.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,18 @@ describe('send-email handler', () => {
4747
{ to: 'a@b.com', subject: 'Hi', text: 'hello' },
4848
createMockContext()
4949
);
50-
expect(result).toEqual({ complete: true });
50+
expect(result).toEqual({ result: { complete: true } });
5151
});
5252
});
5353

5454
describe('sending', () => {
55-
it('returns { complete: true } on valid payload', async () => {
55+
it('returns port-named output on valid payload', async () => {
5656
const handler = loadHandler();
5757
const result = await handler(
5858
{ to: 'a@b.com', subject: 'Hi', html: '<p>hi</p>' },
5959
createMockContext()
6060
);
61-
expect(result).toEqual({ complete: true });
61+
expect(result).toEqual({ result: { complete: true } });
6262
});
6363

6464
it('calls postmaster.send by default (not SMTP)', async () => {
@@ -121,13 +121,13 @@ describe('send-email handler', () => {
121121
process.env.SEND_EMAIL_DRY_RUN = 'true';
122122
});
123123

124-
it('returns { complete: true } without sending', async () => {
124+
it('returns port-named output without sending', async () => {
125125
const handler = loadHandler();
126126
const result = await handler(
127127
{ to: 'a@b.com', subject: 'Hi', html: '<p>hi</p>' },
128128
createMockContext()
129129
);
130-
expect(result).toEqual({ complete: true });
130+
expect(result).toEqual({ result: { complete: true } });
131131
const postmaster = require('@constructive-io/postmaster');
132132
const smtp = require('simple-smtp-server');
133133
expect(postmaster.send).not.toHaveBeenCalled();

functions/send-email/handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ const handler: FunctionHandler<SendEmailPayload> = async (params) => {
8181
logger.info('Sent email', logContext);
8282
}
8383

84-
return { complete: true };
84+
return { result: { complete: true } };
8585
};
8686

8787
export default handler;

functions/send-verification-link/__tests__/handler.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe('send-verification-link handler (validation)', () => {
1515
ctx
1616
);
1717
expect(result).toEqual({
18-
error: 'Missing X-Database-Id header or DEFAULT_DATABASE_ID'
18+
result: { error: 'Missing X-Database-Id header or DEFAULT_DATABASE_ID' }
1919
});
2020
});
2121

functions/send-verification-link/handler.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,10 @@ const sendEmailLink = async (
280280
}
281281

282282
return {
283-
complete: true,
284-
...(isDryRun ? { dryRun: true } : null)
283+
result: {
284+
complete: true,
285+
...(isDryRun ? { dryRun: true } : null)
286+
}
285287
};
286288
};
287289

@@ -290,7 +292,7 @@ const handler: FunctionHandler<SendEmailParams> = async (params, context) => {
290292

291293
const databaseId = job.databaseId;
292294
if (!databaseId) {
293-
return { error: 'Missing X-Database-Id header or DEFAULT_DATABASE_ID' };
295+
return { result: { error: 'Missing X-Database-Id header or DEFAULT_DATABASE_ID' } };
294296
}
295297

296298
log.info('[send-verification-link] Processing request', {

job/compute-worker/src/index.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -404,11 +404,7 @@ export default class ComputeWorker {
404404
});
405405

406406
if (graphNode) {
407-
try {
408-
await this.failGraphExecution(payload.execution_id, err.message);
409-
} catch (graphErr) {
410-
log.error('Failed to mark graph execution as failed', graphErr);
411-
}
407+
await this.failGraphExecution(payload.execution_id, payload.node_name, err.message);
412408
// Don't re-throw for graph nodes — execution is already marked failed.
413409
// Re-throwing would cause the job queue to retry with the same permanent error.
414410
return;
@@ -545,11 +541,7 @@ export default class ComputeWorker {
545541

546542
// Graph node failure: mark execution as failed
547543
if (graphNode) {
548-
try {
549-
await this.failGraphExecution(payload.execution_id, err.message);
550-
} catch (graphErr) {
551-
log.error('Failed to mark graph execution as failed', graphErr);
552-
}
544+
await this.failGraphExecution(payload.execution_id, payload.node_name, err.message);
553545
// Don't re-throw for graph nodes — execution is already marked failed.
554546
// Re-throwing would cause the job queue to retry with the same permanent error.
555547
return;
@@ -579,18 +571,32 @@ export default class ComputeWorker {
579571

580572
/**
581573
* Mark a graph execution as failed when a node errors.
574+
* Records both the failing node name and the error message.
575+
* Handles the race where execution may already be completed/failed
576+
* (e.g. graphOutput finished before a late node failure arrives).
582577
*/
583578
private async failGraphExecution(
584579
executionId: string,
580+
nodeName: string,
585581
errorMessage: string
586582
): Promise<void> {
587-
log.debug('failing graph execution', { executionId, error: errorMessage });
588-
await this.pgPool.query(
583+
const nodeError = `[${nodeName}] ${errorMessage}`;
584+
log.error('graph node failed', { executionId, nodeName, error: errorMessage });
585+
const { rowCount } = await this.pgPool.query(
589586
`UPDATE constructive_compute_private.platform_function_graph_executions
590-
SET status = 'failed', error_message = $1
591-
WHERE id = $2`,
592-
[errorMessage, executionId]
587+
SET status = 'failed',
588+
error_message = $1,
589+
error_code = 'NODE_EXECUTION_FAILED',
590+
completed_at = now()
591+
WHERE id = $2 AND status = 'running'`,
592+
[nodeError, executionId]
593593
);
594+
if (rowCount === 0) {
595+
// Execution already completed or failed — log so the error is never invisible
596+
log.warn('graph execution already finished; node failure not recorded in status', {
597+
executionId, nodeName, error: errorMessage,
598+
});
599+
}
594600
}
595601

596602
/**

0 commit comments

Comments
 (0)