From ffcc7a2393cca1e0985866346ab7804dd4554e7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriele=20Cin=C3=A0?= Date: Fri, 12 Jun 2026 14:02:52 +0100 Subject: [PATCH] feat(retention): strict bool env parsing + stale-RUNNING cleanup override MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two hardening changes to the scheduled task-retention cleanup ahead of the EY rollout: 1. Strict boolean env parsing (_parse_bool_env). The previous pattern (os.environ.get(...) == "true") silently coerced any unrecognized value to False. For RETENTION_CLEANUP_DRY_RUN that failure mode is destructive: DRY_RUN=True (capital T, as YAML/Helm tooling tends to render booleans) meant dry_run=False, i.e. live deletion. Booleans now parse true/false/1/0 case-insensitively and raise on anything else, so a misconfigured worker refuses to run instead of deleting data. Applied to RETENTION_CLEANUP_DRY_RUN, RETENTION_CLEANUP_ENABLED, and ENABLE_HEALTH_CHECK_WORKFLOW (same pattern). 2. RETENTION_CLEANUP_STALE_RUNNING_DAYS (default 0 = disabled). The sweep refuses RUNNING tasks unconditionally, so tasks abandoned mid-run (agent crashed, workflow never reached a terminal state) are exempt from retention forever — in sgp-dev testing one agent had 121 such tasks, all unpurgeable. When this is set > 0, a RUNNING task with no interaction (same last-interaction definition as the idle check) for at least that many days is treated as abandoned and cleaned, with a WARNING-level forensic log per override. The unprocessed-events correctness guard still applies. Wired env -> load_cleanup_config -> sweep -> child workflow -> activity -> use case -> service; the new activity arg defaults to 0 so in-flight workflows are unaffected. --- agentex/docker-compose.yml | 2 + agentex/src/config/environment_variables.py | 45 +++++++-- .../domain/services/task_retention_service.py | 46 +++++++-- .../use_cases/task_retention_use_case.py | 8 ++ .../retention_cleanup_activities.py | 19 +++- .../workflows/retention_cleanup_workflow.py | 10 +- .../unit/config/test_retention_cleanup_env.py | 50 ++++++++++ .../services/test_task_retention_service.py | 93 +++++++++++++++++++ .../test_retention_cleanup_activities.py | 8 +- .../test_retention_cleanup_workflow.py | 78 +++++++++++++++- 10 files changed, 332 insertions(+), 27 deletions(-) diff --git a/agentex/docker-compose.yml b/agentex/docker-compose.yml index a917de42..1f04c5a5 100644 --- a/agentex/docker-compose.yml +++ b/agentex/docker-compose.yml @@ -177,6 +177,7 @@ services: - RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200} - RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20} - RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true} + - RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0} ports: - "5003:5003" volumes: @@ -242,6 +243,7 @@ services: - RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200} - RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20} - RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true} + - RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0} volumes: - .:/app:cached depends_on: diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 0872c0cf..0071b92f 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -65,6 +65,7 @@ class EnvVarKeys(str, Enum): RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE" RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT" RETENTION_CLEANUP_DRY_RUN = "RETENTION_CLEANUP_DRY_RUN" + RETENTION_CLEANUP_STALE_RUNNING_DAYS = "RETENTION_CLEANUP_STALE_RUNNING_DAYS" class Environment(str, Enum): @@ -76,6 +77,30 @@ class Environment(str, Enum): refreshed_environment_variables = None +def _parse_bool_env(key: EnvVarKeys, default: bool) -> bool: + """ + Strict boolean env parsing: accepts true/false/1/0 case-insensitively, + raises on anything else. + + The previous pattern (`os.environ.get(key, ...) == "true"`) silently + coerced any unrecognized value to False. For RETENTION_CLEANUP_DRY_RUN + that failure mode is destructive: `DRY_RUN=True` (capital T, as YAML + tooling tends to render booleans) meant dry_run=False, i.e. live + deletion. Fail loud instead so a misconfigured worker refuses to run. + """ + raw = os.environ.get(key) + if raw is None: + return default + normalized = raw.strip().lower() + if normalized in ("true", "1"): + return True + if normalized in ("false", "0"): + return False + raise ValueError( + f"Invalid boolean for {key.value}: {raw!r} (expected true/false/1/0)" + ) + + class EnvironmentVariables(BaseModel): ENVIRONMENT: str | None = Environment.DEV OPENAI_API_KEY: str | None @@ -128,6 +153,10 @@ class EnvironmentVariables(BaseModel): RETENTION_CLEANUP_PAGE_SIZE: int = 200 RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20 RETENTION_CLEANUP_DRY_RUN: bool = True + # When > 0, tasks stuck in RUNNING with no interaction for this many days + # are treated as abandoned and become eligible for cleanup. 0 disables the + # override (RUNNING tasks are never cleaned), preserving prior behavior. + RETENTION_CLEANUP_STALE_RUNNING_DAYS: int = 0 @classmethod def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: @@ -210,15 +239,14 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: AGENTEX_SERVER_TASK_QUEUE=os.environ.get( EnvVarKeys.AGENTEX_SERVER_TASK_QUEUE ), - ENABLE_HEALTH_CHECK_WORKFLOW=( - os.environ.get(EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, "false") - == "true" + ENABLE_HEALTH_CHECK_WORKFLOW=_parse_bool_env( + EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, default=False ), WEBHOOK_REQUEST_TIMEOUT=float( os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0") ), - RETENTION_CLEANUP_ENABLED=( - os.environ.get(EnvVarKeys.RETENTION_CLEANUP_ENABLED, "false") == "true" + RETENTION_CLEANUP_ENABLED=_parse_bool_env( + EnvVarKeys.RETENTION_CLEANUP_ENABLED, default=False ), RETENTION_CLEANUP_AGENT_ALLOWLIST=[ name.strip() @@ -239,8 +267,11 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: RETENTION_CLEANUP_MAX_IN_FLIGHT=int( os.environ.get(EnvVarKeys.RETENTION_CLEANUP_MAX_IN_FLIGHT, "20") ), - RETENTION_CLEANUP_DRY_RUN=( - os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true") == "true" + RETENTION_CLEANUP_DRY_RUN=_parse_bool_env( + EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, default=True + ), + RETENTION_CLEANUP_STALE_RUNNING_DAYS=int( + os.environ.get(EnvVarKeys.RETENTION_CLEANUP_STALE_RUNNING_DAYS, "0") ), ) refreshed_environment_variables = environment_variables diff --git a/agentex/src/domain/services/task_retention_service.py b/agentex/src/domain/services/task_retention_service.py index dd7c5a76..6d383cb6 100644 --- a/agentex/src/domain/services/task_retention_service.py +++ b/agentex/src/domain/services/task_retention_service.py @@ -177,6 +177,7 @@ async def clean_task( *, enforce_idle_threshold: bool = True, idle_days: int = 7, + stale_running_days: int = 0, ) -> TaskCleanupResultEntity: """ Delete content-bearing rows for a stale task. Idempotent: re-running on a @@ -189,9 +190,18 @@ async def clean_task( scheduled Temporal sweep always sets True. The admin endpoint accepts a force=true flag that flips this to False. idle_days: Idle threshold in days (when enforce_idle_threshold=True). + stale_running_days: When > 0, a task whose status is RUNNING but + whose last interaction is at least this many days old is treated + as abandoned and may be cleaned. 0 (default) keeps the strict + behavior: RUNNING tasks are never cleaned. Tasks that hang in + RUNNING forever (agent crashed mid-run, workflow never reached a + terminal state) would otherwise be exempt from retention + indefinitely, defeating the policy for exactly the data most + likely to be forgotten. Refuses (raises) if: - - task is currently active (status == RUNNING). + - task is currently active (status == RUNNING) and not stale per + stale_running_days. - enforce_idle_threshold=True and the task is not idle long enough. - unprocessed events exist past agent_task_tracker cursors. @@ -236,10 +246,7 @@ async def clean_task( ) # 2. Status + idle threshold guards. - if task.status == TaskStatus.RUNNING: - raise ClientError( - f"Cannot clean task {task_id}: status is RUNNING (active)" - ) + await self._check_running_guard(task, stale_running_days) if enforce_idle_threshold and not await self._is_task_idle(task, idle_days): raise ClientError( f"Cannot clean task {task_id}: not idle for {idle_days} days " @@ -293,6 +300,7 @@ async def preview_clean_task( *, enforce_idle_threshold: bool = True, idle_days: int = 7, + stale_running_days: int = 0, ) -> TaskCleanupResultEntity: """ Run the same safety checks as clean_task without deleting or updating data. @@ -306,10 +314,7 @@ async def preview_clean_task( if task.cleaned_at is not None: cleaned_at = task.cleaned_at else: - if task.status == TaskStatus.RUNNING: - raise ClientError( - f"Cannot clean task {task_id}: status is RUNNING (active)" - ) + await self._check_running_guard(task, stale_running_days) if enforce_idle_threshold and not await self._is_task_idle(task, idle_days): raise ClientError( f"Cannot clean task {task_id}: not idle for {idle_days} days " @@ -452,6 +457,29 @@ async def rehydrate_task( # ---- internal helpers ---- + async def _check_running_guard(self, task, stale_running_days: int) -> None: + """ + Raise ClientError if `task` is RUNNING, unless the stale-RUNNING override + applies: stale_running_days > 0 and the task has had no interaction for + at least that many days (same last-interaction definition as + _is_task_idle). Overrides are logged at WARNING for forensics — cleaning + a RUNNING task means we are declaring its workflow abandoned. + """ + if task.status != TaskStatus.RUNNING: + return + if stale_running_days > 0 and await self._is_task_idle( + task, stale_running_days + ): + logger.warning( + "task_cleanup_stale_running_override", + extra={ + "task_id": task.id, + "stale_running_days": stale_running_days, + }, + ) + return + raise ClientError(f"Cannot clean task {task.id}: status is RUNNING (active)") + async def _is_task_idle(self, task, idle_days: int) -> bool: """ True iff the task has no interaction within the idle window. diff --git a/agentex/src/domain/use_cases/task_retention_use_case.py b/agentex/src/domain/use_cases/task_retention_use_case.py index 6ac61546..3b0eb920 100644 --- a/agentex/src/domain/use_cases/task_retention_use_case.py +++ b/agentex/src/domain/use_cases/task_retention_use_case.py @@ -46,16 +46,22 @@ async def clean_task( task_id: str, force: bool = False, idle_days: int = 7, + stale_running_days: int = 0, ) -> TaskCleanupResultEntity: """ force=True is the admin escape hatch; it bypasses the idle-threshold check (but NOT the active-workflow / unprocessed-events checks, which protect correctness, not policy). + + stale_running_days > 0 relaxes the active-workflow check for tasks that + have sat in RUNNING with no interaction for at least that many days + (abandoned runs that would otherwise be exempt from retention forever). """ return await self.retention_service.clean_task( task_id=task_id, enforce_idle_threshold=not force, idle_days=idle_days, + stale_running_days=stale_running_days, ) async def preview_clean_task( @@ -63,6 +69,7 @@ async def preview_clean_task( task_id: str, force: bool = False, idle_days: int = 7, + stale_running_days: int = 0, ) -> TaskCleanupResultEntity: """ Dry-run counterpart to clean_task: runs the same safety checks without @@ -72,6 +79,7 @@ async def preview_clean_task( task_id=task_id, enforce_idle_threshold=not force, idle_days=idle_days, + stale_running_days=stale_running_days, ) async def rehydrate_task( diff --git a/agentex/src/temporal/activities/retention_cleanup_activities.py b/agentex/src/temporal/activities/retention_cleanup_activities.py index d42def8b..2fb5906b 100644 --- a/agentex/src/temporal/activities/retention_cleanup_activities.py +++ b/agentex/src/temporal/activities/retention_cleanup_activities.py @@ -73,6 +73,7 @@ async def load_cleanup_config(self) -> dict: "page_size": env.RETENTION_CLEANUP_PAGE_SIZE, "max_in_flight": env.RETENTION_CLEANUP_MAX_IN_FLIGHT, "dry_run": env.RETENTION_CLEANUP_DRY_RUN, + "stale_running_days": env.RETENTION_CLEANUP_STALE_RUNNING_DAYS, } @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) @@ -129,7 +130,11 @@ async def find_multi_agent_cleanup_candidates( @activity.defn(name=CLEAN_TASK_ACTIVITY) async def clean_task( - self, task_id: str, idle_days: int, dry_run: bool = True + self, + task_id: str, + idle_days: int, + dry_run: bool = True, + stale_running_days: int = 0, ) -> CleanTaskOutcome: """ Delete the stored content (messages, states, events) for a single task. @@ -139,6 +144,8 @@ async def clean_task( idle_days: Passed through to the use case for policy checks. dry_run: When omitted, preview only. Operators must pass False to enable writes. + stale_running_days: When > 0, RUNNING tasks idle at least this many + days are treated as abandoned and cleaned instead of skipped. Returns: CleanTaskOutcome with ``status`` set to ``"cleaned"`` when content was @@ -149,7 +156,10 @@ async def clean_task( try: if dry_run: result = await self.use_case.preview_clean_task( - task_id=task_id, force=False, idle_days=idle_days + task_id=task_id, + force=False, + idle_days=idle_days, + stale_running_days=stale_running_days, ) logger.info( "task_cleanup_dry_run", @@ -164,7 +174,10 @@ async def clean_task( "events_deleted": 0, } result = await self.use_case.clean_task( - task_id=task_id, force=False, idle_days=idle_days + task_id=task_id, + force=False, + idle_days=idle_days, + stale_running_days=stale_running_days, ) return { "task_id": result.task_id, diff --git a/agentex/src/temporal/workflows/retention_cleanup_workflow.py b/agentex/src/temporal/workflows/retention_cleanup_workflow.py index a08cb858..51f128f9 100644 --- a/agentex/src/temporal/workflows/retention_cleanup_workflow.py +++ b/agentex/src/temporal/workflows/retention_cleanup_workflow.py @@ -37,7 +37,12 @@ class RetentionCleanupTaskWorkflow: async def run(self, args: dict) -> dict: return await workflow.execute_activity( CLEAN_TASK_ACTIVITY, - args=[args["task_id"], args["idle_days"], args.get("dry_run", True)], + args=[ + args["task_id"], + args["idle_days"], + args.get("dry_run", True), + args.get("stale_running_days", 0), + ], start_to_close_timeout=timedelta(seconds=60), retry_policy=RetryPolicy( maximum_attempts=3, @@ -73,6 +78,7 @@ async def run(self, args: dict | None = None) -> dict: page_size = args.get("page_size", 200) max_in_flight = args.get("max_in_flight", 20) dry_run = args.get("dry_run", True) + stale_running_days = args.get("stale_running_days", 0) after_id = args.get("after_id") totals = args.get("totals", {"cleaned": 0, "skipped": 0, "failed": 0}) @@ -135,6 +141,7 @@ async def run(self, args: dict | None = None) -> dict: "task_id": task_id, "idle_days": idle_days, "dry_run": dry_run, + "stale_running_days": stale_running_days, }, id=f"retention-cleanup-task-{sweep_run_id}-{task_id}", retry_policy=RetryPolicy(maximum_attempts=1), @@ -158,6 +165,7 @@ async def run(self, args: dict | None = None) -> dict: "page_size": page_size, "max_in_flight": max_in_flight, "dry_run": dry_run, + "stale_running_days": stale_running_days, "after_id": page_last_id, "totals": totals, } diff --git a/agentex/tests/unit/config/test_retention_cleanup_env.py b/agentex/tests/unit/config/test_retention_cleanup_env.py index 67e8c518..d642b484 100644 --- a/agentex/tests/unit/config/test_retention_cleanup_env.py +++ b/agentex/tests/unit/config/test_retention_cleanup_env.py @@ -54,3 +54,53 @@ def test_retention_cleanup_env_allows_explicit_dry_run_false(monkeypatch): env = EnvironmentVariables.refresh(force_refresh=True) assert env.RETENTION_CLEANUP_DRY_RUN is False + + +@pytest.mark.unit +@pytest.mark.parametrize( + ("raw", "expected"), + [ + ("True", True), + ("TRUE", True), + ("1", True), + (" true ", True), + ("False", False), + ("FALSE", False), + ("0", False), + ], +) +def test_retention_cleanup_bool_env_is_case_insensitive(monkeypatch, raw, expected): + # YAML/Helm tooling tends to render booleans as True/False; the old strict + # `== "true"` comparison silently turned DRY_RUN=True into live deletion. + monkeypatch.setenv("RETENTION_CLEANUP_DRY_RUN", raw) + + env = EnvironmentVariables.refresh(force_refresh=True) + + assert env.RETENTION_CLEANUP_DRY_RUN is expected + + +@pytest.mark.unit +@pytest.mark.parametrize("raw", ["yes", "on", "truee", "dry", ""]) +def test_retention_cleanup_bool_env_rejects_garbage(monkeypatch, raw): + monkeypatch.setenv("RETENTION_CLEANUP_DRY_RUN", raw) + + with pytest.raises(ValueError, match="RETENTION_CLEANUP_DRY_RUN"): + EnvironmentVariables.refresh(force_refresh=True) + + +@pytest.mark.unit +def test_retention_cleanup_stale_running_days_defaults_to_disabled(monkeypatch): + monkeypatch.delenv("RETENTION_CLEANUP_STALE_RUNNING_DAYS", raising=False) + + env = EnvironmentVariables.refresh(force_refresh=True) + + assert env.RETENTION_CLEANUP_STALE_RUNNING_DAYS == 0 + + +@pytest.mark.unit +def test_retention_cleanup_stale_running_days_parses(monkeypatch): + monkeypatch.setenv("RETENTION_CLEANUP_STALE_RUNNING_DAYS", "30") + + env = EnvironmentVariables.refresh(force_refresh=True) + + assert env.RETENTION_CLEANUP_STALE_RUNNING_DAYS == 30 diff --git a/agentex/tests/unit/services/test_task_retention_service.py b/agentex/tests/unit/services/test_task_retention_service.py index 3680e9b7..0129d8f1 100644 --- a/agentex/tests/unit/services/test_task_retention_service.py +++ b/agentex/tests/unit/services/test_task_retention_service.py @@ -4,9 +4,44 @@ import pytest from src.domain.entities.tasks import TaskStatus +from src.domain.exceptions import ClientError from src.domain.services.task_retention_service import TaskRetentionService +def _make_service(task, *, messages=None): + task_repository = AsyncMock() + task_repository.get.return_value = task + task_message_service = AsyncMock() + task_message_service.get_messages.return_value = messages or [] + task_message_service.delete_all_messages.return_value = 0 + task_state_repository = AsyncMock() + task_state_repository.delete_by_field.return_value = 0 + event_repository = AsyncMock() + event_repository.delete_by_task_id.return_value = 0 + agent_task_tracker_repository = AsyncMock() + agent_task_tracker_repository.find_by_field.return_value = [] + + return TaskRetentionService( + task_repository=task_repository, + task_message_service=task_message_service, + task_message_repository=AsyncMock(), + task_state_repository=task_state_repository, + event_repository=event_repository, + agent_task_tracker_repository=agent_task_tracker_repository, + temporal_adapter=AsyncMock(), + httpx_client=AsyncMock(), + ) + + +def _running_task(idle_for_days: int): + return SimpleNamespace( + id="t1", + cleaned_at=None, + status=TaskStatus.RUNNING, + updated_at=datetime.now(UTC) - timedelta(days=idle_for_days), + ) + + @pytest.mark.unit @pytest.mark.asyncio async def test_preview_clean_task_validates_without_writes(): @@ -47,3 +82,61 @@ async def test_preview_clean_task_validates_without_writes(): event_repository.delete_by_task_id.assert_not_awaited() agent_task_tracker_repository.reset_cursors_for_task.assert_not_awaited() task_repository.update.assert_not_awaited() + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_refuses_running_task_by_default(): + service = _make_service(_running_task(idle_for_days=90)) + + with pytest.raises(ClientError, match="RUNNING"): + await service.clean_task(task_id="t1", idle_days=7) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_cleans_stale_running_task_with_override(): + service = _make_service(_running_task(idle_for_days=90)) + + result = await service.clean_task(task_id="t1", idle_days=7, stale_running_days=30) + + assert result.task_id == "t1" + service.task_message_service.delete_all_messages.assert_awaited_once_with("t1") + service.task_repository.update.assert_awaited_once() + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_refuses_running_task_not_stale_enough(): + # RUNNING and idle 10 days: past idle_days (7) but short of the + # stale-RUNNING threshold (30) — must still be refused. + service = _make_service(_running_task(idle_for_days=10)) + + with pytest.raises(ClientError, match="RUNNING"): + await service.clean_task(task_id="t1", idle_days=7, stale_running_days=30) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_running_override_respects_recent_messages(): + # Postgres row is stale but a recent Mongo message proves interaction: + # last-interaction = max(updated_at, latest message), so no override. + recent_message = SimpleNamespace(created_at=datetime.now(UTC) - timedelta(days=1)) + service = _make_service(_running_task(idle_for_days=90), messages=[recent_message]) + + with pytest.raises(ClientError, match="RUNNING"): + await service.clean_task(task_id="t1", idle_days=7, stale_running_days=30) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_preview_clean_task_applies_stale_running_override(): + service = _make_service(_running_task(idle_for_days=90)) + + result = await service.preview_clean_task( + task_id="t1", idle_days=7, stale_running_days=30 + ) + + assert result.task_id == "t1" + service.task_message_service.delete_all_messages.assert_not_awaited() + service.task_repository.update.assert_not_awaited() diff --git a/agentex/tests/unit/temporal/test_retention_cleanup_activities.py b/agentex/tests/unit/temporal/test_retention_cleanup_activities.py index f97403bf..019bf5c4 100644 --- a/agentex/tests/unit/temporal/test_retention_cleanup_activities.py +++ b/agentex/tests/unit/temporal/test_retention_cleanup_activities.py @@ -59,7 +59,9 @@ async def test_clean_task_cleaned_outcome(): assert outcome["status"] == "cleaned" assert outcome["task_id"] == "t1" assert outcome["messages_deleted"] == 3 - use_case.clean_task.assert_awaited_once_with(task_id="t1", force=False, idle_days=7) + use_case.clean_task.assert_awaited_once_with( + task_id="t1", force=False, idle_days=7, stale_running_days=0 + ) @pytest.mark.unit @@ -83,7 +85,7 @@ async def test_clean_task_defaults_to_dry_run_and_validates_without_writes(): assert outcome["task_id"] == "t1" assert outcome["reason"] == "would_clean" use_case.preview_clean_task.assert_awaited_once_with( - task_id="t1", force=False, idle_days=7 + task_id="t1", force=False, idle_days=7, stale_running_days=0 ) use_case.clean_task.assert_not_awaited() @@ -128,6 +130,7 @@ async def test_load_cleanup_config_reads_env(monkeypatch): monkeypatch.setenv("RETENTION_CLEANUP_PAGE_SIZE", "33") monkeypatch.setenv("RETENTION_CLEANUP_MAX_IN_FLIGHT", "4") monkeypatch.setenv("RETENTION_CLEANUP_DRY_RUN", "true") + monkeypatch.setenv("RETENTION_CLEANUP_STALE_RUNNING_DAYS", "30") activities = RetentionCleanupActivities( task_repository=AsyncMock(), use_case=AsyncMock() ) @@ -139,4 +142,5 @@ async def test_load_cleanup_config_reads_env(monkeypatch): "page_size": 33, "max_in_flight": 4, "dry_run": True, + "stale_running_days": 30, } diff --git a/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py b/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py index 7451d141..90c11a0e 100644 --- a/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py +++ b/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py @@ -30,7 +30,9 @@ async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: return [] @activity.defn(name=CLEAN_TASK_ACTIVITY) - async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: assert dry_run is False if task_id == "t2": return { @@ -102,7 +104,9 @@ async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: return [] @activity.defn(name=CLEAN_TASK_ACTIVITY) - async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: assert dry_run is True return { "task_id": task_id, @@ -152,7 +156,9 @@ async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: raise AssertionError("disabled cleanup should not check multi-agent candidates") @activity.defn(name=CLEAN_TASK_ACTIVITY) - async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: raise AssertionError("disabled cleanup should not clean tasks") async with await WorkflowEnvironment.start_time_skipping() as env: @@ -188,7 +194,9 @@ async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: return ["t2"] @activity.defn(name=CLEAN_TASK_ACTIVITY) - async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: cleaned_task_ids.append(task_id) return { "task_id": task_id, @@ -252,7 +260,9 @@ async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: return [] @activity.defn(name=CLEAN_TASK_ACTIVITY) - async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: assert dry_run is True return { "task_id": task_id, @@ -279,3 +289,61 @@ async def fake_clean(task_id: str, idle_days: int, dry_run: bool) -> dict: assert summary["dry_run"] == 1 assert summary["cleaned"] == 0 + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_sweep_propagates_stale_running_days_to_child_cleanup(): + pages = {None: ["t1"], "t1": []} + seen_stale_running_days: list[int] = [] + + @activity.defn(name=LOAD_CLEANUP_CONFIG_ACTIVITY) + async def fake_load() -> dict: + return { + "enabled": True, + "dry_run": True, + "idle_days": 7, + "agent_names": ["a"], + "page_size": 2, + "max_in_flight": 2, + "stale_running_days": 30, + } + + @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) + async def fake_find(after_id, limit, idle_days, agent_names) -> list[str]: + return pages[after_id] + + @activity.defn(name=FIND_MULTI_AGENT_CLEANUP_CANDIDATES_ACTIVITY) + async def fake_find_multi_agent(task_ids: list[str]) -> list[str]: + return [] + + @activity.defn(name=CLEAN_TASK_ACTIVITY) + async def fake_clean( + task_id: str, idle_days: int, dry_run: bool, stale_running_days: int = 0 + ) -> dict: + seen_stale_running_days.append(stale_running_days) + return { + "task_id": task_id, + "status": "dry_run", + "reason": "would_clean", + "messages_deleted": 0, + "task_states_deleted": 0, + "events_deleted": 0, + } + + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker( + env.client, + task_queue="test-retention-stale-running", + workflows=[RetentionCleanupSweepWorkflow, RetentionCleanupTaskWorkflow], + activities=[fake_load, fake_find, fake_find_multi_agent, fake_clean], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + summary = await env.client.execute_workflow( + RetentionCleanupSweepWorkflow.run, + id=f"sweep-{uuid.uuid4()}", + task_queue="test-retention-stale-running", + ) + + assert seen_stale_running_days == [30] + assert summary["dry_run"] == 1