Skip to content

Commit 6420f1c

Browse files
committed
feat: implement integration challenge solutions for multi-agent architecture
Add shared utilities and agent-specific modules that solve the 5 integration challenges identified in the architecture proposal: 1. OBO Auth Bridge (agents/_shared/auth_bridge.py): Context manager that bridges @app_agent UserContext into both monolith ContextVar auth and databricks-tools-core ContextVars in a single `with` block. 2. Complex Tool Schemas (agents/creator/schemas.py): Pydantic models that auto-generate JSON Schema for @app_agent tool registration, replacing ~580 lines of hand-written schema in create_agent_tools.py. 3. Frontend Transparent Proxy (agents/supervisor/proxy.py): Ordered route table mapping all 28 frontend API paths to sub-agents with SSE stream detection and glob support for path parameters. 4. SP Fallback Decorator (agents/_shared/sp_fallback.py): Extracts the _is_scope_error + retry-with-SP pattern from genie_client.py into a reusable decorator and convenience function. 5. Shared Lakebase Client (agents/_shared/lakebase_client.py): Connection pool lifecycle with idempotent DDL per agent and in-memory fallback. Also updates scorer/app.py and creator/app.py scaffolds to demonstrate the integration patterns, and updates the proposal doc with concrete solutions replacing the placeholder descriptions.
1 parent 7079ede commit 6420f1c

File tree

10 files changed

+897
-63
lines changed

10 files changed

+897
-63
lines changed

