Skip to content

Commit 396abd0

Browse files
committed
feat: bind loguru context to telemetry scopes
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 1639bff commit 396abd0

8 files changed

Lines changed: 208 additions & 103 deletions

File tree

docs/logfire-instrumentation-strategy.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ Avoid full automatic SQL span firehose by default.
349349
- [x] Phase 2: Root spans for entrypoints and primary operations
350350
- [x] Phase 3: Child spans for sync, search, and routing
351351
- [x] Phase 4: Failure-focused detail and final verification
352+
- [x] Phase 5: Loguru context binding and scoped context inheritance
352353

353354
## Recommended Rollout Phases
354355

@@ -392,6 +393,16 @@ Add selective deeper spans/log enrichment for:
392393

393394
This keeps normal traces clean while improving debuggability.
394395

396+
### Phase 5: Loguru context binding and scoped context inheritance
397+
398+
Add:
399+
400+
- context-local telemetry state in `basic_memory.telemetry`
401+
- a shared `scope(...)` helper that opens a span and binds stable logger context together
402+
- context inheritance for routing, sync, and search so downstream `loguru` logs carry the active operation fields
403+
404+
This makes the trace view and the log stream tell the same story without forcing logger rewrites across the codebase.
405+
395406
## Validation Checklist
396407

397408
We should consider the integration successful when the following are true:

