Skip to content

Commit e3603a6

Browse files
author
Shailesh Jagannath Padave
committed
waitForWorflowCompletion
1 parent 4d4b08a commit e3603a6

2 files changed

Lines changed: 37 additions & 2 deletions

File tree

src/core/__test__/utils/test-util.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as fs from 'fs';
22
import * as path from 'path';
33
import {WorkflowDef} from "../../../common";
44
import {MetadataClient} from "../../metadataClient";
5+
import {WorkflowExecutor} from "../../executor";
56

67
export class TestUtil {
78
private static metadataClient: MetadataClient;
@@ -35,6 +36,39 @@ export class TestUtil {
3536
}
3637
}
3738

39+
// Helper function to wait for workflow completion
40+
public static async waitForWorkflowCompletion(
41+
executor: WorkflowExecutor,
42+
workflowId: string,
43+
maxWaitMs: number = 300000, // 5 minutes default
44+
pollIntervalMs: number = 100 // 100ms default
45+
) {
46+
const startTime = Date.now();
47+
48+
while (Date.now() - startTime < maxWaitMs) {
49+
try {
50+
const workflowStatus = await executor.getWorkflow(workflowId, true);
51+
52+
// Check if workflow is in a terminal state
53+
if (['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status!)) {
54+
console.debug(`Workflow ${workflowId} reached terminal state: ${workflowStatus.status}`);
55+
return workflowStatus;
56+
}
57+
58+
console.debug(`Workflow ${workflowId} status: ${workflowStatus.status}, waiting...`);
59+
60+
// Wait before next poll
61+
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
62+
63+
} catch (error) {
64+
console.warn(`Error checking workflow status for ${workflowId}:`, error);
65+
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
66+
}
67+
}
68+
69+
throw new Error(`Workflow ${workflowId} did not complete within ${maxWaitMs}ms`);
70+
}
71+
3872
/**
3973
* Unregister a workflow
4074
*/

src/task/__tests__/TaskManager.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { simpleTask, WorkflowExecutor } from "../../core";
33
import { orkesConductorClient } from "../../orkes";
44
import { TaskManager, ConductorWorker } from "../index";
55
import { mockLogger } from "./mockLogger";
6+
import {TestUtil} from "../../core/__test__/utils/test-util";
67

78

89
const BASE_TIME = 500;
@@ -169,9 +170,9 @@ describe("TaskManager", () => {
169170
// decrease speed again
170171
manager.updatePollingOptions({ pollInterval: BASE_TIME, concurrency: 1 });
171172

172-
const workflowStatus = await executor.getWorkflow(executionId!, true);
173+
const workflowStatus = TestUtil.waitForWorkflowCompletion(executor, executionId!, 30000);
173174

174-
expect(workflowStatus.status).toEqual("COMPLETED");
175+
expect(workflowStatus).toEqual("COMPLETED");
175176
await manager.stopPolling();
176177

177178
expect(manager.isPolling).toBeFalsy();

0 commit comments

Comments
 (0)