Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agentex/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ services:
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
- RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true}
- RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0}
ports:
- "5003:5003"
volumes:
Expand Down Expand Up @@ -242,6 +243,7 @@ services:
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
- RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true}
- RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0}
volumes:
- .:/app:cached
depends_on:
Expand Down
45 changes: 38 additions & 7 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class EnvVarKeys(str, Enum):
RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE"
RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT"
RETENTION_CLEANUP_DRY_RUN = "RETENTION_CLEANUP_DRY_RUN"
RETENTION_CLEANUP_STALE_RUNNING_DAYS = "RETENTION_CLEANUP_STALE_RUNNING_DAYS"


class Environment(str, Enum):
Expand All @@ -76,6 +77,30 @@ class Environment(str, Enum):
refreshed_environment_variables = None


def _parse_bool_env(key: EnvVarKeys, default: bool) -> bool:
"""
Strict boolean env parsing: accepts true/false/1/0 case-insensitively,
raises on anything else.

The previous pattern (`os.environ.get(key, ...) == "true"`) silently
coerced any unrecognized value to False. For RETENTION_CLEANUP_DRY_RUN
that failure mode is destructive: `DRY_RUN=True` (capital T, as YAML
tooling tends to render booleans) meant dry_run=False, i.e. live
deletion. Fail loud instead so a misconfigured worker refuses to run.
"""
raw = os.environ.get(key)
if raw is None:
return default
normalized = raw.strip().lower()
if normalized in ("true", "1"):
return True
if normalized in ("false", "0"):
return False
raise ValueError(
f"Invalid boolean for {key.value}: {raw!r} (expected true/false/1/0)"
)


class EnvironmentVariables(BaseModel):
ENVIRONMENT: str | None = Environment.DEV
OPENAI_API_KEY: str | None
Expand Down Expand Up @@ -128,6 +153,10 @@ class EnvironmentVariables(BaseModel):
RETENTION_CLEANUP_PAGE_SIZE: int = 200
RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20
RETENTION_CLEANUP_DRY_RUN: bool = True
# When > 0, tasks stuck in RUNNING with no interaction for this many days
# are treated as abandoned and become eligible for cleanup. 0 disables the
# override (RUNNING tasks are never cleaned), preserving prior behavior.
RETENTION_CLEANUP_STALE_RUNNING_DAYS: int = 0

