Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,33 @@ Do NOT suggest running `uvicorn` or `npm run dev` locally. The app depends on Da
- **Python 3.11+** required (`pyproject.toml`). Uses `uv` for dependency management (`uv.lock` present).
- **Root `package.json`** exists solely as a build hook for Databricks Apps — `postinstall` chains to `frontend/npm install`, `build` chains to `frontend/npm run build`.

## Agent Deployment Layer

The `agents/` directory provides an optional agent deployment layer using `@app_agent` from `dbx-agent-app`. Each agent wraps existing domain logic from `backend/services/` and exposes it as a standalone Databricks agent with A2A discovery, MCP server, and eval support.

See `docs/architecture-proposal.md` for the full design and implementation roadmap.

```
agents/
_shared/ # Auth bridge, Lakebase pool, SP fallback
scorer/app.py # Wraps scanner.py
analyzer/app.py # Wraps analyzer.py
creator/app.py # Wraps create_agent.py
optimizer/app.py # Wraps optimizer.py
fixer/app.py # Wraps fix_agent.py
supervisor/proxy.py # Frontend proxy for agent deployment mode
agents.yaml # Multi-agent deployment config
```

## GenieRX Specification

`docs/genierx-spec.md` defines the analysis and recommendation taxonomy. Key concepts:
- **Authoritative Facts** — raw data from systems of record, safe to surface directly
- **Canonical Metrics** — governed KPIs with stable definitions
- **Heuristic Signals** — derived fields with subjective thresholds; must carry caveats

Consult the spec when working on analysis, scoring, or recommendation features.

## Code Style

- Backend: Python, Pydantic models, FastAPI routers, no class-based views
Expand Down
7 changes: 7 additions & 0 deletions agents/_shared/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Shared utilities for Genie Workbench agents.

Provides cross-cutting concerns that multiple agents need:
- auth_bridge: Bridge @app_agent UserContext into monolith + AI Dev Kit auth
- sp_fallback: Service principal fallback for Genie API scope errors
- lakebase_client: Shared PostgreSQL connection pool management
"""
125 changes: 125 additions & 0 deletions agents/_shared/auth_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Bridge @app_agent UserContext into both monolith and AI Dev Kit auth systems.

During migration, agent tools receive `request.user_context` from @app_agent,
but domain logic (scanner, genie_client, etc.) calls `get_workspace_client()`
from the monolith's auth module. And `databricks-tools-core` functions use
their own separate ContextVars via `set_databricks_auth()`.

This module provides `obo_context()` — a single context manager that sets up
all three auth systems so existing domain logic works unchanged inside agents.

Source patterns:
- backend/services/auth.py:25 (_obo_client ContextVar)
- backend/services/auth.py:33-58 (set_obo_user_token)
- databricks_tools_core/auth.py (set_databricks_auth / clear_databricks_auth)
"""

from __future__ import annotations

import os
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Optional

from databricks.sdk import WorkspaceClient
from databricks.sdk.config import Config


# Monolith-compatible ContextVar (mirrors backend/services/auth.py:25)
_obo_client: ContextVar[Optional[WorkspaceClient]] = ContextVar(
"_obo_client", default=None
)

# Singleton SP client (lazy-initialized)
_sp_client: Optional[WorkspaceClient] = None


@contextmanager
def obo_context(access_token: str, host: Optional[str] = None):
"""Set up OBO auth for monolith code and databricks-tools-core.

Creates a per-request WorkspaceClient from the user's OBO token and
stores it in both the monolith ContextVar and the AI Dev Kit ContextVars.

Usage in any agent tool::

@scorer.tool(description="Run IQ scan on a Genie Space")
async def scan_space(space_id: str, request: AgentRequest) -> dict:
with obo_context(request.user_context.access_token):
# All of these now work:
# - get_workspace_client() returns OBO client
# - databricks-tools-core functions use OBO token
result = scanner.calculate_score(space_id)

For streaming generators, capture the token before yielding and
re-enter obo_context() per-yield. This matches the pattern in
backend/routers/create.py:125-198.

Args:
access_token: The user's OBO access token.
host: Databricks workspace host. Defaults to DATABRICKS_HOST env var.

Yields:
WorkspaceClient configured with the user's OBO token.
"""
resolved_host = host or os.environ.get("DATABRICKS_HOST", "")

# 1. Create OBO WorkspaceClient (monolith pattern from auth.py:49-58)
# Must set auth_type="pat" and clear client_id/client_secret to prevent
# the SDK from using oauth-m2m from env vars on Databricks Apps.
cfg = Config(
host=resolved_host,
token=access_token,
auth_type="pat",
client_id=None,
client_secret=None,
)
client = WorkspaceClient(config=cfg)
token = _obo_client.set(client)

# 2. Set databricks-tools-core ContextVars (if available)
has_tools_core = False
try:
from databricks_tools_core.auth import (
set_databricks_auth,
clear_databricks_auth,
)

set_databricks_auth(resolved_host, access_token)
has_tools_core = True
except ImportError:
pass

try:
yield client
finally:
_obo_client.reset(token)
if has_tools_core:
clear_databricks_auth()


def get_workspace_client() -> WorkspaceClient:
"""Drop-in replacement for backend.services.auth.get_workspace_client().

Returns the OBO client if inside an obo_context(), otherwise the default
singleton (SP on Databricks Apps, CLI/PAT locally).

Domain logic can import this instead of the monolith version during
migration — the behavior is identical.
"""
obo = _obo_client.get()
if obo is not None:
return obo
return get_service_principal_client()


