From 972888d48148dbd422a2d7cd00fd1fad23827e60 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 24 Jun 2026 14:44:06 +0300 Subject: [PATCH 1/5] feat(usage): always-on per-query usage tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a dedicated, provider-agnostic usage-tracking layer that records every query onto the central Organizations graph, independent of the optional LLM conversational-memory feature. Background: the only pre-existing per-query record was a side effect of the opt-in `use_memory` feature, which is also gated to OpenAI/Azure providers and lazily created. As a result the vast majority of users had no recorded query activity, and every recorded query was against the demo DB — usage counts were unusable for measuring adoption. This change: - Adds api/core/usage_tracking.py with record_query_usage_background(), a fire-and-forget recorder mirroring save_memory_background (background task, task-sink support for the SDK, errors logged-not-raised). - Maintains denormalized counters on the User node (query_count/success_count/ error_count/last_active/first_query_at) plus a per-query (:UsageEvent) node linked (User)-[:PERFORMED]-> for time-series/per-DB/success-rate analytics. - Hooks the recorder into run_query and run_confirmed at the completion point, OUTSIDE the use_memory/provider gate, so it runs on every query. - Uses MATCH (not MERGE) on User so an unknown email is a silent no-op rather than creating a phantom user from the query path. Forward-only: historical usage cannot be backfilled. Tests: tests/test_usage_tracking.py (write content, demo flag, ungated design, invalid user_id no-op, swallowed write failure). Cypher validated against a live FalkorDB engine. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/core/text2sql.py | 12 +++ api/core/usage_tracking.py | 138 ++++++++++++++++++++++++++++++++ tests/test_usage_tracking.py | 150 +++++++++++++++++++++++++++++++++++ 3 files changed, 300 insertions(+) create mode 100644 api/core/usage_tracking.py create mode 100644 tests/test_usage_tracking.py diff --git a/api/core/text2sql.py b/api/core/text2sql.py index 3c914aff..cce622bf 100644 --- a/api/core/text2sql.py +++ b/api/core/text2sql.py @@ -31,6 +31,7 @@ from api.agents import AnalysisAgent, RelevancyAgent, FollowUpAgent from api.agents.healer_agent import HealerAgent from api.core.db_resolver import resolve_db +from api.core.usage_tracking import record_query_usage_background from api.core.result_models import QueryAnalysis, QueryMetadata, QueryResult, RefreshResult from api.graph import find, get_db_description, get_user_rules @@ -608,6 +609,12 @@ def _run_sql(sql: str): if not user_readable_response: user_readable_response = f"Error executing SQL query: {execution_error_msg}" + # Always-on usage tracking — independent of ``use_memory``/provider, so it + # records every query (unlike the optional memory write below). + record_query_usage_background( + user_id, namespaced, success=execution_error_msg is None, db=db + ) + if memory_tool is not None: full_response = { "question": queries_history[-1], @@ -764,6 +771,11 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to if not user_readable_response: user_readable_response = execution_error_msg + # Always-on usage tracking — see note in ``run_query``. + record_query_usage_background( + user_id, namespaced, success=execution_error_msg is None, db=db + ) + if memory_tool is not None: save_memory_background( memory_tool=memory_tool, diff --git a/api/core/usage_tracking.py b/api/core/usage_tracking.py new file mode 100644 index 00000000..95b00aa7 --- /dev/null +++ b/api/core/usage_tracking.py @@ -0,0 +1,138 @@ +"""Always-on, provider-agnostic per-query usage tracking. + +This is deliberately independent of the optional LLM conversational-memory +feature (``api/memory/graphiti_tool.py``). Memory writes are opt-in +(``use_memory``), gated to OpenAI/Azure providers, and lazily created — so +they cannot be used to measure adoption. This module records *every* query, +regardless of provider or the ``use_memory`` flag, onto the central +``Organizations`` graph (which already holds ``User``/``Identity``/``Token``). + +For each query we maintain, fire-and-forget: + +* Denormalized counters + activity timestamps on the ``User`` node + (``query_count``/``success_count``/``error_count``/``last_active``/ + ``first_query_at``) for cheap reads. +* A per-query ``(:UsageEvent)`` node linked ``(User)-[:PERFORMED]->`` carrying + ``graph_id``/``is_demo``/``success``/``timestamp`` for time-series, per-DB + and success-rate analytics. + +Writes never block or fail a request: they run as background tasks whose +exceptions are logged and swallowed, mirroring +``api.core.pipeline.save_memory_background``. +""" + +import asyncio +import base64 +import binascii +import logging +from typing import Optional + +from api.core.db_resolver import resolve_db +from api.core.pipeline import background_tasks_var, is_general_graph + +# Central user-management graph (also holds User/Identity/Token). +ORGANIZATIONS_GRAPH = "Organizations" + +# Single round-trip: bump the User counters/timestamps and append a UsageEvent. +# Uses MATCH (not MERGE) on User so an unknown email is a silent no-op rather +# than creating a phantom user from the query path. ``timestamp()`` is FalkorDB +# epoch-millis, matching every other timestamp in the Organizations graph. +_RECORD_USAGE_CYPHER = """ +MATCH (u:User {email: $email}) +SET u.query_count = coalesce(u.query_count, 0) + 1, + u.success_count = coalesce(u.success_count, 0) + (CASE WHEN $success THEN 1 ELSE 0 END), + u.error_count = coalesce(u.error_count, 0) + (CASE WHEN $success THEN 0 ELSE 1 END), + u.last_active = timestamp(), + u.first_query_at = coalesce(u.first_query_at, timestamp()) +CREATE (u)-[:PERFORMED]->(e:UsageEvent { + graph_id: $graph_id, + is_demo: $is_demo, + success: $success, + timestamp: timestamp() +}) +""" + + +def _decode_email(user_id: str) -> Optional[str]: + """Recover the user's email from the base64 ``user_id``. + + Inverse of ``base64.b64encode(email.encode())`` in + ``api/auth/user_management.py``. Returns ``None`` on malformed input so the + caller can skip tracking instead of raising. + """ + if not user_id: + return None + try: + return base64.b64decode(user_id).decode("utf-8") + except (binascii.Error, ValueError, UnicodeDecodeError): + logging.warning("Usage tracking: could not decode user_id to email") + return None + + +async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, db) -> None: + """Perform the single Cypher write against the Organizations graph.""" + organizations_graph = resolve_db(db).select_graph(ORGANIZATIONS_GRAPH) + await organizations_graph.query( + _RECORD_USAGE_CYPHER, + { + "email": email, + "graph_id": graph_id, + "is_demo": is_demo, + "success": success, + }, + ) + # Structured-ish log line so usage is visible to log aggregators even + # before any read API exists. + logging.info( + "usage_event email=%s graph_id=%s is_demo=%s success=%s", + email, graph_id, is_demo, success, + ) + + +def record_query_usage_background( + user_id: str, + namespaced: str, + success: bool, + *, + db=None, + task_sink: Optional[set] = None, +) -> None: + """Schedule fire-and-forget usage tracking for one query. + + Returns immediately. The write runs as a background task whose failure is + logged but never propagated, so tracking can never break or delay a query + response. Called unconditionally at pipeline completion — independent of + ``use_memory`` and the LLM provider. + + Args: + user_id: Base64-encoded email (the namespacing id used by the routes). + namespaced: The fully-namespaced graph name the query ran against; + already demo-aware, so it doubles as the recorded ``graph_id``. + success: Whether SQL execution succeeded (no execution error). + db: Optional FalkorDB handle; resolves to the server singleton when None. + task_sink: Optional set the scheduled task is added to (and auto-removed + from on completion) so SDK ``QueryWeaver.close()`` can await it. + """ + email = _decode_email(user_id) + if email is None: + return + + is_demo = is_general_graph(namespaced) + sink = task_sink if task_sink is not None else background_tasks_var.get() + + task = asyncio.create_task( + _write_usage(email, namespaced, is_demo, success, db) + ) + + if sink is not None: + sink.add(task) + task.add_done_callback(sink.discard) + + def _log_done(t: "asyncio.Task") -> None: + if t.cancelled(): + return + exc = t.exception() + if exc is not None: + logging.error("Usage tracking save failed: %s", exc) # nosemgrep + + task.add_done_callback(_log_done) diff --git a/tests/test_usage_tracking.py b/tests/test_usage_tracking.py new file mode 100644 index 00000000..c422dbe7 --- /dev/null +++ b/tests/test_usage_tracking.py @@ -0,0 +1,150 @@ +"""Tests for always-on per-query usage tracking. + +Usage tracking (``api/core/usage_tracking.py``) records every query onto the +``Organizations`` graph, independent of the optional ``use_memory`` feature and +the LLM provider. These tests assert the write content, the ungated design, +and that failures never propagate to the caller. +""" + +import asyncio +import base64 +import inspect +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from api.core import usage_tracking +from api.core.usage_tracking import ( + _decode_email, + record_query_usage_background, +) + +pytestmark = [pytest.mark.unit] + +EMAIL = "gal.shubeli@falkordb.com" +USER_ID = base64.b64encode(EMAIL.encode()).decode() + + +def _mock_db(): + """A FalkorDB-like mock whose ``select_graph(...).query`` is awaitable.""" + graph = MagicMock() + graph.query = AsyncMock(return_value=MagicMock(result_set=[])) + db = MagicMock() + db.select_graph.return_value = graph + return db, graph + + +async def _drain(sink): + """Await every background task the recorder scheduled into ``sink``.""" + await asyncio.gather(*list(sink), return_exceptions=True) + + +class TestDecodeEmail: + def test_decodes_valid_user_id(self): + assert _decode_email(USER_ID) == EMAIL + + def test_returns_none_for_empty(self): + assert _decode_email("") is None + + def test_returns_none_for_garbage(self): + # Malformed base64 must yield None, not raise. + assert _decode_email("!!!not-base64!!!") is None + + +class TestRecordQueryUsage: + @pytest.mark.asyncio + async def test_records_successful_query_event(self): + db, graph = _mock_db() + sink: set = set() + with patch.object(usage_tracking, "resolve_db", return_value=db), \ + patch.object(usage_tracking, "is_general_graph", return_value=False): + record_query_usage_background( + USER_ID, f"{USER_ID}_mydb", success=True, db=db, task_sink=sink + ) + await _drain(sink) + + db.select_graph.assert_called_once_with("Organizations") + graph.query.assert_awaited_once() + cypher, params = graph.query.await_args.args + assert "MATCH (u:User {email: $email})" in cypher + assert ":UsageEvent" in cypher + assert params == { + "email": EMAIL, + "graph_id": f"{USER_ID}_mydb", + "is_demo": False, + "success": True, + } + + @pytest.mark.asyncio + async def test_records_failed_query_event(self): + db, graph = _mock_db() + sink: set = set() + with patch.object(usage_tracking, "resolve_db", return_value=db), \ + patch.object(usage_tracking, "is_general_graph", return_value=False): + record_query_usage_background( + USER_ID, f"{USER_ID}_mydb", success=False, db=db, task_sink=sink + ) + await _drain(sink) + + _cypher, params = graph.query.await_args.args + assert params["success"] is False + + @pytest.mark.asyncio + async def test_demo_graph_is_flagged(self): + db, graph = _mock_db() + sink: set = set() + with patch.object(usage_tracking, "resolve_db", return_value=db), \ + patch.object(usage_tracking, "is_general_graph", return_value=True): + record_query_usage_background( + USER_ID, "DEMO_CRM", success=True, db=db, task_sink=sink + ) + await _drain(sink) + + _cypher, params = graph.query.await_args.args + assert params["is_demo"] is True + assert params["graph_id"] == "DEMO_CRM" + + @pytest.mark.asyncio + async def test_invalid_user_id_skips_write(self): + db, graph = _mock_db() + sink: set = set() + with patch.object(usage_tracking, "resolve_db", return_value=db): + record_query_usage_background( + "!!!bad!!!", "x_y", success=True, db=db, task_sink=sink + ) + await _drain(sink) + + # No task scheduled, no graph touched. + assert not sink + graph.query.assert_not_awaited() + db.select_graph.assert_not_called() + + @pytest.mark.asyncio + async def test_write_failure_is_swallowed(self): + db, graph = _mock_db() + graph.query.side_effect = RuntimeError("falkordb down") + sink: set = set() + with patch.object(usage_tracking, "resolve_db", return_value=db), \ + patch.object(usage_tracking, "is_general_graph", return_value=False), \ + patch.object(usage_tracking.logging, "error") as mock_log_error: + # The synchronous call must not raise despite the write failing. + record_query_usage_background( + USER_ID, f"{USER_ID}_mydb", success=True, db=db, task_sink=sink + ) + await _drain(sink) + + # Failure was logged by the done-callback, not propagated. + assert any( + "Usage tracking save failed" in str(call.args[0]) + for call in mock_log_error.call_args_list + ) + + +class TestUngatedDesign: + def test_recorder_has_no_memory_or_provider_parameter(self): + """Tracking cannot be gated by ``use_memory`` or the LLM provider: + the recorder simply has no such inputs.""" + params = set(inspect.signature(record_query_usage_background).parameters) + assert params == {"user_id", "namespaced", "success", "db", "task_sink"} + assert "use_memory" not in params + assert "provider" not in params From c12dbcb6b149413a327153876f5e9e9af1ba0584 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 24 Jun 2026 15:47:58 +0300 Subject: [PATCH 2/5] =?UTF-8?q?fix(usage):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20track=20all=20paths,=20drop=20PII,=20validate=20user=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - text2sql: record usage on every terminal path of run_query/run_confirmed (off-topic, not-translatable, demo-blocked, cancelled, no-loader), via a local _track_usage helper. The destructive-confirmation prompt is the deliberate exception — run_confirmed records that query's outcome, so tracking it twice would double-count. - usage_tracking: omit email (PII) from the per-query log line and strip CR/LF from the user-influenced graph_id (CodeQL log-injection). - usage_tracking: base64 decode with validate=True + email-shape check so a malformed user_id is skipped instead of triggering a phantom write. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/core/text2sql.py | 35 +++++++++++++++++++++++++++-------- api/core/usage_tracking.py | 15 +++++++++++---- tests/test_usage_tracking.py | 5 +++++ 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/api/core/text2sql.py b/api/core/text2sql.py index cce622bf..f461da20 100644 --- a/api/core/text2sql.py +++ b/api/core/text2sql.py @@ -337,6 +337,14 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma use_memory = getattr(chat_data, "use_memory", False) validate_custom_model(custom_model) + def _track_usage(success: bool) -> None: + """Record one usage event for this query attempt (fire-and-forget). + + Called on every terminal path so attempts that fail or bail early are + counted too — see the module docstring's "every query" goal. + """ + record_query_usage_background(user_id, namespaced, success=success, db=db) + logging.info("User Query: %s", sanitize_query(queries_history[-1])) # Memory tool created concurrently with relevancy/find work — small perf @@ -368,6 +376,7 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma execution_time=time.perf_counter() - overall_start, error_message="Unable to determine database type", )) + _track_usage(False) return # Concurrent: relevancy check + table-finding @@ -395,6 +404,7 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma is_valid=False, execution_time=time.perf_counter() - overall_start, )) + _track_usage(False) return tables = await find_task @@ -450,6 +460,7 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma ambiguities=answer_an.get("ambiguities", ""), explanation=answer_an.get("explanation", ""), )) + _track_usage(False) return # Auto-quote identifiers using the table set we already loaded. @@ -481,6 +492,7 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma execution_time=time.perf_counter() - overall_start, error_message="Destructive operation not allowed on demo graphs", )) + _track_usage(False) return if is_destructive: @@ -498,6 +510,9 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma is_valid=True, is_destructive=True, requires_confirmation=True, execution_time=time.perf_counter() - overall_start, )) + # No usage event here: the query has no outcome yet. run_confirmed + # records it once the user confirms or cancels — recording now would + # double-count the confirmed case. return yield { @@ -609,11 +624,9 @@ def _run_sql(sql: str): if not user_readable_response: user_readable_response = f"Error executing SQL query: {execution_error_msg}" - # Always-on usage tracking — independent of ``use_memory``/provider, so it - # records every query (unlike the optional memory write below). - record_query_usage_background( - user_id, namespaced, success=execution_error_msg is None, db=db - ) + # Always-on usage tracking — independent of ``use_memory``/provider (unlike + # the optional memory write below). + _track_usage(execution_error_msg is None) if memory_tool is not None: full_response = { @@ -664,9 +677,14 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to overall_start = time.perf_counter() namespaced = graph_name(user_id, graph_id) + def _track_usage(success: bool) -> None: + """Record one usage event for this confirmation attempt (fire-and-forget).""" + record_query_usage_background(user_id, namespaced, success=success, db=db) + if is_general_graph(namespaced): # Match streaming refusal: even an explicit CONFIRM cannot run writes # on a demo graph. + _track_usage(False) raise InvalidArgumentError( "Destructive operations are not allowed on demo graphs" ) @@ -679,6 +697,7 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to validate_custom_model(custom_model) if not sql_query: + _track_usage(False) raise InvalidArgumentError("No SQL query provided") question = ( @@ -698,6 +717,7 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to is_valid=True, is_destructive=True, execution_time=time.perf_counter() - overall_start, )) + _track_usage(False) return use_memory = bool(getattr(confirm_data, "use_memory", False)) @@ -724,6 +744,7 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to execution_time=time.perf_counter() - overall_start, error_message="Unable to determine database type", )) + _track_usage(False) return yield {"type": "reasoning_step", @@ -772,9 +793,7 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to user_readable_response = execution_error_msg # Always-on usage tracking — see note in ``run_query``. - record_query_usage_background( - user_id, namespaced, success=execution_error_msg is None, db=db - ) + _track_usage(execution_error_msg is None) if memory_tool is not None: save_memory_background( diff --git a/api/core/usage_tracking.py b/api/core/usage_tracking.py index 95b00aa7..d0ba7a5a 100644 --- a/api/core/usage_tracking.py +++ b/api/core/usage_tracking.py @@ -63,10 +63,16 @@ def _decode_email(user_id: str) -> Optional[str]: if not user_id: return None try: - return base64.b64decode(user_id).decode("utf-8") + email = base64.b64decode(user_id, validate=True).decode("utf-8") except (binascii.Error, ValueError, UnicodeDecodeError): logging.warning("Usage tracking: could not decode user_id to email") return None + # b64decode is lenient about padding/length; require an email-shaped result + # so a malformed id can't trigger a phantom DB write (matches the docstring). + if "@" not in email: + logging.warning("Usage tracking: decoded user_id is not a valid email") + return None + return email async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, db) -> None: @@ -82,10 +88,11 @@ async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, }, ) # Structured-ish log line so usage is visible to log aggregators even - # before any read API exists. + # before any read API exists. Email (PII) is intentionally omitted; the + # user-influenced graph_id has CR/LF stripped to prevent log forging. logging.info( - "usage_event email=%s graph_id=%s is_demo=%s success=%s", - email, graph_id, is_demo, success, + "usage_event graph_id=%s is_demo=%s success=%s", + graph_id.replace("\r", " ").replace("\n", " "), is_demo, success, ) diff --git a/tests/test_usage_tracking.py b/tests/test_usage_tracking.py index c422dbe7..a5a9428a 100644 --- a/tests/test_usage_tracking.py +++ b/tests/test_usage_tracking.py @@ -50,6 +50,11 @@ def test_returns_none_for_garbage(self): # Malformed base64 must yield None, not raise. assert _decode_email("!!!not-base64!!!") is None + def test_returns_none_for_valid_base64_that_is_not_an_email(self): + # Decodes cleanly but isn't email-shaped -> skip (no phantom write). + not_email = base64.b64encode(b"notanemail").decode() + assert _decode_email(not_email) is None + class TestRecordQueryUsage: @pytest.mark.asyncio From eed65d10700f4a62705daf18352a4df750bb8756 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 24 Jun 2026 16:20:54 +0300 Subject: [PATCH 3/5] refactor(usage): config-driven graph name + keep tracking out of the SDK MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two design fixes raised in review: 1. Single source of truth for the central graph name. Add config.ORGANIZATIONS_GRAPH (env var ORGANIZATIONS_GRAPH, default "Organizations") and use it everywhere the graph was hardcoded — auth/user_management, routes/auth, routes/tokens, and usage tracking. 2. Don't ship hosted-app telemetry in the PyPI SDK. Move usage_tracking from api/core (shipped) to api/routes (excluded from the wheel) and invoke it from the route layer (_serialize_pipeline) instead of inside run_query/run_confirmed in api/core/text2sql.py. The route records exactly once from the final QueryResult and skips the destructive- confirmation prompt (the /confirm call records that query). text2sql.py is back to its pristine, tracking-free state. Verified: SDK wheel no longer contains usage_tracking and text2sql has no tracking refs; unit suite 138 passed, pylint 10/10. Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 4 ++++ api/auth/user_management.py | 9 ++++---- api/config.py | 5 +++++ api/core/text2sql.py | 31 -------------------------- api/routes/auth.py | 7 +++--- api/routes/graphs.py | 25 ++++++++++++++++----- api/routes/tokens.py | 5 +++-- api/{core => routes}/usage_tracking.py | 4 +--- tests/test_usage_tracking.py | 6 ++--- 9 files changed, 45 insertions(+), 51 deletions(-) rename api/{core => routes}/usage_tracking.py (98%) diff --git a/.env.example b/.env.example index c37f9772..eeecd753 100644 --- a/.env.example +++ b/.env.example @@ -37,6 +37,10 @@ FASTAPI_SECRET_KEY=your_super_secret_key_here # Example: redis://localhost:6379/0 FALKORDB_URL=redis://localhost:6379/0 # REQUIRED - change to your FalkorDB URL +# Optional: name of the central user-management graph (User/Identity/Token/UsageEvent). +# Defaults to "Organizations"; override to share or isolate that graph. +# ORGANIZATIONS_GRAPH=Organizations + # Optional: separate host/port settings for local testing (only used if FALKORDB_URL is not set) # FALKORDB_HOST=localhost # FALKORDB_PORT=6379 diff --git a/api/auth/user_management.py b/api/auth/user_management.py index 3b5cd00f..7272b071 100644 --- a/api/auth/user_management.py +++ b/api/auth/user_management.py @@ -9,6 +9,7 @@ from fastapi import Request, HTTPException, status from pydantic import BaseModel +from api.config import ORGANIZATIONS_GRAPH from api.extensions import db # Get secret key for sessions @@ -45,7 +46,7 @@ async def _get_user_info(api_token: str) -> Optional[Dict[str, Any]]: try: # Select the Organizations graph - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) result = await organizations_graph.query( query, @@ -83,7 +84,7 @@ async def delete_user_token(api_token: str): """ try: # Select the Organizations graph - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) await organizations_graph.query( query, @@ -117,7 +118,7 @@ async def ensure_user_in_organizations( # pylint: disable=too-many-arguments, d return validation_result try: - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) first_name, last_name = _extract_name_parts(name) merge_query = _build_user_merge_query() @@ -166,7 +167,7 @@ async def update_identity_last_login(provider, provider_user_id): return try: - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) update_query = """ MATCH (identity:Identity {provider: $provider, provider_user_id: $provider_user_id}) SET identity.last_login = timestamp() diff --git a/api/config.py b/api/config.py index b029970c..c0c695cf 100644 --- a/api/config.py +++ b/api/config.py @@ -14,6 +14,11 @@ # Ensure .env is loaded before Config reads os.getenv() at class definition time load_dotenv() +# Central user-management graph holding User/Identity/Token (and UsageEvent) +# nodes. Single source of truth for the name used across auth, tokens, and +# usage tracking — override with the ORGANIZATIONS_GRAPH env var. +ORGANIZATIONS_GRAPH = os.getenv("ORGANIZATIONS_GRAPH", "Organizations") + # Configure litellm logging to prevent sensitive data leakage def configure_litellm_logging(): """Configure litellm to suppress completion logs.""" diff --git a/api/core/text2sql.py b/api/core/text2sql.py index f461da20..3c914aff 100644 --- a/api/core/text2sql.py +++ b/api/core/text2sql.py @@ -31,7 +31,6 @@ from api.agents import AnalysisAgent, RelevancyAgent, FollowUpAgent from api.agents.healer_agent import HealerAgent from api.core.db_resolver import resolve_db -from api.core.usage_tracking import record_query_usage_background from api.core.result_models import QueryAnalysis, QueryMetadata, QueryResult, RefreshResult from api.graph import find, get_db_description, get_user_rules @@ -337,14 +336,6 @@ async def run_query( # pylint: disable=too-many-locals,too-many-branches,too-ma use_memory = getattr(chat_data, "use_memory", False) validate_custom_model(custom_model) - def _track_usage(success: bool) -> None: - """Record one usage event for this query attempt (fire-and-forget). - - Called on every terminal path so attempts that fail or bail early are - counted too — see the module docstring's "every query" goal. - """ - record_query_usage_background(user_id, namespaced, success=success, db=db) - logging.info("User Query: %s", sanitize_query(queries_history[-1])) # Memory tool created concurrently with relevancy/find work — small perf @@ -376,7 +367,6 @@ def _track_usage(success: bool) -> None: execution_time=time.perf_counter() - overall_start, error_message="Unable to determine database type", )) - _track_usage(False) return # Concurrent: relevancy check + table-finding @@ -404,7 +394,6 @@ def _track_usage(success: bool) -> None: is_valid=False, execution_time=time.perf_counter() - overall_start, )) - _track_usage(False) return tables = await find_task @@ -460,7 +449,6 @@ def _track_usage(success: bool) -> None: ambiguities=answer_an.get("ambiguities", ""), explanation=answer_an.get("explanation", ""), )) - _track_usage(False) return # Auto-quote identifiers using the table set we already loaded. @@ -492,7 +480,6 @@ def _track_usage(success: bool) -> None: execution_time=time.perf_counter() - overall_start, error_message="Destructive operation not allowed on demo graphs", )) - _track_usage(False) return if is_destructive: @@ -510,9 +497,6 @@ def _track_usage(success: bool) -> None: is_valid=True, is_destructive=True, requires_confirmation=True, execution_time=time.perf_counter() - overall_start, )) - # No usage event here: the query has no outcome yet. run_confirmed - # records it once the user confirms or cancels — recording now would - # double-count the confirmed case. return yield { @@ -624,10 +608,6 @@ def _run_sql(sql: str): if not user_readable_response: user_readable_response = f"Error executing SQL query: {execution_error_msg}" - # Always-on usage tracking — independent of ``use_memory``/provider (unlike - # the optional memory write below). - _track_usage(execution_error_msg is None) - if memory_tool is not None: full_response = { "question": queries_history[-1], @@ -677,14 +657,9 @@ async def run_confirmed( # pylint: disable=too-many-locals,too-many-branches,to overall_start = time.perf_counter() namespaced = graph_name(user_id, graph_id) - def _track_usage(success: bool) -> None: - """Record one usage event for this confirmation attempt (fire-and-forget).""" - record_query_usage_background(user_id, namespaced, success=success, db=db) - if is_general_graph(namespaced): # Match streaming refusal: even an explicit CONFIRM cannot run writes # on a demo graph. - _track_usage(False) raise InvalidArgumentError( "Destructive operations are not allowed on demo graphs" ) @@ -697,7 +672,6 @@ def _track_usage(success: bool) -> None: validate_custom_model(custom_model) if not sql_query: - _track_usage(False) raise InvalidArgumentError("No SQL query provided") question = ( @@ -717,7 +691,6 @@ def _track_usage(success: bool) -> None: is_valid=True, is_destructive=True, execution_time=time.perf_counter() - overall_start, )) - _track_usage(False) return use_memory = bool(getattr(confirm_data, "use_memory", False)) @@ -744,7 +717,6 @@ def _track_usage(success: bool) -> None: execution_time=time.perf_counter() - overall_start, error_message="Unable to determine database type", )) - _track_usage(False) return yield {"type": "reasoning_step", @@ -792,9 +764,6 @@ def _track_usage(success: bool) -> None: if not user_readable_response: user_readable_response = execution_error_msg - # Always-on usage tracking — see note in ``run_query``. - _track_usage(execution_error_msg is None) - if memory_tool is not None: save_memory_background( memory_tool=memory_tool, diff --git a/api/routes/auth.py b/api/routes/auth.py index 680a0c59..e55e61b3 100644 --- a/api/routes/auth.py +++ b/api/routes/auth.py @@ -21,6 +21,7 @@ from pydantic import BaseModel from api.auth.user_management import delete_user_token, ensure_user_in_organizations, validate_user +from api.config import ORGANIZATIONS_GRAPH from api.extensions import db # Import GENERAL_PREFIX from graphs route @@ -136,7 +137,7 @@ def _validate_email(email: str) -> bool: async def _set_mail_hash(email: str, password_hash: str) -> bool: """Set email hash for the user in the database.""" try: - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) # Sanitize inputs for logging safe_email = _sanitize_for_log(email) @@ -178,7 +179,7 @@ async def _email_account_exists(email: str) -> bool: Exceptions are intentionally not swallowed so callers fail closed (treat the account as existing / abort the signup) rather than issuing a session token. """ - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) # Use a UNION of two label-scoped lookups so each side hits the (label, email) # index and short-circuits with LIMIT 1. This avoids both a full-graph scan and # the Cartesian product that two chained OPTIONAL MATCH clauses would produce. @@ -204,7 +205,7 @@ def _is_request_secure(request: Request) -> bool: async def _authenticate_email_user(email: str, password: str): """Authenticate an email user.""" try: - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) # Find user by email query = """ diff --git a/api/routes/graphs.py b/api/routes/graphs.py index 95b6c08c..7701d1b4 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -29,22 +29,35 @@ from api.graph import get_user_rules, set_user_rules from api.auth.user_management import token_required from api.routes.tokens import UNAUTHORIZED_RESPONSE +from api.routes.usage_tracking import record_query_usage_background graphs_router = APIRouter(tags=["Graphs & Databases"]) -async def _serialize_pipeline(gen): +async def _serialize_pipeline(gen, *, user_id, namespaced): """Serialize pipeline events to the wire format and stop on ``_Final``. Pure encoding loop — no exception handling here. Each route handler wraps iteration in its own ``try/except`` so the broad-except (which emits a generic error event without leaking stack data) lives in the route function CodeQL already accepts, not in a shared helper. + + Always-on usage tracking lives here in the route layer (not in + ``api/core``) so it ships with the hosted app, never the PyPI SDK. Exactly + one event is recorded per query, derived from the final ``QueryResult`` — + skipping the destructive-confirmation prompt, which has no outcome yet (the + ``/confirm`` call records that query). """ + final = None async for event in gen: if isinstance(event, _Final): - return + final = event.value + break yield json.dumps(event) + MESSAGE_DELIMITER + if final is not None and not final.requires_confirmation: + record_query_usage_background( + user_id, namespaced, success=final.error_message is None + ) class GraphData(BaseModel): @@ -170,7 +183,7 @@ async def query_graph( # the StreamingResponse is iterated. Surfacing client errors as HTTP 400 # requires a synchronous check before we hand the stream to the response. try: - graph_name(request.state.user_id, graph_id) + namespaced = graph_name(request.state.user_id, graph_id) validate_and_truncate_chat(chat_data) validate_custom_model(getattr(chat_data, "custom_model", None)) except InvalidArgumentError as iae: @@ -180,7 +193,8 @@ async def query_graph( async def stream(): try: async for chunk in _serialize_pipeline( - run_query(request.state.user_id, graph_id, chat_data) + run_query(request.state.user_id, graph_id, chat_data), + user_id=request.state.user_id, namespaced=namespaced, ): yield chunk except Exception: # pylint: disable=broad-exception-caught @@ -225,7 +239,8 @@ async def confirm_destructive_operation( async def stream(): try: async for chunk in _serialize_pipeline( - run_confirmed(request.state.user_id, graph_id, confirm_data) + run_confirmed(request.state.user_id, graph_id, confirm_data), + user_id=request.state.user_id, namespaced=namespaced, ): yield chunk except Exception: # pylint: disable=broad-exception-caught diff --git a/api/routes/tokens.py b/api/routes/tokens.py index 03fa0f5a..7df4a492 100644 --- a/api/routes/tokens.py +++ b/api/routes/tokens.py @@ -9,6 +9,7 @@ from pydantic import BaseModel from api.auth.user_management import token_required +from api.config import ORGANIZATIONS_GRAPH from api.extensions import db UNAUTHORIZED_RESPONSE = {"description": "Unauthorized - Please log in or provide a valid API token"} @@ -80,7 +81,7 @@ async def list_tokens(request: Request) -> TokenListResponse: user_email = request.state.user_email # Get tokens from Organizations graph - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) # Get user information by API token and then get all associated tokens that connected # to the Identity of provider='api' @@ -120,7 +121,7 @@ async def delete_token(request: Request, token_id: str) -> JSONResponse: user_email = request.state.user_email # Delete token from Organizations graph - organizations_graph = db.select_graph("Organizations") + organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH) # Delete the token delete_query = """ diff --git a/api/core/usage_tracking.py b/api/routes/usage_tracking.py similarity index 98% rename from api/core/usage_tracking.py rename to api/routes/usage_tracking.py index d0ba7a5a..bf160794 100644 --- a/api/core/usage_tracking.py +++ b/api/routes/usage_tracking.py @@ -27,12 +27,10 @@ import logging from typing import Optional +from api.config import ORGANIZATIONS_GRAPH from api.core.db_resolver import resolve_db from api.core.pipeline import background_tasks_var, is_general_graph -# Central user-management graph (also holds User/Identity/Token). -ORGANIZATIONS_GRAPH = "Organizations" - # Single round-trip: bump the User counters/timestamps and append a UsageEvent. # Uses MATCH (not MERGE) on User so an unknown email is a silent no-op rather # than creating a phantom user from the query path. ``timestamp()`` is FalkorDB diff --git a/tests/test_usage_tracking.py b/tests/test_usage_tracking.py index a5a9428a..f90a5fb9 100644 --- a/tests/test_usage_tracking.py +++ b/tests/test_usage_tracking.py @@ -1,6 +1,6 @@ """Tests for always-on per-query usage tracking. -Usage tracking (``api/core/usage_tracking.py``) records every query onto the +Usage tracking (``api/routes/usage_tracking.py``) records every query onto the ``Organizations`` graph, independent of the optional ``use_memory`` feature and the LLM provider. These tests assert the write content, the ungated design, and that failures never propagate to the caller. @@ -13,8 +13,8 @@ import pytest -from api.core import usage_tracking -from api.core.usage_tracking import ( +from api.routes import usage_tracking +from api.routes.usage_tracking import ( _decode_email, record_query_usage_background, ) From e47e5807d0bfdf29113d00157bd70beaf2139863 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 24 Jun 2026 16:35:39 +0300 Subject: [PATCH 4/5] =?UTF-8?q?fix(usage):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20log=20hash=20not=20PII,=20empty-env=20fallback,=20track=20cr?= =?UTF-8?q?ashes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - usage_tracking: the per-query log line logged the namespaced graph_id ({base64(email)}_{db}); base64 email is reversible, so log a short SHA-256 hash instead — keeps identity out of logs and neutralizes log-injection. - config: ORGANIZATIONS_GRAPH falls back to "Organizations" when the env var is empty (not just unset), so a blank value can't target an empty graph. - routes/graphs: record success=False when run_query/run_confirmed raise before the _Final sentinel (e.g. get_db_description), so pipeline crashes are still counted instead of silently bypassing tracking. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/config.py | 2 +- api/routes/graphs.py | 9 +++++++++ api/routes/usage_tracking.py | 12 ++++++++---- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/api/config.py b/api/config.py index c0c695cf..dce8a34c 100644 --- a/api/config.py +++ b/api/config.py @@ -17,7 +17,7 @@ # Central user-management graph holding User/Identity/Token (and UsageEvent) # nodes. Single source of truth for the name used across auth, tokens, and # usage tracking — override with the ORGANIZATIONS_GRAPH env var. -ORGANIZATIONS_GRAPH = os.getenv("ORGANIZATIONS_GRAPH", "Organizations") +ORGANIZATIONS_GRAPH = os.getenv("ORGANIZATIONS_GRAPH") or "Organizations" # Configure litellm logging to prevent sensitive data leakage def configure_litellm_logging(): diff --git a/api/routes/graphs.py b/api/routes/graphs.py index 7701d1b4..24c3b931 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -201,6 +201,11 @@ async def stream(): # Don't leak stack traces (CodeQL: information exposure through # exception). Log internally; emit a generic error event. logging.exception("Streaming query failed") + # Pipeline crashed before _Final, so _serialize_pipeline didn't + # record — count this attempt as a failure here. + record_query_usage_background( + request.state.user_id, namespaced, success=False + ) yield json.dumps({ "type": "error", "final_response": True, @@ -246,6 +251,10 @@ async def stream(): except Exception: # pylint: disable=broad-exception-caught # See note on the query endpoint above (CodeQL). logging.exception("Streaming confirmed-destructive query failed") + # Pipeline crashed before _Final — record the failed attempt here. + record_query_usage_background( + request.state.user_id, namespaced, success=False + ) yield json.dumps({ "type": "error", "final_response": True, diff --git a/api/routes/usage_tracking.py b/api/routes/usage_tracking.py index bf160794..1e2fd0d5 100644 --- a/api/routes/usage_tracking.py +++ b/api/routes/usage_tracking.py @@ -24,6 +24,7 @@ import asyncio import base64 import binascii +import hashlib import logging from typing import Optional @@ -86,11 +87,14 @@ async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, }, ) # Structured-ish log line so usage is visible to log aggregators even - # before any read API exists. Email (PII) is intentionally omitted; the - # user-influenced graph_id has CR/LF stripped to prevent log forging. + # before any read API exists. graph_id is the namespaced name + # ({base64(email)}_{db}) and base64 email is reversible, so log a short + # stable hash instead of the raw value — this keeps user identity out of + # logs and also neutralizes the CodeQL log-injection vector. + graph_ref = hashlib.sha256(graph_id.encode()).hexdigest()[:12] logging.info( - "usage_event graph_id=%s is_demo=%s success=%s", - graph_id.replace("\r", " ").replace("\n", " "), is_demo, success, + "usage_event graph=%s is_demo=%s success=%s", + graph_ref, is_demo, success, ) From b4bd4df6f12ccd84af431b167149360ad373bf47 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 24 Jun 2026 16:57:22 +0300 Subject: [PATCH 5/5] fix(usage): correct success semantics, env-agnostic test, doc wording MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address the latest review pass: - routes/graphs: success = is_valid AND no error. error_message-None alone counted off-topic / not-SQL-translatable results (is_valid=False, no error) as successes, inflating success_count. - tests: assert against usage_tracking.ORGANIZATIONS_GRAPH instead of the hardcoded "Organizations" so the suite passes when the env var is set. - usage_tracking: reword the task_sink docstring — it no longer ships in the SDK, so drop the QueryWeaver.close() reference for a generic description. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/routes/graphs.py | 6 +++++- api/routes/usage_tracking.py | 3 ++- tests/test_usage_tracking.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/api/routes/graphs.py b/api/routes/graphs.py index 24c3b931..7c1d1c54 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -55,8 +55,12 @@ async def _serialize_pipeline(gen, *, user_id, namespaced): break yield json.dumps(event) + MESSAGE_DELIMITER if final is not None and not final.requires_confirmation: + # "Success" = a valid query that ran without error. error_message is + # None alone isn't enough: off-topic / not-SQL-translatable results + # carry is_valid=False with no error, and must not inflate success_count. record_query_usage_background( - user_id, namespaced, success=final.error_message is None + user_id, namespaced, + success=final.is_valid and final.error_message is None, ) diff --git a/api/routes/usage_tracking.py b/api/routes/usage_tracking.py index 1e2fd0d5..83ded999 100644 --- a/api/routes/usage_tracking.py +++ b/api/routes/usage_tracking.py @@ -120,7 +120,8 @@ def record_query_usage_background( success: Whether SQL execution succeeded (no execution error). db: Optional FalkorDB handle; resolves to the server singleton when None. task_sink: Optional set the scheduled task is added to (and auto-removed - from on completion) so SDK ``QueryWeaver.close()`` can await it. + from on completion) so callers can await any in-flight tracking + writes before shutdown. """ email = _decode_email(user_id) if email is None: diff --git a/tests/test_usage_tracking.py b/tests/test_usage_tracking.py index f90a5fb9..2baf0435 100644 --- a/tests/test_usage_tracking.py +++ b/tests/test_usage_tracking.py @@ -68,7 +68,7 @@ async def test_records_successful_query_event(self): ) await _drain(sink) - db.select_graph.assert_called_once_with("Organizations") + db.select_graph.assert_called_once_with(usage_tracking.ORGANIZATIONS_GRAPH) graph.query.assert_awaited_once() cypher, params = graph.query.await_args.args assert "MATCH (u:User {email: $email})" in cypher