agents/_shared/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Shared utilities for Genie Workbench agents.
2+
3+
Provides cross-cutting concerns that multiple agents need:
4+
- auth_bridge: Bridge @app_agent UserContext into monolith + AI Dev Kit auth
5+
- sp_fallback: Service principal fallback for Genie API scope errors
6+
- lakebase_client: Shared PostgreSQL connection pool management
7+
"""

agents/_shared/auth_bridge.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""Bridge @app_agent UserContext into both monolith and AI Dev Kit auth systems.
2+
3+
During migration, agent tools receive `request.user_context` from @app_agent,
4+
but domain logic (scanner, genie_client, etc.) calls `get_workspace_client()`
5+
from the monolith's auth module. And `databricks-tools-core` functions use
6+
their own separate ContextVars via `set_databricks_auth()`.
7+
8+
This module provides `obo_context()` — a single context manager that sets up
9+
all three auth systems so existing domain logic works unchanged inside agents.
10+
11+
Source patterns:
12+
- backend/services/auth.py:25 (_obo_client ContextVar)
13+
- backend/services/auth.py:33-58 (set_obo_user_token)
14+
- databricks_tools_core/auth.py (set_databricks_auth / clear_databricks_auth)
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import os
20+
from contextlib import contextmanager
21+
from contextvars import ContextVar
22+
from typing import Optional
23+
24+
from databricks.sdk import WorkspaceClient
25+
from databricks.sdk.config import Config
26+
27+
28+
# Monolith-compatible ContextVar (mirrors backend/services/auth.py:25)
29+
_obo_client: ContextVar[Optional[WorkspaceClient]] = ContextVar(
30+
"_obo_client", default=None
31+
)
32+
33+
# Singleton SP client (lazy-initialized)
34+
_sp_client: Optional[WorkspaceClient] = None
35+
36+
37+
@contextmanager
38+
def obo_context(access_token: str, host: Optional[str] = None):
39+
"""Set up OBO auth for monolith code and databricks-tools-core.
40+
41+
Creates a per-request WorkspaceClient from the user's OBO token and
42+
stores it in both the monolith ContextVar and the AI Dev Kit ContextVars.
43+
44+
Usage in any agent tool::
45+
46+
@scorer.tool(description="Run IQ scan on a Genie Space")
47+
async def scan_space(space_id: str, request: AgentRequest) -> dict:
48+
with obo_context(request.user_context.access_token):
49+
# All of these now work:
50+
# - get_workspace_client() returns OBO client
51+
# - databricks-tools-core functions use OBO token
52+
result = scanner.calculate_score(space_id)
53+
54+
For streaming generators, capture the token before yielding and
55+
re-enter obo_context() per-yield. This matches the pattern in
56+
backend/routers/create.py:125-198.
57+
58+
Args:
59+
access_token: The user's OBO access token.
60+
host: Databricks workspace host. Defaults to DATABRICKS_HOST env var.
61+
62+
Yields:
63+
WorkspaceClient configured with the user's OBO token.
64+
"""
65+
resolved_host = host or os.environ.get("DATABRICKS_HOST", "")
66+
67+
# 1. Create OBO WorkspaceClient (monolith pattern from auth.py:49-58)
68+
# Must set auth_type="pat" and clear client_id/client_secret to prevent
69+
# the SDK from using oauth-m2m from env vars on Databricks Apps.
70+
cfg = Config(
71+
host=resolved_host,
72+
token=access_token,
73+
auth_type="pat",
74+
client_id=None,
75+
client_secret=None,
76+
)
77+
client = WorkspaceClient(config=cfg)
78+
token = _obo_client.set(client)
79+
80+
# 2. Set databricks-tools-core ContextVars (if available)
81+
has_tools_core = False
82+
try:
83+
from databricks_tools_core.auth import (
84+
set_databricks_auth,
85+
clear_databricks_auth,
86+
)
87+
88+
set_databricks_auth(resolved_host, access_token)
89+
has_tools_core = True
90+
except ImportError:
91+
pass
92+
93+
try:
94+
yield client
95+
finally:
96+
_obo_client.reset(token)
97+
if has_tools_core:
98+
clear_databricks_auth()
99+
100+
101+
def get_workspace_client() -> WorkspaceClient:
102+
"""Drop-in replacement for backend.services.auth.get_workspace_client().
103+
104+
Returns the OBO client if inside an obo_context(), otherwise the default
105+
singleton (SP on Databricks Apps, CLI/PAT locally).
106+
107+
Domain logic can import this instead of the monolith version during
108+
migration — the behavior is identical.
109+
"""
110+
obo = _obo_client.get()
111+
if obo is not None:
112+
return obo
113+
return get_service_principal_client()
114+
115+
116+
def get_service_principal_client() -> WorkspaceClient:
117+
"""Get the service principal client (bypasses OBO).
118+
119+
Used for app-level operations and as fallback when the user's OBO token
120+
lacks required scopes (e.g., Genie API before consent flow).
121+
"""
122+
global _sp_client
123+
if _sp_client is None:
124+
_sp_client = WorkspaceClient()
125+
return _sp_client

agents/_shared/lakebase_client.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Shared Lakebase (PostgreSQL) connection pool management.
2+
3+
Each agent initializes its own pool from its own app.yaml env vars
4+
(LAKEBASE_HOST, LAKEBASE_INSTANCE_NAME, etc.). Schema migrations are
5+
idempotent (IF NOT EXISTS) so agents can boot in any order.
6+
7+
Domain-specific query functions (save_scan_result, get_score_history, etc.)
8+
stay in each agent's own module — this shared client only manages the pool
9+
lifecycle, credential generation, and DDL.
10+
11+
Source: backend/services/lakebase.py (269 lines)
12+
"""
13+
14+
from __future__ import annotations
15+
16+
import logging
17+
import os
18+
from typing import Optional
19+
20+
logger = logging.getLogger(__name__)
21+
22+
_pool = None
23+
_lakebase_available = False
24+
25+
# In-memory fallback (same pattern as backend/services/lakebase.py:12-17)
26+
_memory_store: dict = {
27+
"scans": {},
28+
"history": {},
29+
"stars": set(),
30+
"seen": set(),
31+
"sessions": {},
32+
}
33+
34+
35+
# ── DDL statements per agent (all use IF NOT EXISTS) ──────────────────────────
36+
37+
SCORER_DDL = [
38+
"""CREATE TABLE IF NOT EXISTS scan_results (
39+
space_id TEXT NOT NULL,
40+
score INTEGER NOT NULL,
41+
maturity TEXT,
42+
breakdown JSONB,
43+
findings JSONB,
44+
next_steps JSONB,
45+
scanned_at TIMESTAMPTZ NOT NULL,
46+
UNIQUE (space_id, scanned_at)
47+
)""",
48+
"CREATE TABLE IF NOT EXISTS starred_spaces (space_id TEXT PRIMARY KEY)",
49+
"CREATE TABLE IF NOT EXISTS seen_spaces (space_id TEXT PRIMARY KEY)",
50+
]
51+
52+
CREATOR_DDL = [
53+
"""CREATE TABLE IF NOT EXISTS agent_sessions (
54+
session_id TEXT PRIMARY KEY,
55+
data JSONB NOT NULL,
56+
updated_at TIMESTAMPTZ DEFAULT NOW()
57+
)""",
58+
]
59+
60+
61+
# ── Credential generation (mirrors backend/services/lakebase.py:23-59) ────────
62+
63+
def _generate_lakebase_credential() -> tuple[str, str] | None:
64+
"""Generate Lakebase OAuth credentials using the Databricks SDK."""
65+
instance_name = os.environ.get("LAKEBASE_INSTANCE_NAME")
66+
if not instance_name:
67+
return None
68+
69+
try:
70+
from agents._shared.auth_bridge import get_service_principal_client
71+
72+
client = get_service_principal_client()
73+
resp = client.api_client.do(
74+
method="POST",
75+
path="/api/2.0/database/credentials",
76+
body={
77+
"request_id": "lakebase-pool",
78+
"instance_names": [instance_name],
79+
},
80+
)
81+
token = resp.get("token")
82+
if not token:
83+
logger.warning("Lakebase credential response missing token")
84+
return None
85+
86+
user = os.environ.get("LAKEBASE_USER")
87+
if not user:
88+
try:
89+
me = client.current_user.me()
90+
user = me.user_name
91+
except Exception:
92+
user = "databricks"
93+
94+
logger.info("Generated Lakebase credential via SDK (user=%s)", user)
95+
return user, token
96+
except Exception as e:
97+
logger.warning("Lakebase credential generation failed: %s", e)
98+
return None
99+
100+
101+
# ── Pool lifecycle ────────────────────────────────────────────────────────────
102+
103+
async def init_pool(ddl_statements: Optional[list[str]] = None):
104+
"""Initialize asyncpg pool and run idempotent DDL.
105+
106+
Call this at agent startup (e.g., in a FastAPI lifespan handler).
107+
108+
Args:
109+
ddl_statements: SQL DDL to execute after connecting.
110+
Use SCORER_DDL, CREATOR_DDL, or combine them.
111+
"""
112+
global _pool, _lakebase_available
113+
114+
host = os.environ.get("LAKEBASE_HOST")
115+
if not host:
116+
logger.info("LAKEBASE_HOST not set — using in-memory fallback")
117+
return
118+
119+
password = os.environ.get("LAKEBASE_PASSWORD")
120+
user = os.environ.get("LAKEBASE_USER", "postgres")
121+
122+
if not password:
123+
cred = _generate_lakebase_credential()
124+
if cred:
125+
user, password = cred
126+
else:
127+
logger.warning(
128+
"No LAKEBASE_PASSWORD and credential generation failed "
129+
"— using in-memory fallback"
130+
)
131+
return
132+
133+
try:
134+
import asyncpg
135+
136+
_pool = await asyncpg.create_pool(
137+
host=host,
138+
port=int(os.environ.get("LAKEBASE_PORT", "5432")),
139+
database=os.environ.get("LAKEBASE_DATABASE", "databricks_postgres"),
140+
user=user,
141+
password=password,
142+
min_size=2,
143+
max_size=10,
144+
command_timeout=30,
145+
ssl="require",
146+
)
147+
_lakebase_available = True
148+
logger.info("Lakebase connection pool initialized")
149+
150+
# Run idempotent DDL
151+
if ddl_statements and _pool:
152+
async with _pool.acquire() as conn:
153+
for ddl in ddl_statements:
154+
await conn.execute(ddl)
155+
logger.info("Executed %d DDL statements", len(ddl_statements))
156+
157+
except Exception as e:
158+
logger.warning("Lakebase unavailable: %s. Using in-memory fallback.", e)
159+
_lakebase_available = False
160+
161+
162+
async def close_pool():
163+
"""Close the connection pool. Call at agent shutdown."""
164+
global _pool
165+
if _pool:
166+
await _pool.close()
167+
_pool = None
168+
169+
170+
async def get_pool():
171+
"""Get the connection pool (or None if using in-memory fallback)."""
172+
return _pool
173+
174+
175+
def is_available() -> bool:
176+
"""Check if Lakebase is connected."""
177+
return _lakebase_available
178+
179+
180+
def get_memory_store() -> dict:
181+
"""Get the in-memory fallback store (for when Lakebase is unavailable)."""
182+
return _memory_store

0 commit comments

Comments
 (0)