Skip to content

Commit 730c9ad

Browse files
committed
feat(console): add usage audit retention config
1 parent ea4e7ea commit 730c9ad

4 files changed

Lines changed: 88 additions & 1 deletion

File tree

openviking/observability/usage_audit/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ Observability Event Bus
6666
"batch_size": 500,
6767
"flush_interval_seconds": 1.0,
6868
"shutdown_flush_timeout_seconds": 3.0,
69+
"usage_retention_days": 14,
70+
"audit_retention_days": 7,
6971
"audit_retention_per_account": 1000,
7072
"timezone": "local",
7173
"inventory_ttl_seconds": 10.0
@@ -86,7 +88,9 @@ Observability Event Bus
8688
| `batch_size` | `500` | 单次批量写入的最大事件数 |
8789
| `flush_interval_seconds` | `1.0` | worker 定时 flush 间隔 |
8890
| `shutdown_flush_timeout_seconds` | `3.0` | 服务关闭时 flush 等待时间 |
89-
| `audit_retention_per_account` | `1000` | 每个 account 保留的最新请求审计条数;`0` 表示不主动裁剪 |
91+
| `usage_retention_days` | `14` | 统计聚合数据保留天数,包含 Token、检索、上下文写入热力图、Agent 活跃;`0` 表示不按天裁剪 |
92+
| `audit_retention_days` | `7` | 请求审计日志保留天数;`0` 表示不按天裁剪 |
93+
| `audit_retention_per_account` | `1000` | 每个 account 保留的最新请求审计条数;`0` 表示不按条数裁剪 |
9094
| `timezone` | `"local"` | 统计日期使用的时区;可填 `"local"``"UTC"` 或 IANA 时区名 |
9195
| `inventory_ttl_seconds` | `10.0` | 上下文当前数据量查询缓存时间 |
9296

openviking/observability/usage_audit/runtime.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ async def init_usage_audit_from_server_config(
6565

6666
store = SQLiteUsageAuditStore(
6767
_resolve_sqlite_path(config),
68+
usage_retention_days=usage_config.usage_retention_days,
69+
audit_retention_days=usage_config.audit_retention_days,
6870
audit_retention_per_account=usage_config.audit_retention_per_account,
6971
timezone_name=usage_config.timezone,
7072
)

openviking/observability/usage_audit/sqlite_store.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ def __init__(
3333
self,
3434
db_path: Path,
3535
*,
36+
usage_retention_days: int = 14,
37+
audit_retention_days: int = 7,
3638
audit_retention_per_account: int = 1000,
3739
timezone_name: str = "local",
3840
) -> None:
3941
self._db_path = Path(db_path)
42+
self._usage_retention_days = int(usage_retention_days)
43+
self._audit_retention_days = int(audit_retention_days)
4044
self._audit_retention_per_account = int(audit_retention_per_account)
4145
self._tz = resolve_usage_timezone(timezone_name)
4246
self._conn: sqlite3.Connection | None = None
@@ -85,6 +89,7 @@ def _record_projection_sync(self, projection: UsageAuditProjection) -> None:
8589
self._write_context_rows(conn, projection.context_rows, updated_at)
8690
self._write_agent_rows(conn, projection.agent_rows, updated_at)
8791
self._write_audit_rows(conn, projection.audit_rows)
92+
self._trim_usage_rows(conn, self._usage_max_dates(projection))
8893
self._trim_audit_rows(conn, projection.touched_audit_accounts)
8994
conn.execute("COMMIT")
9095
except Exception:
@@ -175,6 +180,19 @@ def _write_audit_rows(conn, rows: list[tuple]) -> None:
175180
)
176181

