Skip to content

Commit 07861b0

Browse files
authored
Merge pull request #1088 from OpenFn/fix-worker-validation-timeout-1016
Ameliorate crash-loop during worker startup
2 parents f82ee24 + 94f18f8 commit 07861b0

12 files changed

Lines changed: 155 additions & 25 deletions

File tree

packages/engine-multi/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# engine-multi
22

3+
## 1.7.0
4+
5+
### Minor Changes
6+
7+
- cf8d3c2: Add retry with backoff to worker validation
8+
39
## 1.6.14
410

511
### Patch Changes

packages/engine-multi/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/engine-multi",
3-
"version": "1.6.14",
3+
"version": "1.7.0",
44
"description": "Multi-process runtime engine",
55
"main": "dist/index.js",
66
"type": "module",
Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,45 @@
1-
// TODO let me deal with the fallout first
2-
1+
import { Logger } from '@openfn/logger';
32
import { EngineAPI } from '../types';
43

5-
// Simple vaidation function to ensure that a worker is loaded
4+
class WorkerValidationError extends Error {
5+
name = 'WorkerValidationError';
6+
constructor(message: string) {
7+
super(message);
8+
}
9+
}
10+
11+
// Simple validation function to ensure that a worker is loaded
612
// Call a handshake task in a worker thread
7-
// This really jsut validates that the worker path exists
13+
// Validates that the worker
14+
// Timeout and retry logic to help on containerized environments
15+
// (e.g. Kubernetes) with filesystem boot delays
16+
export default async (
17+
api: EngineAPI,
18+
logger: Logger,
19+
options: { timeout?: number; retries?: number } = {}
20+
): Promise<void> => {
21+
const { timeout = Infinity, retries = 1 } = options;
22+
let start = Date.now();
23+
for (let i = 0; i < retries; i++) {
24+
try {
25+
// TODO argument drive this
26+
await api.callWorker('handshake', [], {}, { timeout });
27+
const duration = Date.now() - start;
28+
logger.debug(`Engine worker validated in ${duration}ms`);
29+
return;
30+
} catch (e) {
31+
if (i >= retries - 1) {
32+
const duration = Date.now() - start;
33+
logger.error(`Engine worker validation failed in in ${duration}ms`);
34+
throw new WorkerValidationError(`Failed to validate worker thread. This likely happened because:
35+
1. An invalid worker file was passed to the engine
36+
2. The filesystem was temporarily unavailable`);
37+
}
838

9-
export default async (api: EngineAPI, timeout = 5000) => {
10-
try {
11-
// TODO argument drive this
12-
await api.callWorker('handshake', [], {}, { timeout });
13-
} catch (e) {
14-
throw new Error('Invalid worker path');
39+
// exponential backoff: 1s, 2s, 4s, etc.
40+
const backoffMs = 1000 * Math.pow(2, i);
41+
logger.warn(`Worker validation failed: will retry in ${backoffMs}ms`);
42+
await new Promise((resolve) => setTimeout(resolve, backoffMs));
43+
}
1544
}
1645
};

packages/engine-multi/src/engine.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ export type EngineOptions = {
8282
statePropsToRemove?: string[];
8383
whitelist?: RegExp[];
8484
proxyStdout?: boolean;
85+
86+
workerValidationTimeout?: number;
87+
workerValidationRetries?: number;
8588
};
8689

8790
export type InternalEngine = RuntimeEngine & {
@@ -136,13 +139,11 @@ const createEngine = async (
136139
);
137140
engine.callWorker = callWorker;
138141

139-
await validateWorker(engine);
142+
await validateWorker(engine, options.logger, {
143+
timeout: options.workerValidationTimeout,
144+
retries: options.workerValidationRetries,
145+
});
140146

141-
// TODO I think this needs to be like:
142-
// take a plan
143-
// create, register and return a state object
144-
// should it also load the initial data clip?
145-
// when does that happen? No, that's inside execute
146147
const registerWorkflow = (plan: ExecutionPlan, input: State) => {
147148
// TODO throw if already registered?
148149
const state = createState(plan, input);

packages/engine-multi/test/api/validate-worker.test.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,47 @@ import validateWorker from '../../src/api/validate-worker';
77

88
const logger = createMockLogger();
99

10+
test.afterEach(() => {
11+
logger._reset();
12+
});
13+
1014
test('validate should not throw if the worker path is valid', async (t) => {
1115
const workerPath = path.resolve('dist/test/worker-functions.js');
1216
const api = initWorkers(workerPath, {}, logger);
13-
await t.notThrowsAsync(() => validateWorker(api as any, 500));
17+
18+
await t.notThrowsAsync(() => validateWorker(api as any, logger));
1419
});
1520

1621
test('validate should throw if the worker path is invalid', async (t) => {
1722
const workerPath = 'a/b/c.js';
1823
const api = initWorkers(workerPath, {}, logger);
19-
await t.throwsAsync(() => validateWorker(api as any, 500), {
20-
message: 'Invalid worker path',
24+
25+
await t.throwsAsync(() => validateWorker(api as any, logger), {
26+
name: 'WorkerValidationError',
2127
});
2228
});
2329

2430
test('validate should throw if the worker does not respond to a handshake', async (t) => {
2531
const workerPath = path.resolve('src/test/bad-worker.js');
2632
const api = initWorkers(workerPath, {}, logger);
27-
await t.throwsAsync(() => validateWorker(api as any, 500), {
28-
message: 'Invalid worker path',
33+
const opts = { timeout: 100 };
34+
35+
await t.throwsAsync(() => validateWorker(api as any, logger, opts), {
36+
name: 'WorkerValidationError',
37+
});
38+
});
39+
40+
test('validate should retry with a backoff', async (t) => {
41+
const workerPath = path.resolve('src/test/bad-worker.js');
42+
const api = initWorkers(workerPath, {}, logger);
43+
const opts = { timeout: 10, retries: 3 };
44+
45+
await t.throwsAsync(() => validateWorker(api as any, logger, opts), {
46+
name: 'WorkerValidationError',
47+
});
48+
49+
const warnings = logger._history.filter(([level, _icon, message]) => {
50+
return level === 'warn' && message.match(/will retry/);
2951
});
52+
t.is(warnings.length, 2);
3053
});

packages/lightning-mock/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# @openfn/lightning-mock
22

3+
## 2.3.1
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [cf8d3c2]
8+
- @openfn/engine-multi@1.7.0
9+
310
## 2.3.0
411

512
### Minor Changes

packages/lightning-mock/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/lightning-mock",
3-
"version": "2.3.0",
3+
"version": "2.3.1",
44
"private": true,
55
"description": "A mock Lightning server",
66
"main": "dist/index.js",

packages/ws-worker/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# ws-worker
22

3+
## 1.18.0
4+
5+
### Minor Changes
6+
7+
- cf8d3c2: Add retry with backoff to worker validation
8+
9+
### Patch Changes
10+
11+
- Updated dependencies [cf8d3c2]
12+
- @openfn/engine-multi@1.7.0
13+
314
## 1.17.0
415

516
### Minor Changes

packages/ws-worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/ws-worker",
3-
"version": "1.17.0",
3+
"version": "1.18.0",
44
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
55
"main": "dist/index.js",
66
"type": "module",

packages/ws-worker/src/start.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ if (args.mock) {
9191
maxWorkers: args.capacity,
9292
statePropsToRemove: args.statePropsToRemove,
9393
runTimeoutMs: args.maxRunDurationSeconds * 1000,
94+
workerValidationTimeout: args.engineValidationTimeoutMs,
95+
workerValidationRetries: args.engineValidationRetries,
9496
};
9597
logger.debug('Creating runtime engine...');
9698
logger.debug('Engine options:', engineOptions);

0 commit comments

Comments
 (0)