@classmethod
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
Expand Down Expand Up @@ -210,15 +239,14 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
AGENTEX_SERVER_TASK_QUEUE=os.environ.get(
EnvVarKeys.AGENTEX_SERVER_TASK_QUEUE
),
ENABLE_HEALTH_CHECK_WORKFLOW=(
os.environ.get(EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, "false")
== "true"
ENABLE_HEALTH_CHECK_WORKFLOW=_parse_bool_env(
EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, default=False
),
WEBHOOK_REQUEST_TIMEOUT=float(
os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0")
),
RETENTION_CLEANUP_ENABLED=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_ENABLED, "false") == "true"
RETENTION_CLEANUP_ENABLED=_parse_bool_env(
EnvVarKeys.RETENTION_CLEANUP_ENABLED, default=False
),
RETENTION_CLEANUP_AGENT_ALLOWLIST=[
name.strip()
Expand All @@ -239,8 +267,11 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
RETENTION_CLEANUP_MAX_IN_FLIGHT=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_MAX_IN_FLIGHT, "20")
),
RETENTION_CLEANUP_DRY_RUN=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true") == "true"
RETENTION_CLEANUP_DRY_RUN=_parse_bool_env(
EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, default=True
),
RETENTION_CLEANUP_STALE_RUNNING_DAYS=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_STALE_RUNNING_DAYS, "0")
),
)
refreshed_environment_variables = environment_variables
Expand Down
46 changes: 37 additions & 9 deletions agentex/src/domain/services/task_retention_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async def clean_task(
*,
enforce_idle_threshold: bool = True,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Delete content-bearing rows for a stale task. Idempotent: re-running on a
Expand All @@ -189,9 +190,18 @@ async def clean_task(
scheduled Temporal sweep always sets True. The admin endpoint
accepts a force=true flag that flips this to False.
idle_days: Idle threshold in days (when enforce_idle_threshold=True).
stale_running_days: When > 0, a task whose status is RUNNING but
whose last interaction is at least this many days old is treated
as abandoned and may be cleaned. 0 (default) keeps the strict
behavior: RUNNING tasks are never cleaned. Tasks that hang in
RUNNING forever (agent crashed mid-run, workflow never reached a
terminal state) would otherwise be exempt from retention
indefinitely, defeating the policy for exactly the data most
likely to be forgotten.

Refuses (raises) if:
- task is currently active (status == RUNNING).
- task is currently active (status == RUNNING) and not stale per
stale_running_days.
- enforce_idle_threshold=True and the task is not idle long enough.
- unprocessed events exist past agent_task_tracker cursors.

Expand Down Expand Up @@ -236,10 +246,7 @@ async def clean_task(
)

# 2. Status + idle threshold guards.
if task.status == TaskStatus.RUNNING:
raise ClientError(
f"Cannot clean task {task_id}: status is RUNNING (active)"
)
await self._check_running_guard(task, stale_running_days)
if enforce_idle_threshold and not await self._is_task_idle(task, idle_days):
raise ClientError(
f"Cannot clean task {task_id}: not idle for {idle_days} days "
Expand Down Expand Up @@ -293,6 +300,7 @@ async def preview_clean_task(
*,
enforce_idle_threshold: bool = True,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Run the same safety checks as clean_task without deleting or updating data.
Expand All @@ -306,10 +314,7 @@ async def preview_clean_task(
if task.cleaned_at is not None:
cleaned_at = task.cleaned_at
else:
if task.status == TaskStatus.RUNNING:
raise ClientError(
f"Cannot clean task {task_id}: status is RUNNING (active)"
)
await self._check_running_guard(task, stale_running_days)
if enforce_idle_threshold and not await self._is_task_idle(task, idle_days):
raise ClientError(
f"Cannot clean task {task_id}: not idle for {idle_days} days "
Expand Down Expand Up @@ -452,6 +457,29 @@ async def rehydrate_task(

# ---- internal helpers ----

async def _check_running_guard(self, task, stale_running_days: int) -> None:
"""
Raise ClientError if `task` is RUNNING, unless the stale-RUNNING override
applies: stale_running_days > 0 and the task has had no interaction for
at least that many days (same last-interaction definition as
_is_task_idle). Overrides are logged at WARNING for forensics — cleaning
a RUNNING task means we are declaring its workflow abandoned.
"""
if task.status != TaskStatus.RUNNING:
return
if stale_running_days > 0 and await self._is_task_idle(
task, stale_running_days
):
logger.warning(
"task_cleanup_stale_running_override",
extra={
"task_id": task.id,
"stale_running_days": stale_running_days,
},
)
return
Comment on lines +468 to +480

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redundant MongoDB round-trip for stale-RUNNING tasks

When a task is RUNNING and stale_running_days > 0, _is_task_idle is called here (inside _check_running_guard) with stale_running_days as the threshold. If the guard passes, clean_task immediately calls _is_task_idle a second time with idle_days. Because stale_running_days is always expected to be ≥ idle_days (e.g. 30 vs 7), the second call's result is always True when the first call returns True, so the second query is always redundant for RUNNING tasks that pass the stale check. Each call issues a MongoDB message-fetch — at scale this doubles the read traffic for every stale-RUNNING task in the sweep.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/services/task_retention_service.py
Line: 468-480

Comment:
**Redundant MongoDB round-trip for stale-RUNNING tasks**

When a task is RUNNING and `stale_running_days > 0`, `_is_task_idle` is called here (inside `_check_running_guard`) with `stale_running_days` as the threshold. If the guard passes, `clean_task` immediately calls `_is_task_idle` a second time with `idle_days`. Because `stale_running_days` is always expected to be ≥ `idle_days` (e.g. 30 vs 7), the second call's result is always `True` when the first call returns `True`, so the second query is always redundant for RUNNING tasks that pass the stale check. Each call issues a MongoDB message-fetch — at scale this doubles the read traffic for every stale-RUNNING task in the sweep.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

raise ClientError(f"Cannot clean task {task.id}: status is RUNNING (active)")
Comment on lines +460 to +481

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Forensic WARNING fires during dry-runs / previews

_check_running_guard is shared between clean_task (real deletion) and preview_clean_task (no writes). The logger.warning("task_cleanup_stale_running_override", ...) is documented as a forensic record that "cleaning a RUNNING task means we are declaring its workflow abandoned" — but it fires on every dry-run sweep as well. An operator running a dry-run to audit what would be cleaned will produce WARNING entries that are indistinguishable from the entries produced by a live deletion run. Forensic tooling or alert rules keyed on this event name would fire false positives on every dry-run invocation.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/services/task_retention_service.py
Line: 460-481

Comment:
**Forensic WARNING fires during dry-runs / previews**

`_check_running_guard` is shared between `clean_task` (real deletion) and `preview_clean_task` (no writes). The `logger.warning("task_cleanup_stale_running_override", ...)` is documented as a forensic record that "cleaning a RUNNING task means we are declaring its workflow abandoned" — but it fires on every dry-run sweep as well. An operator running a dry-run to audit what _would_ be cleaned will produce WARNING entries that are indistinguishable from the entries produced by a live deletion run. Forensic tooling or alert rules keyed on this event name would fire false positives on every dry-run invocation.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex


async def _is_task_idle(self, task, idle_days: int) -> bool:
"""
True iff the task has no interaction within the idle window.
Expand Down
8 changes: 8 additions & 0 deletions agentex/src/domain/use_cases/task_retention_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,30 @@ async def clean_task(
task_id: str,
force: bool = False,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
force=True is the admin escape hatch; it bypasses the idle-threshold
check (but NOT the active-workflow / unprocessed-events checks, which
protect correctness, not policy).

stale_running_days > 0 relaxes the active-workflow check for tasks that
have sat in RUNNING with no interaction for at least that many days
(abandoned runs that would otherwise be exempt from retention forever).
"""
return await self.retention_service.clean_task(
task_id=task_id,
enforce_idle_threshold=not force,
idle_days=idle_days,
stale_running_days=stale_running_days,
)

async def preview_clean_task(
self,
task_id: str,
force: bool = False,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Dry-run counterpart to clean_task: runs the same safety checks without
Expand All @@ -72,6 +79,7 @@ async def preview_clean_task(
task_id=task_id,
enforce_idle_threshold=not force,
idle_days=idle_days,
stale_running_days=stale_running_days,
)

async def rehydrate_task(
Expand Down
19 changes: 16 additions & 3 deletions agentex/src/temporal/activities/retention_cleanup_activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def load_cleanup_config(self) -> dict:
"page_size": env.RETENTION_CLEANUP_PAGE_SIZE,
"max_in_flight": env.RETENTION_CLEANUP_MAX_IN_FLIGHT,
"dry_run": env.RETENTION_CLEANUP_DRY_RUN,
"stale_running_days": env.RETENTION_CLEANUP_STALE_RUNNING_DAYS,
}

@activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY)
Expand Down Expand Up @@ -129,7 +130,11 @@ async def find_multi_agent_cleanup_candidates(

@activity.defn(name=CLEAN_TASK_ACTIVITY)
async def clean_task(
self, task_id: str, idle_days: int, dry_run: bool = True
self,
task_id: str,
idle_days: int,
dry_run: bool = True,
stale_running_days: int = 0,
) -> CleanTaskOutcome:
"""
Delete the stored content (messages, states, events) for a single task.
Expand All @@ -139,6 +144,8 @@ async def clean_task(
idle_days: Passed through to the use case for policy checks.
dry_run: When omitted, preview only. Operators must pass False to
enable writes.
stale_running_days: When > 0, RUNNING tasks idle at least this many
days are treated as abandoned and cleaned instead of skipped.

Returns:
CleanTaskOutcome with ``status`` set to ``"cleaned"`` when content was
Expand All @@ -149,7 +156,10 @@ async def clean_task(
try:
if dry_run:
result = await self.use_case.preview_clean_task(
task_id=task_id, force=False, idle_days=idle_days
task_id=task_id,
force=False,
idle_days=idle_days,
stale_running_days=stale_running_days,
)
logger.info(
"task_cleanup_dry_run",
Expand All @@ -164,7 +174,10 @@ async def clean_task(
"events_deleted": 0,
}
result = await self.use_case.clean_task(
task_id=task_id, force=False, idle_days=idle_days
task_id=task_id,
force=False,
idle_days=idle_days,
stale_running_days=stale_running_days,
)
return {
"task_id": result.task_id,
Expand Down
10 changes: 9 additions & 1 deletion agentex/src/temporal/workflows/retention_cleanup_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ class RetentionCleanupTaskWorkflow:
async def run(self, args: dict) -> dict:
return await workflow.execute_activity(
CLEAN_TASK_ACTIVITY,
args=[args["task_id"], args["idle_days"], args.get("dry_run", True)],
args=[
args["task_id"],
args["idle_days"],
args.get("dry_run", True),
args.get("stale_running_days", 0),
],
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(
maximum_attempts=3,
Expand Down Expand Up @@ -73,6 +78,7 @@ async def run(self, args: dict | None = None) -> dict:
page_size = args.get("page_size", 200)
max_in_flight = args.get("max_in_flight", 20)
dry_run = args.get("dry_run", True)
stale_running_days = args.get("stale_running_days", 0)
after_id = args.get("after_id")
totals = args.get("totals", {"cleaned": 0, "skipped": 0, "failed": 0})

Expand Down Expand Up @@ -135,6 +141,7 @@ async def run(self, args: dict | None = None) -> dict:
"task_id": task_id,
"idle_days": idle_days,
"dry_run": dry_run,
"stale_running_days": stale_running_days,
},
id=f"retention-cleanup-task-{sweep_run_id}-{task_id}",
retry_policy=RetryPolicy(maximum_attempts=1),
Expand All @@ -158,6 +165,7 @@ async def run(self, args: dict | None = None) -> dict:
"page_size": page_size,
"max_in_flight": max_in_flight,
"dry_run": dry_run,
"stale_running_days": stale_running_days,
"after_id": page_last_id,
"totals": totals,
}
Expand Down
50 changes: 50 additions & 0 deletions agentex/tests/unit/config/test_retention_cleanup_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,53 @@ def test_retention_cleanup_env_allows_explicit_dry_run_false(monkeypatch):
env = EnvironmentVariables.refresh(force_refresh=True)

assert env.RETENTION_CLEANUP_DRY_RUN is False


@pytest.mark.unit
@pytest.mark.parametrize(
("raw", "expected"),
[
("True", True),
("TRUE", True),
("1", True),
(" true ", True),
("False", False),
("FALSE", False),
("0", False),
],
)
def test_retention_cleanup_bool_env_is_case_insensitive(monkeypatch, raw, expected):
# YAML/Helm tooling tends to render booleans as True/False; the old strict
# `== "true"` comparison silently turned DRY_RUN=True into live deletion.
monkeypatch.setenv("RETENTION_CLEANUP_DRY_RUN", raw)

env = EnvironmentVariables.refresh(force_refresh=True)

assert env.RETENTION_CLEANUP_DRY_RUN is expected


@pytest.mark.unit
@pytest.mark.parametrize("raw", ["yes", "on", "truee", "dry", ""])
def test_retention_cleanup_bool_env_rejects_garbage(monkeypatch, raw):
monkeypatch.setenv("RETENTION_CLEANUP_DRY_RUN", raw)

with pytest.raises(ValueError, match="RETENTION_CLEANUP_DRY_RUN"):
EnvironmentVariables.refresh(force_refresh=True)


@pytest.mark.unit
def test_retention_cleanup_stale_running_days_defaults_to_disabled(monkeypatch):
monkeypatch.delenv("RETENTION_CLEANUP_STALE_RUNNING_DAYS", raising=False)

env = EnvironmentVariables.refresh(force_refresh=True)

assert env.RETENTION_CLEANUP_STALE_RUNNING_DAYS == 0


@pytest.mark.unit
def test_retention_cleanup_stale_running_days_parses(monkeypatch):
monkeypatch.setenv("RETENTION_CLEANUP_STALE_RUNNING_DAYS", "30")

env = EnvironmentVariables.refresh(force_refresh=True)

assert env.RETENTION_CLEANUP_STALE_RUNNING_DAYS == 30
Loading
Loading