|
| 1 | +"""Tier-aware retention sweeper for saved_workspaces (Step 8). |
| 2 | +
|
| 3 | +The sweeper applies per-tier retention to the `saved_workspaces` |
| 4 | +table. Today every user resolves to "free" (the shim still returns |
| 5 | +free for everyone), so in practice the 7-day Free retention is what |
| 6 | +fires; the Pro / Business branches are exercised by the tests against |
| 7 | +a patched `resolve_user_tier` so the wiring is locked in for the |
| 8 | +Stripe cutover. |
| 9 | +
|
| 10 | +Retention table (locked by the brief): |
| 11 | + free 7 days |
| 12 | + pro 30 days |
| 13 | + business unbounded (no deletion on age) |
| 14 | +
|
| 15 | +Implementation notes vs HelpmateAI's `sweep_local_workspace_storage`: |
| 16 | + * HelpmateAI's sweeper also cleans up FileStorage objects, orphan |
| 17 | + upload paths, orphan index dirs, etc. AI Job Agent saved workspaces |
| 18 | + are JSON blobs in a Supabase row -- there are no bucket objects or |
| 19 | + on-disk files to chase, so this sweeper is a pure DELETE pass. |
| 20 | + * We resolve each row's owner via `resolve_user_tier(app_user)` per |
| 21 | + the brief, so a future Stripe-aware resolver doesn't need to be |
| 22 | + revisited. App-user records ride in `aijobagent_app_users`; we |
| 23 | + fetch the row by user_id, then hand it to the resolver. |
| 24 | + * Service-role client only -- the sweeper bypasses RLS because it |
| 25 | + crosses user_id partitions. Mirrors `CachedJobsStore`'s |
| 26 | + service-role pattern. |
| 27 | +
|
| 28 | +CLI entry point at the bottom mirrors HelpmateAI's |
| 29 | +`if __name__ == "__main__": main()`. Operators (or a cron job in the |
| 30 | +VPS docker-compose) invoke this directly. The function returns a |
| 31 | +`SweepSummary` so the cron log carries a structured record of how |
| 32 | +many rows were touched. |
| 33 | +""" |
| 34 | +from __future__ import annotations |
| 35 | + |
| 36 | +import json |
| 37 | +import logging |
| 38 | +import os |
| 39 | +from dataclasses import asdict, dataclass |
| 40 | +from datetime import datetime, timedelta, timezone |
| 41 | +from typing import Any, Optional |
| 42 | + |
| 43 | +from backend.tiers import Tier, resolve_user_tier, retention_days_for_tier |
| 44 | +from src.config import ( |
| 45 | + SUPABASE_SAVED_WORKSPACES_TABLE, |
| 46 | + SUPABASE_SERVICE_ROLE_KEY, |
| 47 | + SUPABASE_URL, |
| 48 | +) |
| 49 | +from src.schemas import AppUserRecord |
| 50 | + |
| 51 | + |
| 52 | +try: # supabase is an optional dep in some test paths |
| 53 | + from supabase import create_client as _create_supabase_client # type: ignore |
| 54 | +except Exception: # pragma: no cover - defensive import |
| 55 | + _create_supabase_client = None # type: ignore |
| 56 | + |
| 57 | + |
| 58 | +logger = logging.getLogger(__name__) |
| 59 | + |
| 60 | + |
| 61 | +# Auth-table name. The same constant lives in `src.app_user_store`; |
| 62 | +# we recompute it here so the sweeper has no runtime coupling to the |
| 63 | +# auth module (which pulls in supabase as well). When SUPABASE_APP_USERS_TABLE |
| 64 | +# is renamed via env, this falls through to the default just like the |
| 65 | +# auth module does. |
| 66 | +_APP_USERS_TABLE = os.getenv("SUPABASE_APP_USERS_TABLE", "app_users").strip() |
| 67 | + |
| 68 | + |
| 69 | +@dataclass |
| 70 | +class SweepSummary: |
| 71 | + """Per-run summary returned by the sweeper. |
| 72 | +
|
| 73 | + `expired_workspaces_deleted` is the count of saved_workspaces rows |
| 74 | + whose `updated_at` was older than the owner's tier retention and |
| 75 | + that we actually deleted. `business_workspaces_skipped` is the |
| 76 | + count of rows whose owner resolved to Business (None retention) |
| 77 | + and were therefore exempted -- separated so operators can sanity- |
| 78 | + check that Business retention is firing. |
| 79 | +
|
| 80 | + `errors` is a count of rows we tried to process but couldn't |
| 81 | + (missing user record, Supabase delete failure, etc.). Per-row |
| 82 | + failures don't abort the sweep -- we want to make progress on |
| 83 | + the rest of the table. |
| 84 | + """ |
| 85 | + |
| 86 | + expired_workspaces_deleted: int = 0 |
| 87 | + business_workspaces_skipped: int = 0 |
| 88 | + rows_inspected: int = 0 |
| 89 | + errors: int = 0 |
| 90 | + |
| 91 | + def to_dict(self) -> dict[str, int]: |
| 92 | + return asdict(self) |
| 93 | + |
| 94 | + |
| 95 | +def _parse_timestamp(value: Any) -> Optional[datetime]: |
| 96 | + """Parse the row's `updated_at` (ISO 8601 string or datetime). |
| 97 | +
|
| 98 | + Supabase returns timestamps as strings; the deserialization path |
| 99 | + in some tests hands us a real datetime instead. Both branches |
| 100 | + return a tz-aware UTC datetime so the cutoff math is uniform. |
| 101 | + Returns None on parse failure -- the row gets skipped at the |
| 102 | + call site. |
| 103 | + """ |
| 104 | + if value is None: |
| 105 | + return None |
| 106 | + if isinstance(value, datetime): |
| 107 | + moment = value |
| 108 | + elif isinstance(value, str): |
| 109 | + if not value.strip(): |
| 110 | + return None |
| 111 | + try: |
| 112 | + moment = datetime.fromisoformat(value.replace("Z", "+00:00")) |
| 113 | + except ValueError: |
| 114 | + return None |
| 115 | + else: |
| 116 | + return None |
| 117 | + if moment.tzinfo is None: |
| 118 | + moment = moment.replace(tzinfo=timezone.utc) |
| 119 | + return moment.astimezone(timezone.utc) |
| 120 | + |
| 121 | + |
| 122 | +def _service_role_client(): |
| 123 | + """Build a service-role Supabase client or return None. |
| 124 | +
|
| 125 | + The sweeper crosses user_id partitions, so it has to bypass RLS; |
| 126 | + only the service role can do that. Returns None when the env |
| 127 | + vars / supabase dep aren't configured -- the caller logs and |
| 128 | + exits cleanly so a misconfigured cron doesn't crash on import. |
| 129 | + """ |
| 130 | + if not (SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY): |
| 131 | + return None |
| 132 | + if _create_supabase_client is None: |
| 133 | + return None |
| 134 | + return _create_supabase_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
| 135 | + |
| 136 | + |
| 137 | +def _load_app_user(client, user_id: str) -> Optional[AppUserRecord]: |
| 138 | + """Fetch the app_users row for a given user_id. |
| 139 | +
|
| 140 | + Used by the sweeper to feed `resolve_user_tier`. We accept None |
| 141 | + on missing/error -- the caller falls back to the Free retention |
| 142 | + in that branch so a tombstoned auth row can't make a workspace |
| 143 | + immortal. |
| 144 | + """ |
| 145 | + if not user_id: |
| 146 | + return None |
| 147 | + try: |
| 148 | + response = ( |
| 149 | + client.table(_APP_USERS_TABLE) |
| 150 | + .select("*") |
| 151 | + .eq("id", user_id) |
| 152 | + .limit(1) |
| 153 | + .execute() |
| 154 | + ) |
| 155 | + except Exception as exc: # noqa: BLE001 - boundary |
| 156 | + logger.warning( |
| 157 | + "sweep_app_user_lookup_failed user_id=%s error=%s", |
| 158 | + user_id, |
| 159 | + exc, |
| 160 | + ) |
| 161 | + return None |
| 162 | + rows = getattr(response, "data", None) or [] |
| 163 | + if not rows: |
| 164 | + return None |
| 165 | + first = rows[0] |
| 166 | + if not isinstance(first, dict): |
| 167 | + return None |
| 168 | + # Reuse the dataclass for a faithful representation of the row. |
| 169 | + # Field shape mirrors `_build_fallback_app_user_record`'s output. |
| 170 | + try: |
| 171 | + return AppUserRecord( |
| 172 | + id=str(first.get("id", "") or ""), |
| 173 | + email=str(first.get("email", "") or ""), |
| 174 | + plan_tier=str(first.get("plan_tier", "free") or "free"), |
| 175 | + account_status=str( |
| 176 | + first.get("account_status", "active") or "active" |
| 177 | + ), |
| 178 | + ) |
| 179 | + except Exception: # pragma: no cover - defensive |
| 180 | + return None |
| 181 | + |
| 182 | + |
| 183 | +def _delete_workspace(client, user_id: str, table_name: str) -> bool: |
| 184 | + """Delete the saved-workspace row for `user_id`. Returns True on |
| 185 | + success, False on failure (logged). The store upserts on user_id |
| 186 | + so there's at most one row to delete per user.""" |
| 187 | + try: |
| 188 | + client.table(table_name).delete().eq("user_id", user_id).execute() |
| 189 | + except Exception as exc: # noqa: BLE001 - boundary |
| 190 | + logger.warning( |
| 191 | + "sweep_workspace_delete_failed user_id=%s error=%s", |
| 192 | + user_id, |
| 193 | + exc, |
| 194 | + ) |
| 195 | + return False |
| 196 | + return True |
| 197 | + |
| 198 | + |
| 199 | +def _row_should_be_deleted( |
| 200 | + *, |
| 201 | + tier: Tier, |
| 202 | + updated_at: datetime, |
| 203 | + now: datetime, |
| 204 | +) -> bool: |
| 205 | + """Decide if a single row's age has exceeded its tier retention. |
| 206 | +
|
| 207 | + Business tier (retention=None) always returns False -- workspaces |
| 208 | + never auto-delete for unbounded retention. Capped tiers compute |
| 209 | + `cutoff = now - retention` and return True when `updated_at <= cutoff`. |
| 210 | + """ |
| 211 | + retention_days = retention_days_for_tier(tier) |
| 212 | + if retention_days is None: |
| 213 | + return False |
| 214 | + cutoff = now - timedelta(days=int(retention_days)) |
| 215 | + return updated_at <= cutoff |
| 216 | + |
| 217 | + |
| 218 | +def sweep_expired_workspaces( |
| 219 | + *, |
| 220 | + now: Optional[datetime] = None, |
| 221 | + table_name: str = SUPABASE_SAVED_WORKSPACES_TABLE, |
| 222 | + client=None, |
| 223 | +) -> SweepSummary: |
| 224 | + """Delete saved_workspaces rows older than their owner's tier |
| 225 | + retention window. Returns a SweepSummary the caller can log. |
| 226 | +
|
| 227 | + `now` and `client` exist as parameters for the test suite -- the |
| 228 | + happy production path leaves them defaulted. `table_name` exists |
| 229 | + so a future schema migration can run the sweep against a shadow |
| 230 | + table without code change. |
| 231 | +
|
| 232 | + The function is idempotent: a no-op call right after a real sweep |
| 233 | + finds no rows to delete and returns zeros across the board. |
| 234 | + """ |
| 235 | + summary = SweepSummary() |
| 236 | + sweep_now = (now or datetime.now(timezone.utc)).astimezone(timezone.utc) |
| 237 | + sweep_client = client if client is not None else _service_role_client() |
| 238 | + if sweep_client is None: |
| 239 | + logger.warning( |
| 240 | + "sweep_skipped_no_service_role_client " |
| 241 | + "url_configured=%s key_configured=%s table=%s", |
| 242 | + bool(SUPABASE_URL), |
| 243 | + bool(SUPABASE_SERVICE_ROLE_KEY), |
| 244 | + table_name, |
| 245 | + ) |
| 246 | + return summary |
| 247 | + |
| 248 | + try: |
| 249 | + response = ( |
| 250 | + sweep_client.table(table_name) |
| 251 | + .select("user_id,updated_at") |
| 252 | + .execute() |
| 253 | + ) |
| 254 | + except Exception as exc: # noqa: BLE001 - boundary |
| 255 | + logger.exception( |
| 256 | + "sweep_list_failed table=%s error=%s", table_name, exc |
| 257 | + ) |
| 258 | + summary.errors += 1 |
| 259 | + return summary |
| 260 | + |
| 261 | + rows = getattr(response, "data", None) or [] |
| 262 | + for row in rows: |
| 263 | + if not isinstance(row, dict): |
| 264 | + summary.errors += 1 |
| 265 | + continue |
| 266 | + summary.rows_inspected += 1 |
| 267 | + user_id = str(row.get("user_id", "") or "") |
| 268 | + updated_at = _parse_timestamp(row.get("updated_at")) |
| 269 | + if not user_id or updated_at is None: |
| 270 | + summary.errors += 1 |
| 271 | + continue |
| 272 | + |
| 273 | + # Tier resolution per the brief: load the auth row and hand |
| 274 | + # it to the resolver. Returning None falls through to Free |
| 275 | + # retention so a missing user record can't make a workspace |
| 276 | + # immortal. |
| 277 | + app_user = _load_app_user(sweep_client, user_id) |
| 278 | + tier = resolve_user_tier(app_user) |
| 279 | + |
| 280 | + if retention_days_for_tier(tier) is None: |
| 281 | + # Business tier -- skip on age. The row stays until the |
| 282 | + # user explicitly deletes it. |
| 283 | + summary.business_workspaces_skipped += 1 |
| 284 | + continue |
| 285 | + |
| 286 | + if not _row_should_be_deleted( |
| 287 | + tier=tier, updated_at=updated_at, now=sweep_now |
| 288 | + ): |
| 289 | + continue |
| 290 | + |
| 291 | + if _delete_workspace(sweep_client, user_id, table_name): |
| 292 | + summary.expired_workspaces_deleted += 1 |
| 293 | + else: |
| 294 | + summary.errors += 1 |
| 295 | + |
| 296 | + logger.info( |
| 297 | + "sweep_completed expired=%d business_skipped=%d inspected=%d errors=%d", |
| 298 | + summary.expired_workspaces_deleted, |
| 299 | + summary.business_workspaces_skipped, |
| 300 | + summary.rows_inspected, |
| 301 | + summary.errors, |
| 302 | + ) |
| 303 | + return summary |
| 304 | + |
| 305 | + |
| 306 | +def main() -> None: |
| 307 | + """CLI entry point. Mirrors HelpmateAI's `main()` in |
| 308 | + `backend/maintenance.py`. The cron job (or a one-off operator |
| 309 | + run) invokes this with `python -m backend.maintenance`; output |
| 310 | + is JSON so structured-log pipelines can ingest it directly. |
| 311 | + """ |
| 312 | + summary = sweep_expired_workspaces() |
| 313 | + print(json.dumps(summary.to_dict(), indent=2)) |
| 314 | + |
| 315 | + |
| 316 | +if __name__ == "__main__": |
| 317 | + main() |
| 318 | + |
| 319 | + |
| 320 | +__all__ = [ |
| 321 | + "SweepSummary", |
| 322 | + "main", |
| 323 | + "sweep_expired_workspaces", |
| 324 | +] |
0 commit comments