-
Notifications
You must be signed in to change notification settings - Fork 185
Expand file tree
/
Copy pathproject_context.py
More file actions
733 lines (614 loc) · 29.7 KB
/
project_context.py
File metadata and controls
733 lines (614 loc) · 29.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
"""Project context utilities for Basic Memory MCP server.
Provides project lookup utilities for MCP tools.
Handles project validation and context management in one place.
Note: This module uses ProjectResolver for unified project resolution.
The resolve_project_parameter function is a thin wrapper for backwards
compatibility with existing MCP tools.
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator, Awaitable, Callable, Optional, List, Tuple
from httpx import AsyncClient
from httpx._types import (
HeaderTypes,
)
from loguru import logger
from fastmcp import Context
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory import telemetry
from basic_memory.config import BasicMemoryConfig, ConfigManager, ProjectMode
from basic_memory.project_resolver import ProjectResolver
from basic_memory.schemas.cloud import WorkspaceInfo, WorkspaceListResponse
from basic_memory.schemas.project_info import ProjectItem, ProjectList
from basic_memory.schemas.v2 import ProjectResolveResponse
from basic_memory.schemas.memory import memory_url_path
from basic_memory.utils import generate_permalink, normalize_project_reference
# --- Workspace provider injection ---
# Mirrors the set_client_factory() pattern in async_client.py.
# The cloud MCP server sets a provider that queries its own database directly,
# avoiding the control-plane HTTP round-trip that requires local credentials.
_workspace_provider: Optional[Callable[[], Awaitable[list[WorkspaceInfo]]]] = None
def set_workspace_provider(provider: Callable[[], Awaitable[list[WorkspaceInfo]]]) -> None:
"""Override workspace discovery (for cloud app, testing, etc)."""
global _workspace_provider
_workspace_provider = provider
async def _resolve_default_project_from_api() -> Optional[str]:
"""Query the projects API for the default project.
Used as a fallback when ConfigManager has no local config (cloud mode).
"""
from basic_memory.mcp.async_client import get_client
try:
async with get_client() as client:
response = await client.get("/v2/projects/")
if response.status_code == 200:
project_list = ProjectList.model_validate(response.json())
if project_list.default_project:
return project_list.default_project
# Fallback: find project with is_default=True
for p in project_list.projects:
if p.is_default:
return p.name
except Exception:
pass
return None
async def _get_cached_active_project(context: Optional[Context]) -> Optional[ProjectItem]:
"""Return the cached active project from context when available."""
if not context:
return None
cached_raw = await context.get_state("active_project")
if isinstance(cached_raw, dict):
return ProjectItem.model_validate(cached_raw)
return None
async def _set_cached_active_project(
context: Optional[Context],
active_project: ProjectItem,
) -> None:
"""Persist the active project and known default-project metadata in context."""
if not context:
return
await context.set_state("active_project", active_project.model_dump())
if active_project.is_default:
await context.set_state("default_project_name", active_project.name)
async def _get_cached_default_project(context: Optional[Context]) -> Optional[str]:
"""Return the cached default project name from context when available."""
if not context:
return None
cached_default = await context.get_state("default_project_name")
if isinstance(cached_default, str):
return cached_default
return None
def _canonicalize_project_name(
project_name: Optional[str],
config: BasicMemoryConfig,
) -> Optional[str]:
"""Return the configured project name when the identifier matches by permalink.
Project routing happens before API validation, so we normalize explicit inputs
here to keep local/cloud routing aligned with the database's case-insensitive
project resolver.
"""
if project_name is None:
return None
requested_permalink = generate_permalink(project_name)
for configured_name in config.projects:
if generate_permalink(configured_name) == requested_permalink:
return configured_name
return project_name
def _project_matches_identifier(project_item: ProjectItem, identifier: Optional[str]) -> bool:
"""Return True when the identifier refers to the cached project."""
if identifier is None:
return True
normalized_identifier = generate_permalink(identifier)
return normalized_identifier in {
generate_permalink(project_item.name),
project_item.permalink,
}
async def resolve_project_parameter(
project: Optional[str] = None,
allow_discovery: bool = False,
default_project: Optional[str] = None,
context: Optional[Context] = None,
) -> Optional[str]:
"""Resolve project parameter using unified linear priority chain.
This is a thin wrapper around ProjectResolver for backwards compatibility.
New code should consider using ProjectResolver directly for more detailed
resolution information.
Resolution order:
1. ENV_CONSTRAINT: BASIC_MEMORY_MCP_PROJECT env var (highest priority)
2. EXPLICIT: project parameter passed directly
3. DEFAULT: default_project from config (if set)
4. Fallback: discovery (if allowed) → NONE
Args:
project: Optional explicit project parameter
allow_discovery: If True, allows returning None for discovery mode
(used by tools like recent_activity that can operate across all projects)
default_project: Optional explicit default project. If not provided, reads from ConfigManager.
Returns:
Resolved project name or None if no resolution possible
"""
with telemetry.span(
"routing.resolve_project",
requested_project=project,
allow_discovery=allow_discovery,
):
config = ConfigManager().config
# Trigger: project already resolved earlier in the same MCP request
# Why: the active project is request-constant, so re-discovering the
# default project via /v2/projects/ just repeats work
# Outcome: reuse the cached project name as the explicit candidate
if project is None:
cached_project = await _get_cached_active_project(context)
if cached_project is not None:
project = cached_project.name
# Trigger: there is no explicit project after env/context normalization
# Why: default-project discovery is only needed as a fallback; doing it
# for explicit requests adds an avoidable /v2/projects/ round-trip
# Outcome: skip default lookup when the active project is already known
if default_project is None and project is None:
# Load config for any values not explicitly provided.
# ConfigManager reads from the local config file, which doesn't exist in cloud mode.
# When it returns None, fall back to querying the projects API for the is_default flag.
default_project = config.default_project
if default_project is None:
default_project = await _get_cached_default_project(context)
if default_project is None:
default_project = await _resolve_default_project_from_api()
if default_project and context:
await context.set_state("default_project_name", default_project)
# Create resolver with configuration and resolve
resolver = ProjectResolver.from_env(
default_project=default_project,
)
result = resolver.resolve(project=project, allow_discovery=allow_discovery)
return _canonicalize_project_name(result.project, config)
async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
# Deferred import to avoid circular dependency with tools
from basic_memory.mcp.tools.utils import call_get
response = await call_get(client, "/v2/projects/", headers=headers)
project_list = ProjectList.model_validate(response.json())
return [project.name for project in project_list.projects]
def _workspace_matches_identifier(workspace: WorkspaceInfo, identifier: str) -> bool:
"""Return True when identifier matches workspace tenant_id or name."""
if workspace.tenant_id == identifier:
return True
return workspace.name.lower() == identifier.lower()
def _workspace_choices(workspaces: list[WorkspaceInfo]) -> str:
"""Format deterministic workspace choices for prompt-style errors."""
return "\n".join(
[
(
f"- {item.name} "
f"(type={item.workspace_type}, role={item.role}, tenant_id={item.tenant_id})"
)
for item in workspaces
]
)
async def get_available_workspaces(context: Optional[Context] = None) -> list[WorkspaceInfo]:
"""Load available cloud workspaces for the current authenticated user."""
if context:
cached_raw = await context.get_state("available_workspaces")
if isinstance(cached_raw, list):
return [WorkspaceInfo.model_validate(item) for item in cached_raw]
# Trigger: workspace provider was injected (e.g., by cloud MCP server)
# Why: the cloud server IS the cloud — it can query its own database
# directly instead of making an HTTP round-trip that requires local credentials
# Outcome: use provider result, cache in context, skip control-plane client
if _workspace_provider is not None:
workspaces = await _workspace_provider()
if context:
await context.set_state(
"available_workspaces",
[ws.model_dump() for ws in workspaces],
)
return workspaces
from basic_memory.mcp.async_client import get_cloud_control_plane_client
from basic_memory.mcp.tools.utils import call_get
async with get_cloud_control_plane_client() as client:
response = await call_get(client, "/workspaces/")
workspace_list = WorkspaceListResponse.model_validate(response.json())
if context:
await context.set_state(
"available_workspaces",
[ws.model_dump() for ws in workspace_list.workspaces],
)
return workspace_list.workspaces
async def resolve_workspace_parameter(
workspace: Optional[str] = None,
context: Optional[Context] = None,
) -> WorkspaceInfo:
"""Resolve workspace using explicit input, session cache, and cloud discovery."""
with telemetry.scope(
"routing.resolve_workspace",
workspace_requested=workspace is not None,
has_context=context is not None,
):
if context:
cached_raw = await context.get_state("active_workspace")
if isinstance(cached_raw, dict):
cached_workspace = WorkspaceInfo.model_validate(cached_raw)
if workspace is None or _workspace_matches_identifier(cached_workspace, workspace):
logger.debug(
f"Using cached workspace from context: {cached_workspace.tenant_id}"
)
return cached_workspace
workspaces = await get_available_workspaces(context=context)
if not workspaces:
raise ValueError(
"No accessible workspaces found for this account. "
"Ensure you have an active subscription and tenant access."
)
selected_workspace: WorkspaceInfo | None = None
if workspace:
matches = [
item for item in workspaces if _workspace_matches_identifier(item, workspace)
]
if not matches:
raise ValueError(
f"Workspace '{workspace}' was not found.\n"
f"Available workspaces:\n{_workspace_choices(workspaces)}"
)
if len(matches) > 1:
raise ValueError(
f"Workspace name '{workspace}' matches multiple workspaces. "
"Use tenant_id instead.\n"
f"Available workspaces:\n{_workspace_choices(workspaces)}"
)
selected_workspace = matches[0]
elif len(workspaces) == 1:
selected_workspace = workspaces[0]
else:
raise ValueError(
"Multiple workspaces are available. Ask the user which workspace to use, then retry "
"with the 'workspace' argument set to the tenant_id or unique name.\n"
f"Available workspaces:\n{_workspace_choices(workspaces)}"
)
if context:
await context.set_state("active_workspace", selected_workspace.model_dump())
logger.debug(f"Cached workspace in context: {selected_workspace.tenant_id}")
return selected_workspace
async def get_active_project(
client: AsyncClient,
project: Optional[str] = None,
context: Optional[Context] = None,
headers: HeaderTypes | None = None,
) -> ProjectItem:
"""Get and validate project, setting it in context if available.
Args:
client: HTTP client for API calls
project: Optional project name (resolved using hierarchy)
context: Optional FastMCP context to cache the result
Returns:
The validated project item
Raises:
ValueError: If no project can be resolved
HTTPError: If project doesn't exist or is inaccessible
"""
with telemetry.scope(
"routing.validate_project",
requested_project=project,
has_context=context is not None,
):
# Deferred import to avoid circular dependency with tools
from basic_memory.mcp.tools.utils import call_post
cached_project = await _get_cached_active_project(context)
if cached_project and _project_matches_identifier(cached_project, project):
logger.debug(f"Using cached project from context: {cached_project.name}")
return cached_project
resolved_project = await resolve_project_parameter(project, context=context)
if not resolved_project:
project_names = await get_project_names(client, headers)
raise ValueError(
"No project specified. "
"Either set 'default_project' in config, or use 'project' argument.\n"
f"Available projects: {project_names}"
)
project = resolved_project
if cached_project and _project_matches_identifier(cached_project, project):
logger.debug(f"Using cached project from context: {cached_project.name}")
return cached_project
# Validate project exists by calling API
logger.debug(f"Validating project: {project}")
response = await call_post(
client,
"/v2/projects/resolve",
json={"identifier": project},
headers=headers,
)
resolved = ProjectResolveResponse.model_validate(response.json())
active_project = ProjectItem(
id=resolved.project_id,
external_id=resolved.external_id,
name=resolved.name,
path=resolved.path,
is_default=resolved.is_default,
)
# Cache in context if available
await _set_cached_active_project(context, active_project)
if context:
logger.debug(f"Cached project in context: {project}")
logger.debug(f"Validated project: {active_project.name}")
return active_project
def _split_project_prefix(path: str) -> tuple[Optional[str], str]:
"""Split a possible project prefix from a memory URL path."""
if "/" not in path:
return None, path
project_prefix, remainder = path.split("/", 1)
if not project_prefix or not remainder:
return None, path
if "*" in project_prefix:
return None, path
return project_prefix, remainder
async def resolve_project_and_path(
client: AsyncClient,
identifier: str,
project: Optional[str] = None,
context: Optional[Context] = None,
headers: HeaderTypes | None = None,
) -> tuple[ProjectItem, str, bool]:
"""Resolve project and normalized path for memory:// identifiers.
Returns:
Tuple of (active_project, normalized_path, is_memory_url)
"""
is_memory_url = identifier.strip().startswith("memory://")
config = ConfigManager().config
include_project = config.permalinks_include_project if is_memory_url else None
with telemetry.scope(
"routing.resolve_memory_url",
is_memory_url=is_memory_url,
requested_project=project,
include_project_prefix=include_project,
):
if not is_memory_url:
active_project = await get_active_project(client, project, context, headers)
return active_project, identifier, False
normalized_path = normalize_project_reference(memory_url_path(identifier))
project_prefix, remainder = _split_project_prefix(normalized_path)
include_project = config.permalinks_include_project
# Trigger: memory URL begins with a potential project segment
# Why: allow project-scoped memory URLs without requiring a separate project parameter
# Outcome: attempt to resolve the prefix as a project and route to it
if project_prefix:
cached_project = await _get_cached_active_project(context)
if cached_project and _project_matches_identifier(cached_project, project_prefix):
resolved_project = await resolve_project_parameter(project_prefix, context=context)
if resolved_project and generate_permalink(resolved_project) != generate_permalink(
project_prefix
):
raise ValueError(
f"Project is constrained to '{resolved_project}', cannot use '{project_prefix}'."
)
resolved_path = (
f"{cached_project.permalink}/{remainder}" if include_project else remainder
)
return cached_project, resolved_path, True
try:
from basic_memory.mcp.tools.utils import call_post
response = await call_post(
client,
"/v2/projects/resolve",
json={"identifier": project_prefix},
headers=headers,
)
resolved = ProjectResolveResponse.model_validate(response.json())
except ToolError as exc:
if "project not found" not in str(exc).lower():
raise
else:
resolved_project = await resolve_project_parameter(project_prefix, context=context)
if resolved_project and generate_permalink(resolved_project) != generate_permalink(
project_prefix
):
raise ValueError(
f"Project is constrained to '{resolved_project}', cannot use '{project_prefix}'."
)
active_project = ProjectItem(
id=resolved.project_id,
external_id=resolved.external_id,
name=resolved.name,
path=resolved.path,
is_default=resolved.is_default,
)
await _set_cached_active_project(context, active_project)
resolved_path = (
f"{resolved.permalink}/{remainder}" if include_project else remainder
)
return active_project, resolved_path, True
# Trigger: no resolvable project prefix in the memory URL
# Why: preserve existing memory URL behavior within the active project
# Outcome: use the active project and normalize the path for lookup
active_project = await get_active_project(client, project, context, headers)
resolved_path = normalized_path
if include_project:
# Trigger: project-prefixed permalinks are enabled and the path lacks a prefix
# Why: ensure memory URL lookups align with canonical permalinks
# Outcome: prefix the path with the active project's permalink
project_prefix = active_project.permalink
if resolved_path != project_prefix and not resolved_path.startswith(
f"{project_prefix}/"
):
resolved_path = f"{project_prefix}/{resolved_path}"
return active_project, resolved_path, True
def add_project_metadata(result: str, project_name: str) -> str:
"""Add project context as metadata footer for assistant session tracking.
Provides clear project context to help the assistant remember which
project is being used throughout the conversation session.
Args:
result: The tool result string
project_name: The project name that was used
Returns:
Result with project session tracking metadata
"""
return f"{result}\n\n[Session: Using project '{project_name}']"
def detect_project_from_url_prefix(identifier: str, config: BasicMemoryConfig) -> Optional[str]:
"""Check if a memory URL's first path segment matches a known project in config.
This enables automatic project routing from memory URLs like
``memory://specs/in-progress`` without requiring the caller to pass
an explicit ``project`` parameter.
Uses local config only — no network calls.
Args:
identifier: Raw identifier string (may or may not start with ``memory://``).
config: Current BasicMemoryConfig with project entries.
Returns:
Matching project name from config, or None if no match.
"""
path = memory_url_path(identifier) if identifier.strip().startswith("memory://") else identifier
normalized = normalize_project_reference(path)
prefix, _ = _split_project_prefix(normalized)
if prefix is None:
return None
prefix_permalink = generate_permalink(prefix)
for project_name in config.projects:
if generate_permalink(project_name) == prefix_permalink:
return project_name
return None
@asynccontextmanager
async def get_project_client(
project: Optional[str] = None,
workspace: Optional[str] = None,
context: Optional[Context] = None,
) -> AsyncIterator[Tuple[AsyncClient, ProjectItem]]:
"""Resolve project, create correctly-routed client, and validate project.
Solves the bootstrap problem: we need to know the project name to choose
the right client (local vs cloud), but we need the client to validate
the project. This helper resolves the project from config first (no
network), creates the correctly-routed client, then validates via API.
Routing decision order:
1. Explicit --local/--cloud flags → skip workspace, use flag routing
2. Cloud routing (explicit --cloud OR project mode CLOUD) →
resolve workspace via priority chain, create cloud client
3. Otherwise → local ASGI client
Workspace resolution priority (when cloud routing):
1. Explicit ``workspace`` parameter
2. Per-project ``workspace_id`` from config
3. Global ``default_workspace`` from config
4. MCP session cache (context)
5. Auto-select if single workspace
6. Error listing choices
Args:
project: Optional explicit project parameter
workspace: Optional cloud workspace selector (tenant_id or unique name)
context: Optional FastMCP context for caching
Yields:
Tuple of (client, active_project)
Raises:
ValueError: If no project can be resolved
RuntimeError: If cloud project but no API key configured
"""
# Deferred imports to avoid circular dependency
from basic_memory.mcp.async_client import (
_explicit_routing,
_force_local_mode,
get_client,
is_factory_mode,
)
# Step 1: Resolve project name from config (no network call)
resolved_project = await resolve_project_parameter(project, context=context)
if not resolved_project:
# Fall back to local client to discover projects and raise helpful error
async with get_client() as client:
project_names = await get_project_names(client)
raise ValueError(
"No project specified. "
"Either set 'default_project' in config, or use 'project' argument.\n"
f"Available projects: {project_names}"
)
# Step 1b: Factory injection (in-process cloud server)
# Trigger: set_client_factory() was called (e.g., by cloud MCP server)
# Why: the factory's transport layer handles auth and tenant resolution;
# we pass workspace through so the transport can route to the correct
# workspace when the tool specifies one different from the connection default
# Outcome: factory client with optional workspace override via inner request headers
if is_factory_mode():
route_mode = "factory"
with telemetry.scope(
"routing.client_session",
project_name=resolved_project,
route_mode=route_mode,
workspace_id=workspace,
):
logger.debug("Using injected client factory for project routing")
async with get_client(workspace=workspace) as client:
active_project = await get_active_project(client, resolved_project, context)
yield client, active_project
return
# Step 2: Check explicit routing BEFORE workspace resolution
# Trigger: CLI passed --local or --cloud
# Why: explicit flags must be deterministic — skip workspace entirely for --local
# Outcome: route strictly based on explicit flag, no workspace network calls
if _explicit_routing() and _force_local_mode():
route_mode = "explicit_local"
with telemetry.scope(
"routing.client_session",
project_name=resolved_project,
route_mode=route_mode,
):
logger.debug("Explicit local routing selected for project client")
async with get_client(project_name=resolved_project) as client:
active_project = await get_active_project(client, resolved_project, context)
yield client, active_project
return
# Step 3: Determine if cloud routing is needed
config = ConfigManager().config
project_entry = config.projects.get(resolved_project)
project_mode = config.get_project_mode(resolved_project)
# Trigger: workspace provided for a local project (without explicit --cloud)
# Why: workspace selection is a cloud routing concern only
# Outcome: fail fast with a deterministic guidance message
if project_mode != ProjectMode.CLOUD and workspace is not None and not _explicit_routing():
raise ValueError(
f"Workspace '{workspace}' cannot be used with local project '{resolved_project}'. "
"Workspace selection is only supported for cloud-mode projects."
)
if project_mode == ProjectMode.CLOUD or (_explicit_routing() and not _force_local_mode()):
# --- Cloud routing: resolve workspace with priority chain ---
effective_workspace = workspace
# Priority 2: per-project workspace_id from config
if effective_workspace is None and project_entry and project_entry.workspace_id:
effective_workspace = project_entry.workspace_id
# Priority 3: global default_workspace from config
if effective_workspace is None and config.default_workspace:
effective_workspace = config.default_workspace
route_mode = "cloud_proxy"
# Priorities 4-6: if still unresolved, fall back to resolve_workspace_parameter
# which checks context cache, auto-selects single workspace, or errors
if effective_workspace is not None:
# Config-resolved workspace — pass directly to get_client, skip network lookup
with telemetry.scope(
"routing.client_session",
project_name=resolved_project,
route_mode=route_mode,
workspace_id=effective_workspace,
):
logger.debug("Using configured workspace for cloud project routing")
async with get_client(
project_name=resolved_project,
workspace=effective_workspace,
) as client:
active_project = await get_active_project(client, resolved_project, context)
yield client, active_project
else:
# No config-based workspace — use resolve_workspace_parameter for discovery
active_ws = await resolve_workspace_parameter(workspace=None, context=context)
with telemetry.scope(
"routing.client_session",
project_name=resolved_project,
route_mode=route_mode,
workspace_id=active_ws.tenant_id,
):
logger.debug("Resolved workspace dynamically for cloud project routing")
async with get_client(
project_name=resolved_project,
workspace=active_ws.tenant_id,
) as client:
active_project = await get_active_project(client, resolved_project, context)
yield client, active_project
return
# Step 4: Local routing (default)
route_mode = "local_asgi"
with telemetry.scope(
"routing.client_session",
project_name=resolved_project,
route_mode=route_mode,
):
logger.debug("Using default local ASGI routing for project client")
async with get_client(project_name=resolved_project) as client:
active_project = await get_active_project(client, resolved_project, context)
yield client, active_project