-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathTaskRunner.test.ts
More file actions
102 lines (88 loc) · 2.71 KB
/
TaskRunner.test.ts
File metadata and controls
102 lines (88 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import { expect, describe, test, jest, afterAll } from "@jest/globals";
import {
TaskRunner,
WorkflowExecutor,
simpleTask,
orkesConductorClient,
MetadataClient,
} from "../sdk";
import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus";
describe("TaskRunner", () => {
const clientPromise = orkesConductorClient();
const workflowsToCleanup: { name: string; version: number }[] = [];
jest.setTimeout(30000);
afterAll(async () => {
const client = await clientPromise;
const metadataClient = new MetadataClient(client);
await Promise.allSettled(
workflowsToCleanup.map((w) =>
metadataClient.unregisterWorkflow(w.name, w.version)
)
);
});
test("worker example ", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);
const taskName = `jsSdkTest-task-manager-int-test-${Date.now()}`;
const workflowName = `jsSdkTest-task-manager-int-test-wf-${Date.now()}`;
const taskRunner = new TaskRunner({
client: client,
worker: {
taskDefName: taskName,
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
},
options: {
pollInterval: 1000,
domain: undefined,
concurrency: 2,
workerID: "",
},
});
taskRunner.startPolling();
expect(taskRunner.isPolling).toEqual(true);
await executor.registerWorkflow(true, {
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });
const { workflowId: executionId } = await executor.executeWorkflow(
{
name: workflowName,
version: 1,
},
workflowName,
1,
`${workflowName}-id`
);
expect(executionId).toBeDefined();
taskRunner.updateOptions({ concurrency: 1, pollInterval: 100 });
expect(executionId).toBeDefined();
if (!executionId) {
throw new Error("Execution ID is undefined");
}
const workflowStatus = await waitForWorkflowStatus(
executor,
executionId,
"COMPLETED"
);
const [firstTask] = workflowStatus.tasks || [];
expect(firstTask?.taskType).toEqual(taskName);
expect(workflowStatus.status).toEqual("COMPLETED");
await taskRunner.stopPolling();
expect(taskRunner.isPolling).toEqual(false);
const taskDetails = await executor.getTask(firstTask?.taskId || "");
expect(taskDetails?.status).toEqual("COMPLETED");
});
});