diff --git a/src/core/__test__/ServiceRegistryClient.test.ts b/src/core/__test__/ServiceRegistryClient.test.ts index 9cea0023..ebf8ebc6 100644 --- a/src/core/__test__/ServiceRegistryClient.test.ts +++ b/src/core/__test__/ServiceRegistryClient.test.ts @@ -1,4 +1,4 @@ -import {beforeAll, describe, expect, jest, test} from "@jest/globals"; +import {beforeAll, afterEach, describe, expect, jest, test} from "@jest/globals"; import {ServiceRegistryClient} from "../serviceRegistryClient"; import {orkesConductorClient} from "../../orkes"; import {ServiceType} from "../../common/open-api/models/ServiceRegistryModels"; @@ -8,12 +8,26 @@ import * as path from 'path'; describe("ServiceRegistryClient", () => { const clientPromise = orkesConductorClient({useEnvVars: true}); let serviceRegistryClient: ServiceRegistryClient; + const testServicesToCleanup: string[] = []; beforeAll(async () => { const client = await clientPromise; serviceRegistryClient = new ServiceRegistryClient(client); }); + afterEach(async () => { + // Clean up any services created during tests + for (const serviceName of testServicesToCleanup) { + try { + await serviceRegistryClient.removeService(serviceName); + } catch (e) { + // Ignore cleanup errors - service might already be deleted or not exist + console.debug(`Failed to cleanup service ${serviceName}:`, e); + } + } + testServicesToCleanup.length = 0; + }); + jest.setTimeout(15000); test("Should add and retrieve a service registry", async () => { @@ -21,7 +35,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081/api-docs", + serviceURI: "http://httpbin:8081/api-docs", config: { circuitBreakerConfig: { failureRateThreshold: 50.0, @@ -37,6 +51,9 @@ describe("ServiceRegistryClient", () => { } }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -72,9 +89,12 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry_to_remove", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081" + serviceURI: "http://httpbin:8081/api-docs" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -101,9 +121,12 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry_with_method", type: ServiceType.HTTP, - serviceURI: "http://localhost:8082" + serviceURI: "http://httpbin:8081/api-docs" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -145,102 +168,73 @@ describe("ServiceRegistryClient", () => { expect(foundMethod?.methodType).toEqual(testServiceMethod.methodType); expect(foundMethod?.inputType).toEqual(testServiceMethod.inputType); expect(foundMethod?.outputType).toEqual(testServiceMethod.outputType); - - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); }); test("Should discover methods from a http service", async () => { // Create a test service registry for discovery - // Note: This should point to a real service that supports discovery - // For HTTP services, it should point to a service with a Swagger/OpenAPI doc - // For gRPC services, it should point to a running gRPC service with reflection const testServiceRegistry = { name: "test_service_registry_discovery", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081/api-docs" + serviceURI: "http://httpbin:8081/api-docs" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry - await expect( - serviceRegistryClient.addOrUpdateService(testServiceRegistry) - ).resolves.not.toThrowError(); + await serviceRegistryClient.addOrUpdateService(testServiceRegistry); - try { - // Attempt to discover methods without creating them - const discoveredMethods = await serviceRegistryClient.discover( - testServiceRegistry.name, - true - ); - - // Verify that we discovered at least one method - expect(discoveredMethods).toBeDefined(); - expect(Array.isArray(discoveredMethods)).toBe(true); - - // Check that we got at least one method - // If the service URI is valid, this should pass - expect(discoveredMethods.length).toBeGreaterThan(0); - - if (discoveredMethods.length > 0) { - // Check that the discovered methods have the expected properties - const firstMethod = discoveredMethods[0]; - expect(firstMethod.methodName).toBeDefined(); - expect(firstMethod.methodType).toBeDefined(); - } - } catch (error) { - // If the discovery endpoint fails (e.g., if the petstore API is down), - // we'll log the error but not fail the test - console.warn("Discovery test failed, possibly due to external service unavailability:", error); - } finally { - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); + // Attempt to discover methods - this will fail the test if discovery fails + const discoveredMethods = await serviceRegistryClient.discover( + testServiceRegistry.name, + true + ); + + // Verify that we discovered methods + expect(discoveredMethods).toBeDefined(); + expect(Array.isArray(discoveredMethods)).toBe(true); + expect(discoveredMethods.length).toBeGreaterThan(0); + + if (discoveredMethods.length > 0) { + // Check that the discovered methods have the expected properties + const firstMethod = discoveredMethods[0]; + expect(firstMethod.methodName).toBeDefined(); + expect(firstMethod.methodType).toBeDefined(); } }); test("Should discover methods from a gRPC service", async () => { // Create a test service registry for discovery - // Note: This should point to a real service that supports discovery - // For HTTP services, it should point to a service with a Swagger/OpenAPI doc - // For gRPC services, it should point to a running gRPC service with reflection const testServiceRegistry = { name: "test_gRPC_service_registry_discovery", type: ServiceType.gRPC, - serviceURI: "localhost:50051" + serviceURI: "grpcbin:50051" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry - await expect( - serviceRegistryClient.addOrUpdateService(testServiceRegistry) - ).resolves.not.toThrowError(); + await serviceRegistryClient.addOrUpdateService(testServiceRegistry); const filePath = path.join(__dirname, 'metadata', 'compiled.bin'); const fileBuffer = fs.readFileSync(filePath); const blob = new Blob([fileBuffer], {type: 'application/octet-stream'}); - // Register the service registry - await expect( - serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob) - ).resolves.not.toThrowError(); + // Set proto data + await serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob); - try { - const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name).then(); - const methods = serviceMethods.methods; - expect(serviceMethods).toBeDefined(); - expect(methods?.length).toBeGreaterThan(0); - expect(Array.isArray(methods)).toBe(true); - - if (methods) { - const firstMethod = methods[0]; - expect(firstMethod.methodName).toBeDefined(); - expect(firstMethod.methodType).toBeDefined(); - } - } catch (error) { - // If the discovery endpoint fails (e.g., if the petstore API is down), - // we'll log the error but not fail the test - console.warn("Discovery test failed, possibly due to external service unavailability:", error); - } finally { - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); + const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name); + const methods = serviceMethods.methods; + + expect(serviceMethods).toBeDefined(); + expect(methods?.length).toBeGreaterThan(0); + expect(Array.isArray(methods)).toBe(true); + + if (methods) { + const firstMethod = methods[0]; + expect(firstMethod.methodName).toBeDefined(); + expect(firstMethod.methodType).toBeDefined(); } }); }); \ No newline at end of file diff --git a/src/core/__test__/executor.test.ts b/src/core/__test__/executor.test.ts index 979c83cb..650cc10e 100644 --- a/src/core/__test__/executor.test.ts +++ b/src/core/__test__/executor.test.ts @@ -1,11 +1,10 @@ -import {expect, describe, test, jest, beforeAll} from "@jest/globals"; +import {expect, describe, test, jest, beforeAll, afterEach, afterAll} from "@jest/globals"; import {Consistency, ReturnStrategy, SetVariableTaskDef, TaskType, WorkflowDef} from "../../common"; import { orkesConductorClient } from "../../orkes"; import { WorkflowExecutor } from "../executor"; import { v4 as uuidv4 } from "uuid"; import {MetadataClient} from "../metadataClient"; import {TestUtil} from "./utils/test-util"; -import {after} from "node:test"; import {TaskResultStatusEnum} from "../../common/open-api/models/TaskResultStatusEnum"; import {SignalResponse} from "../../common/open-api/models/SignalResponse"; @@ -114,7 +113,6 @@ describe("Executor", () => { }); }); - describe("Execute with Return Strategy and Consistency", () => { // Constants specific to this test suite const WORKFLOW_NAMES = { @@ -130,6 +128,8 @@ describe("Execute with Return Strategy and Consistency", () => { let client: any; let executor: WorkflowExecutor; let metadataClient: MetadataClient; + const workflowsToCleanup: {name: string, version: number}[] = []; + const executionsToCleanup: string[] = []; beforeAll(async () => { client = await clientPromise; @@ -139,9 +139,28 @@ describe("Execute with Return Strategy and Consistency", () => { // Register all test workflows await registerAllWorkflows(); - }) + }); + + afterEach(async () => { + // Clean up executions first + for (const executionId of executionsToCleanup) { + try { + const workflowStatus = await executor.getWorkflowStatus(executionId, false, false); + + if (workflowStatus.status && !['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status)) { + await executor.terminate(executionId, "Test cleanup"); + console.debug(`Terminated running workflow: ${executionId}`); + } else { + console.debug(`Skipping cleanup for ${workflowStatus.status} workflow: ${executionId}`); + } + } catch (e) { + console.debug(`Failed to cleanup execution ${executionId}:`, e); + } + } + executionsToCleanup.length = 0; + }); - after(async () => { + afterAll(async () => { // Cleanup all workflows await cleanupAllWorkflows(); }); @@ -154,6 +173,12 @@ describe("Execute with Return Strategy and Consistency", () => { TestUtil.registerWorkflow(WORKFLOW_NAMES.SUB_WF_2), TestUtil.registerWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST) ]); + + // Add to cleanup list + Object.values(WORKFLOW_NAMES).forEach(name => { + workflowsToCleanup.push({name, version: 1}); + }); + console.log('✓ All workflows registered successfully'); } catch (error) { throw new Error(`Failed to register workflows: ${error}`); @@ -161,17 +186,14 @@ describe("Execute with Return Strategy and Consistency", () => { } async function cleanupAllWorkflows(): Promise { - const cleanupPromises = [ - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.COMPLEX_WF, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_1, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_2, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST, 1) - ]; + const cleanupPromises = workflowsToCleanup.map(({name, version}) => + TestUtil.unregisterWorkflow(name, version) + ); const results = await Promise.allSettled(cleanupPromises); results.forEach((result, index) => { if (result.status === 'rejected') { - console.warn(`Failed to cleanup workflow ${Object.values(WORKFLOW_NAMES)[index]}: ${result.reason}`); + console.warn(`Failed to cleanup workflow ${workflowsToCleanup[index].name}: ${result.reason}`); } }); console.log('✓ Cleanup completed'); @@ -241,7 +263,6 @@ describe("Execute with Return Strategy and Consistency", () => { } ]; - // Let's write one comprehensive test first, then replicate test("Should execute complex workflow with SYNC + TARGET_WORKFLOW and validate all aspects", async () => { const testCase = testCombinations[0]; // SYNC + TARGET_WORKFLOW @@ -262,6 +283,11 @@ describe("Execute with Return Strategy and Consistency", () => { // Convert to SignalResponse instance const result = Object.assign(new SignalResponse(), rawResult); + // Add to cleanup list + if (result.workflowId) { + executionsToCleanup.push(result.workflowId); + } + console.log(`Started workflow with ID: ${result.workflowId} for strategy: ${testCase.name}`); // ========== BASIC VALIDATIONS ========== @@ -285,7 +311,6 @@ describe("Execute with Return Strategy and Consistency", () => { expect(result.status).toBeDefined(); expect(result.createTime).toBeGreaterThan(0); expect(result.updateTime).toBeGreaterThan(0); - //expect(result.updateTime).toBeGreaterThanOrEqual(result.createTime); expect(result.tasks).toBeDefined(); expect(Array.isArray(result.tasks)).toBe(true); expect(result.tasks!.length).toBeGreaterThan(0); @@ -317,7 +342,6 @@ describe("Execute with Return Strategy and Consistency", () => { expect(workflowFromResp.status).toEqual(result.status); expect(workflowFromResp.createTime).toEqual(result.createTime); expect(workflowFromResp.updateTime).toEqual(result.updateTime); - //expect(workflowFromResp.tasks.length).toEqual(result.tasks!.length); // Test that task helper methods throw errors expect(() => result.getBlockingTask()).toThrow('does not contain task details'); @@ -405,6 +429,11 @@ describe("Execute with Return Strategy and Consistency", () => { // Convert to SignalResponse instance const result = Object.assign(new SignalResponse(), rawResult); + // Add to cleanup list + if (result.workflowId) { + executionsToCleanup.push(result.workflowId); + } + // Basic validations expect(result.responseType).toEqual(testCase.returnStrategy); expect(result.workflowId).toBeDefined(); @@ -451,5 +480,4 @@ describe("Execute with Return Strategy and Consistency", () => { }); }); }); - }); \ No newline at end of file diff --git a/src/core/__test__/utils/test-util.ts b/src/core/__test__/utils/test-util.ts index a47f93d0..d0d903a4 100644 --- a/src/core/__test__/utils/test-util.ts +++ b/src/core/__test__/utils/test-util.ts @@ -2,6 +2,7 @@ import * as fs from 'fs'; import * as path from 'path'; import {WorkflowDef} from "../../../common"; import {MetadataClient} from "../../metadataClient"; +import {WorkflowExecutor} from "../../executor"; export class TestUtil { private static metadataClient: MetadataClient; @@ -35,6 +36,39 @@ export class TestUtil { } } + // Helper function to wait for workflow completion + public static async waitForWorkflowCompletion( + executor: WorkflowExecutor, + workflowId: string, + maxWaitMs: number = 300000, // 5 minutes default + pollIntervalMs: number = 100 // 100ms default + ) { + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitMs) { + try { + const workflowStatus = await executor.getWorkflow(workflowId, true); + + // Check if workflow is in a terminal state + if (['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status!)) { + console.debug(`Workflow ${workflowId} reached terminal state: ${workflowStatus.status}`); + return workflowStatus; + } + + console.debug(`Workflow ${workflowId} status: ${workflowStatus.status}, waiting...`); + + // Wait before next poll + await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + + } catch (error) { + console.warn(`Error checking workflow status for ${workflowId}:`, error); + await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + } + } + + throw new Error(`Workflow ${workflowId} did not complete within ${maxWaitMs}ms`); + } + /** * Unregister a workflow */ diff --git a/src/task/__tests__/TaskManager.test.ts b/src/task/__tests__/TaskManager.test.ts index bbd24839..51b18847 100644 --- a/src/task/__tests__/TaskManager.test.ts +++ b/src/task/__tests__/TaskManager.test.ts @@ -3,6 +3,7 @@ import { simpleTask, WorkflowExecutor } from "../../core"; import { orkesConductorClient } from "../../orkes"; import { TaskManager, ConductorWorker } from "../index"; import { mockLogger } from "./mockLogger"; +import {TestUtil} from "../../core/__test__/utils/test-util"; const BASE_TIME = 500; @@ -169,7 +170,7 @@ describe("TaskManager", () => { // decrease speed again manager.updatePollingOptions({ pollInterval: BASE_TIME, concurrency: 1 }); - const workflowStatus = await executor.getWorkflow(executionId!, true); + const workflowStatus = await TestUtil.waitForWorkflowCompletion(executor, executionId!, 30000); expect(workflowStatus.status).toEqual("COMPLETED"); await manager.stopPolling();