Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/five-hands-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Fix a critical issue when processing batch events that could cause runs to be lost
19 changes: 10 additions & 9 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const run = async (attempt) => {
});
};

test('crash: syntax error', async (t) => {
test.serial('crash: syntax error', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -54,7 +54,7 @@ test('crash: syntax error', async (t) => {
});

// https://github.com/OpenFn/kit/issues/1045
test('crash: reference error', async (t) => {
test.serial('crash: reference error', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -78,7 +78,7 @@ test('crash: reference error', async (t) => {
});

// https://github.com/OpenFn/kit/issues/758
test('crash: job not found', async (t) => {
test.serial('crash: job not found', async (t) => {
lightning.addDataclip('x', {});

const attempt = {
Expand All @@ -102,7 +102,7 @@ test('crash: job not found', async (t) => {
t.regex(error_message, /could not find start job: y/i);
});

test('exception: autoinstall error', async (t) => {
test.serial('exception: autoinstall error', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -125,7 +125,7 @@ test('exception: autoinstall error', async (t) => {
);
});

test('exception: bad credential (not found)', async (t) => {
test.serial('exception: bad credential (not found)', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand Down Expand Up @@ -154,7 +154,7 @@ test('exception: bad credential (not found)', async (t) => {
);
});

test('exception: credential timeout', async (t) => {
test.serial('exception: credential timeout', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -177,7 +177,7 @@ test('exception: credential timeout', async (t) => {
);
});

test('kill: oom (small, kill worker)', async (t) => {
test.serial('kill: oom (small, kill worker)', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -201,7 +201,8 @@ test('kill: oom (small, kill worker)', async (t) => {
t.is(error_message, 'Run exceeded maximum memory usage');
});

test('kill: oom (large, kill vm)', async (t) => {
// TODO this is failing locally... is it OK in CI?
test.serial('kill: oom (large, kill vm)', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -225,7 +226,7 @@ test('kill: oom (large, kill vm)', async (t) => {
t.is(error_message, 'Run exceeded maximum memory usage');
});

test('crash: process.exit() triggered by postgres', async (t) => {
test.serial('crash: process.exit() triggered by postgres', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand Down
147 changes: 113 additions & 34 deletions packages/ws-worker/src/api/process-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export type EventProcessorOptions = {
batchInterval?: number;
batchLimit?: number;
timeout_ms?: number;
trace?: boolean;
events?: Record<string, any>;
};

const DEFAULT_BATCH_LIMIT = 10;
Expand Down Expand Up @@ -55,6 +57,20 @@ const allEngineEvents = [
* Queuing ensures events are sent in order while allowing batching to reduce network calls.
* Batching helps with high-volume logs by sending fewer requests with larger payloads,
* reducing websocket latency. Events batch by count or time interval, whichever comes first.
*
* The basic architecture is:
* - events are synchronously and immediately added to a queue
* - items are processed sequentially
* - after an item has been processed, we pull the next item from the queue
*
* If an event is flagged as batchable, we introduce new rules
* - We flag if a batch is "open"
* - A batch event will wait for the batch interval to expire before process () completes
* - So batch will usually block the async loop until the batch has naturally expired
* - The batch event can be interrupted early if a limit is hit, or a new event type comes in
*
* The batch exposes a danger of having two loops running async, so it's managed very carefully.
* After a batch event is sent, in some cases, the batch will trigger next
*/
export function eventProcessor(
engine: RuntimeEngine,
Expand All @@ -64,26 +80,37 @@ export function eventProcessor(
) {
const { id: planId, logger } = context;
const {
batchLimit: limit = DEFAULT_BATCH_LIMIT,
batchInterval: interval = DEFAULT_BATCH_INTERVAL,
batchLimit = DEFAULT_BATCH_LIMIT,
batchInterval = DEFAULT_BATCH_INTERVAL,
timeout_ms,
events,
} = options;

const queue: any = [];

let activeBatch: string | null = null;
let batch: any = [];
let batchTimeout: NodeJS.Timeout;
let batchTimeout: NodeJS.Timeout | null = null;
let batchSendPromise: Promise<void> | null = null;
let didFinish = false;
let timeoutHandle: NodeJS.Timeout;
let processTimeoutHandle: NodeJS.Timeout;

const trace = (...message: any) => {
if (options.trace) {
console.log(...message);
}
};

const next = async () => {
if (batchSendPromise) {
await batchSendPromise;
}
const evt = queue[0];
if (evt) {
didFinish = false;

const finish = () => {
clearTimeout(timeoutHandle);
clearTimeout(processTimeoutHandle);
if (!didFinish) {
didFinish = true;
queue.shift();
Expand All @@ -92,23 +119,41 @@ export function eventProcessor(
};

if (timeout_ms) {
timeoutHandle = setTimeout(() => {
processTimeoutHandle = setTimeout(() => {
logger.error(`${planId} :: ${evt.name} :: timeout (fallback)`);
finish();
}, timeout_ms);
}

await process(evt.name, evt.event);
trace(`finish ${evt.name}`);
finish();
}
};

const sendBatch = async (name: string) => {
clearTimeout(batchTimeout);
// first clear the batch
activeBatch = null;
await send(name, batch, batch.length);
batch = [];
// If sending the batch early, we break the cycle of the main
// process loop
// So we need to control whether to trigger the next call,
// or whether the calling function will process the next item for us
const sendBatch = async (triggerNext = false) => {
if (activeBatch) {
trace('sending batch', activeBatch, batch.length);
clearTimeout(batchTimeout!);
batchTimeout = null;

// first clear the batch (but leave it truthy)
const name = activeBatch as string;
activeBatch = '--';
await send(name, batch, batch.length);
activeBatch = null;
batch = [];

if (triggerNext) {
clearTimeout(processTimeoutHandle);
queue.shift();
next();
}
}
};

const send = async (name: string, payload: any, batchSize?: number) => {
Expand Down Expand Up @@ -138,7 +183,17 @@ export function eventProcessor(
}
};

const addToBatch = async (event: any) => {
batch.push(event);

if (batch.length >= batchLimit) {
// If we're at the batch limit, return right away
return sendBatch(true);
}
};

const process = async (name: string, event: any) => {
trace('process', name);
// TODO this actually shouldn't be here - should be done separately
if (name !== 'workflow-log') {
Sentry.addBreadcrumb({
Expand All @@ -150,15 +205,8 @@ export function eventProcessor(

if (name === activeBatch) {
// if there's a batch open, just push the event
batch.push(event);

if (batch.length >= limit) {
await sendBatch(name);
}
await addToBatch(event);
return;
} else if (activeBatch) {
// If a different event comes in, send the batch (and carry on processing the event)
await sendBatch(activeBatch);
}

if (name in callbacks) {
Expand All @@ -170,22 +218,32 @@ export function eventProcessor(
batch.push(event);

// Next, peek ahead in the queue for more pending events
while (queue.length > 1 && queue[1].name === name) {
const [nextBatchItem] = queue.splice(1, 1);
batch.push(nextBatchItem.event);
while (queue.length > 1) {
if (queue[1].name === name) {
const [nextBatchItem] = queue.splice(1, 1);
batch.push(nextBatchItem.event);

if (batch.length >= limit) {
// If we're at the batch limit, return right away
return sendBatch(name);
if (batch.length >= batchLimit) {
// If we're at the batch limit, return right away
return sendBatch(true);
}
} else {
// If there's another pending item not a part of this batch,
// just send the batch now
// send the batch early
return sendBatch(true);
}
}

// finally wait for a time before sending the batch
if (!batchTimeout) {
const batchName = activeBatch!;
batchTimeout = setTimeout(async () => {
sendBatch(batchName);
}, interval);
// finally wait for a time before sending the batch
// This is the "natural" batch trigger
clearTimeout(processTimeoutHandle);
return new Promise((resolve) => {
batchTimeout = setTimeout(() => {
sendBatch(false).then(resolve);
}, batchInterval);
});
}
} else {
await send(name, event);
Expand All @@ -196,17 +254,38 @@ export function eventProcessor(
};

const enqueue = (name: string, event: any) => {
trace('queue', name);
if (name === 'workflow-log') {
trace(event.message);
}
queue.push({ name, event });

if (queue.length == 1) {
next();
// If this is the only item in the queue, start executing right away
trace(`[${name}] executing immediately`);
setImmediate(next);
} else if (activeBatch === name) {
addToBatch(event);
queue.pop();
} else if (queue.length == 2 && batchTimeout) {
trace('Sending batch early');
// If this is the second item in the queue, and we have a batch active,
// send the batch early
// (note that this event will still be deferred)
sendBatch(true);
} else {
trace(`[${name}] deffering event`);
}
};

const e = allEngineEvents.reduce(
(obj, e) => Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }),
const e = (events || allEngineEvents).reduce(
(obj: any, e: string) =>
Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }),
{}
);

engine.listen(planId, e);

// return debug state
return { queue };
}
Loading