Skip to content

Commit 831b8ac

Browse files
taylordowns2000josephjclark
authored andcommitted
wip
1 parent 28ebecc commit 831b8ac

9 files changed

Lines changed: 115 additions & 66 deletions

File tree

packages/lexicon/lightning.d.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ export type RunStartReply = {}; // no payload
169169

170170
export type RunCompletePayload = ExitReason & {
171171
timestamp: TimeInMicroSeconds;
172-
final_state?: any; // The aggregated final state from the workflow (handles branching)
172+
final_dataclip_id?: string; // Single-leaf: reuse the step's output dataclip
173+
final_state?: any; // Multi-leaf: aggregated final state keyed by step id
173174
};
174175
export type RunCompleteReply = undefined;
175176

packages/ws-worker/src/events/run-complete.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,28 @@ export default async function onWorkflowComplete(
1414
) {
1515
const { state, onFinish, logger } = context;
1616

17-
// Use the aggregated final state from the runtime
18-
// This handles branching workflows correctly by returning all leaf states
1917
const result = event.state;
2018

2119
const reason = calculateRunExitReason(state);
2220
await logFinalReason(context, reason);
2321

22+
const isSingleLeaf =
23+
state.leafDataclipIds.length === 1 &&
24+
!state.withheldDataclips[state.leafDataclipIds[0]];
25+
26+
const payload: RunCompletePayload = {
27+
timestamp: timeInMicroseconds(event.time),
28+
...reason,
29+
};
30+
31+
if (isSingleLeaf) {
32+
payload.final_dataclip_id = state.leafDataclipIds[0];
33+
} else {
34+
payload.final_state = result;
35+
}
36+
2437
try {
25-
await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, {
26-
final_state: result,
27-
timestamp: timeInMicroseconds(event.time),
28-
...reason,
29-
});
38+
await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, payload);
3039
} catch (e) {
3140
logger.error(
3241
`${state.plan.id} failed to send ${RUN_COMPLETE} event. This run will be lost!`

packages/ws-worker/src/events/step-complete.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,10 @@ export default async function onStepComplete(
3232
delete state.activeStep;
3333
delete state.activeJob;
3434

35-
// TODO right now, the last job to run will be the result for the run
36-
// this may not stand up in the future
37-
// I'd feel happer if the runtime could judge what the final result is
38-
// (taking into account branches and stuff)
39-
// The problem is that the runtime will return the object, not an id,
40-
// so we have a bit of a mapping problem
41-
state.lastDataclipId = dataclipId;
35+
// Track leaf dataclips (steps with no downstream jobs)
36+
if (!event.next?.length) {
37+
state.leafDataclipIds.push(dataclipId);
38+
}
4239

4340
// Set the input dataclip id for downstream jobs
4441
event.next?.forEach((nextJobId) => {

packages/ws-worker/src/types.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ export type RunState = {
1818
withheldDataclips: Record<string, true>;
1919
reasons: Record<string, ExitReason>;
2020

21-
// final dataclip id
22-
lastDataclipId?: string;
21+
// dataclip ids for leaf nodes (steps with no downstream)
22+
leafDataclipIds: string[];
2323
};
2424

2525
export type CancelablePromise = Promise<void> & {

packages/ws-worker/src/util/create-run-state.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { RunState } from '../types';
33

44
export default (plan: ExecutionPlan, input?: Lazy<State>): RunState => {
55
const state = {
6-
lastDataclipId: '',
6+
leafDataclipIds: [],
77
dataclips: {},
88
inputDataclips: {},
99
withheldDataclips: {},

packages/ws-worker/test/events/run-complete.test.ts

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,43 @@ import { RUN_COMPLETE, RUN_LOG } from '../../src/events';
88
import { createRunState } from '../../src/util';
99
import { createPlan } from '../util';
1010

11-
test('should send a run:complete event', async (t) => {
11+
test('should send a run:complete event with final_dataclip_id for single leaf', async (t) => {
1212
const result = { answer: 42 };
1313
const plan = createPlan();
1414

1515
const state = createRunState(plan);
16+
state.leafDataclipIds = ['clip-1'];
17+
18+
const channel = mockChannel({
19+
[RUN_LOG]: () => true,
20+
[RUN_COMPLETE]: (evt) => {
21+
t.is(evt.final_dataclip_id, 'clip-1');
22+
t.falsy(evt.final_state);
23+
t.falsy(evt.time);
24+
},
25+
});
26+
27+
const event: any = { state: result };
28+
29+
const context: any = { channel, state, onFinish: () => {} };
30+
await handleRunComplete(context, event);
31+
});
32+
33+
test('should send final_state when there are multiple leaves', async (t) => {
34+
const result = {
35+
'job-a': { data: { a: true } },
36+
'job-b': { data: { b: true } },
37+
};
38+
const plan = createPlan();
39+
40+
const state = createRunState(plan);
41+
state.leafDataclipIds = ['clip-1', 'clip-2'];
1642

1743
const channel = mockChannel({
1844
[RUN_LOG]: () => true,
1945
[RUN_COMPLETE]: (evt) => {
2046
t.deepEqual(evt.final_state, result);
21-
t.falsy(evt.time); // if no timestamp in the engine event, no timestamp in the worker one
47+
t.falsy(evt.final_dataclip_id);
2248
},
2349
});
2450

@@ -240,9 +266,11 @@ test('should call onFinish even if the lightning event timesout', async (t) => {
240266
await handleRunComplete(context, event);
241267
});
242268

243-
test('should send final_state for a linear workflow', async (t) => {
269+
test('should send final_dataclip_id for a single-leaf workflow', async (t) => {
244270
const plan = createPlan();
245271
const state = createRunState(plan);
272+
state.leafDataclipIds = ['abc-123'];
273+
246274
const finalResult = { data: { count: 100 }, references: [] };
247275

248276
let completeEvent: any;
@@ -264,15 +292,16 @@ test('should send final_state for a linear workflow', async (t) => {
264292

265293
await handleRunComplete(context, event);
266294

267-
t.deepEqual(completeEvent.final_state, finalResult);
295+
t.is(completeEvent.final_dataclip_id, 'abc-123');
296+
t.falsy(completeEvent.final_state);
268297
t.is(completeEvent.reason, 'success');
269298
});
270299

271300
test('should send final_state for a branching workflow with multiple leaf nodes', async (t) => {
272301
const plan = createPlan();
273302
const state = createRunState(plan);
303+
state.leafDataclipIds = ['clip-1', 'clip-2', 'clip-3'];
274304

275-
// Simulate a branching workflow with multiple final states
276305
const branchedResult = {
277306
'job-1': { data: { path: 'A', value: 42 } },
278307
'job-2': { data: { path: 'B', value: 84 } },
@@ -292,7 +321,6 @@ test('should send final_state for a branching workflow with multiple leaf nodes'
292321
channel,
293322
state,
294323
onFinish: ({ state: finalState }: any) => {
295-
// Verify that onFinish receives the branched result
296324
t.deepEqual(finalState, branchedResult);
297325
},
298326
};
@@ -301,33 +329,18 @@ test('should send final_state for a branching workflow with multiple leaf nodes'
301329

302330
await handleRunComplete(context, event);
303331

304-
// Verify the event contains the full branched state structure
305332
t.deepEqual(completeEvent.final_state, branchedResult);
333+
t.falsy(completeEvent.final_dataclip_id);
306334
t.is(completeEvent.reason, 'success');
307-
t.truthy(completeEvent.final_state['job-1']);
308-
t.truthy(completeEvent.final_state['job-2']);
309-
t.truthy(completeEvent.final_state['job-3']);
310335
});
311336

312-
test('should properly serialize final_state as JSON', async (t) => {
337+
test('should send final_state when single leaf dataclip was withheld', async (t) => {
313338
const plan = createPlan();
314339
const state = createRunState(plan);
340+
state.leafDataclipIds = ['clip-1'];
341+
state.withheldDataclips = { 'clip-1': true };
315342

316-
// Test with complex state including nested objects, arrays, and special values
317-
const complexState = {
318-
data: {
319-
users: [
320-
{ id: 1, name: 'Alice' },
321-
{ id: 2, name: 'Bob' },
322-
],
323-
metadata: {
324-
timestamp: new Date('2024-01-01').toISOString(),
325-
nested: { deeply: { value: 42 } },
326-
},
327-
},
328-
configuration: { setting: true },
329-
references: [],
330-
};
343+
const result = { data: { big: 'data' } };
331344

332345
let completeEvent: any;
333346

@@ -344,28 +357,23 @@ test('should properly serialize final_state as JSON', async (t) => {
344357
onFinish: () => {},
345358
};
346359

347-
const event: any = { state: complexState };
360+
const event: any = { state: result };
348361

349362
await handleRunComplete(context, event);
350363

351-
// Verify the state is properly preserved
352-
t.deepEqual(completeEvent.final_state, complexState);
353-
t.deepEqual(completeEvent.final_state.data.users[0], { id: 1, name: 'Alice' });
354-
t.is(completeEvent.final_state.data.metadata.nested.deeply.value, 42);
355-
356-
// Verify it can be stringified (simulating what happens when sent over the wire)
357-
const jsonString = JSON.stringify(completeEvent.final_state);
358-
const parsed = JSON.parse(jsonString);
359-
t.deepEqual(parsed, complexState);
364+
t.deepEqual(completeEvent.final_state, result);
365+
t.falsy(completeEvent.final_dataclip_id);
360366
});
361367

362-
test('should handle Uint8Array in final_state', async (t) => {
368+
test('should send final_state when a single leaf node is reached by two paths', async (t) => {
363369
const plan = createPlan();
364370
const state = createRunState(plan);
371+
// Same node executed twice via different paths produces two leaf dataclips
372+
state.leafDataclipIds = ['clip-1', 'clip-2'];
365373

366-
// Test with Uint8Array which needs special handling
367-
const stateWithBinary = {
368-
data: { buffer: new Uint8Array([1, 2, 3, 4, 5]) },
374+
const result = {
375+
x: { data: { from: 'a' } },
376+
'x-1': { data: { from: 'b' } },
369377
};
370378

371379
let completeEvent: any;
@@ -383,10 +391,10 @@ test('should handle Uint8Array in final_state', async (t) => {
383391
onFinish: () => {},
384392
};
385393

386-
const event: any = { state: stateWithBinary };
394+
const event: any = { state: result };
387395

388396
await handleRunComplete(context, event);
389397

390-
// Verify the Uint8Array is preserved in the event
391-
t.deepEqual(completeEvent.final_state.data.buffer, new Uint8Array([1, 2, 3, 4, 5]));
398+
t.deepEqual(completeEvent.final_state, result);
399+
t.falsy(completeEvent.final_dataclip_id);
392400
});

packages/ws-worker/test/events/run-error.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ test('runError should trigger runComplete with a reason', async (t) => {
1111
const jobId = 'job-1';
1212

1313
const state = createRunState(plan);
14-
state.lastDataclipId = 'x';
14+
1515
state.activeStep = 'b';
1616
state.activeJob = jobId;
1717

@@ -40,7 +40,7 @@ test('workflow error should send reason to onFinish', async (t) => {
4040
const jobId = 'job-1';
4141

4242
const state = createRunState(plan);
43-
state.lastDataclipId = 'x';
43+
4444
state.activeStep = 'b';
4545
state.activeJob = jobId;
4646

@@ -72,7 +72,7 @@ test('workflow error should send reason to onFinish', async (t) => {
7272

7373
test('runError should not call job complete if the job is not active', async (t) => {
7474
const state = createRunState(plan);
75-
state.lastDataclipId = 'x';
75+
7676

7777
const channel = mockChannel({
7878
[RUN_LOG]: () => true,
@@ -113,7 +113,7 @@ test('runError should log the reason', async (t) => {
113113
},
114114
options: {},
115115
});
116-
state.lastDataclipId = 'x';
116+
117117
state.activeStep = 'b';
118118
state.activeJob = jobId;
119119

packages/ws-worker/test/events/step-complete.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,37 @@ test('should include a timestamp', async (t) => {
283283
const context: any = { channel, state, onFinish: () => {} };
284284
await handleStepComplete(context, event);
285285
});
286+
287+
test('track leaf dataclip when step has no downstream jobs', async (t) => {
288+
const plan = createPlan();
289+
290+
const state = createRunState(plan);
291+
state.activeJob = 'job-1';
292+
state.activeStep = 'b';
293+
294+
const channel = mockChannel({
295+
[STEP_COMPLETE]: () => true,
296+
});
297+
298+
const event = { state: { x: 10 }, next: [] } as any;
299+
await handleStepComplete({ channel, state } as any, event);
300+
301+
t.is(state.leafDataclipIds.length, 1);
302+
});
303+
304+
test('do not track leaf dataclip when step has downstream jobs', async (t) => {
305+
const plan = createPlan();
306+
307+
const state = createRunState(plan);
308+
state.activeJob = 'job-1';
309+
state.activeStep = 'b';
310+
311+
const channel = mockChannel({
312+
[STEP_COMPLETE]: () => true,
313+
});
314+
315+
const event = { state: { x: 10 }, next: ['job-2'] } as any;
316+
await handleStepComplete({ channel, state } as any, event);
317+
318+
t.is(state.leafDataclipIds.length, 0);
319+
});

packages/ws-worker/test/util/create-run-state.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ test('create run', (t) => {
1818
const run = createRunState(plan, input);
1919

2020
t.deepEqual(run.plan, plan);
21-
t.deepEqual(run.lastDataclipId, '');
21+
t.deepEqual(run.leafDataclipIds, []);
2222
t.deepEqual(run.dataclips, {});
2323
t.deepEqual(run.inputDataclips, {});
2424
t.deepEqual(run.reasons, {});

0 commit comments

Comments
 (0)