diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index f457bb3ea7..d0fdd01dcd 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -1,9 +1,10 @@ from __future__ import annotations import datetime +import os import time import unittest -from collections.abc import Iterator +from collections.abc import Iterator, Sequence from zoneinfo import ZoneInfo import pytest @@ -48,6 +49,85 @@ from tests.utils import get_or_raise +def _ensure_no_running_executions( + client: CogniteClient, + workflow_external_id: str, + *, + timeout: float = 120, +) -> bool: + """Cancel and poll until the workflow has no running executions.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + running = client.workflows.executions.list( + workflow_version_ids=WorkflowVersionId(workflow_external_id), + statuses=["running"], + limit=None, + ) + if not running: + return True + for execution in running: + try: + client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + except CogniteAPIError: + pass # execution may have completed/cancelled between list and cancel + time.sleep(0.5) + return False + + +def _cancel_running_executions_for_workflow( + client: CogniteClient, + workflow_external_id: str, + *, + timeout: float = 60, +) -> None: + """Cancel running executions so a workflow can be deleted.""" + if not _ensure_no_running_executions(client, workflow_external_id, timeout=timeout): + raise RuntimeError(f"Workflow {workflow_external_id!r} still has running executions after {timeout}s") + + +def _safe_delete_workflows( + client: CogniteClient, + external_ids: str | list[str], + *, + ignore_unknown_ids: bool = True, +) -> None: + """Delete workflows without failing on races with executions or other test runs. + + "Safe" here means we skip IDs already gone, cancel running executions before + workflows.delete, and retry for up to ~3 minutes if delete still reports + running executions. Needed because integration tests share one CDF project: + another worker or CI job may have deleted the workflow already, cancellation + may not be visible yet, or a trigger can start a new execution between cancel + and delete. + """ + ids = [external_ids] if isinstance(external_ids, str) else list(external_ids) + deadline = time.monotonic() + 180 + last_exc: CogniteAPIError | None = None + + while time.monotonic() < deadline: + if ignore_unknown_ids: + retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) + existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() + ids = [id_ for id_ in ids if id_ in existing_ids] + if not ids: + return + + for workflow_external_id in ids: + _ensure_no_running_executions(client, workflow_external_id, timeout=30) + + try: + client.workflows.delete(ids, ignore_unknown_ids=ignore_unknown_ids) + return + except CogniteAPIError as exc: + if "running executions" not in str(exc): + raise + last_exc = exc + time.sleep(1) + + if last_exc is not None: + raise last_exc + + @pytest.fixture def workflow_simint_routine(cognite_client: CogniteClient) -> str: return ensure_workflow_simint_routine(cognite_client) @@ -58,9 +138,63 @@ def permanent_wf_ext_id_prefix(os_and_py_version: str) -> str: return f"integ_test_wf_trigger_{os_and_py_version}" +def _delete_triggers_for_versions( + client: CogniteClient, + version_ids: Sequence[tuple[str, str | None]], +) -> None: + """Delete triggers linked to the given workflow versions.""" + if not version_ids: + return + version_id_set = set(version_ids) + triggers = client.workflows.triggers.list(limit=None) + trigger_ids = [ + trigger.external_id + for trigger in triggers + if (trigger.workflow_external_id, trigger.workflow_version) in version_id_set + ] + if trigger_ids: + client.workflows.triggers.delete(trigger_ids) + + +def _safe_delete_workflow_versions( + client: CogniteClient, + version_ids: Sequence[tuple[str, str | None]], + *, + ignore_unknown_ids: bool = True, +) -> None: + """Delete workflow versions without failing on linked triggers. + + "Safe" here means we remove triggers pointing at these versions before calling + versions.delete, and retry once if the API still reports an active trigger. + Needed because integration tests share one CDF project across xdist workers and + concurrent CI jobs: a version can gain a trigger after our cleanup list was built, + or another run may still have a trigger on a version we are tearing down. + """ + if not version_ids: + return + _delete_triggers_for_versions(client, version_ids) + ids_to_delete = [ + WorkflowVersionId(workflow_external_id=wf_ext_id, version=version) + for wf_ext_id, version in version_ids + if version is not None + ] + if not ids_to_delete: + return + try: + client.workflows.versions.delete(ids_to_delete, ignore_unknown_ids=ignore_unknown_ids) + except CogniteAPIError as exc: + if "active trigger" not in str(exc): + raise + _delete_triggers_for_versions(client, version_ids) + client.workflows.versions.delete(ids_to_delete, ignore_unknown_ids=ignore_unknown_ids) + + @pytest.fixture(scope="session") def permanent_wf_ext_id(permanent_wf_ext_id_prefix: str, sdk_version: tuple[str, str, str]) -> str: - return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}" + # "Permanent" means exempt from wf_setup_module stale-resource cleanup (via prefix), + # not a new ID per CI run. Stable across runs; xdist worker suffix avoids cross-worker races. + worker = os.environ.get("PYTEST_XDIST_WORKER", "master") + return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}_{worker}" @pytest.fixture(autouse=True, scope="module") @@ -69,13 +203,22 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s resource_age = timestamp_to_ms("30m-ago") wf_triggers = cognite_client.workflows.triggers.list(limit=None) - wf_triggers_to_delete = [ + wf_triggers_to_delete = { wft.external_id for wft in wf_triggers if wft.last_updated_time < resource_age and not wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix) - ] + } + # Drop orphan per-minute triggers left on permanent workflows by failed trigger tests. + permanent_trigger_keep_prefixes = ("scheduled-trigger_", "data-modeling-trigger_") + wf_triggers_to_delete.update( + wft.external_id + for wft in wf_triggers + if wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix) + and wft.external_id.startswith("test_create_update_scheduled_trigger_") + and not wft.external_id.startswith(permanent_trigger_keep_prefixes) + ) if wf_triggers_to_delete: - cognite_client.workflows.triggers.delete(wf_triggers_to_delete) + cognite_client.workflows.triggers.delete(list(wf_triggers_to_delete)) wf_versions = cognite_client.workflows.versions.list(limit=None) wf_versions_to_delete = [ @@ -84,7 +227,7 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s if wf.last_updated_time < resource_age and not wf.workflow_external_id.startswith(permanent_wf_ext_id_prefix) ] if wf_versions_to_delete: - cognite_client.workflows.versions.delete(wf_versions_to_delete) + _safe_delete_workflow_versions(cognite_client, wf_versions_to_delete) wfs = cognite_client.workflows.list(limit=None) wfs_to_delete = [ @@ -93,7 +236,7 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s if wf.last_updated_time < resource_age and not wf.external_id.startswith(permanent_wf_ext_id_prefix) ] if wfs_to_delete: - cognite_client.workflows.delete(wfs_to_delete) + _safe_delete_workflows(cognite_client, wfs_to_delete) def handle(client: CogniteClient, data: dict[str, object]) -> str: @@ -137,7 +280,7 @@ def _new_workflow(cognite_client: CogniteClient, data_set: DataSet) -> Iterator[ data_set_id=data_set.id, ) yield cognite_client.workflows.upsert(workflow) - cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) + _safe_delete_workflows(cognite_client, workflow.external_id) @pytest.fixture(scope="session") @@ -188,7 +331,8 @@ def _new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow) ), ) yield cognite_client.workflows.versions.upsert(version) - cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) + _safe_delete_workflow_versions(cognite_client, [version.as_id().as_tuple()]) @pytest.fixture(scope="session") @@ -224,7 +368,8 @@ def _new_async_workflow_version( ), ) yield cognite_client.workflows.versions.upsert(version) - cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) + _safe_delete_workflow_versions(cognite_client, [(new_workflow.external_id, version.version)]) @pytest.fixture(scope="session") @@ -295,7 +440,8 @@ def _new_workflow_version_list(cognite_client: CogniteClient, new_workflow: Work upserted_version = cognite_client.workflows.versions.upsert(version) upserted_versions.append(upserted_version) yield upserted_versions - cognite_client.workflows.versions.delete(upserted_versions.as_ids(), ignore_unknown_ids=True) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) + _safe_delete_workflow_versions(cognite_client, upserted_versions.as_ids().as_tuples()) @pytest.fixture(scope="session") @@ -334,23 +480,32 @@ def _new_workflow_execution_list( metadata={"test": "integration_cancelled"}, ) run_2 = cognite_client.workflows.executions.cancel(id=run_2.id, reason="test cancel") - return WorkflowExecutionList([run_1, run_2]) + for execution in (run_1, run_2): + if execution.status != "running": + continue + try: + cognite_client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + except CogniteAPIError: + pass + + if not _ensure_no_running_executions(cognite_client, new_workflow_version.workflow_external_id, timeout=120): + raise RuntimeError( + f"Workflow {new_workflow_version.workflow_external_id!r} still has running executions " + "after setup for execution list tests" + ) -@pytest.fixture(scope="session") -def workflow_execution_list( - cognite_client: CogniteClient, new_workflow_version: WorkflowVersion -) -> WorkflowExecutionList: - return _new_workflow_execution_list(cognite_client, new_workflow_version) + return WorkflowExecutionList([run_1, run_2]) +@pytest.fixture def workflow_execution_list_test_scoped( cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion ) -> WorkflowExecutionList: return _new_workflow_execution_list(cognite_client, new_workflow_version_test_scoped) -@pytest.fixture(scope="session") +@pytest.fixture(scope="class") def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_ext_id: str) -> WorkflowVersion: workflow = WorkflowUpsert(external_id=permanent_wf_ext_id, description="Permanent workflow for trigger testing") cognite_client.workflows.upsert(workflow) @@ -371,6 +526,8 @@ def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_ ), ) upserted = cognite_client.workflows.versions.upsert(version) + if cognite_client.workflows.versions.retrieve(upserted.as_id()) is None: + raise RuntimeError(f"Failed to ensure permanent workflow version {upserted.as_id()!r}") return upserted @@ -435,6 +592,7 @@ def test_upsert_preexisting(self, cognite_client: CogniteClient, new_workflow: W assert updated_workflow.max_concurrent_executions == new_workflow.max_concurrent_executions def test_delete_multiple_non_existing_raise(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) with pytest.raises(CogniteAPIError, match="workflows were not found"): cognite_client.workflows.delete( [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=False @@ -442,6 +600,7 @@ def test_delete_multiple_non_existing_raise(self, cognite_client: CogniteClient, assert cognite_client.workflows.retrieve(new_workflow.external_id) is not None def test_delete_multiple_non_existing(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) cognite_client.workflows.delete( [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=True ) @@ -497,7 +656,7 @@ def test_upsert_run_delete_with_simulation_task( ), ) - cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + _safe_delete_workflow_versions(cognite_client, [version.as_id().as_tuple()], ignore_unknown_ids=True) created_version: WorkflowVersion | None = None try: @@ -531,8 +690,8 @@ def test_upsert_run_delete_with_simulation_task( finally: if created_version is not None: - cognite_client.workflows.versions.delete(created_version.as_id()) - cognite_client.workflows.delete(created_version.workflow_external_id) + _safe_delete_workflow_versions(cognite_client, [created_version.as_id().as_tuple()]) + _safe_delete_workflows(cognite_client, created_version.workflow_external_id) def test_upsert_preexisting( self, cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion @@ -606,32 +765,37 @@ class TestWorkflowExecutions: def test_list_workflow_executions( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: listed = cognite_client.workflows.executions.list( - workflow_version_ids=workflow_execution_list[0].as_workflow_id() + workflow_version_ids=workflow_execution_list_test_scoped[0].as_workflow_id() ) # Compare by ID: cancel() can return before fields like end_time are # finalized server-side, so full-object equality is flaky. - assert {e.id for e in listed} == {e.id for e in workflow_execution_list} + assert {e.id for e in listed} == {e.id for e in workflow_execution_list_test_scoped} def test_list_workflow_executions_by_status( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: - listed_completed = cognite_client.workflows.executions.list(statuses=["completed", "terminated"]) + listed_completed = cognite_client.workflows.executions.list( + workflow_version_ids=workflow_execution_list_test_scoped[0].as_workflow_id(), + statuses=["completed", "terminated"], + ) for execution in listed_completed: assert execution.status in ["completed", "terminated"] + expected_ids = {e.id for e in workflow_execution_list_test_scoped if e.status in ("completed", "terminated")} + assert expected_ids <= {e.id for e in listed_completed} def test_retrieve_workflow_execution_detailed( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: - retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id) + retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list_test_scoped[0].id) assert retrieved - assert retrieved.as_execution().dump() == workflow_execution_list[0].dump() + assert retrieved.as_execution().dump() == workflow_execution_list_test_scoped[0].dump() assert retrieved.executed_tasks assert retrieved.metadata == {"test": "integration_completed"} @@ -683,18 +847,19 @@ def test_trigger_cancel_retry_workflow( retried_workflow_execution = cognite_client.workflows.executions.retry(workflow_execution.id) assert retried_workflow_execution.status == "running" + cognite_client.workflows.executions.cancel(id=retried_workflow_execution.id, reason="test cleanup") class TestWorkflowTriggers: def test_create_update_scheduled_trigger( self, cognite_client: CogniteClient, - permanent_workflow_for_triggers: WorkflowVersion, + new_workflow_version_test_scoped: WorkflowVersion, ) -> None: - version = permanent_workflow_for_triggers + version = new_workflow_version_test_scoped existing = WorkflowTriggerUpsert( external_id=f"test_create_update_scheduled_trigger_{random_string(5)}", - trigger_rule=WorkflowScheduledTriggerRule(cron_expression="* * * * *"), + trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 * * * *"), workflow_external_id=version.workflow_external_id, workflow_version=version.version, input={"a": 1, "b": 2}, @@ -893,7 +1058,7 @@ def test_pause_resume_trigger(self, cognite_client: CogniteClient) -> None: except Exception: pass try: - cognite_client.workflows.delete(workflow_external_id) + _safe_delete_workflows(cognite_client, workflow_external_id) except Exception: pass