Skip to content

Commit db9c40d

Browse files
josephjclarkjeremi
andcommitted
Release: support multiple local adaptor repos (#1438)
* feat(ws-worker): support multi-root @Local adaptor resolution (#1397) * feat(ws-worker): support multi-root @Local adaptor resolution OPENFN_ADAPTORS_REPO (and the --monorepo-dir / -m flag) now accept a colon-separated list of monorepo roots. When a job pins an adaptor to @Local, the worker walks the configured roots in order and resolves to the first root whose `packages/<shortName>/package.json` exists. This matches Lightning's AdaptorRegistry precedence so the registry view and the worker's execution path agree on which root supplies a given adaptor. Single-path values keep behaving exactly as before. When no root contains the adaptor the worker still surfaces a candidate path under the first root, so the runtime emits a clean "missing adaptor" error rather than crashing on a malformed colon-joined string. This unblocks the multi-root flow on the Lightning side, where the AdaptorRegistry already accepts the colon-separated form but the worker was rejecting it with ENOENT on @Local execution. * fix(ws-worker): use comma to separate multi-root adaptor paths Colon collides with Windows drive letters (c:/repo); comma matches Lightning's parsing of OPENFN_ADAPTORS_REPO. Single-path callers are unchanged. * simplify changeset * update cli * versions --------- Co-authored-by: Jeremi Joslin <jeremi@newlogic.com>
1 parent 02d780a commit db9c40d

14 files changed

Lines changed: 347 additions & 55 deletions

packages/cli/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @openfn/cli
22

3+
## 1.37.0
4+
5+
### Minor Changes
6+
7+
- ff1b1b6: `OPENFN_ADAPTORS_REPO` now supports multiple comma-separated paths.
8+
39
## 1.36.3
410

511
### Patch Changes

packages/cli/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/cli",
3-
"version": "1.36.3",
3+
"version": "1.37.0",
44
"description": "CLI devtools for the OpenFn toolchain",
55
"engines": {
66
"node": ">=18",

packages/cli/src/commands.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ const parse = async (options: Opts, log?: Logger) => {
9797
const { monorepoPath } = options;
9898
if (monorepoPath) {
9999
// TODO how does this occur?
100-
if (monorepoPath === 'ERR') {
100+
if (monorepoPath[0] === 'ERR') {
101101
logger.error(
102102
'ERROR: --use-adaptors-monorepo was passed, but OPENFN_ADAPTORS_REPO env var is undefined'
103103
);

packages/cli/src/options.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export type Opts = {
4646
keepUnsupported?: boolean;
4747
log?: Record<string, LogLevel>;
4848
logJson?: boolean;
49-
monorepoPath?: string;
49+
monorepoPath?: string[];
5050
only?: string; // only run this workflow node
5151
operation?: string;
5252
outputPath?: string;
@@ -587,7 +587,16 @@ export const useAdaptorsMonorepo: CLIOption = {
587587
},
588588
ensure: (opts) => {
589589
if (opts.useAdaptorsMonorepo) {
590-
opts.monorepoPath = process.env.OPENFN_ADAPTORS_REPO || 'ERR';
590+
const repo = process.env.OPENFN_ADAPTORS_REPO;
591+
// OPENFN_ADAPTORS_REPO is a comma-separated list of monorepo roots
592+
// (a single path is just a one-element list)
593+
opts.monorepoPath = repo
594+
? repo
595+
.split(',')
596+
.map((p) => p.trim())
597+
.filter((p) => p.length > 0)
598+
.map((p) => nodePath.resolve(p))
599+
: ['ERR'];
591600
}
592601
},
593602
};

packages/cli/src/util/map-adaptors-to-monorepo.ts

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { readFile } from 'node:fs/promises';
1+
import { existsSync } from 'node:fs';
22
import path from 'node:path';
3-
import assert from 'node:assert';
43
import { Logger } from '@openfn/logger';
54
import {
65
getNameAndVersion,
@@ -10,19 +9,21 @@ import {
109

1110
import type { Opts } from '../options';
1211

13-
export const validateMonoRepo = async (repoPath: string, log: Logger) => {
14-
try {
15-
const raw = await readFile(`${repoPath}/package.json`, 'utf8');
16-
const pkg = JSON.parse(raw);
17-
assert(pkg.name === 'adaptors');
18-
} catch (e) {
19-
log.error(`ERROR: Adaptors Monorepo not found at ${repoPath}`);
20-
process.exit(9);
12+
export const validateMonoRepo = async (repoPaths: string[], log: Logger) => {
13+
for (const repoPath of repoPaths) {
14+
if (!existsSync(path.resolve(repoPath, 'packages'))) {
15+
log.error(`ERROR: Adaptors Monorepo not found at ${repoPath}`);
16+
process.exit(9);
17+
}
2118
}
2219
};
2320

2421
// Convert an adaptor name into a path to the adaptor in the monorepo
25-
export const updatePath = (adaptor: string, repoPath: string, log: Logger) => {
22+
export const updatePath = (
23+
adaptor: string,
24+
repoPaths: string[],
25+
log: Logger
26+
) => {
2627
if (adaptor.match('=')) {
2728
// Should do nothing if a path is already provided
2829
return adaptor;
@@ -36,7 +37,22 @@ export const updatePath = (adaptor: string, repoPath: string, log: Logger) => {
3637
);
3738
}
3839
const shortName = name.replace('@openfn/language-', '');
39-
const abspath = path.resolve(repoPath, 'packages', shortName);
40+
41+
// Find the first root in the monorepo list that contains the adaptor
42+
// (order is precedence, so an earlier root overrides a later one)
43+
const abspath = repoPaths
44+
.map((repoPath) => path.join(repoPath, 'packages', shortName))
45+
.find((candidate) => existsSync(candidate));
46+
47+
if (!abspath) {
48+
if (repoPaths.length > 1) {
49+
throw new Error(
50+
`Adaptor ${name} not found in any provided adaptors monorepo`
51+
);
52+
} else {
53+
throw new Error(`Adaptor ${name} not found in the adaptors monorepo`);
54+
}
55+
}
4056

4157
log.info(`Mapped adaptor ${name} to monorepo: ${abspath}`);
4258
return `${name}=${abspath}`;
@@ -48,11 +64,11 @@ export type MapAdaptorsToMonorepoOptions = Pick<
4864
>;
4965

5066
const mapAdaptorsToMonorepo = (
51-
monorepoPath: string = '',
67+
monorepoPath: string[] = [],
5268
input: string[] | ExecutionPlan = [],
5369
log: Logger
5470
): string[] | ExecutionPlan => {
55-
if (monorepoPath) {
71+
if (monorepoPath.length) {
5672
if (Array.isArray(input)) {
5773
const adaptors = input as string[];
5874
return adaptors.map((a) => updatePath(a, monorepoPath, log));

packages/cli/test/options/ensure/useAdaptorsMonorepo.test.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import path from 'node:path';
12
import test from 'ava';
23
import { useAdaptorsMonorepo, Opts } from '../../../src/options';
34

@@ -41,7 +42,19 @@ test('monorepoPath is set with a value from OPENFN_ADAPTORS_REPO', (t) => {
4142

4243
useAdaptorsMonorepo.ensure!(opts);
4344

44-
t.is(opts.monorepoPath, 'a/b/c');
45+
t.deepEqual(opts.monorepoPath, [path.resolve('a/b/c')]);
46+
delete process.env.OPENFN_ADAPTORS_REPO;
47+
});
48+
49+
test('monorepoPath is set with multiple comma-separated paths', (t) => {
50+
process.env.OPENFN_ADAPTORS_REPO = 'a/b/c, d/e/f';
51+
const opts = {
52+
useAdaptorsMonorepo: true,
53+
} as Opts;
54+
55+
useAdaptorsMonorepo.ensure!(opts);
56+
57+
t.deepEqual(opts.monorepoPath, [path.resolve('a/b/c'), path.resolve('d/e/f')]);
4558
delete process.env.OPENFN_ADAPTORS_REPO;
4659
});
4760

@@ -54,5 +67,5 @@ test('monorepoPath is set to an error value if OPENFN_ADAPTORS_REPO is not set',
5467

5568
useAdaptorsMonorepo.ensure!(opts);
5669

57-
t.is(opts.monorepoPath, 'ERR');
70+
t.deepEqual(opts.monorepoPath, ['ERR']);
5871
});

packages/cli/test/util/load-plan.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ test.serial('xplan: map to monorepo', async (t) => {
331331
workflowPath: 'test/wf.json',
332332
expandAdaptors: true,
333333
plan: {},
334-
monorepoPath: '/repo/',
334+
monorepoPath: ['/repo/'],
335335
} as Partial<Opts>;
336336

337337
const plan = createPlan([
@@ -344,6 +344,7 @@ test.serial('xplan: map to monorepo', async (t) => {
344344

345345
mock({
346346
'test/wf.json': JSON.stringify(plan),
347+
'/repo/packages/common': {},
347348
});
348349

349350
const result = await loadPlan(opts as Opts, logger);

packages/cli/test/util/map-adaptors-to-monorepo.test.ts

Lines changed: 111 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,77 +9,166 @@ import mapAdaptorsToMonorepo, {
99
} from '../../src/util/map-adaptors-to-monorepo';
1010
import { ExecutionPlan } from '@openfn/runtime';
1111

12-
const REPO_PATH = 'a/b/c';
13-
const ABS_REPO_PATH = path.resolve(REPO_PATH);
12+
// Paths are resolved to absolute in the option's ensure block, so the util
13+
// always receives absolute roots
14+
const REPO_PATH = path.resolve('a/b/c');
15+
const REPO_PATH_2 = path.resolve('d/e/f');
1416

1517
const logger = createMockLogger();
1618

1719
test.afterEach(() => {
1820
logger._reset();
21+
mock.restore();
1922
});
2023

21-
test('updatePath: common', (t) => {
22-
const result = updatePath('common', REPO_PATH, logger);
24+
test.serial('updatePath: common', (t) => {
25+
mock({
26+
[`${REPO_PATH}/packages/common`]: {},
27+
});
28+
29+
const result = updatePath('common', [REPO_PATH], logger);
2330

24-
t.is(result, `common=${ABS_REPO_PATH}/packages/common`);
31+
t.is(result, `common=${REPO_PATH}/packages/common`);
2532
});
2633

27-
test('updatePath: @openfn/language-common', (t) => {
28-
const result = updatePath('@openfn/language-common', REPO_PATH, logger);
34+
test.serial('updatePath: @openfn/language-common', (t) => {
35+
mock({
36+
[`${REPO_PATH}/packages/common`]: {},
37+
});
38+
39+
const result = updatePath('@openfn/language-common', [REPO_PATH], logger);
2940

30-
t.is(result, `@openfn/language-common=${ABS_REPO_PATH}/packages/common`);
41+
t.is(result, `@openfn/language-common=${REPO_PATH}/packages/common`);
3142
});
3243

33-
test('updatePath: common@1.2.3 (with warning)', (t) => {
34-
const result = updatePath('common@1.2.3', REPO_PATH, logger);
44+
test.serial('updatePath: common@1.2.3 (with warning)', (t) => {
45+
mock({
46+
[`${REPO_PATH}/packages/common`]: {},
47+
});
3548

36-
t.is(result, `common=${ABS_REPO_PATH}/packages/common`);
49+
const result = updatePath('common@1.2.3', [REPO_PATH], logger);
50+
51+
t.is(result, `common=${REPO_PATH}/packages/common`);
3752

3853
const { level, message } = logger._parse(logger._last);
3954
t.is(level, 'warn');
4055
t.regex(message as string, /ignoring version specifier/i);
4156
});
4257

4358
test('updatePath: common=x/y/z', (t) => {
44-
const result = updatePath('common=x/y/z', REPO_PATH, logger);
59+
const result = updatePath('common=x/y/z', [REPO_PATH], logger);
4560

4661
t.is(result, `common=x/y/z`);
4762
});
4863

64+
test.serial('updatePath: prefer the root which has the adaptor', (t) => {
65+
mock({
66+
[`${REPO_PATH_2}/packages/common`]: {},
67+
});
68+
69+
// common only exists in the second root, so that path should be used
70+
const result = updatePath('common', [REPO_PATH, REPO_PATH_2], logger);
71+
72+
t.is(result, `common=${REPO_PATH_2}/packages/common`);
73+
});
74+
75+
test.serial('updatePath: earlier root wins when both have the adaptor', (t) => {
76+
mock({
77+
[`${REPO_PATH}/packages/common`]: {},
78+
[`${REPO_PATH_2}/packages/common`]: {},
79+
});
80+
81+
const result = updatePath('common', [REPO_PATH, REPO_PATH_2], logger);
82+
83+
t.is(result, `common=${REPO_PATH}/packages/common`);
84+
});
85+
86+
test.serial('updatePath: throw if not found in the single root', (t) => {
87+
mock({
88+
[`${REPO_PATH}/packages`]: {},
89+
});
90+
91+
t.throws(() => updatePath('common', [REPO_PATH], logger), {
92+
message: /not found in the adaptors monorepo/,
93+
});
94+
});
95+
96+
test.serial('updatePath: throw if not found in any root', (t) => {
97+
mock({
98+
[`${REPO_PATH}/packages`]: {},
99+
[`${REPO_PATH_2}/packages`]: {},
100+
});
101+
102+
t.throws(() => updatePath('common', [REPO_PATH, REPO_PATH_2], logger), {
103+
message: /not found in any provided adaptors monorepo/,
104+
});
105+
});
106+
49107
// TODO can't test this in ava, have to use an integration test
50108
test.skip('validate monorepo: log and exit early if repo not found', async (t) => {
51109
mock({
52110
a: {},
53111
});
54112

55-
await t.throwsAsync(async () => validateMonoRepo(REPO_PATH, logger), {
113+
await t.throwsAsync(async () => validateMonoRepo([REPO_PATH], logger), {
56114
message: 'Monorepo not found',
57115
});
58116
const { level, message } = logger._parse(logger._last);
59117
t.is(level, 'error');
60118
t.is(message, `ERROR: Monorepo not found at ${REPO_PATH}`);
61119
});
62120

63-
test('validate monorepo: all OK', async (t) => {
121+
test.serial('validate monorepo: all OK', async (t) => {
64122
mock({
65-
[`${REPO_PATH}/package.json`]: '{ "name": "adaptors" }',
123+
[`${REPO_PATH}/packages`]: {},
66124
});
67125

68-
await t.notThrowsAsync(async () => validateMonoRepo(REPO_PATH, logger));
126+
await t.notThrowsAsync(async () => validateMonoRepo([REPO_PATH], logger));
127+
});
128+
129+
test.serial('validate monorepo: all OK with multiple paths', async (t) => {
130+
mock({
131+
[`${REPO_PATH}/packages`]: {},
132+
[`${REPO_PATH_2}/packages`]: {},
133+
});
134+
135+
await t.notThrowsAsync(async () =>
136+
validateMonoRepo([REPO_PATH, REPO_PATH_2], logger)
137+
);
69138
});
70139

71140
test.serial('mapAdaptorsToMonorepo: map adaptors', async (t) => {
72141
mock({
73-
[`${REPO_PATH}/package.json`]: '{ "name": "adaptors" }',
142+
[`${REPO_PATH}/packages/common`]: {},
74143
});
75144

76-
const result = await mapAdaptorsToMonorepo(REPO_PATH, ['common'], logger);
77-
t.deepEqual(result, [`common=${ABS_REPO_PATH}/packages/common`]);
145+
const result = await mapAdaptorsToMonorepo([REPO_PATH], ['common'], logger);
146+
t.deepEqual(result, [`common=${REPO_PATH}/packages/common`]);
78147
});
79148

149+
test.serial(
150+
'mapAdaptorsToMonorepo: map adaptors across multiple roots',
151+
async (t) => {
152+
mock({
153+
[`${REPO_PATH}/packages/http`]: {},
154+
[`${REPO_PATH_2}/packages/common`]: {},
155+
});
156+
157+
const result = await mapAdaptorsToMonorepo(
158+
[REPO_PATH, REPO_PATH_2],
159+
['http', 'common'],
160+
logger
161+
);
162+
t.deepEqual(result, [
163+
`http=${REPO_PATH}/packages/http`,
164+
`common=${REPO_PATH_2}/packages/common`,
165+
]);
166+
}
167+
);
168+
80169
test.serial('mapAdaptorsToMonorepo: map workflow', async (t) => {
81170
mock({
82-
[`${REPO_PATH}/package.json`]: '{ "name": "adaptors" }',
171+
[`${REPO_PATH}/packages/common`]: {},
83172
});
84173

85174
const plan: ExecutionPlan = {
@@ -94,12 +183,12 @@ test.serial('mapAdaptorsToMonorepo: map workflow', async (t) => {
94183
options: {},
95184
};
96185

97-
await mapAdaptorsToMonorepo(REPO_PATH, plan, logger);
186+
await mapAdaptorsToMonorepo([REPO_PATH], plan, logger);
98187
t.deepEqual(plan.workflow, {
99188
steps: [
100189
{
101190
expression: '.',
102-
adaptors: [`common=${ABS_REPO_PATH}/packages/common`],
191+
adaptors: [`common=${REPO_PATH}/packages/common`],
103192
},
104193
],
105194
});

0 commit comments

Comments
 (0)