Skip to content

Commit b80b030

Browse files
committed
add callback logic for the app function
1 parent a027be0 commit b80b030

1 file changed

Lines changed: 150 additions & 0 deletions

File tree

jobs/knative-job-fn/src/index.ts

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
import express from 'express';
22
import bodyParser from 'body-parser';
3+
import http from 'node:http';
4+
import https from 'node:https';
5+
import { URL } from 'node:url';
6+
7+
type JobCallbackStatus = 'success' | 'error';
8+
9+
type JobContext = {
10+
callbackUrl: string | undefined;
11+
workerId: string | undefined;
12+
jobId: string | undefined;
13+
databaseId: string | undefined;
14+
};
315

416
const app: any = express();
17+
518
app.use(bodyParser.json());
19+
20+
// Echo job headers back on responses for debugging/traceability.
621
app.use((req: any, res: any, next: any) => {
722
res.set({
823
'Content-Type': 'application/json',
@@ -13,6 +28,128 @@ app.use((req: any, res: any, next: any) => {
1328
next();
1429
});
1530

31+
// Normalize callback URL so it always points at the /callback endpoint.
32+
const normalizeCallbackUrl = (rawUrl: string): string => {
33+
try {
34+
const url = new URL(rawUrl);
35+
if (!url.pathname || url.pathname === '/') {
36+
url.pathname = '/callback';
37+
}
38+
return url.toString();
39+
} catch {
40+
return rawUrl;
41+
}
42+
};
43+
44+
const postJson = (
45+
urlStr: string,
46+
headers: Record<string, string>,
47+
body: Record<string, unknown>
48+
): Promise<void> => {
49+
return new Promise((resolve, reject) => {
50+
let url: URL;
51+
try {
52+
url = new URL(urlStr);
53+
} catch (e) {
54+
return reject(e);
55+
}
56+
57+
const isHttps = url.protocol === 'https:';
58+
const client = isHttps ? https : http;
59+
60+
const req = client.request(
61+
{
62+
hostname: url.hostname,
63+
port: url.port || (isHttps ? 443 : 80),
64+
path: url.pathname + url.search,
65+
method: 'POST',
66+
headers: {
67+
'Content-Type': 'application/json',
68+
...headers
69+
}
70+
},
71+
(res) => {
72+
// Drain response data but ignore contents; callback server
73+
// only uses status for debugging.
74+
res.on('data', () => {});
75+
res.on('end', () => resolve());
76+
}
77+
);
78+
79+
req.on('error', (err) => reject(err));
80+
req.write(JSON.stringify(body));
81+
req.end();
82+
});
83+
};
84+
85+
const sendJobCallback = async (
86+
ctx: JobContext,
87+
status: JobCallbackStatus,
88+
errorMessage?: string
89+
) => {
90+
const { callbackUrl, workerId, jobId, databaseId } = ctx;
91+
if (!callbackUrl || !workerId || !jobId) {
92+
return;
93+
}
94+
95+
const target = normalizeCallbackUrl(callbackUrl);
96+
97+
const headers: Record<string, string> = {
98+
'X-Worker-Id': workerId,
99+
'X-Job-Id': jobId
100+
};
101+
102+
if (databaseId) {
103+
headers['X-Database-Id'] = databaseId;
104+
}
105+
106+
const body: Record<string, unknown> = {
107+
status
108+
};
109+
110+
if (status === 'error') {
111+
headers['X-Job-Error'] = 'true';
112+
body.error = errorMessage || 'ERROR';
113+
}
114+
115+
try {
116+
await postJson(target, headers, body);
117+
} catch (err) {
118+
// eslint-disable-next-line no-console
119+
console.error('[knative-job-fn] Failed to POST job callback', {
120+
target,
121+
status,
122+
err
123+
});
124+
}
125+
};
126+
127+
// Attach per-request context and a finish hook to send success callbacks.
128+
app.use((req: any, res: any, next: any) => {
129+
const ctx: JobContext = {
130+
callbackUrl: req.get('X-Callback-Url'),
131+
workerId: req.get('X-Worker-Id'),
132+
jobId: req.get('X-Job-Id'),
133+
databaseId: req.get('X-Database-Id')
134+
};
135+
136+
// Store on res.locals so the error middleware can also mark callbacks as sent.
137+
res.locals = res.locals || {};
138+
res.locals.jobContext = ctx;
139+
res.locals.jobCallbackSent = false;
140+
141+
if (ctx.callbackUrl && ctx.workerId && ctx.jobId) {
142+
res.on('finish', () => {
143+
// If an error handler already sent a callback, skip.
144+
if (res.locals.jobCallbackSent) return;
145+
res.locals.jobCallbackSent = true;
146+
void sendJobCallback(ctx, 'success');
147+
});
148+
}
149+
150+
next();
151+
});
152+
16153
export default {
17154
post: function (...args: any[]) {
18155
return app.post.apply(app, args as any);
@@ -27,6 +164,19 @@ export default {
27164
'Content-Type': 'application/json',
28165
'X-Job-Error': true
29166
});
167+
168+
// Mark job as having errored via callback, if available.
169+
try {
170+
const ctx: JobContext | undefined = res.locals?.jobContext;
171+
if (ctx && !res.locals.jobCallbackSent) {
172+
res.locals.jobCallbackSent = true;
173+
await sendJobCallback(ctx, 'error', error?.message);
174+
}
175+
} catch (err) {
176+
// eslint-disable-next-line no-console
177+
console.error('[knative-job-fn] Failed to send error callback', err);
178+
}
179+
30180
res.status(200).json({ message: error.message });
31181
});
32182
app.listen(port, cb);

0 commit comments

Comments
 (0)