Skip to content

Commit 37c2013

Browse files
authored
Merge pull request #614 from FalkorDB/feat/usage-tracking
feat(usage): always-on per-query usage tracking
2 parents 65c2c95 + b4bd4df commit 37c2013

8 files changed

Lines changed: 357 additions & 14 deletions

File tree

.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ FASTAPI_SECRET_KEY=your_super_secret_key_here
3737
# Example: redis://localhost:6379/0
3838
FALKORDB_URL=redis://localhost:6379/0 # REQUIRED - change to your FalkorDB URL
3939

40+
# Optional: name of the central user-management graph (User/Identity/Token/UsageEvent).
41+
# Defaults to "Organizations"; override to share or isolate that graph.
42+
# ORGANIZATIONS_GRAPH=Organizations
43+
4044
# Optional: separate host/port settings for local testing (only used if FALKORDB_URL is not set)
4145
# FALKORDB_HOST=localhost
4246
# FALKORDB_PORT=6379

api/auth/user_management.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from fastapi import Request, HTTPException, status
1111
from pydantic import BaseModel
12+
from api.config import ORGANIZATIONS_GRAPH
1213
from api.extensions import db
1314

1415
# Get secret key for sessions
@@ -45,7 +46,7 @@ async def _get_user_info(api_token: str) -> Optional[Dict[str, Any]]:
4546

4647
try:
4748
# Select the Organizations graph
48-
organizations_graph = db.select_graph("Organizations")
49+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
4950

