-
Notifications
You must be signed in to change notification settings - Fork 133
feat(usage): always-on per-query usage tracking #614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
972888d
feat(usage): always-on per-query usage tracking
galshubeli 16500d8
Merge branch 'staging' into feat/usage-tracking
galshubeli c12dbcb
fix(usage): address review — track all paths, drop PII, validate user_id
galshubeli eed65d1
refactor(usage): config-driven graph name + keep tracking out of the SDK
galshubeli 31fa28d
Merge branch 'staging' into feat/usage-tracking
galshubeli e47e580
fix(usage): address review — log hash not PII, empty-env fallback, tr…
galshubeli b4bd4df
fix(usage): correct success semantics, env-agnostic test, doc wording
galshubeli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| """Always-on, provider-agnostic per-query usage tracking. | ||
|
|
||
| This is deliberately independent of the optional LLM conversational-memory | ||
| feature (``api/memory/graphiti_tool.py``). Memory writes are opt-in | ||
| (``use_memory``), gated to OpenAI/Azure providers, and lazily created — so | ||
| they cannot be used to measure adoption. This module records *every* query, | ||
| regardless of provider or the ``use_memory`` flag, onto the central | ||
| ``Organizations`` graph (which already holds ``User``/``Identity``/``Token``). | ||
|
|
||
| For each query we maintain, fire-and-forget: | ||
|
|
||
| * Denormalized counters + activity timestamps on the ``User`` node | ||
| (``query_count``/``success_count``/``error_count``/``last_active``/ | ||
| ``first_query_at``) for cheap reads. | ||
| * A per-query ``(:UsageEvent)`` node linked ``(User)-[:PERFORMED]->`` carrying | ||
| ``graph_id``/``is_demo``/``success``/``timestamp`` for time-series, per-DB | ||
| and success-rate analytics. | ||
|
|
||
| Writes never block or fail a request: they run as background tasks whose | ||
| exceptions are logged and swallowed, mirroring | ||
| ``api.core.pipeline.save_memory_background``. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import base64 | ||
| import binascii | ||
| import hashlib | ||
| import logging | ||
| from typing import Optional | ||
|
|
||
| from api.config import ORGANIZATIONS_GRAPH | ||
| from api.core.db_resolver import resolve_db | ||
| from api.core.pipeline import background_tasks_var, is_general_graph | ||
|
|
||
| # Single round-trip: bump the User counters/timestamps and append a UsageEvent. | ||
| # Uses MATCH (not MERGE) on User so an unknown email is a silent no-op rather | ||
| # than creating a phantom user from the query path. ``timestamp()`` is FalkorDB | ||
| # epoch-millis, matching every other timestamp in the Organizations graph. | ||
| _RECORD_USAGE_CYPHER = """ | ||
| MATCH (u:User {email: $email}) | ||
| SET u.query_count = coalesce(u.query_count, 0) + 1, | ||
| u.success_count = coalesce(u.success_count, 0) + (CASE WHEN $success THEN 1 ELSE 0 END), | ||
| u.error_count = coalesce(u.error_count, 0) + (CASE WHEN $success THEN 0 ELSE 1 END), | ||
| u.last_active = timestamp(), | ||
| u.first_query_at = coalesce(u.first_query_at, timestamp()) | ||
| CREATE (u)-[:PERFORMED]->(e:UsageEvent { | ||
| graph_id: $graph_id, | ||
| is_demo: $is_demo, | ||
| success: $success, | ||
| timestamp: timestamp() | ||
| }) | ||
| """ | ||
|
|
||
|
|
||
| def _decode_email(user_id: str) -> Optional[str]: | ||
| """Recover the user's email from the base64 ``user_id``. | ||
|
|
||
| Inverse of ``base64.b64encode(email.encode())`` in | ||
| ``api/auth/user_management.py``. Returns ``None`` on malformed input so the | ||
| caller can skip tracking instead of raising. | ||
| """ | ||
| if not user_id: | ||
| return None | ||
| try: | ||
| email = base64.b64decode(user_id, validate=True).decode("utf-8") | ||
| except (binascii.Error, ValueError, UnicodeDecodeError): | ||
| logging.warning("Usage tracking: could not decode user_id to email") | ||
| return None | ||
| # b64decode is lenient about padding/length; require an email-shaped result | ||
| # so a malformed id can't trigger a phantom DB write (matches the docstring). | ||
| if "@" not in email: | ||
| logging.warning("Usage tracking: decoded user_id is not a valid email") | ||
| return None | ||
| return email | ||
|
|
||
|
|
||
| async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, db) -> None: | ||
| """Perform the single Cypher write against the Organizations graph.""" | ||
| organizations_graph = resolve_db(db).select_graph(ORGANIZATIONS_GRAPH) | ||
| await organizations_graph.query( | ||
| _RECORD_USAGE_CYPHER, | ||
| { | ||
| "email": email, | ||
| "graph_id": graph_id, | ||
| "is_demo": is_demo, | ||
| "success": success, | ||
| }, | ||
| ) | ||
| # Structured-ish log line so usage is visible to log aggregators even | ||
| # before any read API exists. graph_id is the namespaced name | ||
| # ({base64(email)}_{db}) and base64 email is reversible, so log a short | ||
| # stable hash instead of the raw value — this keeps user identity out of | ||
| # logs and also neutralizes the CodeQL log-injection vector. | ||
| graph_ref = hashlib.sha256(graph_id.encode()).hexdigest()[:12] | ||
| logging.info( | ||
| "usage_event graph=%s is_demo=%s success=%s", | ||
| graph_ref, is_demo, success, | ||
| ) | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def record_query_usage_background( | ||
| user_id: str, | ||
| namespaced: str, | ||
| success: bool, | ||
| *, | ||
| db=None, | ||
| task_sink: Optional[set] = None, | ||
| ) -> None: | ||
| """Schedule fire-and-forget usage tracking for one query. | ||
|
|
||
| Returns immediately. The write runs as a background task whose failure is | ||
| logged but never propagated, so tracking can never break or delay a query | ||
| response. Called unconditionally at pipeline completion — independent of | ||
| ``use_memory`` and the LLM provider. | ||
|
|
||
| Args: | ||
| user_id: Base64-encoded email (the namespacing id used by the routes). | ||
| namespaced: The fully-namespaced graph name the query ran against; | ||
| already demo-aware, so it doubles as the recorded ``graph_id``. | ||
| success: Whether SQL execution succeeded (no execution error). | ||
| db: Optional FalkorDB handle; resolves to the server singleton when None. | ||
| task_sink: Optional set the scheduled task is added to (and auto-removed | ||
| from on completion) so callers can await any in-flight tracking | ||
| writes before shutdown. | ||
| """ | ||
| email = _decode_email(user_id) | ||
| if email is None: | ||
| return | ||
|
|
||
| is_demo = is_general_graph(namespaced) | ||
| sink = task_sink if task_sink is not None else background_tasks_var.get() | ||
|
|
||
| task = asyncio.create_task( | ||
| _write_usage(email, namespaced, is_demo, success, db) | ||
| ) | ||
|
|
||
| if sink is not None: | ||
| sink.add(task) | ||
| task.add_done_callback(sink.discard) | ||
|
|
||
| def _log_done(t: "asyncio.Task") -> None: | ||
| if t.cancelled(): | ||
| return | ||
| exc = t.exception() | ||
| if exc is not None: | ||
| logging.error("Usage tracking save failed: %s", exc) # nosemgrep | ||
|
|
||
| task.add_done_callback(_log_done) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.