177182
def _trim_audit_rows(self, conn, accounts: set[str]) -> None:
183+
if self._audit_retention_days > 0:
184+
max_dates = self._audit_max_dates(conn, accounts)
185+
for account_id, cutoff_date in self._cutoff_dates(
186+
max_dates,
187+
retention_days=self._audit_retention_days,
188+
).items():
189+
conn.execute(
190+
"""
191+
DELETE FROM request_audit
192+
WHERE account_id = ? AND substr(created_at, 1, 10) < ?
193+
""",
194+
(account_id, cutoff_date),
195+
)
178196
if self._audit_retention_per_account <= 0:
179197
return
180198
for account_id in accounts:
@@ -192,6 +210,67 @@ def _trim_audit_rows(self, conn, accounts: set[str]) -> None:
192210
(account_id, account_id, self._audit_retention_per_account),
193211
)
194212

213+
def _trim_usage_rows(self, conn, max_dates_by_account: dict[str, str]) -> None:
214+
cutoff_by_account = self._cutoff_dates(
215+
max_dates_by_account,
216+
retention_days=self._usage_retention_days,
217+
)
218+
for account_id, cutoff_date in cutoff_by_account.items():
219+
for table in (
220+
"usage_token_daily",
221+
"usage_retrieval_daily",
222+
"usage_context_write_bucket",
223+
"usage_agent_activity_daily",
224+
):
225+
conn.execute(
226+
f"DELETE FROM {table} WHERE account_id = ? AND date < ?",
227+
(account_id, cutoff_date),
228+
)
229+
230+
@staticmethod
231+
def _usage_max_dates(projection: UsageAuditProjection) -> dict[str, str]:
232+
max_dates: dict[str, str] = {}
233+
SQLiteUsageAuditStore._merge_max_dates(max_dates, projection.token_rows, date_index=3)
234+
SQLiteUsageAuditStore._merge_max_dates(max_dates, projection.retrieval_rows, date_index=3)
235+
SQLiteUsageAuditStore._merge_max_dates(max_dates, projection.context_rows, date_index=3)
236+
SQLiteUsageAuditStore._merge_max_dates(max_dates, projection.agent_rows, date_index=2)
237+
return max_dates
238+
239+
@staticmethod
240+
def _merge_max_dates(target: dict[str, str], rows: dict[tuple, Any], *, date_index: int) -> None:
241+
for key in rows:
242+
account_id = str(key[0])
243+
event_date = str(key[date_index])
244+
if event_date > target.get(account_id, ""):
245+
target[account_id] = event_date
246+
247+
@staticmethod
248+
def _cutoff_dates(max_dates_by_account: dict[str, str], *, retention_days: int) -> dict[str, str]:
249+
if retention_days <= 0:
250+
return {}
251+
return {
252+
account_id: (
253+
date.fromisoformat(max_date) - timedelta(days=retention_days - 1)
254+
).isoformat()
255+
for account_id, max_date in max_dates_by_account.items()
256+
}
257+
258+
@staticmethod
259+
def _audit_max_dates(conn, accounts: set[str]) -> dict[str, str]:
260+
max_dates: dict[str, str] = {}
261+
for account_id in accounts:
262+
row = conn.execute(
263+
"""
264+
SELECT MAX(substr(created_at, 1, 10)) AS max_date
265+
FROM request_audit
266+
WHERE account_id = ?
267+
""",
268+
(account_id,),
269+
).fetchone()
270+
if row and row["max_date"]:
271+
max_dates[account_id] = str(row["max_date"])
272+
return max_dates
273+
195274
async def get_today_tokens(self, *, account_id: str, date: str) -> dict[str, int]:
196275
async with self._lock:
197276
return await asyncio.to_thread(self._get_today_tokens_sync, account_id, date)

openviking/server/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ class UsageAuditConfig(BaseModel):
9595
batch_size: int = Field(500, gt=0)
9696
flush_interval_seconds: float = Field(1.0, gt=0)
9797
shutdown_flush_timeout_seconds: float = Field(3.0, gt=0)
98+
usage_retention_days: int = Field(14, ge=0)
99+
audit_retention_days: int = Field(7, ge=0)
98100
audit_retention_per_account: int = Field(1000, ge=0)
99101
timezone: str = "local"
100102
inventory_ttl_seconds: float = Field(10.0, ge=0)

0 commit comments

Comments
 (0)