src/basic_memory/mcp/project_context.py

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ async def resolve_workspace_parameter(
205205
context: Optional[Context] = None,
206206
) -> WorkspaceInfo:
207207
"""Resolve workspace using explicit input, session cache, and cloud discovery."""
208-
with telemetry.span(
208+
with telemetry.scope(
209209
"routing.resolve_workspace",
210210
workspace_requested=workspace is not None,
211211
has_context=context is not None,
@@ -281,7 +281,7 @@ async def get_active_project(
281281
ValueError: If no project can be resolved
282282
HTTPError: If project doesn't exist or is inaccessible
283283
"""
284-
with telemetry.span(
284+
with telemetry.scope(
285285
"routing.validate_project",
286286
requested_project=project,
287287
has_context=context is not None,
@@ -364,7 +364,7 @@ async def resolve_project_and_path(
364364
"""
365365
is_memory_url = identifier.strip().startswith("memory://")
366366
include_project = ConfigManager().config.permalinks_include_project if is_memory_url else None
367-
with telemetry.span(
367+
with telemetry.scope(
368368
"routing.resolve_memory_url",
369369
is_memory_url=is_memory_url,
370370
requested_project=project,
@@ -548,14 +548,13 @@ async def get_project_client(
548548
# Outcome: use the factory client directly, skip workspace resolution
549549
if is_factory_mode():
550550
route_mode = "factory"
551-
with telemetry.contextualize(route_mode=route_mode, workspace_id=workspace):
552-
with telemetry.span(
553-
"routing.resolve_client",
554-
project_name=resolved_project,
555-
route_mode=route_mode,
556-
workspace_id=workspace,
557-
):
558-
logger.debug("Using injected client factory for project routing")
551+
with telemetry.scope(
552+
"routing.resolve_client",
553+
project_name=resolved_project,
554+
route_mode=route_mode,
555+
workspace_id=workspace,
556+
):
557+
logger.debug("Using injected client factory for project routing")
559558
async with get_client() as client:
560559
active_project = await get_active_project(client, resolved_project, context)
561560
yield client, active_project
@@ -567,13 +566,12 @@ async def get_project_client(
567566
# Outcome: route strictly based on explicit flag, no workspace network calls
568567
if _explicit_routing() and _force_local_mode():
569568
route_mode = "explicit_local"
570-
with telemetry.contextualize(route_mode=route_mode, workspace_id=None):
571-
with telemetry.span(
572-
"routing.resolve_client",
573-
project_name=resolved_project,
574-
route_mode=route_mode,
575-
):
576-
logger.debug("Explicit local routing selected for project client")
569+
with telemetry.scope(
570+
"routing.resolve_client",
571+
project_name=resolved_project,
572+
route_mode=route_mode,
573+
):
574+
logger.debug("Explicit local routing selected for project client")
577575
async with get_client(project_name=resolved_project) as client:
578576
active_project = await get_active_project(client, resolved_project, context)
579577
yield client, active_project
@@ -611,17 +609,13 @@ async def get_project_client(
611609
# which checks context cache, auto-selects single workspace, or errors
612610
if effective_workspace is not None:
613611
# Config-resolved workspace — pass directly to get_client, skip network lookup
614-
with telemetry.contextualize(
612+
with telemetry.scope(
613+
"routing.resolve_client",
614+
project_name=resolved_project,
615615
route_mode=route_mode,
616616
workspace_id=effective_workspace,
617617
):
618-
with telemetry.span(
619-
"routing.resolve_client",
620-
project_name=resolved_project,
621-
route_mode=route_mode,
622-
workspace_id=effective_workspace,
623-
):
624-
logger.debug("Using configured workspace for cloud project routing")
618+
logger.debug("Using configured workspace for cloud project routing")
625619
async with get_client(
626620
project_name=resolved_project,
627621
workspace=effective_workspace,
@@ -631,17 +625,13 @@ async def get_project_client(
631625
else:
632626
# No config-based workspace — use resolve_workspace_parameter for discovery
633627
active_ws = await resolve_workspace_parameter(workspace=None, context=context)
634-
with telemetry.contextualize(
628+
with telemetry.scope(
629+
"routing.resolve_client",
630+
project_name=resolved_project,
635631
route_mode=route_mode,
636632
workspace_id=active_ws.tenant_id,
637633
):
638-
with telemetry.span(
639-
"routing.resolve_client",
640-
project_name=resolved_project,
641-
route_mode=route_mode,
642-
workspace_id=active_ws.tenant_id,
643-
):
644-
logger.debug("Resolved workspace dynamically for cloud project routing")
634+
logger.debug("Resolved workspace dynamically for cloud project routing")
645635
async with get_client(
646636
project_name=resolved_project,
647637
workspace=active_ws.tenant_id,
@@ -652,13 +642,12 @@ async def get_project_client(
652642

653643
# Step 4: Local routing (default)
654644
route_mode = "local_asgi"
655-
with telemetry.contextualize(route_mode=route_mode, workspace_id=None):
656-
with telemetry.span(
657-
"routing.resolve_client",
658-
project_name=resolved_project,
659-
route_mode=route_mode,
660-
):
661-
logger.debug("Using default local ASGI routing for project client")
645+
with telemetry.scope(
646+
"routing.resolve_client",
647+
project_name=resolved_project,
648+
route_mode=route_mode,
649+
):
650+
logger.debug("Using default local ASGI routing for project client")
662651
async with get_client(project_name=resolved_project) as client:
663652
active_project = await get_active_project(client, resolved_project, context)
664653
yield client, active_project

src/basic_memory/services/search_service.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,6 @@ async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchInd
152152
logger.debug("no criteria passed to query")
153153
return []
154154

155-
logger.trace(f"Searching with query: {query}")
156-
157155
after_date = (
158156
(
159157
query.after_date
@@ -176,7 +174,7 @@ async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchInd
176174
retrieval_mode = query.retrieval_mode or SearchRetrievalMode.FTS
177175
strict_search_text = query.text
178176

179-
with telemetry.span(
177+
with telemetry.scope(
180178
"search.execute",
181179
retrieval_mode=retrieval_mode.value,
182180
has_text_query=bool(strict_search_text),
@@ -186,6 +184,7 @@ async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchInd
186184
limit=limit,
187185
offset=offset,
188186
):
187+
logger.trace(f"Searching with query: {query}")
189188
# First pass: preserve existing strict search behavior.
190189
results = await self.repository.search(
191190
search_text=strict_search_text,
@@ -219,7 +218,7 @@ async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchInd
219218
"Strict FTS returned 0 results; retrying relaxed FTS query "
220219
f"strict='{strict_search_text}' relaxed='{relaxed_search_text}'"
221220
)
222-
with telemetry.span(
221+
with telemetry.scope(
223222
"search.relaxed_fts_retry",
224223
retrieval_mode=retrieval_mode.value,
225224
token_count=len(self._tokenize_fts_text(strict_search_text)),

src/basic_memory/sync/sync_service.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async def sync(
278278

279279
# initial paths from db to sync
280280
# path -> checksum
281-
with telemetry.span("sync.project.scan", force_full=force_full):
281+
with telemetry.scope("sync.project.scan", force_full=force_full):
282282
report = await self.scan(directory, force_full=force_full)
283283

284284
# order of sync matters to resolve relations effectively
@@ -287,7 +287,7 @@ async def sync(
287287
+ f"deleted_files={len(report.deleted)}, moved_files={len(report.moves)}"
288288
)
289289

290-
with telemetry.span(
290+
with telemetry.scope(
291291
"sync.project.apply_changes",
292292
new_count=len(report.new),
293293
modified_count=len(report.modified),
@@ -350,15 +350,15 @@ async def sync(
350350
# Only resolve relations if there were actual changes
351351
# If no files changed, no new unresolved relations could have been created
352352
if report.total > 0:
353-
with telemetry.span("sync.project.resolve_relations", relation_scope="all_pending"):
353+
with telemetry.scope("sync.project.resolve_relations", relation_scope="all_pending"):
354354
await self.resolve_relations()
355355
else:
356356
logger.info("Skipping relation resolution - no file changes detected")
357357

358358
# Batch-generate vector embeddings for all synced entities
359359
if synced_entity_ids and self.app_config.semantic_search_enabled:
360360
try:
361-
with telemetry.span(
361+
with telemetry.scope(
362362
"sync.project.sync_embeddings",
363363
entity_count=len(synced_entity_ids),
364364
):
@@ -382,7 +382,7 @@ async def sync(
382382
# Update scan watermark after successful sync
383383
# Use the timestamp from sync start (not end) to ensure we catch files
384384
# created during the sync on the next iteration
385-
with telemetry.span("sync.project.update_watermark"):
385+
with telemetry.scope("sync.project.update_watermark"):
386386
current_file_count = await self._quick_count_files(directory)
387387
if self.entity_repository.project_id is not None:
388388
project = await self.project_repository.find_by_id(
@@ -458,7 +458,7 @@ async def scan(self, directory, force_full: bool = False):
458458
if project is None:
459459
raise ValueError(f"Project not found: {self.entity_repository.project_id}")
460460

461-
with telemetry.span("sync.project.select_scan_strategy", force_full=force_full):
461+
with telemetry.scope("sync.project.select_scan_strategy", force_full=force_full):
462462
# Step 1: Quick file count
463463
logger.debug("Counting files in directory")
464464
current_count = await self._quick_count_files(directory)
@@ -502,13 +502,12 @@ async def scan(self, directory, force_full: bool = False):
502502
logger.warning("No scan watermark available, falling back to full scan")
503503
scan_coro = self._scan_directory_full(directory)
504504

505-
with telemetry.contextualize(scan_type=scan_type):
506-
with telemetry.span("sync.project.filesystem_scan", scan_type=scan_type):
507-
file_paths_to_scan = await scan_coro
508-
if scan_type == "incremental":
509-
logger.debug(
510-
f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
511-
)
505+
with telemetry.scope("sync.project.filesystem_scan", scan_type=scan_type):
506+
file_paths_to_scan = await scan_coro
507+
if scan_type == "incremental":
508+
logger.debug(
509+
f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
510+
)
512511

513512
# Step 3: Process each file with mtime-based comparison
514513
scanned_paths: Set[str] = set()
@@ -569,7 +568,7 @@ async def scan(self, directory, force_full: bool = False):
569568

570569
# Step 4: Detect moves (for both full and incremental scans)
571570
# Check if any "new" files are actually moves by matching checksums
572-
with telemetry.span("sync.project.detect_moves", new_count=len(report.new)):
571+
with telemetry.scope("sync.project.detect_moves", new_count=len(report.new)):
573572
for new_path in list(
574573
report.new
575574
): # Use list() to allow modification during iteration
@@ -604,7 +603,7 @@ async def scan(self, directory, force_full: bool = False):
604603
# Step 5: Detect deletions (only for full scans)
605604
# Incremental scans can't reliably detect deletions since they only see modified files
606605
if scan_type in ("full_initial", "full_deletions", "full_fallback", "full_forced"):
607-
with telemetry.span("sync.project.detect_deletions", scan_type=scan_type):
606+
with telemetry.scope("sync.project.detect_deletions", scan_type=scan_type):
608607
# Use optimized query for just file paths (not full entities)
609608
db_file_paths = await self.entity_repository.get_all_file_paths()
610609
logger.debug(f"Found {len(db_file_paths)} db paths for deletion detection")
@@ -693,7 +692,7 @@ async def sync_file(
693692
except FileNotFoundError:
694693
# File exists in database but not on filesystem
695694
# This indicates a database/filesystem inconsistency - treat as deletion
696-
with telemetry.span(
695+
with telemetry.scope(
697696
"sync.file.failure",
698697
failure_type="file_not_found",
699698
path=path,
@@ -715,7 +714,7 @@ async def sync_file(
715714
if isinstance(e, SyncFatalError) or isinstance(
716715
e.__cause__, SyncFatalError
717716
): # pragma: no cover
718-
with telemetry.span(
717+
with telemetry.scope(
719718
"sync.file.failure",
720719
failure_type=failure_type,
721720
path=path,
@@ -728,7 +727,7 @@ async def sync_file(
728727

729728
# Otherwise treat as recoverable file-level error
730729
error_msg = str(e)
731-
with telemetry.span(
730+
with telemetry.scope(
732731
"sync.file.failure",
733732
failure_type=failure_type,
734733
path=path,
@@ -1133,7 +1132,7 @@ async def resolve_relations(self, entity_id: int | None = None):
11331132
# update search index only on successful resolution
11341133
await self.search_service.index_entity(resolved_entity)
11351134
except IntegrityError:
1136-
with telemetry.span(
1135+
with telemetry.scope(
11371136
"sync.relation.resolve_conflict",
11381137
relation_id=relation.id,
11391138
relation_type=relation.relation_type,
@@ -1154,7 +1153,7 @@ async def resolve_relations(self, entity_id: int | None = None):
11541153
try:
11551154
await self.relation_repository.delete(relation.id)
11561155
except Exception as e:
1157-
with telemetry.span(
1156+
with telemetry.scope(
11581157
"sync.relation.cleanup_failure",
11591158
relation_id=relation.id,
11601159
relation_type=relation.relation_type,

0 commit comments

Comments
 (0)