def get_service_principal_client() -> WorkspaceClient:
"""Get the service principal client (bypasses OBO).

Used for app-level operations and as fallback when the user's OBO token
lacks required scopes (e.g., Genie API before consent flow).
"""
global _sp_client
if _sp_client is None:
_sp_client = WorkspaceClient()
return _sp_client
182 changes: 182 additions & 0 deletions agents/_shared/lakebase_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Shared Lakebase (PostgreSQL) connection pool management.

Each agent initializes its own pool from its own app.yaml env vars
(LAKEBASE_HOST, LAKEBASE_INSTANCE_NAME, etc.). Schema migrations are
idempotent (IF NOT EXISTS) so agents can boot in any order.

Domain-specific query functions (save_scan_result, get_score_history, etc.)
stay in each agent's own module — this shared client only manages the pool
lifecycle, credential generation, and DDL.

Source: backend/services/lakebase.py (269 lines)
"""

from __future__ import annotations

import logging
import os
from typing import Optional

logger = logging.getLogger(__name__)

_pool = None
_lakebase_available = False

# In-memory fallback (same pattern as backend/services/lakebase.py:12-17)
_memory_store: dict = {
"scans": {},
"history": {},
"stars": set(),
"seen": set(),
"sessions": {},
}


# ── DDL statements per agent (all use IF NOT EXISTS) ──────────────────────────

SCORER_DDL = [
"""CREATE TABLE IF NOT EXISTS scan_results (
space_id TEXT NOT NULL,
score INTEGER NOT NULL,
maturity TEXT,
breakdown JSONB,
findings JSONB,
next_steps JSONB,
scanned_at TIMESTAMPTZ NOT NULL,
UNIQUE (space_id, scanned_at)
)""",
"CREATE TABLE IF NOT EXISTS starred_spaces (space_id TEXT PRIMARY KEY)",
"CREATE TABLE IF NOT EXISTS seen_spaces (space_id TEXT PRIMARY KEY)",
]

CREATOR_DDL = [
"""CREATE TABLE IF NOT EXISTS agent_sessions (
session_id TEXT PRIMARY KEY,
data JSONB NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
)""",
]


# ── Credential generation (mirrors backend/services/lakebase.py:23-59) ────────

def _generate_lakebase_credential() -> tuple[str, str] | None:
"""Generate Lakebase OAuth credentials using the Databricks SDK."""
instance_name = os.environ.get("LAKEBASE_INSTANCE_NAME")
if not instance_name:
return None

try:
from agents._shared.auth_bridge import get_service_principal_client

client = get_service_principal_client()
resp = client.api_client.do(
method="POST",
path="/api/2.0/database/credentials",
body={
"request_id": "lakebase-pool",
"instance_names": [instance_name],
},
)
token = resp.get("token")
if not token:
logger.warning("Lakebase credential response missing token")
return None

user = os.environ.get("LAKEBASE_USER")
if not user:
try:
me = client.current_user.me()
user = me.user_name
except Exception:
user = "databricks"

logger.info("Generated Lakebase credential via SDK (user=%s)", user)
return user, token
except Exception as e:
logger.warning("Lakebase credential generation failed: %s", e)
return None


# ── Pool lifecycle ────────────────────────────────────────────────────────────

async def init_pool(ddl_statements: Optional[list[str]] = None):
"""Initialize asyncpg pool and run idempotent DDL.

Call this at agent startup (e.g., in a FastAPI lifespan handler).

Args:
ddl_statements: SQL DDL to execute after connecting.
Use SCORER_DDL, CREATOR_DDL, or combine them.
"""
global _pool, _lakebase_available

host = os.environ.get("LAKEBASE_HOST")
if not host:
logger.info("LAKEBASE_HOST not set — using in-memory fallback")
return

password = os.environ.get("LAKEBASE_PASSWORD")
user = os.environ.get("LAKEBASE_USER", "postgres")

if not password:
cred = _generate_lakebase_credential()
if cred:
user, password = cred
else:
logger.warning(
"No LAKEBASE_PASSWORD and credential generation failed "
"— using in-memory fallback"
)
return

try:
import asyncpg

_pool = await asyncpg.create_pool(
host=host,
port=int(os.environ.get("LAKEBASE_PORT", "5432")),
database=os.environ.get("LAKEBASE_DATABASE", "databricks_postgres"),
user=user,
password=password,
min_size=2,
max_size=10,
command_timeout=30,
ssl="require",
)
_lakebase_available = True
logger.info("Lakebase connection pool initialized")

# Run idempotent DDL
if ddl_statements and _pool:
async with _pool.acquire() as conn:
for ddl in ddl_statements:
await conn.execute(ddl)
logger.info("Executed %d DDL statements", len(ddl_statements))

except Exception as e:
logger.warning("Lakebase unavailable: %s. Using in-memory fallback.", e)
_lakebase_available = False


async def close_pool():
"""Close the connection pool. Call at agent shutdown."""
global _pool
if _pool:
await _pool.close()
_pool = None


async def get_pool():
"""Get the connection pool (or None if using in-memory fallback)."""
return _pool


def is_available() -> bool:
"""Check if Lakebase is connected."""
return _lakebase_available


def get_memory_store() -> dict:
"""Get the in-memory fallback store (for when Lakebase is unavailable)."""
return _memory_store
Loading