Skip to content

Commit 1cd60b8

Browse files
authored
Worker: fix an issue with batch logging (#1353)
* set min-release-age * versions * debugging flaky test * fix an issue where the batch is never clear * fix a timing issue when sending batch events Big help from claude * logging * add a bunch of more controlled unit tests * test on interrupt * update and fix tests I think this this fixes the actual issue - I just want more good focused tests now * tidy logging * more tests * changeset * types * remove only * run tests in serial * worker: tweak event processor and be sure to reset timeout on batch * remove comment * remove more comments
1 parent b724db5 commit 1cd60b8

File tree

4 files changed

+607
-79
lines changed

4 files changed

+607
-79
lines changed

.changeset/five-hands-dance.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openfn/ws-worker': patch
3+
---
4+
5+
Fix a critical issue when processing batch events that could cause runs to be lost

integration-tests/worker/test/exit-reasons.test.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const run = async (attempt) => {
3434
});
3535
};
3636

37-
test('crash: syntax error', async (t) => {
37+
test.serial('crash: syntax error', async (t) => {
3838
const attempt = {
3939
id: crypto.randomUUID(),
4040
jobs: [
@@ -54,7 +54,7 @@ test('crash: syntax error', async (t) => {
5454
});
5555

5656
// https://github.com/OpenFn/kit/issues/1045
57-
test('crash: reference error', async (t) => {
57+
test.serial('crash: reference error', async (t) => {
5858
const attempt = {
5959
id: crypto.randomUUID(),
6060
jobs: [
@@ -78,7 +78,7 @@ test('crash: reference error', async (t) => {
7878
});
7979

8080
// https://github.com/OpenFn/kit/issues/758
81-
test('crash: job not found', async (t) => {
81+
test.serial('crash: job not found', async (t) => {
8282
lightning.addDataclip('x', {});
8383

8484
const attempt = {
@@ -102,7 +102,7 @@ test('crash: job not found', async (t) => {
102102
t.regex(error_message, /could not find start job: y/i);
103103
});
104104

105-
test('exception: autoinstall error', async (t) => {
105+
test.serial('exception: autoinstall error', async (t) => {
106106
const attempt = {
107107
id: crypto.randomUUID(),
108108
jobs: [
@@ -125,7 +125,7 @@ test('exception: autoinstall error', async (t) => {
125125
);
126126
});
127127

128-
test('exception: bad credential (not found)', async (t) => {
128+
test.serial('exception: bad credential (not found)', async (t) => {
129129
const attempt = {
130130
id: crypto.randomUUID(),
131131
jobs: [
@@ -154,7 +154,7 @@ test('exception: bad credential (not found)', async (t) => {
154154
);
155155
});
156156

157-
test('exception: credential timeout', async (t) => {
157+
test.serial('exception: credential timeout', async (t) => {
158158
const attempt = {
159159
id: crypto.randomUUID(),
160160
jobs: [
@@ -177,7 +177,7 @@ test('exception: credential timeout', async (t) => {
177177
);
178178
});
179179

180-
test('kill: oom (small, kill worker)', async (t) => {
180+
test.serial('kill: oom (small, kill worker)', async (t) => {
181181
const attempt = {
182182
id: crypto.randomUUID(),
183183
jobs: [
@@ -201,7 +201,8 @@ test('kill: oom (small, kill worker)', async (t) => {
201201
t.is(error_message, 'Run exceeded maximum memory usage');
202202
});
203203

204-
test('kill: oom (large, kill vm)', async (t) => {
204+
// TODO this is failing locally... is it OK in CI?
205+
test.serial('kill: oom (large, kill vm)', async (t) => {
205206
const attempt = {
206207
id: crypto.randomUUID(),
207208
jobs: [
@@ -225,7 +226,7 @@ test('kill: oom (large, kill vm)', async (t) => {
225226
t.is(error_message, 'Run exceeded maximum memory usage');
226227
});
227228

228-
test('crash: process.exit() triggered by postgres', async (t) => {
229+
test.serial('crash: process.exit() triggered by postgres', async (t) => {
229230
const attempt = {
230231
id: crypto.randomUUID(),
231232
jobs: [

packages/ws-worker/src/api/process-events.ts

Lines changed: 113 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export type EventProcessorOptions = {
2626
batchInterval?: number;
2727
batchLimit?: number;
2828
timeout_ms?: number;
29+
trace?: boolean;
30+
events?: Record<string, any>;
2931
};
3032

3133
const DEFAULT_BATCH_LIMIT = 10;
@@ -55,6 +57,20 @@ const allEngineEvents = [
5557
* Queuing ensures events are sent in order while allowing batching to reduce network calls.
5658
* Batching helps with high-volume logs by sending fewer requests with larger payloads,
5759
* reducing websocket latency. Events batch by count or time interval, whichever comes first.
60+
*
61+
* The basic architecture is:
62+
* - events are synchronously and immediately added to a queue
63+
* - items are processed sequentially
64+
* - after an item has been processed, we pull the next item from the queue
65+
*
66+
* If an event is flagged as batchable, we introduce new rules
67+
* - We flag if a batch is "open"
68+
* - A batch event will wait for the batch interval to expire before process () completes
69+
* - So batch will usually block the async loop until the batch has naturally expired
70+
* - The batch event can be interrupted early if a limit is hit, or a new event type comes in
71+
*
72+
* The batch exposes a danger of having two loops running async, so it's managed very carefully.
73+
* After a batch event is sent, in some cases, the batch will trigger next
5874
*/
5975
export function eventProcessor(
6076
engine: RuntimeEngine,
@@ -64,26 +80,37 @@ export function eventProcessor(
6480
) {
6581
const { id: planId, logger } = context;
6682
const {
67-
batchLimit: limit = DEFAULT_BATCH_LIMIT,
68-
batchInterval: interval = DEFAULT_BATCH_INTERVAL,
83+
batchLimit = DEFAULT_BATCH_LIMIT,
84+
batchInterval = DEFAULT_BATCH_INTERVAL,
6985
timeout_ms,
86+
events,
7087
} = options;
7188

7289
const queue: any = [];
7390

7491
let activeBatch: string | null = null;
7592
let batch: any = [];
76-
let batchTimeout: NodeJS.Timeout;
93+
let batchTimeout: NodeJS.Timeout | null = null;
94+
let batchSendPromise: Promise<void> | null = null;
7795
let didFinish = false;
78-
let timeoutHandle: NodeJS.Timeout;
96+
let processTimeoutHandle: NodeJS.Timeout;
97+
98+
const trace = (...message: any) => {
99+
if (options.trace) {
100+
console.log(...message);
101+
}
102+
};
79103

80104
const next = async () => {
105+
if (batchSendPromise) {
106+
await batchSendPromise;
107+
}
81108
const evt = queue[0];
82109
if (evt) {
83110
didFinish = false;
84111

85112
const finish = () => {
86-
clearTimeout(timeoutHandle);
113+
clearTimeout(processTimeoutHandle);
87114
if (!didFinish) {
88115
didFinish = true;
89116
queue.shift();
@@ -92,23 +119,41 @@ export function eventProcessor(
92119
};
93120

94121
if (timeout_ms) {
95-
timeoutHandle = setTimeout(() => {
122+
processTimeoutHandle = setTimeout(() => {
96123
logger.error(`${planId} :: ${evt.name} :: timeout (fallback)`);
97124
finish();
98125
}, timeout_ms);
99126
}
100127

101128
await process(evt.name, evt.event);
129+
trace(`finish ${evt.name}`);
102130
finish();
103131
}
104132
};
105133

106-
const sendBatch = async (name: string) => {
107-
clearTimeout(batchTimeout);
108-
// first clear the batch
109-
activeBatch = null;
110-
await send(name, batch, batch.length);
111-
batch = [];
134+
// If sending the batch early, we break the cycle of the main
135+
// process loop
136+
// So we need to control whether to trigger the next call,
137+
// or whether the calling function will process the next item for us
138+
const sendBatch = async (triggerNext = false) => {
139+
if (activeBatch) {
140+
trace('sending batch', activeBatch, batch.length);
141+
clearTimeout(batchTimeout!);
142+
batchTimeout = null;
143+
144+
// first clear the batch (but leave it truthy)
145+
const name = activeBatch as string;
146+
activeBatch = '--';
147+
await send(name, batch, batch.length);
148+
activeBatch = null;
149+
batch = [];
150+
151+
if (triggerNext) {
152+
clearTimeout(processTimeoutHandle);
153+
queue.shift();
154+
next();
155+
}
156+
}
112157
};
113158

114159
const send = async (name: string, payload: any, batchSize?: number) => {
@@ -138,7 +183,17 @@ export function eventProcessor(
138183
}
139184
};
140185

186+
const addToBatch = async (event: any) => {
187+
batch.push(event);
188+
189+
if (batch.length >= batchLimit) {
190+
// If we're at the batch limit, return right away
191+
return sendBatch(true);
192+
}
193+
};
194+
141195
const process = async (name: string, event: any) => {
196+
trace('process', name);
142197
// TODO this actually shouldn't be here - should be done separately
143198
if (name !== 'workflow-log') {
144199
Sentry.addBreadcrumb({
@@ -150,15 +205,8 @@ export function eventProcessor(
150205

151206
if (name === activeBatch) {
152207
// if there's a batch open, just push the event
153-
batch.push(event);
154-
155-
if (batch.length >= limit) {
156-
await sendBatch(name);
157-
}
208+
await addToBatch(event);
158209
return;
159-
} else if (activeBatch) {
160-
// If a different event comes in, send the batch (and carry on processing the event)
161-
await sendBatch(activeBatch);
162210
}
163211

164212
if (name in callbacks) {
@@ -170,22 +218,32 @@ export function eventProcessor(
170218
batch.push(event);
171219

172220
// Next, peek ahead in the queue for more pending events
173-
while (queue.length > 1 && queue[1].name === name) {
174-
const [nextBatchItem] = queue.splice(1, 1);
175-
batch.push(nextBatchItem.event);
221+
while (queue.length > 1) {
222+
if (queue[1].name === name) {
223+
const [nextBatchItem] = queue.splice(1, 1);
224+
batch.push(nextBatchItem.event);
176225

177-
if (batch.length >= limit) {
178-
// If we're at the batch limit, return right away
179-
return sendBatch(name);
226+
if (batch.length >= batchLimit) {
227+
// If we're at the batch limit, return right away
228+
return sendBatch(true);
229+
}
230+
} else {
231+
// If there's another pending item not a part of this batch,
232+
// just send the batch now
233+
// send the batch early
234+
return sendBatch(true);
180235
}
181236
}
182237

183-
// finally wait for a time before sending the batch
184238
if (!batchTimeout) {
185-
const batchName = activeBatch!;
186-
batchTimeout = setTimeout(async () => {
187-
sendBatch(batchName);
188-
}, interval);
239+
// finally wait for a time before sending the batch
240+
// This is the "natural" batch trigger
241+
clearTimeout(processTimeoutHandle);
242+
return new Promise((resolve) => {
243+
batchTimeout = setTimeout(() => {
244+
sendBatch(false).then(resolve);
245+
}, batchInterval);
246+
});
189247
}
190248
} else {
191249
await send(name, event);
@@ -196,17 +254,38 @@ export function eventProcessor(
196254
};
197255

198256
const enqueue = (name: string, event: any) => {
257+
trace('queue', name);
258+
if (name === 'workflow-log') {
259+
trace(event.message);
260+
}
199261
queue.push({ name, event });
200262

201263
if (queue.length == 1) {
202-
next();
264+
// If this is the only item in the queue, start executing right away
265+
trace(`[${name}] executing immediately`);
266+
setImmediate(next);
267+
} else if (activeBatch === name) {
268+
addToBatch(event);
269+
queue.pop();
270+
} else if (queue.length == 2 && batchTimeout) {
271+
trace('Sending batch early');
272+
// If this is the second item in the queue, and we have a batch active,
273+
// send the batch early
274+
// (note that this event will still be deferred)
275+
sendBatch(true);
276+
} else {
277+
trace(`[${name}] deffering event`);
203278
}
204279
};
205280

206-
const e = allEngineEvents.reduce(
207-
(obj, e) => Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }),
281+
const e = (events || allEngineEvents).reduce(
282+
(obj: any, e: string) =>
283+
Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }),
208284
{}
209285
);
210286

211287
engine.listen(planId, e);
288+
289+
// return debug state
290+
return { queue };
212291
}

0 commit comments

Comments
 (0)