Skip to content

Commit 84022c9

Browse files
committed
refactor: align execution result summary contract
1 parent 9389f3e commit 84022c9

10 files changed

Lines changed: 181 additions & 79 deletions

File tree

agent-service/src/agent/tools/result-formatting.spec.ts

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@
1919

2020
import { describe, expect, test } from "bun:test";
2121
import { formatOperatorResult } from "./result-formatting";
22-
import type { OperatorExecutionSummary, OperatorState, SampleRow } from "../../types/execution";
22+
import {
23+
ConsoleMessageType,
24+
OperatorState,
25+
OperatorResultMode,
26+
WorkflowFatalErrorType,
27+
type OperatorExecutionSummary,
28+
type WorkflowFatalError,
29+
type SampleRow,
30+
} from "../../types/execution";
2331

2432
// Convert flat test rows (with an optional embedded __row_index__) into the
2533
// structured SampleRow[] the summary now carries.
@@ -36,29 +44,40 @@ interface OpInfoOverrides {
3644
state?: OperatorState;
3745
error?: string;
3846
outputTuples?: number;
39-
totalRowCount?: number;
47+
tuplesCount?: number;
4048
warnings?: string[];
4149
result?: Record<string, any>[];
4250
}
4351

52+
function makeExecutionFailure(message: string): WorkflowFatalError {
53+
return {
54+
type: { name: WorkflowFatalErrorType.EXECUTION_FAILURE },
55+
timestamp: { seconds: 0, nanos: 0 },
56+
message,
57+
details: "",
58+
operatorId: "",
59+
workerId: "",
60+
};
61+
}
62+
4463
function makeOpInfo(overrides: OpInfoOverrides = {}): OperatorExecutionSummary {
4564
const summary: OperatorExecutionSummary = {
46-
state: overrides.state ?? "Completed",
47-
errorMessages: overrides.error ? [{ type: "EXECUTION_FAILURE", message: overrides.error }] : [],
65+
state: overrides.state ?? OperatorState.COMPLETED,
66+
errorMessages: overrides.error ? [makeExecutionFailure(overrides.error)] : [],
4867
};
4968
// The result summary is present only when the operator produced a result.
5069
if (overrides.result !== undefined) {
5170
summary.resultSummary = {
52-
resultMode: "table",
71+
resultMode: OperatorResultMode.TABLE,
5372
// Non-arrays are passed through to exercise the "(no result data)" guard.
5473
sampleTuples: Array.isArray(overrides.result) ? toSampleRows(overrides.result) : (overrides.result as any),
55-
totalRowCount: overrides.totalRowCount ?? overrides.outputTuples ?? 0,
74+
tuplesCount: overrides.tuplesCount ?? overrides.outputTuples ?? 0,
5675
};
5776
}
5877
if (overrides.warnings) {
5978
// Warnings are derived from console messages whose title is "WARNING: ...".
6079
summary.consoleLogsSummary = {
61-
messages: overrides.warnings.map(w => ({ msgType: "PRINT", title: w, message: "" })),
80+
messages: overrides.warnings.map(w => ({ msgType: ConsoleMessageType.PRINT, title: w, message: "" })),
6281
};
6382
}
6483
return summary;
@@ -93,15 +112,15 @@ describe("formatOperatorResult - early returns", () => {
93112
});
94113

95114
describe("formatOperatorResult - table shape and metadata", () => {
96-
test("uses outputTuples for row count when totalRowCount missing", () => {
115+
test("uses outputTuples for row count when tuplesCount missing", () => {
97116
const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] }));
98117
expect(out).toContain("Output table shape: (7, 2)");
99118
});
100119

101-
test("totalRowCount overrides outputTuples in output shape", () => {
120+
test("tuplesCount overrides outputTuples in output shape", () => {
102121
const out = formatOperatorResult(
103122
"op1",
104-
makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] })
123+
makeOpInfo({ outputTuples: 7, tuplesCount: 999, result: [{ a: 1, b: 2 }] })
105124
);
106125
expect(out).toContain("Output table shape: (999, 2)");
107126
});

