Skip to content

Commit 07450de

Browse files
committed
fixes and use of task update v2
1 parent 5f97f22 commit 07450de

11 files changed

Lines changed: 698 additions & 47 deletions

File tree

AGENTS.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,32 @@ ORKES_BACKEND_VERSION=5 \
5757
npm run test:integration:orkes-v5
5858
```
5959

60+
## Post-Change Verification (Required)
61+
62+
After every code change, you **must** run the following before considering the work complete:
63+
64+
1. **Unit tests** — all must pass:
65+
```bash
66+
npm test
67+
```
68+
69+
2. **All examples** — each must complete successfully (requires a running Conductor server and `.env` credentials):
70+
```bash
71+
export $(cat .env | xargs)
72+
npx ts-node -P tsconfig.json --transpile-only examples/helloworld.ts
73+
npx ts-node -P tsconfig.json --transpile-only examples/quickstart.ts
74+
npx ts-node -P tsconfig.json --transpile-only examples/kitchensink.ts
75+
npx ts-node -P tsconfig.json --transpile-only examples/dynamic-workflow.ts
76+
npx ts-node -P tsconfig.json --transpile-only examples/task-configure.ts
77+
npx ts-node -P tsconfig.json --transpile-only examples/task-context.ts
78+
npx ts-node -P tsconfig.json --transpile-only examples/worker-configuration.ts
79+
npx ts-node -P tsconfig.json --transpile-only examples/workflow-ops.ts
80+
npx ts-node -P tsconfig.json --transpile-only examples/workers-e2e.ts
81+
npx ts-node -P tsconfig.json --transpile-only examples/perf-test.ts
82+
```
83+
84+
Do not skip any example. If an example fails for reasons unrelated to your change (e.g., server down), note it explicitly.
85+
6086
## Critical Pitfalls
6187

6288
These are real bugs that caused test failures during SDK development. Read before writing any code.

examples/perf-test.ts

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/**
2+
* Performance Test — Measure task processing pipeline latency and throughput
3+
*
4+
* Runs N workflows through a no-op worker and collects timing for every phase:
5+
* - Poll latency (server → SDK)
6+
* - Execute latency (worker function)
7+
* - Update latency (SDK → server, the V2 endpoint)
8+
* - End-to-end per-task latency
9+
* - Throughput (tasks/sec)
10+
*
11+
* Run:
12+
* CONDUCTOR_SERVER_URL=http://localhost:8080 npx ts-node examples/perf-test.ts
13+
* CONDUCTOR_SERVER_URL=http://localhost:8080 PERF_WORKFLOWS=100 npx ts-node examples/perf-test.ts
14+
*/
15+
import {
16+
OrkesClients,
17+
ConductorWorkflow,
18+
TaskHandler,
19+
worker,
20+
simpleTask,
21+
} from "../src/sdk";
22+
import type {
23+
TaskRunnerEventsListener,
24+
PollCompleted,
25+
TaskExecutionStarted,
26+
TaskExecutionCompleted,
27+
TaskUpdateCompleted,
28+
} from "../src/sdk/clients/worker/events";
29+
import type { Task } from "../src/open-api";
30+
31+
// ── Configuration ────────────────────────────────────────────────
32+
const WORKFLOW_COUNT = parseInt(process.env.PERF_WORKFLOWS ?? "20", 10);
33+
const CONCURRENCY = parseInt(process.env.PERF_CONCURRENCY ?? "5", 10);
34+
const BATCH_SIZE = parseInt(process.env.PERF_BATCH_SIZE ?? "5", 10);
35+
36+
// ── No-op worker (near-zero execution time) ──────────────────────
37+
const _perfTask = worker({
38+
taskDefName: "perf_noop_task",
39+
registerTaskDef: true,
40+
concurrency: CONCURRENCY,
41+
})(async (_task: Task) => ({
42+
status: "COMPLETED" as const,
43+
outputData: { ts: Date.now() },
44+
}));
45+
46+
// ── Timing collector ─────────────────────────────────────────────
47+
class PerfCollector implements TaskRunnerEventsListener {
48+
pollDurations: number[] = [];
49+
executeDurations: number[] = [];
50+
updateDurations: number[] = [];
51+
// Track per-task e2e: execution start → update complete
52+
private execStartTimes = new Map<string, number>();
53+
e2eDurations: number[] = [];
54+
errors = 0;
55+
56+
onPollCompleted(event: PollCompleted): void {
57+
this.pollDurations.push(event.durationMs);
58+
}
59+
60+
onTaskExecutionStarted(event: TaskExecutionStarted): void {
61+
this.execStartTimes.set(event.taskId, Date.now());
62+
}
63+
64+
onTaskExecutionCompleted(event: TaskExecutionCompleted): void {
65+
this.executeDurations.push(event.durationMs);
66+
}
67+
68+
onTaskUpdateCompleted(event: TaskUpdateCompleted): void {
69+
this.updateDurations.push(event.durationMs);
70+
const start = this.execStartTimes.get(event.taskId);
71+
if (start !== undefined) {
72+
this.e2eDurations.push(Date.now() - start);
73+
this.execStartTimes.delete(event.taskId);
74+
}
75+
}
76+
}
77+
78+
// ── Helpers ──────────────────────────────────────────────────────
79+
function percentile(sorted: number[], p: number): number {
80+
if (sorted.length === 0) return 0;
81+
const idx = p * (sorted.length - 1);
82+
const lo = Math.floor(idx);
83+
const hi = Math.ceil(idx);
84+
if (lo === hi) return sorted[lo];
85+
return sorted[lo] + (idx - lo) * (sorted[hi] - sorted[lo]);
86+
}
87+
88+
function fmt(n: number): string {
89+
return n.toFixed(1).padStart(8);
90+
}
91+
92+
function printStats(label: string, values: number[]): void {
93+
if (values.length === 0) {
94+
console.log(` ${label.padEnd(22)} (no data)`);
95+
return;
96+
}
97+
const sorted = [...values].sort((a, b) => a - b);
98+
const p50 = percentile(sorted, 0.5);
99+
const p95 = percentile(sorted, 0.95);
100+
const p99 = percentile(sorted, 0.99);
101+
const avg = sorted.reduce((a, b) => a + b, 0) / sorted.length;
102+
console.log(
103+
` ${label.padEnd(22)} p50=${fmt(p50)}ms p95=${fmt(p95)}ms p99=${fmt(p99)}ms avg=${fmt(avg)}ms n=${sorted.length}`
104+
);
105+
}
106+
107+
// ── Main ─────────────────────────────────────────────────────────
108+
async function main() {
109+
const clients = await OrkesClients.from();
110+
const workflowClient = clients.getWorkflowClient();
111+
const client = clients.getClient();
112+
113+
// Register workflow
114+
const wf = new ConductorWorkflow(workflowClient, "perf_test_workflow")
115+
.description("Performance test — single no-op task")
116+
.add(simpleTask("perf_ref", "perf_noop_task", {}))
117+
.outputParameters({ ts: "${perf_ref.output.ts}" });
118+
119+
await wf.register(true);
120+
121+
// Set up collector
122+
const collector = new PerfCollector();
123+
124+
// Start workers
125+
const handler = new TaskHandler({
126+
client,
127+
scanForDecorated: true,
128+
eventListeners: [collector],
129+
});
130+
await handler.startWorkers();
131+
132+
// Execute workflows in batches to avoid overwhelming the server
133+
console.log(
134+
`\nRunning ${WORKFLOW_COUNT} workflows (batch=${BATCH_SIZE}, worker concurrency=${CONCURRENCY})...\n`
135+
);
136+
const wallStart = Date.now();
137+
138+
const results: (Awaited<ReturnType<typeof wf.execute>> | null)[] = [];
139+
for (let i = 0; i < WORKFLOW_COUNT; i += BATCH_SIZE) {
140+
const batch = Array.from(
141+
{ length: Math.min(BATCH_SIZE, WORKFLOW_COUNT - i) },
142+
(_, j) =>
143+
wf.execute({ iteration: i + j }).catch((err) => {
144+
collector.errors++;
145+
console.error(` Workflow ${i + j} failed:`, err.message);
146+
return null;
147+
})
148+
);
149+
results.push(...(await Promise.all(batch)));
150+
}
151+
const wallTimeMs = Date.now() - wallStart;
152+
153+
const completed = results.filter((r) => r?.status === "COMPLETED").length;
154+
const failed = WORKFLOW_COUNT - completed;
155+
156+
// Print results
157+
console.log(
158+
`--- Performance Test Results (${WORKFLOW_COUNT} workflows, batch=${BATCH_SIZE}, concurrency=${CONCURRENCY}) ---\n`
159+
);
160+
printStats("Poll latency", collector.pollDurations);
161+
printStats("Execute latency", collector.executeDurations);
162+
printStats("Update latency", collector.updateDurations);
163+
printStats("E2E task latency", collector.e2eDurations);
164+
165+
const tasksCompleted = collector.updateDurations.length;
166+
const throughput =
167+
wallTimeMs > 0 ? (tasksCompleted / wallTimeMs) * 1000 : 0;
168+
console.log(`\n Throughput: ${throughput.toFixed(1)} tasks/sec`);
169+
console.log(` Wall time: ${(wallTimeMs / 1000).toFixed(2)}s`);
170+
console.log(` Workflows completed: ${completed}/${WORKFLOW_COUNT}`);
171+
if (failed > 0) console.log(` Workflows failed: ${failed}`);
172+
if (collector.errors > 0)
173+
console.log(` Errors: ${collector.errors}`);
174+
175+
await handler.stopWorkers();
176+
process.exit(0);
177+
}
178+
179+
main().catch((err) => {
180+
console.error(err);
181+
process.exit(1);
182+
});

0 commit comments

Comments
 (0)