5051
result = await organizations_graph.query(
5152
query,
@@ -83,7 +84,7 @@ async def delete_user_token(api_token: str):
8384
"""
8485
try:
8586
# Select the Organizations graph
86-
organizations_graph = db.select_graph("Organizations")
87+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
8788

8889
await organizations_graph.query(
8990
query,
@@ -117,7 +118,7 @@ async def ensure_user_in_organizations( # pylint: disable=too-many-arguments, d
117118
return validation_result
118119

119120
try:
120-
organizations_graph = db.select_graph("Organizations")
121+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
121122
first_name, last_name = _extract_name_parts(name)
122123

123124
merge_query = _build_user_merge_query()
@@ -166,7 +167,7 @@ async def update_identity_last_login(provider, provider_user_id):
166167
return
167168

168169
try:
169-
organizations_graph = db.select_graph("Organizations")
170+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
170171
update_query = """
171172
MATCH (identity:Identity {provider: $provider, provider_user_id: $provider_user_id})
172173
SET identity.last_login = timestamp()

api/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
# Ensure .env is loaded before Config reads os.getenv() at class definition time
1515
load_dotenv()
1616

17+
# Central user-management graph holding User/Identity/Token (and UsageEvent)
18+
# nodes. Single source of truth for the name used across auth, tokens, and
19+
# usage tracking — override with the ORGANIZATIONS_GRAPH env var.
20+
ORGANIZATIONS_GRAPH = os.getenv("ORGANIZATIONS_GRAPH") or "Organizations"
21+
1722
# Configure litellm logging to prevent sensitive data leakage
1823
def configure_litellm_logging():
1924
"""Configure litellm to suppress completion logs."""

api/routes/auth.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from pydantic import BaseModel
2222

2323
from api.auth.user_management import delete_user_token, ensure_user_in_organizations, validate_user
24+
from api.config import ORGANIZATIONS_GRAPH
2425
from api.extensions import db
2526

2627
# Import GENERAL_PREFIX from graphs route
@@ -136,7 +137,7 @@ def _validate_email(email: str) -> bool:
136137
async def _set_mail_hash(email: str, password_hash: str) -> bool:
137138
"""Set email hash for the user in the database."""
138139
try:
139-
organizations_graph = db.select_graph("Organizations")
140+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
140141

141142
# Sanitize inputs for logging
142143
safe_email = _sanitize_for_log(email)
@@ -178,7 +179,7 @@ async def _email_account_exists(email: str) -> bool:
178179
Exceptions are intentionally not swallowed so callers fail closed (treat the
179180
account as existing / abort the signup) rather than issuing a session token.
180181
"""
181-
organizations_graph = db.select_graph("Organizations")
182+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
182183
# Use a UNION of two label-scoped lookups so each side hits the (label, email)
183184
# index and short-circuits with LIMIT 1. This avoids both a full-graph scan and
184185
# the Cartesian product that two chained OPTIONAL MATCH clauses would produce.
@@ -204,7 +205,7 @@ def _is_request_secure(request: Request) -> bool:
204205
async def _authenticate_email_user(email: str, password: str):
205206
"""Authenticate an email user."""
206207
try:
207-
organizations_graph = db.select_graph("Organizations")
208+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
208209

209210
# Find user by email
210211
query = """

api/routes/graphs.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,39 @@
2929
from api.graph import get_user_rules, set_user_rules
3030
from api.auth.user_management import token_required
3131
from api.routes.tokens import UNAUTHORIZED_RESPONSE
32+
from api.routes.usage_tracking import record_query_usage_background
3233

3334
graphs_router = APIRouter(tags=["Graphs & Databases"])
3435

3536

36-
async def _serialize_pipeline(gen):
37+
async def _serialize_pipeline(gen, *, user_id, namespaced):
3738
"""Serialize pipeline events to the wire format and stop on ``_Final``.
3839
3940
Pure encoding loop — no exception handling here. Each route handler
4041
wraps iteration in its own ``try/except`` so the broad-except (which
4142
emits a generic error event without leaking stack data) lives in the
4243
route function CodeQL already accepts, not in a shared helper.
44+
45+
Always-on usage tracking lives here in the route layer (not in
46+
``api/core``) so it ships with the hosted app, never the PyPI SDK. Exactly
47+
one event is recorded per query, derived from the final ``QueryResult`` —
48+
skipping the destructive-confirmation prompt, which has no outcome yet (the
49+
``/confirm`` call records that query).
4350
"""
51+
final = None
4452
async for event in gen:
4553
if isinstance(event, _Final):
46-
return
54+
final = event.value
55+
break
4756
yield json.dumps(event) + MESSAGE_DELIMITER
57+
if final is not None and not final.requires_confirmation:
58+
# "Success" = a valid query that ran without error. error_message is
59+
# None alone isn't enough: off-topic / not-SQL-translatable results
60+
# carry is_valid=False with no error, and must not inflate success_count.
61+
record_query_usage_background(
62+
user_id, namespaced,
63+
success=final.is_valid and final.error_message is None,
64+
)
4865

4966

5067
class GraphData(BaseModel):
@@ -170,7 +187,7 @@ async def query_graph(
170187
# the StreamingResponse is iterated. Surfacing client errors as HTTP 400
171188
# requires a synchronous check before we hand the stream to the response.
172189
try:
173-
graph_name(request.state.user_id, graph_id)
190+
namespaced = graph_name(request.state.user_id, graph_id)
174191
validate_and_truncate_chat(chat_data)
175192
validate_custom_model(getattr(chat_data, "custom_model", None))
176193
except InvalidArgumentError as iae:
@@ -180,13 +197,19 @@ async def query_graph(
180197
async def stream():
181198
try:
182199
async for chunk in _serialize_pipeline(
183-
run_query(request.state.user_id, graph_id, chat_data)
200+
run_query(request.state.user_id, graph_id, chat_data),
201+
user_id=request.state.user_id, namespaced=namespaced,
184202
):
185203
yield chunk
186204
except Exception: # pylint: disable=broad-exception-caught
187205
# Don't leak stack traces (CodeQL: information exposure through
188206
# exception). Log internally; emit a generic error event.
189207
logging.exception("Streaming query failed")
208+
# Pipeline crashed before _Final, so _serialize_pipeline didn't
209+
# record — count this attempt as a failure here.
210+
record_query_usage_background(
211+
request.state.user_id, namespaced, success=False
212+
)
190213
yield json.dumps({
191214
"type": "error",
192215
"final_response": True,
@@ -225,12 +248,17 @@ async def confirm_destructive_operation(
225248
async def stream():
226249
try:
227250
async for chunk in _serialize_pipeline(
228-
run_confirmed(request.state.user_id, graph_id, confirm_data)
251+
run_confirmed(request.state.user_id, graph_id, confirm_data),
252+
user_id=request.state.user_id, namespaced=namespaced,
229253
):
230254
yield chunk
231255
except Exception: # pylint: disable=broad-exception-caught
232256
# See note on the query endpoint above (CodeQL).
233257
logging.exception("Streaming confirmed-destructive query failed")
258+
# Pipeline crashed before _Final — record the failed attempt here.
259+
record_query_usage_background(
260+
request.state.user_id, namespaced, success=False
261+
)
234262
yield json.dumps({
235263
"type": "error",
236264
"final_response": True,

api/routes/tokens.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pydantic import BaseModel
1010

1111
from api.auth.user_management import token_required
12+
from api.config import ORGANIZATIONS_GRAPH
1213
from api.extensions import db
1314

1415
UNAUTHORIZED_RESPONSE = {"description": "Unauthorized - Please log in or provide a valid API token"}
@@ -80,7 +81,7 @@ async def list_tokens(request: Request) -> TokenListResponse:
8081
user_email = request.state.user_email
8182

8283
# Get tokens from Organizations graph
83-
organizations_graph = db.select_graph("Organizations")
84+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
8485

8586
# Get user information by API token and then get all associated tokens that connected
8687
# to the Identity of provider='api'
@@ -120,7 +121,7 @@ async def delete_token(request: Request, token_id: str) -> JSONResponse:
120121
user_email = request.state.user_email
121122

122123
# Delete token from Organizations graph
123-
organizations_graph = db.select_graph("Organizations")
124+
organizations_graph = db.select_graph(ORGANIZATIONS_GRAPH)
124125

125126
# Delete the token
126127
delete_query = """

api/routes/usage_tracking.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""Always-on, provider-agnostic per-query usage tracking.
2+
3+
This is deliberately independent of the optional LLM conversational-memory
4+
feature (``api/memory/graphiti_tool.py``). Memory writes are opt-in
5+
(``use_memory``), gated to OpenAI/Azure providers, and lazily created — so
6+
they cannot be used to measure adoption. This module records *every* query,
7+
regardless of provider or the ``use_memory`` flag, onto the central
8+
``Organizations`` graph (which already holds ``User``/``Identity``/``Token``).
9+
10+
For each query we maintain, fire-and-forget:
11+
12+
* Denormalized counters + activity timestamps on the ``User`` node
13+
(``query_count``/``success_count``/``error_count``/``last_active``/
14+
``first_query_at``) for cheap reads.
15+
* A per-query ``(:UsageEvent)`` node linked ``(User)-[:PERFORMED]->`` carrying
16+
``graph_id``/``is_demo``/``success``/``timestamp`` for time-series, per-DB
17+
and success-rate analytics.
18+
19+
Writes never block or fail a request: they run as background tasks whose
20+
exceptions are logged and swallowed, mirroring
21+
``api.core.pipeline.save_memory_background``.
22+
"""
23+
24+
import asyncio
25+
import base64
26+
import binascii
27+
import hashlib
28+
import logging
29+
from typing import Optional
30+
31+
from api.config import ORGANIZATIONS_GRAPH
32+
from api.core.db_resolver import resolve_db
33+
from api.core.pipeline import background_tasks_var, is_general_graph
34+
35+
# Single round-trip: bump the User counters/timestamps and append a UsageEvent.
36+
# Uses MATCH (not MERGE) on User so an unknown email is a silent no-op rather
37+
# than creating a phantom user from the query path. ``timestamp()`` is FalkorDB
38+
# epoch-millis, matching every other timestamp in the Organizations graph.
39+
_RECORD_USAGE_CYPHER = """
40+
MATCH (u:User {email: $email})
41+
SET u.query_count = coalesce(u.query_count, 0) + 1,
42+
u.success_count = coalesce(u.success_count, 0) + (CASE WHEN $success THEN 1 ELSE 0 END),
43+
u.error_count = coalesce(u.error_count, 0) + (CASE WHEN $success THEN 0 ELSE 1 END),
44+
u.last_active = timestamp(),
45+
u.first_query_at = coalesce(u.first_query_at, timestamp())
46+
CREATE (u)-[:PERFORMED]->(e:UsageEvent {
47+
graph_id: $graph_id,
48+
is_demo: $is_demo,
49+
success: $success,
50+
timestamp: timestamp()
51+
})
52+
"""
53+
54+
55+
def _decode_email(user_id: str) -> Optional[str]:
56+
"""Recover the user's email from the base64 ``user_id``.
57+
58+
Inverse of ``base64.b64encode(email.encode())`` in
59+
``api/auth/user_management.py``. Returns ``None`` on malformed input so the
60+
caller can skip tracking instead of raising.
61+
"""
62+
if not user_id:
63+
return None
64+
try:
65+
email = base64.b64decode(user_id, validate=True).decode("utf-8")
66+
except (binascii.Error, ValueError, UnicodeDecodeError):
67+
logging.warning("Usage tracking: could not decode user_id to email")
68+
return None
69+
# b64decode is lenient about padding/length; require an email-shaped result
70+
# so a malformed id can't trigger a phantom DB write (matches the docstring).
71+
if "@" not in email:
72+
logging.warning("Usage tracking: decoded user_id is not a valid email")
73+
return None
74+
return email
75+
76+
77+
async def _write_usage(email: str, graph_id: str, is_demo: bool, success: bool, db) -> None:
78+
"""Perform the single Cypher write against the Organizations graph."""
79+
organizations_graph = resolve_db(db).select_graph(ORGANIZATIONS_GRAPH)
80+
await organizations_graph.query(
81+
_RECORD_USAGE_CYPHER,
82+
{
83+
"email": email,
84+
"graph_id": graph_id,
85+
"is_demo": is_demo,
86+
"success": success,
87+
},
88+
)
89+
# Structured-ish log line so usage is visible to log aggregators even
90+
# before any read API exists. graph_id is the namespaced name
91+
# ({base64(email)}_{db}) and base64 email is reversible, so log a short
92+
# stable hash instead of the raw value — this keeps user identity out of
93+
# logs and also neutralizes the CodeQL log-injection vector.
94+
graph_ref = hashlib.sha256(graph_id.encode()).hexdigest()[:12]
95+
logging.info(
96+
"usage_event graph=%s is_demo=%s success=%s",
97+
graph_ref, is_demo, success,
98+
)
99+
100+
101+
def record_query_usage_background(
102+
user_id: str,
103+
namespaced: str,
104+
success: bool,
105+
*,
106+
db=None,
107+
task_sink: Optional[set] = None,
108+
) -> None:
109+
"""Schedule fire-and-forget usage tracking for one query.
110+
111+
Returns immediately. The write runs as a background task whose failure is
112+
logged but never propagated, so tracking can never break or delay a query
113+
response. Called unconditionally at pipeline completion — independent of
114+
``use_memory`` and the LLM provider.
115+
116+
Args:
117+
user_id: Base64-encoded email (the namespacing id used by the routes).
118+
namespaced: The fully-namespaced graph name the query ran against;
119+
already demo-aware, so it doubles as the recorded ``graph_id``.
120+
success: Whether SQL execution succeeded (no execution error).
121+
db: Optional FalkorDB handle; resolves to the server singleton when None.
122+
task_sink: Optional set the scheduled task is added to (and auto-removed
123+
from on completion) so callers can await any in-flight tracking
124+
writes before shutdown.
125+
"""
126+
email = _decode_email(user_id)
127+
if email is None:
128+
return
129+
130+
is_demo = is_general_graph(namespaced)
131+
sink = task_sink if task_sink is not None else background_tasks_var.get()
132+
133+
task = asyncio.create_task(
134+
_write_usage(email, namespaced, is_demo, success, db)
135+
)
136+
137+
if sink is not None:
138+
sink.add(task)
139+
task.add_done_callback(sink.discard)
140+
141+
def _log_done(t: "asyncio.Task") -> None:
142+
if t.cancelled():
143+
return
144+
exc = t.exception()
145+
if exc is not None:
146+
logging.error("Usage tracking save failed: %s", exc) # nosemgrep
147+
148+
task.add_done_callback(_log_done)

0 commit comments

Comments
 (0)