agent-service/src/agent/tools/result-formatting.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export function formatOperatorResult(operatorId: string, opInfo: OperatorExecuti
5454

5555
// Output shape only; input-port shapes are derivable by the agent from the DAG
5656
// links plus each upstream operator's own output shape shown in context.
57-
const outputRows = opInfo.resultSummary?.totalRowCount ?? 0;
57+
const outputRows = opInfo.resultSummary?.tuplesCount ?? 0;
5858
const metadataLines = [`Output table shape: (${outputRows}, ${columns})`, ...getOperatorWarnings(opInfo)].filter(
5959
Boolean
6060
);

agent-service/src/agent/tools/workflow-execution-tools.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,15 @@ import type { WorkflowState } from "../workflow-state";
2929
import { getBackendConfig } from "../../api/backend-api";
3030
import { env } from "../../config/env";
3131
import type { LogicalPlan, LogicalLink } from "../../api/execution-api";
32-
import type { OperatorExecutionSummary, SampleRow, WorkflowExecutionSummary } from "../../types/execution";
32+
import {
33+
OperatorState,
34+
WorkflowFatalErrorType,
35+
WorkflowExecutionState,
36+
type OperatorExecutionSummary,
37+
type SampleRow,
38+
type WorkflowFatalError,
39+
type WorkflowExecutionSummary,
40+
} from "../../types/execution";
3341
import { WorkflowSystemMetadata } from "../util/workflow-system-metadata";
3442
import { DEFAULT_AGENT_SETTINGS } from "../../types/agent";
3543
import { createLogger } from "../../logger";
@@ -325,7 +333,7 @@ async function executeWorkflowHttp(
325333
log.error({ err: error }, "execution failed");
326334
return {
327335
success: false,
328-
state: "Error",
336+
state: WorkflowExecutionState.ERROR,
329337
operators: {},
330338
errors: [error instanceof Error ? error.message : "Unknown error"],
331339
};
@@ -355,6 +363,18 @@ function formatExecutionError(
355363
return lines.join("\n");
356364
}
357365

366+
function makeExecutionFailure(message: string, operatorId: string): WorkflowFatalError {
367+
const now = Date.now();
368+
return {
369+
type: { name: WorkflowFatalErrorType.EXECUTION_FAILURE },
370+
timestamp: { seconds: Math.floor(now / 1000), nanos: (now % 1000) * 1_000_000 },
371+
message,
372+
details: "",
373+
operatorId,
374+
workerId: "",
375+
};
376+
}
377+
358378
function jsonToTableFormat(rows: SampleRow[]): string {
359379
if (!rows || rows.length === 0) return "";
360380

@@ -430,20 +450,21 @@ export async function executeOperatorAndFormat(
430450

431451
if (!result.success) {
432452
const operatorErrors =
433-
result.state === "Failed"
453+
result.state === WorkflowExecutionState.FAILED
434454
? Object.entries(result.operators)
435455
.filter(([_, op]) => op.errorMessages.length)
436456
.map(([opId, op]) => ({ operatorId: opId, error: op.errorMessages.map(e => e.message).join("; ") }))
437457
: undefined;
438458

439-
const generalErrors = result.state === "Killed" ? ["Workflow execution was killed (timeout)."] : result.errors;
459+
const generalErrors =
460+
result.state === WorkflowExecutionState.KILLED ? ["Workflow execution was killed (timeout)."] : result.errors;
440461

441462
const errorText = formatExecutionError(operatorErrors, generalErrors);
442463

443464
if (options.onResult) {
444465
const errorInfo: OperatorExecutionSummary = {
445-
state: "Failed",
446-
errorMessages: [{ type: "EXECUTION_FAILURE", message: errorText, operatorId }],
466+
state: OperatorState.FAILED,
467+
errorMessages: [makeExecutionFailure(errorText, operatorId)],
447468
};
448469
options.onResult(operatorId, errorInfo);
449470
}
@@ -519,7 +540,7 @@ export async function executeOperatorAndFormat(
519540

520541
// Output shape only; the agent derives input-port shapes from the DAG + the
521542
// upstream operators' own output shapes shown in context.
522-
const shapeLine = `Output table shape: (${opInfo.resultSummary?.totalRowCount ?? 0}, ${columns})`;
543+
const shapeLine = `Output table shape: (${opInfo.resultSummary?.tuplesCount ?? 0}, ${columns})`;
523544

524545
const warningLines = getOperatorWarnings(opInfo);
525546

agent-service/src/agent/workflow-result-state.spec.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import { describe, expect, test } from "bun:test";
2121
import { WorkflowResultState } from "./workflow-result-state";
22-
import type { OperatorExecutionSummary } from "../types/execution";
22+
import { OperatorResultMode, OperatorState, type OperatorExecutionSummary } from "../types/execution";
2323

24-
function makeInfo(totalRowCount: number): OperatorExecutionSummary {
24+
function makeInfo(tuplesCount: number): OperatorExecutionSummary {
2525
return {
26-
state: "Completed",
26+
state: OperatorState.COMPLETED,
2727
errorMessages: [],
28-
resultSummary: { resultMode: "table", sampleTuples: [], totalRowCount },
28+
resultSummary: { resultMode: OperatorResultMode.TABLE, sampleTuples: [], tuplesCount },
2929
};
3030
}
3131

@@ -39,15 +39,15 @@ describe("WorkflowResultState - ancestor walk", () => {
3939
state.set("op1", "step-C", makeInfo(3));
4040

4141
path = ["step-A", "step-B", "step-C"];
42-
expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(3);
42+
expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(3);
4343

4444
// Rewind to step-B; step-C is no longer an ancestor.
4545
path = ["step-A", "step-B"];
46-
expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(2);
46+
expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(2);
4747

4848
// Rewind further.
4949
path = ["step-A"];
50-
expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1);
50+
expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(1);
5151
});
5252

5353
test("returns undefined when no ancestor has a result", () => {
@@ -73,8 +73,8 @@ describe("WorkflowResultState - ancestor walk", () => {
7373
path = ["step-A", "step-B"];
7474
const visible = state.getAllVisible();
7575
expect(visible.size).toBe(2);
76-
expect(visible.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1);
77-
expect(visible.get("op2")?.operatorInfo.resultSummary?.totalRowCount).toBe(7);
76+
expect(visible.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(1);
77+
expect(visible.get("op2")?.operatorInfo.resultSummary?.tuplesCount).toBe(7);
7878
});
7979

8080
test("clear drops all stored results", () => {
@@ -89,6 +89,6 @@ describe("WorkflowResultState - ancestor walk", () => {
8989
const state = new WorkflowResultState(() => ["step-A"]);
9090
state.set("op1", "step-A", makeInfo(1));
9191
state.set("op1", "step-A", makeInfo(42));
92-
expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(42);
92+
expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(42);
9393
});
9494
});

agent-service/src/server.spec.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { beforeEach, describe, expect, spyOn, test } from "bun:test";
2121
import { buildApp, start, _resetAgentStoreForTests, _getAgentForTests } from "./server";
2222
import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata";
2323
import { env } from "./config/env";
24+
import { OperatorResultMode, OperatorState } from "./types/execution";
2425

2526
const API = env.API_PREFIX;
2627
const app = buildApp();
@@ -331,12 +332,12 @@ describe("agent read routes", () => {
331332
"op-1",
332333
{
333334
operatorInfo: {
334-
state: "Completed",
335+
state: OperatorState.COMPLETED,
335336
errorMessages: [],
336337
resultSummary: {
337-
resultMode: "table",
338+
resultMode: OperatorResultMode.TABLE,
338339
sampleTuples: [{ rowIndex: 0, tuple: { a: 1 } }],
339-
totalRowCount: 2,
340+
tuplesCount: 2,
340341
},
341342
},
342343
},
@@ -345,10 +346,10 @@ describe("agent read routes", () => {
345346
});
346347

347348
const body = await readJson<{
348-
results: Record<string, { state: string; resultSummary: { totalRowCount: number; sampleTuples: unknown[] } }>;
349+
results: Record<string, { state: string; resultSummary: { tuplesCount: number; sampleTuples: unknown[] } }>;
349350
}>(await getJson(`${API}/agents/${id}/operator-results`));
350351
expect(body.results["op-1"].state).toBe("Completed");
351-
expect(body.results["op-1"].resultSummary.totalRowCount).toBe(2);
352+
expect(body.results["op-1"].resultSummary.tuplesCount).toBe(2);
352353
expect(body.results["op-1"].resultSummary.sampleTuples).toEqual([{ rowIndex: 0, tuple: { a: 1 } }]);
353354
});
354355
});

agent-service/src/types/execution.ts

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,70 @@
1717
* under the License.
1818
*/
1919

20+
export enum WorkflowFatalErrorType {
21+
COMPILATION_ERROR = "COMPILATION_ERROR",
22+
EXECUTION_FAILURE = "EXECUTION_FAILURE",
23+
}
24+
2025
// A fatal error reported for one operator. Reuses the engine's wire shape
21-
// (workflowruntimestate.proto): `type` is the FatalErrorType enum name. The same
22-
// type the workflow-compiling service returns for compilation errors, so compile
23-
// and execution errors share one shape. Re-exported by api/compile-api.ts.
26+
// (workflowruntimestate.proto). The same type the workflow-compiling service
27+
// returns for compilation errors, so compile and execution errors share one
28+
// shape. Re-exported by api/compile-api.ts.
2429
export interface WorkflowFatalError {
25-
// FatalErrorType enum name, e.g. "COMPILATION_ERROR" | "EXECUTION_FAILURE".
26-
type: string;
30+
type: { name: WorkflowFatalErrorType };
31+
timestamp: { seconds: number; nanos: number };
2732
message: string;
28-
details?: string;
29-
operatorId?: string;
30-
workerId?: string;
31-
timestamp?: { seconds: number; nanos: number };
33+
details: string;
34+
operatorId: string;
35+
workerId: string;
3236
}
3337

3438
// Lifecycle state of a single operator, as reported by the engine
3539
// (mirrors the backend's WorkflowAggregatedState string mapping).
36-
export type OperatorState =
37-
| "Uninitialized"
38-
| "Ready"
39-
| "Running"
40-
| "Pausing"
41-
| "Paused"
42-
| "Resuming"
43-
| "Completed"
44-
| "Failed"
45-
| "Killed"
46-
| "Terminated"
47-
| "Unknown";
40+
export enum OperatorState {
41+
UNINITIALIZED = "Uninitialized",
42+
READY = "Ready",
43+
RUNNING = "Running",
44+
PAUSING = "Pausing",
45+
PAUSED = "Paused",
46+
RESUMING = "Resuming",
47+
COMPLETED = "Completed",
48+
FAILED = "Failed",
49+
KILLED = "Killed",
50+
TERMINATED = "Terminated",
51+
UNKNOWN = "Unknown",
52+
}
4853

4954
// Aggregated state of a whole workflow execution: the OperatorState values the
5055
// engine reports, plus the synthetic outcomes the sync-execution endpoint adds.
51-
export type WorkflowExecutionState = OperatorState | "Error" | "CompilationFailed";
56+
export enum WorkflowExecutionState {
57+
UNINITIALIZED = "Uninitialized",
58+
READY = "Ready",
59+
RUNNING = "Running",
60+
PAUSING = "Pausing",
61+
PAUSED = "Paused",
62+
RESUMING = "Resuming",
63+
COMPLETED = "Completed",
64+
FAILED = "Failed",
65+
KILLED = "Killed",
66+
TERMINATED = "Terminated",
67+
UNKNOWN = "Unknown",
68+
ERROR = "Error",
69+
COMPILATION_FAILED = "CompilationFailed",
70+
}
71+
72+
export enum ConsoleMessageType {
73+
PRINT = "PRINT",
74+
ERROR = "ERROR",
75+
COMMAND = "COMMAND",
76+
DEBUGGER = "DEBUGGER",
77+
}
5278

5379
// A single console message emitted by an operator during execution.
5480
// `title` is the short header (Scala errors put their text here); `message` is
5581
// the body (Python errors / stack traces).
5682
export interface ConsoleMessage {
57-
msgType: string;
83+
msgType: ConsoleMessageType;
5884
title: string;
5985
message: string;
6086
}
@@ -66,16 +92,20 @@ export interface SampleRow {
6692
tuple: Record<string, unknown>;
6793
}
6894

95+
export enum OperatorResultMode {
96+
TABLE = "table",
97+
VISUALIZATION = "visualization",
98+
}
99+
69100
// An operator's output, summarized for the agent. `sampleTuples` are the
70101
// symmetrically-truncated output rows (the middle is dropped, so `rowIndex`
71102
// values may have gaps). `outputSchema` / per-column statistics are intended
72103
// future additions — the engine does not produce them yet.
73104
export interface OperatorResultSummary {
74-
// "table" or "visualization".
75-
resultMode: string;
105+
resultMode: OperatorResultMode;
76106
sampleTuples: SampleRow[];
77107
// Total output rows before truncation (sampleTuples may hold fewer).
78-
totalRowCount: number;
108+
tuplesCount: number;
79109
}
80110

81111
// An operator's console output. Warnings are not a separate field: they are the

0 commit comments

Comments
 (0)