Skip to content
Merged
49 changes: 47 additions & 2 deletions tests/tests_integration/test_api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,49 @@ def test_list_workflows(self, cognite_client: CogniteClient, persisted_workflow_
assert persisted_workflow_list._external_id_to_item.keys() <= listed._external_id_to_item.keys()


def _delete_workflow_after_executions_finish(
cognite_client: CogniteClient,
workflow_external_id: str,
*,
timeout: float = 120,
) -> None:
"""Cancel running executions and delete once the API allows it (jazz-api#2424)."""
cancelled_ids: set[str] = set()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
running = cognite_client.workflows.executions.list(
workflow_version_ids=WorkflowVersionId(workflow_external_id),
statuses=["running"],
limit=None,
)
for execution in running:
if execution.id in cancelled_ids:
continue
try:
cognite_client.workflows.executions.cancel(id=execution.id, reason="integration test cleanup")
cancelled_ids.add(execution.id)
except CogniteAPIError:
pass
if not running:
break
time.sleep(0.5)
Comment thread
vincent-cognite marked this conversation as resolved.

delete_deadline = time.monotonic() + 60
last_exc: CogniteAPIError | None = None
while time.monotonic() < delete_deadline:
try:
cognite_client.workflows.delete(workflow_external_id, ignore_unknown_ids=True)
return
except CogniteAPIError as exc:
if "running executions" not in str(exc):
raise
last_exc = exc
time.sleep(1)

if last_exc is not None:
raise last_exc


class TestWorkflowVersions:
@pytest.mark.allow_no_semaphore(
"Test setup uses simulator fixtures that exercise worker-facing endpoints "
Expand Down Expand Up @@ -531,8 +574,10 @@ def test_upsert_run_delete_with_simulation_task(

finally:
if created_version is not None:
cognite_client.workflows.versions.delete(created_version.as_id())
cognite_client.workflows.delete(created_version.workflow_external_id)
_delete_workflow_after_executions_finish(
cognite_client,
created_version.workflow_external_id,
)

def test_upsert_preexisting(
self, cognite_client: CogniteClient, new_workflow_version_test_scoped: WorkflowVersion
Expand Down
Loading