Skip to content

Commit b8a5c2f

Browse files
fix clean up
1 parent cce80dc commit b8a5c2f

1 file changed

Lines changed: 52 additions & 6 deletions

File tree

tests/tests_integration/test_api/test_workflows.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import datetime
4+
import os
45
import time
56
import unittest
67
from collections.abc import Iterator
@@ -94,7 +95,16 @@ def _safe_delete_workflows(
9495
except CogniteAPIError:
9596
if not ignore_unknown_ids:
9697
raise
97-
client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids)
98+
try:
99+
client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids)
100+
except CogniteAPIError as exc:
101+
if "running executions" not in str(exc):
102+
raise
103+
for workflow_external_id in ids:
104+
if client.workflows.retrieve(workflow_external_id) is None:
105+
continue
106+
_cancel_running_executions_for_workflow(client, workflow_external_id, timeout=120)
107+
client.workflows.delete(external_ids, ignore_unknown_ids=ignore_unknown_ids)
98108

99109

100110
@pytest.fixture
@@ -109,7 +119,9 @@ def permanent_wf_ext_id_prefix(os_and_py_version: str) -> str:
109119

110120
@pytest.fixture(scope="session")
111121
def permanent_wf_ext_id(permanent_wf_ext_id_prefix: str, sdk_version: tuple[str, str, str]) -> str:
112-
return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}"
122+
# Isolate permanent trigger workflows per xdist worker to avoid cross-worker races.
123+
worker = os.environ.get("PYTEST_XDIST_WORKER", "master")
124+
return f"{permanent_wf_ext_id_prefix}_{sdk_version[0]}_{worker}"
113125

114126

115127
@pytest.fixture(autouse=True, scope="module")
@@ -118,13 +130,22 @@ def wf_setup_module(cognite_client: CogniteClient, permanent_wf_ext_id_prefix: s
118130
resource_age = timestamp_to_ms("30m-ago")
119131

120132
wf_triggers = cognite_client.workflows.triggers.list(limit=None)
121-
wf_triggers_to_delete = [
133+
wf_triggers_to_delete = {
122134
wft.external_id
123135
for wft in wf_triggers
124136
if wft.last_updated_time < resource_age and not wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix)
125-
]
137+
}
138+
# Drop orphan per-minute triggers left on permanent workflows by failed trigger tests.
139+
permanent_trigger_keep_prefixes = ("scheduled-trigger_", "data-modeling-trigger_")
140+
wf_triggers_to_delete.update(
141+
wft.external_id
142+
for wft in wf_triggers
143+
if wft.workflow_external_id.startswith(permanent_wf_ext_id_prefix)
144+
and wft.external_id.startswith("test_create_update_scheduled_trigger_")
145+
and not wft.external_id.startswith(permanent_trigger_keep_prefixes)
146+
)
126147
if wf_triggers_to_delete:
127-
cognite_client.workflows.triggers.delete(wf_triggers_to_delete)
148+
cognite_client.workflows.triggers.delete(list(wf_triggers_to_delete))
128149

129150
wf_versions = cognite_client.workflows.versions.list(limit=None)
130151
wf_versions_to_delete = [
@@ -237,6 +258,7 @@ def _new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow)
237258
),
238259
)
239260
yield cognite_client.workflows.versions.upsert(version)
261+
_cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id)
240262
cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True)
241263

242264

@@ -273,6 +295,7 @@ def _new_async_workflow_version(
273295
),
274296
)
275297
yield cognite_client.workflows.versions.upsert(version)
298+
_cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id)
276299
cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True)
277300

278301

@@ -344,6 +367,7 @@ def _new_workflow_version_list(cognite_client: CogniteClient, new_workflow: Work
344367
upserted_version = cognite_client.workflows.versions.upsert(version)
345368
upserted_versions.append(upserted_version)
346369
yield upserted_versions
370+
_cancel_running_executions_for_workflow(cognite_client, new_workflow.external_id)
347371
cognite_client.workflows.versions.delete(upserted_versions.as_ids(), ignore_unknown_ids=True)
348372

349373

@@ -383,6 +407,26 @@ def _new_workflow_execution_list(
383407
metadata={"test": "integration_cancelled"},
384408
)
385409
run_2 = cognite_client.workflows.executions.cancel(id=run_2.id, reason="test cancel")
410+
411+
for execution in (run_1, run_2):
412+
if execution.status != "running":
413+
continue
414+
try:
415+
cognite_client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup")
416+
except CogniteAPIError:
417+
pass
418+
419+
deadline = time.monotonic() + 30
420+
while time.monotonic() < deadline:
421+
running = cognite_client.workflows.executions.list(
422+
workflow_version_ids=new_workflow_version.as_id(),
423+
statuses=["running"],
424+
limit=1,
425+
)
426+
if not running:
427+
break
428+
time.sleep(0.5)
429+
386430
return WorkflowExecutionList([run_1, run_2])
387431

388432

@@ -400,7 +444,7 @@ def workflow_execution_list_test_scoped(
400444
return _new_workflow_execution_list(cognite_client, new_workflow_version_test_scoped)
401445

402446

403-
@pytest.fixture(scope="session")
447+
@pytest.fixture(scope="class")
404448
def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_ext_id: str) -> WorkflowVersion:
405449
workflow = WorkflowUpsert(external_id=permanent_wf_ext_id, description="Permanent workflow for trigger testing")
406450
cognite_client.workflows.upsert(workflow)
@@ -421,6 +465,8 @@ def permanent_workflow_for_triggers(cognite_client: CogniteClient, permanent_wf_
421465
),
422466
)
423467
upserted = cognite_client.workflows.versions.upsert(version)
468+
if cognite_client.workflows.versions.retrieve(upserted.as_id()) is None:
469+
raise RuntimeError(f"Failed to ensure permanent workflow version {upserted.as_id()!r}")
424470
return upserted
425471

426472

0 commit comments

Comments
 (0)