From 815eed88f243f38038c151da9221d912259f1754 Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 13:44:12 +0200 Subject: [PATCH 01/21] fix(tests): cancel running workflow executions before delete in integration tests Workflow API now rejects deletion while executions are still running (CDF-28136). Update workflow integration test teardown to cancel running executions first so cleanup does not fail against the new guard. Co-authored-by: Cursor --- .../test_api/test_workflows.py | 51 +++++++++++++++++-- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index f457bb3ea7..912738f363 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -48,6 +48,46 @@ from tests.utils import get_or_raise +def _cancel_running_executions_for_workflow( + client: CogniteClient, + workflow_external_id: str, + *, + timeout: float = 60, +) -> None: + """Cancel running executions so a workflow can be deleted (see jazz-api PR #2424).""" + running = client.workflows.executions.list( + workflow_version_ids=WorkflowVersionId(workflow_external_id), + statuses=["running"], + limit=None, + ) + for execution in running: + client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + running = client.workflows.executions.list( + workflow_version_ids=WorkflowVersionId(workflow_external_id), + statuses=["running"], + limit=1, + ) + if not running: + return + time.sleep(0.5) + raise RuntimeError(f"Workflow {workflow_external_id!r} still has running executions after {timeout}s") + + +def _safe_delete_workflows( + client: CogniteClient, + external_ids: str | list[str], + *, + ignore_unknown_ids: bool = True, +) -> None: + ids = [external_ids] if isinstance(external_ids, str) else external_ids + for workflow_external_id in ids: + _cancel_running_executions_for_workflow(client, workflow_external_id) + client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) + + @pytest.fixture def workflow_simint_routine(cognite_client: CogniteClient) -> str: return ensure_workflow_simint_routine(cognite_client) @@ -93,7 +133,7 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s if wf.last_updated_time < resource_age and not wf.external_id.startswith(permanent_wf_ext_id_prefix) ] if wfs_to_delete: - cognite_client.workflows.delete(wfs_to_delete) + _safe_delete_workflows(cognite_client, wfs_to_delete) def handle(client: CogniteClient, data: dict[str, object]) -> str: @@ -137,7 +177,7 @@ def _new_workflow(cognite_client: CogniteClient, data_set: DataSet) -> Iterator[ data_set_id=data_set.id, ) yield cognite_client.workflows.upsert(workflow) - cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) + _safe_delete_workflows(cognite_client, workflow.external_id) @pytest.fixture(scope="session") @@ -435,6 +475,7 @@ def test_upsert_preexisting(self, cognite_client: CogniteClient, new_workflow: W assert updated_workflow.max_concurrent_executions == new_workflow.max_concurrent_executions def test_delete_multiple_non_existing_raise(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) with pytest.raises(CogniteAPIError, match="workflows were not found"): cognite_client.workflows.delete( [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=False @@ -442,6 +483,7 @@ def test_delete_multiple_non_existing_raise(self, cognite_client: CogniteClient, assert cognite_client.workflows.retrieve(new_workflow.external_id) is not None def test_delete_multiple_non_existing(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) cognite_client.workflows.delete( [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=True ) @@ -532,7 +574,7 @@ def test_upsert_run_delete_with_simulation_task( finally: if created_version is not None: cognite_client.workflows.versions.delete(created_version.as_id()) - cognite_client.workflows.delete(created_version.workflow_external_id) + _safe_delete_workflows(cognite_client, created_version.workflow_external_id) def test_upsert_preexisting( self, cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion @@ -683,6 +725,7 @@ def test_trigger_cancel_retry_workflow( retried_workflow_execution = cognite_client.workflows.executions.retry(workflow_execution.id) assert retried_workflow_execution.status == "running" + cognite_client.workflows.executions.cancel(id=retried_workflow_execution.id, reason="test cleanup") class TestWorkflowTriggers: @@ -893,7 +936,7 @@ def test_pause_resume_trigger(self, cognite_client: CogniteClient) -> None: except Exception: pass try: - cognite_client.workflows.delete(workflow_external_id) + _safe_delete_workflows(cognite_client, workflow_external_id) except Exception: pass From f843ce0b7d841b67fc5670731871b398b86fde3d Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 13:59:06 +0200 Subject: [PATCH 02/21] fix python doc --- tests/tests_integration/test_api/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 912738f363..f0becb541d 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -54,7 +54,7 @@ def _cancel_running_executions_for_workflow( *, timeout: float = 60, ) -> None: - """Cancel running executions so a workflow can be deleted (see jazz-api PR #2424).""" + """Cancel running executions so a workflow can be deleted.""" running = client.workflows.executions.list( workflow_version_ids=WorkflowVersionId(workflow_external_id), statuses=["running"], From 9a6d9aab84dc2828ebf4f782b4e36cde97a42862 Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 14:09:32 +0200 Subject: [PATCH 03/21] fix(tests): isolate test_list_workflow_executions from shared session fixtures Use test-scoped workflow executions so listing is not polluted by executions created by other tests on the same session-scoped version. Co-authored-by: Cursor --- tests/tests_integration/test_api/test_workflows.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index f0becb541d..268416bcab 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -384,6 +384,7 @@ def workflow_execution_list( return _new_workflow_execution_list(cognite_client, new_workflow_version) +@pytest.fixture def workflow_execution_list_test_scoped( cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion ) -> WorkflowExecutionList: @@ -648,14 +649,14 @@ class TestWorkflowExecutions: def test_list_workflow_executions( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: listed = cognite_client.workflows.executions.list( - workflow_version_ids=workflow_execution_list[0].as_workflow_id() + workflow_version_ids=workflow_execution_list_test_scoped[0].as_workflow_id() ) # Compare by ID: cancel() can return before fields like end_time are # finalized server-side, so full-object equality is flaky. - assert {e.id for e in listed} == {e.id for e in workflow_execution_list} + assert {e.id for e in listed} == {e.id for e in workflow_execution_list_test_scoped} def test_list_workflow_executions_by_status( self, From 37484b6f7df46abe717de80668b860c4f8a839ae Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 14:15:21 +0200 Subject: [PATCH 04/21] fix(tests): harden workflow teardown helpers against race and missing IDs Ignore CogniteAPIError when cancelling executions that finish between list and cancel, and skip cancellation for unknown workflows when ignore_unknown_ids is enabled. Co-authored-by: Cursor --- tests/tests_integration/test_api/test_workflows.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 268416bcab..76f3e9d3e2 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -61,7 +61,10 @@ def _cancel_running_executions_for_workflow( limit=None, ) for execution in running: - client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + try: + client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + except CogniteAPIError: + pass # execution may have completed/cancelled between list and cancel deadline = time.monotonic() + timeout while time.monotonic() < deadline: @@ -84,7 +87,13 @@ def _safe_delete_workflows( ) -> None: ids = [external_ids] if isinstance(external_ids, str) else external_ids for workflow_external_id in ids: - _cancel_running_executions_for_workflow(client, workflow_external_id) + if ignore_unknown_ids and client.workflows.retrieve(workflow_external_id) is None: + continue + try: + _cancel_running_executions_for_workflow(client, workflow_external_id) + except CogniteAPIError: + if not ignore_unknown_ids: + raise client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) From af975381ba142f42f6a601c52a327d1cba9ea5b3 Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 14:25:11 +0200 Subject: [PATCH 05/21] kick From cce80dcae53ab9efc8e819c00d5cde8847542955 Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 22 Jun 2026 14:35:13 +0200 Subject: [PATCH 06/21] kick From b8a5c2f07e40a91270e64396d169a3f6ec6a7115 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 23 Jun 2026 15:19:31 +0200 Subject: [PATCH 07/21] fix clean up --- .../test_api/test_workflows.py | 58 +++++++++++++++++-- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 76f3e9d3e2..a2473716fa 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -1,6 +1,7 @@ from __future__ import annotations import datetime +import os import time import unittest from collections.abc import Iterator @@ -94,7 +95,16 @@ def _safe_delete_workflows( except CogniteAPIError: if not ignore_unknown_ids: raise - client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) + try: + client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) + except CogniteAPIError as exc: + if "running executions" not in str(exc): + raise + for workflow_external_id in ids: + if client.workflows.retrieve(workflow_external_id) is None: + continue + _cancel_running_executions_for_workflow(client, workflow_external_id, timeout=120) + client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) @pytest.fixture @@ -109,7 +119,9 @@ def permanent_wf_ext_id_prefix(os_and_py_version: str) -> str: @pytest.fixture(scope="session") def permanent_wf_ext_id(permanent_wf_ext_id_prefix: str, sdk_version: tuple[str, str, str]) -> str: - return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}" + # Isolate permanent trigger workflows per xdist worker to avoid cross-worker races. + worker = os.environ.get("PYTEST_XDIST_WORKER", "master") + return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}_{worker}" @pytest.fixture(autouse=True, scope="module") @@ -118,13 +130,22 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s resource_age = timestamp_to_ms("30m-ago") wf_triggers = cognite_client.workflows.triggers.list(limit=None) - wf_triggers_to_delete = [ + wf_triggers_to_delete = { wft.external_id for wft in wf_triggers if wft.last_updated_time < resource_age and not wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix) - ] + } + # Drop orphan per-minute triggers left on permanent workflows by failed trigger tests. + permanent_trigger_keep_prefixes = ("scheduled-trigger_", "data-modeling-trigger_") + wf_triggers_to_delete.update( + wft.external_id + for wft in wf_triggers + if wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix) + and wft.external_id.startswith("test_create_update_scheduled_trigger_") + and not wft.external_id.startswith(permanent_trigger_keep_prefixes) + ) if wf_triggers_to_delete: - cognite_client.workflows.triggers.delete(wf_triggers_to_delete) + cognite_client.workflows.triggers.delete(list(wf_triggers_to_delete)) wf_versions = cognite_client.workflows.versions.list(limit=None) wf_versions_to_delete = [ @@ -237,6 +258,7 @@ def _new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow) ), ) yield cognite_client.workflows.versions.upsert(version) + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) @@ -273,6 +295,7 @@ def _new_async_workflow_version( ), ) yield cognite_client.workflows.versions.upsert(version) + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True) @@ -344,6 +367,7 @@ def _new_workflow_version_list(cognite_client: CogniteClient, new_workflow: Work upserted_version = cognite_client.workflows.versions.upsert(version) upserted_versions.append(upserted_version) yield upserted_versions + _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) cognite_client.workflows.versions.delete(upserted_versions.as_ids(), ignore_unknown_ids=True) @@ -383,6 +407,26 @@ def _new_workflow_execution_list( metadata={"test": "integration_cancelled"}, ) run_2 = cognite_client.workflows.executions.cancel(id=run_2.id, reason="test cancel") + + for execution in (run_1, run_2): + if execution.status != "running": + continue + try: + cognite_client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + except CogniteAPIError: + pass + + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + running = cognite_client.workflows.executions.list( + workflow_version_ids=new_workflow_version.as_id(), + statuses=["running"], + limit=1, + ) + if not running: + break + time.sleep(0.5) + return WorkflowExecutionList([run_1, run_2]) @@ -400,7 +444,7 @@ def workflow_execution_list_test_scoped( return _new_workflow_execution_list(cognite_client, new_workflow_version_test_scoped) -@pytest.fixture(scope="session") +@pytest.fixture(scope="class") def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_ext_id: str) -> WorkflowVersion: workflow = WorkflowUpsert(external_id=permanent_wf_ext_id, description="Permanent workflow for trigger testing") cognite_client.workflows.upsert(workflow) @@ -421,6 +465,8 @@ def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_ ), ) upserted = cognite_client.workflows.versions.upsert(version) + if cognite_client.workflows.versions.retrieve(upserted.as_id()) is None: + raise RuntimeError(f"Failed to ensure permanent workflow version {upserted.as_id()!r}") return upserted From 90ea6f50da3eb2bc3b4c6b0394ced5d608fa0c05 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 23 Jun 2026 15:53:46 +0200 Subject: [PATCH 08/21] kick From c70247f87e264df18d36f87f4da8d7f3ad002baf Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 23 Jun 2026 16:09:21 +0200 Subject: [PATCH 09/21] add comment From e51873c09861c8c80f5ac4773bd2edfef7871b61 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 23 Jun 2026 16:14:04 +0200 Subject: [PATCH 10/21] add comment --- tests/tests_integration/test_api/test_workflows.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index a2473716fa..675d2790a4 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -100,6 +100,8 @@ def _safe_delete_workflows( except CogniteAPIError as exc: if "running executions" not in str(exc): raise + # Second pass: delete can still fail if cancellation is not yet visible, a new + # execution started (e.g. scheduled trigger), or the first cancel was skipped. for workflow_external_id in ids: if client.workflows.retrieve(workflow_external_id) is None: continue From 0da3ee674c5e3b62289107cf40948b4c350605b7 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 09:03:51 +0200 Subject: [PATCH 11/21] add comment From 0a3cc7661ce6a701cf72c624028c6d28ad47110e Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 09:15:20 +0200 Subject: [PATCH 12/21] small optim from gemini --- tests/tests_integration/test_api/test_workflows.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 675d2790a4..235f433853 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -87,9 +87,12 @@ def _safe_delete_workflows( ignore_unknown_ids: bool = True, ) -> None: ids = [external_ids] if isinstance(external_ids, str) else external_ids + if ignore_unknown_ids: + retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) + existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() + ids = [id_ for id_ in ids if id_ in existing_ids] + for workflow_external_id in ids: - if ignore_unknown_ids and client.workflows.retrieve(workflow_external_id) is None: - continue try: _cancel_running_executions_for_workflow(client, workflow_external_id) except CogniteAPIError: @@ -102,8 +105,10 @@ def _safe_delete_workflows( raise # Second pass: delete can still fail if cancellation is not yet visible, a new # execution started (e.g. scheduled trigger), or the first cancel was skipped. + retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) + existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() for workflow_external_id in ids: - if client.workflows.retrieve(workflow_external_id) is None: + if workflow_external_id not in existing_ids: continue _cancel_running_executions_for_workflow(client, workflow_external_id, timeout=120) client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) From e861b96338ee4a90197971411218c18d33c35553 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 09:25:05 +0200 Subject: [PATCH 13/21] kick From e2480f88c377031afc9f4cd67596d69739827f7f Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 09:29:18 +0200 Subject: [PATCH 14/21] kick From 0199fd38f6059c59c9d253f740c4daf7f62a7d79 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 09:53:28 +0200 Subject: [PATCH 15/21] kick From 6129365ecce9f1e12e76bf54de9c21457965c3d6 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 10:20:33 +0200 Subject: [PATCH 16/21] all test scoped --- .../test_api/test_workflows.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 235f433853..90793ebda9 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -437,13 +437,6 @@ def _new_workflow_execution_list( return WorkflowExecutionList([run_1, run_2]) -@pytest.fixture(scope="session") -def workflow_execution_list( - cognite_client: CogniteClient, new_workflow_version: WorkflowVersion -) -> WorkflowExecutionList: - return _new_workflow_execution_list(cognite_client, new_workflow_version) - - @pytest.fixture def workflow_execution_list_test_scoped( cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion @@ -723,20 +716,25 @@ def test_list_workflow_executions( def test_list_workflow_executions_by_status( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: - listed_completed = cognite_client.workflows.executions.list(statuses=["completed", "terminated"]) + listed_completed = cognite_client.workflows.executions.list( + workflow_version_ids=workflow_execution_list_test_scoped[0].as_workflow_id(), + statuses=["completed", "terminated"], + ) for execution in listed_completed: assert execution.status in ["completed", "terminated"] + expected_ids = {e.id for e in workflow_execution_list_test_scoped if e.status in ("completed", "terminated")} + assert expected_ids <= {e.id for e in listed_completed} def test_retrieve_workflow_execution_detailed( self, cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, + workflow_execution_list_test_scoped: WorkflowExecutionList, ) -> None: - retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id) + retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list_test_scoped[0].id) assert retrieved - assert retrieved.as_execution().dump() == workflow_execution_list[0].dump() + assert retrieved.as_execution().dump() == workflow_execution_list_test_scoped[0].dump() assert retrieved.executed_tasks assert retrieved.metadata == {"test": "integration_completed"} From 3de6288c94d46ab2f9e4d3532c67b6708ed7c2d3 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 10:32:02 +0200 Subject: [PATCH 17/21] kick From 52501fbc68e05d777313cd8a4077e15fa8b99525 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 11:00:02 +0200 Subject: [PATCH 18/21] more concurrent safety --- .../test_api/test_workflows.py | 83 ++++++++++++++++--- 1 file changed, 72 insertions(+), 11 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 90793ebda9..2c4de78f7e 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -4,7 +4,7 @@ import os import time import unittest -from collections.abc import Iterator +from collections.abc import Iterator, Sequence from zoneinfo import ZoneInfo import pytest @@ -86,6 +86,15 @@ def _safe_delete_workflows( *, ignore_unknown_ids: bool = True, ) -> None: + """Delete workflows without failing on races with executions or other test runs. + + "Safe" here means we skip IDs already gone, cancel running executions before + workflows.delete, and retry once with a longer wait if delete still reports + running executions. Needed because integration tests share one CDF project: + another worker or CI job may have deleted the workflow already, cancellation + may not be visible yet, or a trigger can start a new execution between cancel + and delete. + """ ids = [external_ids] if isinstance(external_ids, str) else external_ids if ignore_unknown_ids: retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) @@ -124,9 +133,61 @@ def permanent_wf_ext_id_prefix(os_and_py_version: str) -> str: return f"integ_test_wf_trigger_{os_and_py_version}" +def _delete_triggers_for_versions( + client: CogniteClient, + version_ids: Sequence[tuple[str, str | None]], +) -> None: + """Delete triggers linked to the given workflow versions.""" + if not version_ids: + return + version_id_set = set(version_ids) + triggers = client.workflows.triggers.list(limit=None) + trigger_ids = [ + trigger.external_id + for trigger in triggers + if (trigger.workflow_external_id, trigger.workflow_version) in version_id_set + ] + if trigger_ids: + client.workflows.triggers.delete(trigger_ids) + + +def _safe_delete_workflow_versions( + client: CogniteClient, + version_ids: Sequence[tuple[str, str | None]], + *, + ignore_unknown_ids: bool = True, +) -> None: + """Delete workflow versions without failing on linked triggers. + + "Safe" here means we remove triggers pointing at these versions before calling + versions.delete, and retry once if the API still reports an active trigger. + Needed because integration tests share one CDF project across xdist workers and + concurrent CI jobs: a version can gain a trigger after our cleanup list was built, + or another run may still have a trigger on a version we are tearing down. + """ + if not version_ids: + return + _delete_triggers_for_versions(client, version_ids) + ids_to_delete = [ + WorkflowVersionId(workflow_external_id=wf_ext_id, version=version) + for wf_ext_id, version in version_ids + if version is not None + ] + if not ids_to_delete: + return + try: + client.workflows.versions.delete(ids_to_delete, ignore_unknown_ids=ignore_unknown_ids) + except CogniteAPIError as exc: + if "active trigger" not in str(exc): + raise + _delete_triggers_for_versions(client, version_ids) + client.workflows.versions.delete(ids_to_delete, ignore_unknown_ids=ignore_unknown_ids) + + @pytest.fixture(scope="session") def permanent_wf_ext_id(permanent_wf_ext_id_prefix: str, sdk_version: tuple[str, str, str]) -> str: - # Isolate permanent trigger workflows per xdist worker to avoid cross-worker races. + # "Permanent" means exempt from wf_setup_module stale-resource cleanup (via prefix), + # not a new ID per CI run. Stable across runs; xdist worker suffix avoids cross-worker races. worker = os.environ.get("PYTEST_XDIST_WORKER", "master") return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}_{worker}" @@ -161,7 +222,7 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s if wf.last_updated_time < resource_age and not wf.workflow_external_id.startswith(permanent_wf_ext_id_prefix) ] if wf_versions_to_delete: - cognite_client.workflows.versions.delete(wf_versions_to_delete) + _safe_delete_workflow_versions(cognite_client, wf_versions_to_delete) wfs = cognite_client.workflows.list(limit=None) wfs_to_delete = [ @@ -266,7 +327,7 @@ def _new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow) ) yield cognite_client.workflows.versions.upsert(version) _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) - cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + _safe_delete_workflow_versions(cognite_client, [version.as_id().as_tuple()]) @pytest.fixture(scope="session") @@ -303,7 +364,7 @@ def _new_async_workflow_version( ) yield cognite_client.workflows.versions.upsert(version) _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) - cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True) + _safe_delete_workflow_versions(cognite_client, [(new_workflow.external_id, version.version)]) @pytest.fixture(scope="session") @@ -375,7 +436,7 @@ def _new_workflow_version_list(cognite_client: CogniteClient, new_workflow: Work upserted_versions.append(upserted_version) yield upserted_versions _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) - cognite_client.workflows.versions.delete(upserted_versions.as_ids(), ignore_unknown_ids=True) + _safe_delete_workflow_versions(cognite_client, upserted_versions.as_ids().as_tuples()) @pytest.fixture(scope="session") @@ -595,7 +656,7 @@ def test_upsert_run_delete_with_simulation_task( ), ) - cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + _safe_delete_workflow_versions(cognite_client, [version.as_id().as_tuple()], ignore_unknown_ids=True) created_version: WorkflowVersion | None = None try: @@ -629,7 +690,7 @@ def test_upsert_run_delete_with_simulation_task( finally: if created_version is not None: - cognite_client.workflows.versions.delete(created_version.as_id()) + _safe_delete_workflow_versions(cognite_client, [created_version.as_id().as_tuple()]) _safe_delete_workflows(cognite_client, created_version.workflow_external_id) def test_upsert_preexisting( @@ -793,12 +854,12 @@ class TestWorkflowTriggers: def test_create_update_scheduled_trigger( self, cognite_client: CogniteClient, - permanent_workflow_for_triggers: WorkflowVersion, + new_workflow_version_test_scoped: WorkflowVersion, ) -> None: - version = permanent_workflow_for_triggers + version = new_workflow_version_test_scoped existing = WorkflowTriggerUpsert( external_id=f"test_create_update_scheduled_trigger_{random_string(5)}", - trigger_rule=WorkflowScheduledTriggerRule(cron_expression="* * * * *"), + trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 * * * *"), workflow_external_id=version.workflow_external_id, workflow_version=version.version, input={"a": 1, "b": 2}, From a4e87e5033b28317d4fa833bc71643af58039233 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 11:19:13 +0200 Subject: [PATCH 19/21] kick From 42086555115cbede19f33a6ac7895a9fb89017f7 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 12:29:44 +0200 Subject: [PATCH 20/21] kick From d6d0c383cda72b7c65cf9d164960fd334f5846d9 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jun 2026 13:10:56 +0200 Subject: [PATCH 21/21] more concurrent safety --- .../test_api/test_workflows.py | 108 +++++++++--------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 2c4de78f7e..d0fdd01dcd 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -49,35 +49,40 @@ from tests.utils import get_or_raise -def _cancel_running_executions_for_workflow( +def _ensure_no_running_executions( client: CogniteClient, workflow_external_id: str, *, - timeout: float = 60, -) -> None: - """Cancel running executions so a workflow can be deleted.""" - running = client.workflows.executions.list( - workflow_version_ids=WorkflowVersionId(workflow_external_id), - statuses=["running"], - limit=None, - ) - for execution in running: - try: - client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") - except CogniteAPIError: - pass # execution may have completed/cancelled between list and cancel - + timeout: float = 120, +) -> bool: + """Cancel and poll until the workflow has no running executions.""" deadline = time.monotonic() + timeout while time.monotonic() < deadline: running = client.workflows.executions.list( workflow_version_ids=WorkflowVersionId(workflow_external_id), statuses=["running"], - limit=1, + limit=None, ) if not running: - return + return True + for execution in running: + try: + client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup") + except CogniteAPIError: + pass # execution may have completed/cancelled between list and cancel time.sleep(0.5) - raise RuntimeError(f"Workflow {workflow_external_id!r} still has running executions after {timeout}s") + return False + + +def _cancel_running_executions_for_workflow( + client: CogniteClient, + workflow_external_id: str, + *, + timeout: float = 60, +) -> None: + """Cancel running executions so a workflow can be deleted.""" + if not _ensure_no_running_executions(client, workflow_external_id, timeout=timeout): + raise RuntimeError(f"Workflow {workflow_external_id!r} still has running executions after {timeout}s") def _safe_delete_workflows( @@ -89,38 +94,38 @@ def _safe_delete_workflows( """Delete workflows without failing on races with executions or other test runs. "Safe" here means we skip IDs already gone, cancel running executions before - workflows.delete, and retry once with a longer wait if delete still reports + workflows.delete, and retry for up to ~3 minutes if delete still reports running executions. Needed because integration tests share one CDF project: another worker or CI job may have deleted the workflow already, cancellation may not be visible yet, or a trigger can start a new execution between cancel and delete. """ - ids = [external_ids] if isinstance(external_ids, str) else external_ids - if ignore_unknown_ids: - retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) - existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() - ids = [id_ for id_ in ids if id_ in existing_ids] + ids = [external_ids] if isinstance(external_ids, str) else list(external_ids) + deadline = time.monotonic() + 180 + last_exc: CogniteAPIError | None = None + + while time.monotonic() < deadline: + if ignore_unknown_ids: + retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) + existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() + ids = [id_ for id_ in ids if id_ in existing_ids] + if not ids: + return + + for workflow_external_id in ids: + _ensure_no_running_executions(client, workflow_external_id, timeout=30) - for workflow_external_id in ids: try: - _cancel_running_executions_for_workflow(client, workflow_external_id) - except CogniteAPIError: - if not ignore_unknown_ids: + client.workflows.delete(ids, ignore_unknown_ids=ignore_unknown_ids) + return + except CogniteAPIError as exc: + if "running executions" not in str(exc): raise - try: - client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) - except CogniteAPIError as exc: - if "running executions" not in str(exc): - raise - # Second pass: delete can still fail if cancellation is not yet visible, a new - # execution started (e.g. scheduled trigger), or the first cancel was skipped. - retrieved = client.workflows.retrieve(ids, ignore_unknown_ids=True) - existing_ids = {wf.external_id for wf in retrieved} if retrieved else set() - for workflow_external_id in ids: - if workflow_external_id not in existing_ids: - continue - _cancel_running_executions_for_workflow(client, workflow_external_id, timeout=120) - client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids) + last_exc = exc + time.sleep(1) + + if last_exc is not None: + raise last_exc @pytest.fixture @@ -326,7 +331,7 @@ def _new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow) ), ) yield cognite_client.workflows.versions.upsert(version) - _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) _safe_delete_workflow_versions(cognite_client, [version.as_id().as_tuple()]) @@ -363,7 +368,7 @@ def _new_async_workflow_version( ), ) yield cognite_client.workflows.versions.upsert(version) - _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) _safe_delete_workflow_versions(cognite_client, [(new_workflow.external_id, version.version)]) @@ -435,7 +440,7 @@ def _new_workflow_version_list(cognite_client: CogniteClient, new_workflow: Work upserted_version = cognite_client.workflows.versions.upsert(version) upserted_versions.append(upserted_version) yield upserted_versions - _cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id) + _ensure_no_running_executions(cognite_client, new_workflow.external_id, timeout=120) _safe_delete_workflow_versions(cognite_client, upserted_versions.as_ids().as_tuples()) @@ -484,16 +489,11 @@ def _new_workflow_execution_list( except CogniteAPIError: pass - deadline = time.monotonic() + 30 - while time.monotonic() < deadline: - running = cognite_client.workflows.executions.list( - workflow_version_ids=new_workflow_version.as_id(), - statuses=["running"], - limit=1, + if not _ensure_no_running_executions(cognite_client, new_workflow_version.workflow_external_id, timeout=120): + raise RuntimeError( + f"Workflow {new_workflow_version.workflow_external_id!r} still has running executions " + "after setup for execution list tests" ) - if not running: - break - time.sleep(0.5) return WorkflowExecutionList([run_1, run_2])