diff --git a/CLAUDE.md b/CLAUDE.md index 8630b30..35cd56e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,132 +1,41 @@ # Genie Workbench -Databricks App for creating, scoring, and optimizing Genie Spaces. FastAPI backend + React/Vite frontend deployed together on Databricks Apps. +## Project Overview -## Commands +Genie Workbench is a Databricks App that acts as a quality control and optimization platform for Genie Space administrators. It helps builders understand why their Genie Space isn't performing well and fix it. -```bash -# Backend (from project root) -uv pip install -e . # Install Python deps -uvicorn backend.main:app --host 0.0.0.0 --port 8000 --reload # Dev server - -# Frontend (from frontend/) -cd frontend && npm install && npm run build # Build for production -cd frontend && npm run dev # Vite dev server (port 5173, proxies /api to :8000) -cd frontend && npm run lint # ESLint - -# Full build (what Databricks Apps runs) -npm install # Triggers postinstall -> cd frontend && npm install -npm run build # Triggers cd frontend && npm run build - -# Deploy -databricks sync --watch . /Workspace/Users//genie-workbench -databricks apps deploy --source-code-path /Workspace/Users//genie-workbench - -# Tests (require running backend at localhost:8000) -python tests/test_e2e_local.py # E2E create agent tests -python tests/test_full_schema.py # Schema validation -# Deployed E2E tests require: pip install playwright && playwright install chromium -python tests/test_e2e_deployed.py -``` - -## Architecture - -``` -backend/ - main.py # FastAPI app entry point, OBO middleware, static file serving - models.py # All Pydantic models (shared between routers/services) - routers/ - analysis.py # /api/space/*, /api/analyze/*, /api/optimize, /api/genie/*, /api/sql/* - spaces.py # /api/spaces/* (list, scan, history, star, fix) - admin.py # /api/admin/* (dashboard, leaderboard, alerts) - auth.py # /api/auth/me - create.py # /api/create/* (agent chat, UC discovery, wizard) - services/ - auth.py # OBO auth (ContextVar), SP fallback, WorkspaceClient mgmt - genie_client.py # Databricks Genie API (fetch space, list spaces, query for SQL) - scanner.py # Rule-based IQ scoring engine (0-100, 4 dimensions) - analyzer.py # LLM-based deep analysis against best-practices checklist - optimizer.py # LLM-based optimization from benchmark feedback - fix_agent.py # LLM agent that generates JSON patches and applies via Genie API - create_agent.py # Multi-turn LLM agent for creating new Genie Spaces - create_agent_session.py # Session persistence for create agent (Lakebase) - create_agent_tools.py # Tool definitions for create agent (UC discovery, SQL, etc.) - lakebase.py # PostgreSQL persistence (asyncpg pool, in-memory fallback) - llm_utils.py # OpenAI-compatible LLM client via Databricks serving endpoints - uc_client.py # Unity Catalog browsing (catalogs, schemas, tables) - prompts/ # Prompt templates for analysis - prompts_create/ # Prompt templates for create agent (multi-file, modular) - references/schema.md # Genie Space JSON schema reference -frontend/ - src/ - App.tsx # Root: SpaceList | SpaceDetail | AdminDashboard | CreateAgentChat - lib/api.ts # All API calls (fetch, SSE streaming helpers) - types/index.ts # TypeScript types mirroring backend Pydantic models - components/ # UI components (analysis, optimization, fix agent, etc.) - pages/ # SpaceList, SpaceDetail, AdminDashboard, HistoryTab, IQScoreTab - hooks/ # useAnalysis, useTheme - vite.config.ts # Vite config with /api proxy to localhost:8000 -``` - -## Key Patterns - -### Authentication (OBO) -On Databricks Apps, user identity flows via `x-forwarded-access-token` header. `OBOAuthMiddleware` in `main.py` stores the token in a `ContextVar`. All services call `get_workspace_client()` which returns the OBO client if set, otherwise the SP singleton. Some Genie API calls require SP auth (missing `genie` OAuth scope) — see `_is_scope_error()` fallback in `genie_client.py`. - -### SSE Streaming -Multiple endpoints use `StreamingResponse` with `text/event-stream`: -- `/api/analyze/stream` — analysis progress -- `/api/optimize` — optimization with heartbeat keepalives (15s) -- `/api/spaces/{id}/fix` — fix agent patches -- `/api/create/agent/chat` — multi-turn agent with typed events (session, step, thinking, tool_call, tool_result, message_delta, message, created, error, done) - -Frontend consumes these via manual `fetch` + `ReadableStream` in `lib/api.ts` (not EventSource). Buffer splitting on `\n\n`. +- **Backend:** Python (FastAPI), deployed as a Databricks App +- **Frontend:** React/TypeScript (Vite) +- **Storage:** Lakebase (with in-memory fallback for local dev) +- **Tracing:** Optional MLflow integration -### Lakebase Persistence -`services/lakebase.py` uses asyncpg with graceful fallback to in-memory dicts when `LAKEBASE_HOST` is not set. Credentials auto-generated via Databricks SDK (`/api/2.0/database/credentials`). Schema defined in `sql/setup_lakebase.sql`. +## GenieRX Specification -### LLM Calls -All LLM calls go through Databricks model serving endpoints using OpenAI-compatible API. Model configured via `LLM_MODEL` env var (default: `databricks-claude-sonnet-4-6`). MLflow tracing is optional — controlled by `MLFLOW_EXPERIMENT_ID`. +The GenieRX spec (`docs/genierx-spec.md`) defines the core analysis and recommendation framework used throughout this project. **Always consult it when working on analysis, scoring, or recommendation features.** -## Environment Variables +Key concepts from the spec: -Defined in `app.yaml`. Key ones: -- `SQL_WAREHOUSE_ID` — from app resource `sql-warehouse` -- `LLM_MODEL` — serving endpoint name -- `LAKEBASE_HOST`, `LAKEBASE_PORT`, `LAKEBASE_DATABASE`, `LAKEBASE_INSTANCE_NAME` — Lakebase config -- `MLFLOW_EXPERIMENT_ID` — enables MLflow tracing (validated at startup, cleared if invalid) -- `GENIE_TARGET_DIRECTORY` — where new spaces are created (default `/Shared/`) -- `DEV_USER_EMAIL` — local dev only +- **Authoritative Facts** — raw data from systems of record, safe to surface directly +- **Canonical Metrics** — governed KPIs with stable definitions and cross-team agreement +- **Heuristic Signals** — derived fields with subjective thresholds; must always carry caveats -Local dev uses `.env.local` (loaded first with override) then `.env`. +When implementing or modifying any analyzer, scorer, or recommender logic, ensure field classifications align with this taxonomy. Heuristic signals must never be presented as authoritative facts in Genie answers. -## Dev/Test Workflow +## Key Documentation -There is no local dev server — all testing is done by syncing code to Databricks and redeploying: +- `docs/genierx-spec.md` — GenieRX analyzer/recommender specification +- `docs/genie-space-schema.md` — Genie space schema reference +- `docs/checklist-by-schema.md` — Analysis checklist organized by schema section +- `CUJ.md` — Core user journeys and product analysis -1. Edit code locally -2. `databricks sync --watch . /Workspace/Users//genie-workbench` picks up changes automatically -3. Re-run `databricks apps deploy --source-code-path /Workspace/Users//genie-workbench` to trigger a new deployment -4. Test in the deployed Databricks App +## Development -Do NOT suggest running `uvicorn` or `npm run dev` locally. The app depends on Databricks-managed resources (OBO auth, Lakebase, serving endpoints) that aren't available outside a Databricks App environment. - -## Gotchas - -- **frontend/dist/ is gitignored but NOT databricksignored** — the built React app must be synced to workspace for deployment. Build before `databricks sync`. -- **`.databricksignore` excludes `*.md`** but explicitly includes `backend/references/schema.md` (needed at runtime by the analyzer). -- **OBO ContextVar and streaming** — for SSE endpoints, the ContextVar is NOT cleared after `call_next` because the response streams lazily. Streaming handlers stash the token on `request.state` and re-set it inside the generator. -- **Two separate "analysis" paths** — IQ Scan (`scanner.py`, rule-based, instant) and Deep Analysis (`analyzer.py`, LLM-based, streaming). They produce different outputs and don't cross-reference. -- **Two separate "fix" paths** — Fix Agent (from scan findings, auto-applies patches) and Optimize flow (from benchmark labeling, produces suggestions for a new space). They're independent. -- **Vite proxy** — dev frontend at :5173 proxies `/api` to :8000. In production, FastAPI serves static files from `frontend/dist/` directly. -- **Python 3.11+** required (`pyproject.toml`). Uses `uv` for dependency management (`uv.lock` present). -- **Root `package.json`** exists solely as a build hook for Databricks Apps — `postinstall` chains to `frontend/npm install`, `build` chains to `frontend/npm run build`. +```bash +# Backend (from repo root) +uv run start-server -## Code Style +# Frontend +cd frontend && npm run dev +``` -- Backend: Python, Pydantic models, FastAPI routers, no class-based views -- Frontend: React 19 + TypeScript + Tailwind CSS v4 + Vite 7, functional components only -- UI primitives in `frontend/src/components/ui/` (button, card, badge, etc.) using `class-variance-authority` -- Path alias `@` maps to `frontend/src/` (configured in `vite.config.ts` and `tsconfig.app.json`) -- All API routes prefixed with `/api` -- Pydantic models in `backend/models.py`, TypeScript mirrors in `frontend/src/types/index.ts` — keep in sync +Frontend runs at `localhost:5173`, proxies API calls to backend at `localhost:8000`. diff --git a/agents.yaml b/agents.yaml new file mode 100644 index 0000000..5eecdbc --- /dev/null +++ b/agents.yaml @@ -0,0 +1,49 @@ +# Multi-agent deployment configuration for Genie Workbench. +# +# Deploy all agents: +# dbx-agent-app deploy --config agents.yaml +# +# Deploy a single agent: +# dbx-agent-app deploy --config agents.yaml --agent scorer +# +# Each agent is a standalone Databricks App that exposes: +# - /invocations (Responses Agent protocol) +# - /.well-known/agent.json (A2A discovery) +# - /health (liveness probe) +# - MCP server (tool integration) + +project: + name: genie-workbench + workspace_path: /Workspace/Shared/apps + +agents: + - name: scorer + source: ./agents/scorer + description: "IQ scoring for Genie Spaces — scan, history, stars" + + - name: analyzer + source: ./agents/analyzer + description: "LLM-powered deep analysis of Genie Space configurations" + + - name: creator + source: ./agents/creator + description: "Conversational wizard for building new Genie Spaces" + + - name: optimizer + source: ./agents/optimizer + description: "Optimization suggestions from benchmark labeling feedback" + + - name: fixer + source: ./agents/fixer + description: "AI fix agent — generates and applies config patches" + + - name: supervisor + source: . + description: "Supervisor — serves React SPA and routes to sub-agents" + depends_on: [scorer, analyzer, creator, optimizer, fixer] + url_env_map: + scorer: SCORER_URL + analyzer: ANALYZER_URL + creator: CREATOR_URL + optimizer: OPTIMIZER_URL + fixer: FIXER_URL diff --git a/agents/_shared/__init__.py b/agents/_shared/__init__.py new file mode 100644 index 0000000..04942cf --- /dev/null +++ b/agents/_shared/__init__.py @@ -0,0 +1,7 @@ +"""Shared utilities for Genie Workbench agents. + +Provides cross-cutting concerns that multiple agents need: +- auth_bridge: Bridge @app_agent UserContext into monolith + AI Dev Kit auth +- sp_fallback: Service principal fallback for Genie API scope errors +- lakebase_client: Shared PostgreSQL connection pool management +""" diff --git a/agents/_shared/auth_bridge.py b/agents/_shared/auth_bridge.py new file mode 100644 index 0000000..e6a8071 --- /dev/null +++ b/agents/_shared/auth_bridge.py @@ -0,0 +1,125 @@ +"""Bridge @app_agent UserContext into both monolith and AI Dev Kit auth systems. + +During migration, agent tools receive `request.user_context` from @app_agent, +but domain logic (scanner, genie_client, etc.) calls `get_workspace_client()` +from the monolith's auth module. And `databricks-tools-core` functions use +their own separate ContextVars via `set_databricks_auth()`. + +This module provides `obo_context()` — a single context manager that sets up +all three auth systems so existing domain logic works unchanged inside agents. + +Source patterns: + - backend/services/auth.py:25 (_obo_client ContextVar) + - backend/services/auth.py:33-58 (set_obo_user_token) + - databricks_tools_core/auth.py (set_databricks_auth / clear_databricks_auth) +""" + +from __future__ import annotations + +import os +from contextlib import contextmanager +from contextvars import ContextVar +from typing import Optional + +from databricks.sdk import WorkspaceClient +from databricks.sdk.config import Config + + +# Monolith-compatible ContextVar (mirrors backend/services/auth.py:25) +_obo_client: ContextVar[Optional[WorkspaceClient]] = ContextVar( + "_obo_client", default=None +) + +# Singleton SP client (lazy-initialized) +_sp_client: Optional[WorkspaceClient] = None + + +@contextmanager +def obo_context(access_token: str, host: Optional[str] = None): + """Set up OBO auth for monolith code and databricks-tools-core. + + Creates a per-request WorkspaceClient from the user's OBO token and + stores it in both the monolith ContextVar and the AI Dev Kit ContextVars. + + Usage in any agent tool:: + + @scorer.tool(description="Run IQ scan on a Genie Space") + async def scan_space(space_id: str, request: AgentRequest) -> dict: + with obo_context(request.user_context.access_token): + # All of these now work: + # - get_workspace_client() returns OBO client + # - databricks-tools-core functions use OBO token + result = scanner.calculate_score(space_id) + + For streaming generators, capture the token before yielding and + re-enter obo_context() per-yield. This matches the pattern in + backend/routers/create.py:125-198. + + Args: + access_token: The user's OBO access token. + host: Databricks workspace host. Defaults to DATABRICKS_HOST env var. + + Yields: + WorkspaceClient configured with the user's OBO token. + """ + resolved_host = host or os.environ.get("DATABRICKS_HOST", "") + + # 1. Create OBO WorkspaceClient (monolith pattern from auth.py:49-58) + # Must set auth_type="pat" and clear client_id/client_secret to prevent + # the SDK from using oauth-m2m from env vars on Databricks Apps. + cfg = Config( + host=resolved_host, + token=access_token, + auth_type="pat", + client_id=None, + client_secret=None, + ) + client = WorkspaceClient(config=cfg) + token = _obo_client.set(client) + + # 2. Set databricks-tools-core ContextVars (if available) + has_tools_core = False + try: + from databricks_tools_core.auth import ( + set_databricks_auth, + clear_databricks_auth, + ) + + set_databricks_auth(resolved_host, access_token) + has_tools_core = True + except ImportError: + pass + + try: + yield client + finally: + _obo_client.reset(token) + if has_tools_core: + clear_databricks_auth() + + +def get_workspace_client() -> WorkspaceClient: + """Drop-in replacement for backend.services.auth.get_workspace_client(). + + Returns the OBO client if inside an obo_context(), otherwise the default + singleton (SP on Databricks Apps, CLI/PAT locally). + + Domain logic can import this instead of the monolith version during + migration — the behavior is identical. + """ + obo = _obo_client.get() + if obo is not None: + return obo + return get_service_principal_client() + + +def get_service_principal_client() -> WorkspaceClient: + """Get the service principal client (bypasses OBO). + + Used for app-level operations and as fallback when the user's OBO token + lacks required scopes (e.g., Genie API before consent flow). + """ + global _sp_client + if _sp_client is None: + _sp_client = WorkspaceClient() + return _sp_client diff --git a/agents/_shared/lakebase_client.py b/agents/_shared/lakebase_client.py new file mode 100644 index 0000000..e850585 --- /dev/null +++ b/agents/_shared/lakebase_client.py @@ -0,0 +1,182 @@ +"""Shared Lakebase (PostgreSQL) connection pool management. + +Each agent initializes its own pool from its own app.yaml env vars +(LAKEBASE_HOST, LAKEBASE_INSTANCE_NAME, etc.). Schema migrations are +idempotent (IF NOT EXISTS) so agents can boot in any order. + +Domain-specific query functions (save_scan_result, get_score_history, etc.) +stay in each agent's own module — this shared client only manages the pool +lifecycle, credential generation, and DDL. + +Source: backend/services/lakebase.py (269 lines) +""" + +from __future__ import annotations + +import logging +import os +from typing import Optional + +logger = logging.getLogger(__name__) + +_pool = None +_lakebase_available = False + +# In-memory fallback (same pattern as backend/services/lakebase.py:12-17) +_memory_store: dict = { + "scans": {}, + "history": {}, + "stars": set(), + "seen": set(), + "sessions": {}, +} + + +# ── DDL statements per agent (all use IF NOT EXISTS) ────────────────────────── + +SCORER_DDL = [ + """CREATE TABLE IF NOT EXISTS scan_results ( + space_id TEXT NOT NULL, + score INTEGER NOT NULL, + maturity TEXT, + breakdown JSONB, + findings JSONB, + next_steps JSONB, + scanned_at TIMESTAMPTZ NOT NULL, + UNIQUE (space_id, scanned_at) + )""", + "CREATE TABLE IF NOT EXISTS starred_spaces (space_id TEXT PRIMARY KEY)", + "CREATE TABLE IF NOT EXISTS seen_spaces (space_id TEXT PRIMARY KEY)", +] + +CREATOR_DDL = [ + """CREATE TABLE IF NOT EXISTS agent_sessions ( + session_id TEXT PRIMARY KEY, + data JSONB NOT NULL, + updated_at TIMESTAMPTZ DEFAULT NOW() + )""", +] + + +# ── Credential generation (mirrors backend/services/lakebase.py:23-59) ──────── + +def _generate_lakebase_credential() -> tuple[str, str] | None: + """Generate Lakebase OAuth credentials using the Databricks SDK.""" + instance_name = os.environ.get("LAKEBASE_INSTANCE_NAME") + if not instance_name: + return None + + try: + from agents._shared.auth_bridge import get_service_principal_client + + client = get_service_principal_client() + resp = client.api_client.do( + method="POST", + path="/api/2.0/database/credentials", + body={ + "request_id": "lakebase-pool", + "instance_names": [instance_name], + }, + ) + token = resp.get("token") + if not token: + logger.warning("Lakebase credential response missing token") + return None + + user = os.environ.get("LAKEBASE_USER") + if not user: + try: + me = client.current_user.me() + user = me.user_name + except Exception: + user = "databricks" + + logger.info("Generated Lakebase credential via SDK (user=%s)", user) + return user, token + except Exception as e: + logger.warning("Lakebase credential generation failed: %s", e) + return None + + +# ── Pool lifecycle ──────────────────────────────────────────────────────────── + +async def init_pool(ddl_statements: Optional[list[str]] = None): + """Initialize asyncpg pool and run idempotent DDL. + + Call this at agent startup (e.g., in a FastAPI lifespan handler). + + Args: + ddl_statements: SQL DDL to execute after connecting. + Use SCORER_DDL, CREATOR_DDL, or combine them. + """ + global _pool, _lakebase_available + + host = os.environ.get("LAKEBASE_HOST") + if not host: + logger.info("LAKEBASE_HOST not set — using in-memory fallback") + return + + password = os.environ.get("LAKEBASE_PASSWORD") + user = os.environ.get("LAKEBASE_USER", "postgres") + + if not password: + cred = _generate_lakebase_credential() + if cred: + user, password = cred + else: + logger.warning( + "No LAKEBASE_PASSWORD and credential generation failed " + "— using in-memory fallback" + ) + return + + try: + import asyncpg + + _pool = await asyncpg.create_pool( + host=host, + port=int(os.environ.get("LAKEBASE_PORT", "5432")), + database=os.environ.get("LAKEBASE_DATABASE", "databricks_postgres"), + user=user, + password=password, + min_size=2, + max_size=10, + command_timeout=30, + ssl="require", + ) + _lakebase_available = True + logger.info("Lakebase connection pool initialized") + + # Run idempotent DDL + if ddl_statements and _pool: + async with _pool.acquire() as conn: + for ddl in ddl_statements: + await conn.execute(ddl) + logger.info("Executed %d DDL statements", len(ddl_statements)) + + except Exception as e: + logger.warning("Lakebase unavailable: %s. Using in-memory fallback.", e) + _lakebase_available = False + + +async def close_pool(): + """Close the connection pool. Call at agent shutdown.""" + global _pool + if _pool: + await _pool.close() + _pool = None + + +async def get_pool(): + """Get the connection pool (or None if using in-memory fallback).""" + return _pool + + +def is_available() -> bool: + """Check if Lakebase is connected.""" + return _lakebase_available + + +def get_memory_store() -> dict: + """Get the in-memory fallback store (for when Lakebase is unavailable).""" + return _memory_store diff --git a/agents/_shared/sp_fallback.py b/agents/_shared/sp_fallback.py new file mode 100644 index 0000000..b0bcfad --- /dev/null +++ b/agents/_shared/sp_fallback.py @@ -0,0 +1,93 @@ +"""Service Principal fallback for Genie API scope errors. + +When OBO tokens lack the 'genie' scope (before the user consent flow is +triggered), the Genie API returns scope errors. This module extracts the +retry-with-SP pattern from the monolith into a reusable decorator and +convenience function. + +Source pattern: backend/services/genie_client.py:22-68 +""" + +from __future__ import annotations + +import functools +import logging +from typing import Callable, TypeVar + +from agents._shared.auth_bridge import ( + get_workspace_client, + get_service_principal_client, +) + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +def _is_scope_error(e: Exception) -> bool: + """Check if exception is a missing OAuth scope error. + + Matches the same check in backend/services/genie_client.py:22-25. + """ + msg = str(e).lower() + return "scope" in msg or "insufficient_scope" in msg + + +def with_sp_fallback(func: Callable[..., T]) -> Callable[..., T]: + """Decorator: retry with SP client if OBO token lacks Genie scope. + + The decorated function must accept a ``client`` keyword argument + (a WorkspaceClient). On scope error, the function is retried with + the service principal client. + + Usage:: + + @with_sp_fallback + def get_genie_space(space_id: str, *, client=None): + client = client or get_workspace_client() + return client.api_client.do( + "GET", f"/api/2.0/genie/spaces/{space_id}" + ) + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + if _is_scope_error(e): + logger.info( + "%s: OBO scope error, retrying with service principal", + func.__name__, + ) + kwargs["client"] = get_service_principal_client() + return func(*args, **kwargs) + raise + + return wrapper + + +def genie_api_call(method: str, path: str, **kwargs): + """Make a Genie API call with automatic SP fallback. + + Convenience function for simple one-off API calls that don't need + the full decorator pattern. + + Args: + method: HTTP method (GET, POST, etc.) + path: API path (e.g., "/api/2.0/genie/spaces/{id}") + **kwargs: Forwarded to ``client.api_client.do()``. + + Returns: + API response dict. + """ + client = get_workspace_client() + try: + return client.api_client.do(method=method, path=path, **kwargs) + except Exception as e: + if _is_scope_error(e): + logger.info("Genie API %s: scope error, retrying with SP", path) + sp = get_service_principal_client() + if sp is not client: + return sp.api_client.do(method=method, path=path, **kwargs) + raise diff --git a/agents/analyzer/app.py b/agents/analyzer/app.py new file mode 100644 index 0000000..9ffb5f3 --- /dev/null +++ b/agents/analyzer/app.py @@ -0,0 +1,111 @@ +"""genie-analyzer — LLM-powered deep analysis of Genie Space configurations. + +Extracted from: + - backend/routers/analysis.py (fetch, analyze, stream, query, SQL endpoints) + - backend/services/analyzer.py (GenieSpaceAnalyzer, section analysis, synthesis) + - backend/synthesizer.py (cross-section synthesis) + - backend/services/genie_client.py (space fetching) + +Streaming: Yes (SSE for multi-section analysis) +LLM: Yes (section analysis + synthesis) +""" + +from __future__ import annotations + +from dbx_agent_app import AgentRequest, AgentResponse, app_agent + + +@app_agent( + name="genie-analyzer", + description=( + "Deep LLM-powered analysis of Genie Space configurations. Evaluates " + "each section (tables, instructions, SQL snippets, etc.) against a " + "checklist, synthesizes findings, and provides actionable recommendations." + ), +) +async def analyzer(request: AgentRequest) -> AgentResponse: + """Route incoming agent requests to analysis tools.""" + # TODO: Parse intent from request.messages and dispatch to tools + ... + + +# ── Tools ──────────────────────────────────────────────────────────────────── + + +@analyzer.tool( + description=( + "Fetch and parse a Genie Space by ID. Returns the space data " + "and list of sections with their data." + ), +) +async def fetch_space(genie_space_id: str) -> dict: + """Source: backend/routers/analysis.py::fetch_space""" + # Uses genie_client.get_serialized_space (moved as-is) + raise NotImplementedError("Phase 4: move genie_client + analyzer here") + + +@analyzer.tool( + description=( + "Analyze a single section of a Genie Space configuration. " + "Returns findings, score, and recommendations for that section." + ), +) +async def analyze_section( + section_name: str, + section_data: dict | list | None, + full_space: dict, +) -> dict: + """Source: backend/services/analyzer.py::GenieSpaceAnalyzer.analyze_section""" + raise NotImplementedError("Phase 4: move analyzer.py here") + + +@analyzer.tool( + description=( + "Analyze all sections with cross-sectional synthesis. Returns " + "section analyses plus a synthesis result for full analysis." + ), +) +async def analyze_all( + sections: list[dict], + full_space: dict, +) -> dict: + """Source: backend/routers/analysis.py::analyze_all_sections""" + raise NotImplementedError("Phase 4: move analyzer.py + synthesizer.py here") + + +@analyzer.tool( + description=( + "Query a Genie Space with a natural language question. " + "Returns the generated SQL if successful." + ), +) +async def query_genie(genie_space_id: str, question: str) -> dict: + """Source: backend/services/genie_client.py::query_genie_for_sql""" + raise NotImplementedError("Phase 4: move genie_client.query_genie_for_sql here") + + +@analyzer.tool( + description=( + "Execute a read-only SQL query on a Databricks SQL Warehouse. " + "Returns tabular results limited to 1000 rows." + ), +) +async def execute_sql(sql: str, warehouse_id: str | None = None) -> dict: + """Source: backend/sql_executor.py::execute_sql + + Phase 8: Replace with databricks_tools_core.sql.execute_sql + """ + raise NotImplementedError("Phase 4: move sql_executor.py (then Phase 8: replace with AI Dev Kit)") + + +@analyzer.tool( + description="Parse pasted Genie Space JSON from the API response.", +) +async def parse_space_json(json_content: str) -> dict: + """Source: backend/routers/analysis.py::parse_space_json""" + raise NotImplementedError("Phase 4: move parse logic here") + + +# ── Standalone entry point ─────────────────────────────────────────────────── + +app = analyzer.app diff --git a/agents/analyzer/app.yaml b/agents/analyzer/app.yaml new file mode 100644 index 0000000..b3c8e76 --- /dev/null +++ b/agents/analyzer/app.yaml @@ -0,0 +1,27 @@ +# Databricks Apps configuration for genie-analyzer agent. +# +# LLM-powered deep analysis — needs LLM endpoint + SQL warehouse. +# No Lakebase dependency (stateless analysis). + +command: + - "uvicorn" + - "app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "8000" + +env: + # LLM serving endpoint + - name: LLM_MODEL + value: "databricks-claude-sonnet-4-6" + + # SQL Warehouse for execute_sql tool + - name: SQL_WAREHOUSE_ID + valueFrom: sql-warehouse + + # MLflow Tracing + - name: MLFLOW_TRACKING_URI + value: "databricks" + - name: MLFLOW_REGISTRY_URI + value: "databricks-uc" diff --git a/agents/creator/app.py b/agents/creator/app.py new file mode 100644 index 0000000..966e493 --- /dev/null +++ b/agents/creator/app.py @@ -0,0 +1,213 @@ +"""genie-creator — Conversational wizard for building new Genie Spaces. + +Extracted from: + - backend/routers/create.py (UC discovery, validation, agent chat, sessions) + - backend/services/create_agent.py (CreateGenieAgent tool-calling loop) + - backend/services/create_agent_tools.py (16 tool definitions + implementations) + - backend/services/create_agent_session.py (two-tier session persistence) + - backend/services/uc_client.py (UC browsing — replaced by AI Dev Kit) + - backend/prompts_create/ (dynamic prompt assembly, 9 modules) + - backend/references/ (schema.md reference) + - backend/genie_creator.py (Genie API write operations) + +This is the MOST COMPLEX agent (Phase 6 extraction). The tool-calling loop, +message compaction, session persistence, and dynamic prompting are all +irreplaceable domain logic that moves as-is. + +What gets replaced: + - 580 lines of JSON tool schemas → auto-generated from @creator.tool() signatures + - 40-line handle_tool_call() dispatcher → auto-routing + - uc_client.py (60 lines) → databricks_tools_core.unity_catalog + - sql_executor.py (220 lines) → databricks_tools_core.sql + +Streaming: Yes (SSE for agent chat) +LLM: Yes (tool-calling loop with Claude) +""" + +from __future__ import annotations + +from dbx_agent_app import AgentRequest, AgentResponse, app_agent + +from agents.creator.schemas import GenerateConfigArgs + + +@app_agent( + name="genie-creator", + description=( + "Conversational wizard for building new Genie Spaces. Guides users " + "through requirements gathering, data source discovery, table " + "inspection, plan presentation, config generation, and space creation." + ), +) +async def creator(request: AgentRequest) -> AgentResponse: + """Route incoming agent requests to the creator workflow. + + The core tool-calling loop (CreateGenieAgent.chat) moves here as-is. + It handles: step detection, LLM streaming, tool dispatch, message + compaction, JSON repair, and session management. + + Source: backend/services/create_agent.py::CreateGenieAgent.chat + """ + # TODO: Phase 6 — move CreateGenieAgent.chat here + ... + + +# ── UC Discovery Tools ────────────────────────────────────────────────────── +# Phase 8: Replace implementations with databricks_tools_core + + +@creator.tool(description="List all Unity Catalog catalogs the user has access to.") +async def discover_catalogs() -> dict: + """Source: backend/services/uc_client.py::list_catalogs + + Phase 8: from databricks_tools_core.unity_catalog import list_catalogs + """ + raise NotImplementedError("Phase 6/8") + + +@creator.tool(description="List schemas within a catalog.") +async def discover_schemas(catalog: str) -> dict: + """Source: backend/services/uc_client.py::list_schemas""" + raise NotImplementedError("Phase 6/8") + + +@creator.tool(description="List tables within a catalog.schema.") +async def discover_tables(catalog: str, schema: str) -> dict: + """Source: backend/services/uc_client.py::list_tables""" + raise NotImplementedError("Phase 6/8") + + +# ── Table Inspection Tools ─────────────────────────────────────────────────── + + +@creator.tool( + description="Get detailed table metadata: columns, types, descriptions, row count, sample rows.", +) +async def describe_table(table: str) -> dict: + """Source: backend/services/create_agent_tools.py::_describe_table (lines ~860-960)""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description=( + "Profile selected columns: distinct values, null percentage, " + "min/max, data type distribution." + ), +) +async def profile_columns(table: str, columns: list[str] | None = None) -> dict: + """Source: backend/services/create_agent_tools.py::_profile_columns""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description="Assess data quality: null rates, duplicate rates, freshness, anomalies.", +) +async def assess_data_quality(tables: list[str]) -> dict: + """Source: backend/services/create_agent_tools.py::_assess_data_quality""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description="Profile table usage patterns: query frequency, common joins, active users.", +) +async def profile_table_usage(tables: list[str]) -> dict: + """Source: backend/services/create_agent_tools.py::_profile_table_usage""" + raise NotImplementedError("Phase 6") + + +@creator.tool(description="Execute a test SQL query and return results (read-only, max 5 rows).") +async def test_sql(sql: str) -> dict: + """Source: backend/services/create_agent_tools.py::_test_sql + + Phase 8: Replace with databricks_tools_core.sql.execute_sql + """ + raise NotImplementedError("Phase 6/8") + + +@creator.tool(description="List available SQL warehouses for the user.") +async def discover_warehouses() -> dict: + """Source: backend/services/create_agent_tools.py::_discover_warehouses""" + raise NotImplementedError("Phase 6") + + +# ── Config Generation Tools ────────────────────────────────────────────────── + + +@creator.tool(description="Get the Genie Space configuration JSON schema reference.") +async def get_config_schema() -> dict: + """Source: backend/services/create_agent_tools.py::_get_config_schema""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description=( + "Generate a complete Genie Space configuration from discovered " + "tables, inspection data, and user requirements." + ), + parameters=GenerateConfigArgs.model_json_schema(), +) +async def generate_config(**kwargs) -> dict: + """Source: backend/services/create_agent_tools.py::_generate_config (~lines 245-650) + + This is the largest tool implementation. The LLM provides content; + this tool handles all structural formatting (JSON schema compliance, + column config normalization, instruction budget enforcement). + + Integration pattern (Challenge 2): + Pydantic model auto-generates the JSON Schema for @app_agent + registration, replacing ~580 lines of hand-written schema. + Runtime validation catches malformed LLM output early. + """ + args = GenerateConfigArgs(**kwargs) + # TODO Phase 6: move _generate_config implementation here + # args.tables, args.sample_questions, etc. are all validated + raise NotImplementedError("Phase 6") + + +@creator.tool( + description="Present the space creation plan to the user for review before generating config.", + parameters=GenerateConfigArgs.model_json_schema(), +) +async def present_plan(**kwargs) -> dict: + """Source: backend/services/create_agent_tools.py::_present_plan""" + args = GenerateConfigArgs(**kwargs) + # TODO Phase 6: move _present_plan implementation here + raise NotImplementedError("Phase 6") + + +@creator.tool(description="Validate a generated configuration against the Genie Space schema.") +async def validate_config(config: dict) -> dict: + """Source: backend/services/create_agent_tools.py::_validate_config""" + raise NotImplementedError("Phase 6") + + +@creator.tool(description="Apply incremental updates to an existing generated configuration.") +async def update_config(config: dict, updates: dict) -> dict: + """Source: backend/services/create_agent_tools.py::_update_config""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description="Create a new Genie Space in the workspace with the generated configuration.", +) +async def create_space( + display_name: str, + config: dict, + parent_path: str | None = None, + warehouse_id: str | None = None, +) -> dict: + """Source: backend/services/create_agent_tools.py::_create_space + backend/genie_creator.py""" + raise NotImplementedError("Phase 6") + + +@creator.tool( + description="Update an existing Genie Space with a modified configuration.", +) +async def update_space(space_id: str, config: dict) -> dict: + """Source: backend/services/create_agent_tools.py::_update_space""" + raise NotImplementedError("Phase 6") + + +# ── Standalone entry point ─────────────────────────────────────────────────── + +app = creator.app diff --git a/agents/creator/app.yaml b/agents/creator/app.yaml new file mode 100644 index 0000000..abab50f --- /dev/null +++ b/agents/creator/app.yaml @@ -0,0 +1,41 @@ +# Databricks Apps configuration for genie-creator agent. +# +# Most complex agent: LLM tool-calling loop, session persistence, +# UC discovery, SQL execution, Genie API writes. + +command: + - "uvicorn" + - "app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "8000" + +env: + # LLM serving endpoint (for tool-calling loop) + - name: LLM_MODEL + value: "databricks-claude-sonnet-4-6" + + # SQL Warehouse for test_sql tool + - name: SQL_WAREHOUSE_ID + valueFrom: sql-warehouse + + # Genie Space target directory (where new spaces are created) + - name: GENIE_TARGET_DIRECTORY + value: "/Shared/" + + # Lakebase PostgreSQL — for session persistence + - name: LAKEBASE_HOST + value: "" + - name: LAKEBASE_PORT + value: "5432" + - name: LAKEBASE_DATABASE + value: "databricks_postgres" + - name: LAKEBASE_INSTANCE_NAME + value: "" + + # MLflow Tracing + - name: MLFLOW_TRACKING_URI + value: "databricks" + - name: MLFLOW_REGISTRY_URI + value: "databricks-uc" diff --git a/agents/creator/schemas.py b/agents/creator/schemas.py new file mode 100644 index 0000000..0e45db1 --- /dev/null +++ b/agents/creator/schemas.py @@ -0,0 +1,166 @@ +"""Pydantic models for creator agent tool schemas. + +These replace ~580 lines of hand-written JSON Schema in +backend/services/create_agent_tools.py. The models serve double duty: + +1. Generate JSON Schema for @app_agent tool registration via + ``GenerateConfigArgs.model_json_schema()`` +2. Validate + parse incoming tool arguments at runtime via + ``GenerateConfigArgs(**kwargs)`` + +Usage in agents/creator/app.py:: + + from agents.creator.schemas import GenerateConfigArgs + + @creator.tool( + description="Generate a complete Genie Space configuration", + parameters=GenerateConfigArgs.model_json_schema(), + ) + async def generate_config(**kwargs) -> dict: + args = GenerateConfigArgs(**kwargs) # Validates at runtime + ... +""" + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, Field + + +# ── Shared nested types ────────────────────────────────────────────────────── + + +class ColumnConfig(BaseModel): + """Column-level configuration within a table.""" + + column_name: str + description: Optional[str] = None + synonyms: Optional[list[str]] = None + exclude: Optional[bool] = None + enable_matching: Optional[bool] = None + + +class SqlParameter(BaseModel): + """Parameter definition for parameterized example SQL.""" + + name: str + type_hint: str = Field( + ..., pattern="^(STRING|NUMBER|DATE|BOOLEAN)$" + ) + description: Optional[str] = None + default_value: str + + +class ExampleSql(BaseModel): + """Example SQL with natural-language question mapping.""" + + question: str + sql: str + usage_guidance: Optional[str] = None + parameters: Optional[list[SqlParameter]] = None + + +class Measure(BaseModel): + """Aggregate measure definition (SUM, AVG, COUNT, etc.).""" + + alias: str + sql: str + display_name: Optional[str] = None + synonyms: Optional[list[str]] = None + instruction: Optional[str] = None + comment: Optional[str] = None + + +class Filter(BaseModel): + """Pre-defined filter (WHERE clause snippet).""" + + display_name: str + sql: str + synonyms: Optional[list[str]] = None + instruction: Optional[str] = None + comment: Optional[str] = None + + +class Expression(BaseModel): + """Computed expression (derived column).""" + + alias: str + sql: str + display_name: Optional[str] = None + synonyms: Optional[list[str]] = None + instruction: Optional[str] = None + comment: Optional[str] = None + + +class JoinSpec(BaseModel): + """Join specification between two tables.""" + + left_table: str + left_alias: str + right_table: str + right_alias: str + left_column: str + right_column: str + relationship: str = Field( + ..., + pattern="^(MANY_TO_ONE|ONE_TO_MANY|ONE_TO_ONE|MANY_TO_MANY)$", + ) + instruction: Optional[str] = None + comment: Optional[str] = None + + +class Benchmark(BaseModel): + """Question/SQL pair for evaluation benchmarks.""" + + question: str + expected_sql: str + + +class MetricViewColumnConfig(BaseModel): + """Column configuration within a metric view.""" + + column_name: str + description: Optional[str] = None + enable_format_assistance: Optional[bool] = None + + +class MetricView(BaseModel): + """Metric view definition (curated data view).""" + + identifier: str + description: Optional[str] = None + column_configs: Optional[list[MetricViewColumnConfig]] = None + + +class TableConfig(BaseModel): + """Table-level configuration with optional column configs.""" + + identifier: str + description: Optional[str] = None + column_configs: Optional[list[ColumnConfig]] = None + + +# ── Top-level tool argument models ─────────────────────────────────────────── + + +class GenerateConfigArgs(BaseModel): + """Arguments for ``generate_config`` and ``present_plan`` tools. + + These tools share the same schema — present_plan previews what + generate_config will produce. + """ + + tables: list[TableConfig] + sample_questions: Optional[list[str]] = None + text_instructions: Optional[list[str]] = None + example_sqls: Optional[list[ExampleSql]] = Field( + None, min_length=3 + ) + measures: Optional[list[Measure]] = None + filters: Optional[list[Filter]] = None + expressions: Optional[list[Expression]] = None + join_specs: Optional[list[JoinSpec]] = None + benchmarks: Optional[list[Benchmark]] = None + generate_benchmarks: Optional[bool] = None + metric_views: Optional[list[MetricView]] = None diff --git a/agents/fixer/app.py b/agents/fixer/app.py new file mode 100644 index 0000000..771205f --- /dev/null +++ b/agents/fixer/app.py @@ -0,0 +1,92 @@ +"""genie-fixer — AI fix agent for Genie Space configurations. + +Extracted from: + - backend/routers/spaces.py (fix endpoint) + - backend/services/fix_agent.py (FixAgent, patch generation + application) + - backend/prompts.py (get_fix_agent_prompt) + +Streaming: Yes (SSE with thinking → patch → applying → complete events) +LLM: Yes (fix plan generation) +""" + +from __future__ import annotations + +from dbx_agent_app import AgentRequest, AgentResponse, app_agent + + +@app_agent( + name="genie-fixer", + description=( + "AI fix agent for Genie Spaces. Takes IQ scan findings and the " + "current space configuration, uses an LLM to generate targeted " + "config patches, and applies them via the Genie API." + ), +) +async def fixer(request: AgentRequest) -> AgentResponse: + """Route incoming agent requests to fix tools. + + The streaming fix workflow: + 1. thinking — "Analyzing findings..." + 2. patch — individual patches with field_path, old/new values, rationale + 3. applying — "Applying N fix(es)..." + 4. complete — summary with patches_applied count and diff + + Source: backend/services/fix_agent.py::FixAgent.run + """ + # TODO: Phase 3 — move FixAgent.run here as streaming handler + ... + + +# ── Tools ──────────────────────────────────────────────────────────────────── + + +@fixer.tool( + description=( + "Generate a fix plan from IQ scan findings. Uses LLM to reason " + "about findings, prioritize fixes, and produce specific config " + "patch operations (field_path + new_value + rationale)." + ), +) +async def generate_fixes( + space_id: str, + findings: list[str], + space_config: dict, +) -> dict: + """Source: backend/services/fix_agent.py::FixAgent.run (first half — plan generation) + + Returns: + - patches: list of {field_path, new_value, rationale} + - summary: human-readable summary of the fix plan + """ + raise NotImplementedError("Phase 3: move fix_agent.py here") + + +@fixer.tool( + description=( + "Apply a list of config patches to a Genie Space via the " + "Databricks API. Returns before/after diff." + ), +) +async def apply_patches( + space_id: str, + patches: list[dict], + space_config: dict, +) -> dict: + """Source: backend/services/fix_agent.py::FixAgent.run (second half — patch application) + + Each patch has: + - field_path: Dot-notation path (e.g., "instructions.text_instructions[0].content") + - new_value: The value to set + - rationale: Why this fix helps + + Returns: + - patches_applied: int + - summary: str + - diff: {patches, original_config, updated_config} + """ + raise NotImplementedError("Phase 3: move fix_agent.py patch application here") + + +# ── Standalone entry point ─────────────────────────────────────────────────── + +app = fixer.app diff --git a/agents/fixer/app.yaml b/agents/fixer/app.yaml new file mode 100644 index 0000000..f18c141 --- /dev/null +++ b/agents/fixer/app.yaml @@ -0,0 +1,23 @@ +# Databricks Apps configuration for genie-fixer agent. +# +# AI fix agent — needs LLM endpoint for plan generation. +# No Lakebase dependency (reads/writes space config via Genie API). + +command: + - "uvicorn" + - "app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "8000" + +env: + # LLM serving endpoint + - name: LLM_MODEL + value: "databricks-claude-sonnet-4-6" + + # MLflow Tracing + - name: MLFLOW_TRACKING_URI + value: "databricks" + - name: MLFLOW_REGISTRY_URI + value: "databricks-uc" diff --git a/agents/optimizer/app.py b/agents/optimizer/app.py new file mode 100644 index 0000000..ab12c44 --- /dev/null +++ b/agents/optimizer/app.py @@ -0,0 +1,96 @@ +"""genie-optimizer — Optimization suggestions from benchmark labeling feedback. + +Extracted from: + - backend/routers/analysis.py (optimize, merge_config endpoints) + - backend/services/optimizer.py (GenieSpaceOptimizer) + - backend/prompts.py (get_optimization_prompt) + +Streaming: Heartbeat SSE only (long-running LLM call with keepalives) +LLM: Yes (optimization suggestion generation) +""" + +from __future__ import annotations + +from dbx_agent_app import AgentRequest, AgentResponse, app_agent + + +@app_agent( + name="genie-optimizer", + description=( + "Generates optimization suggestions for Genie Space configurations " + "based on benchmark labeling feedback. Analyzes incorrect answers " + "and suggests config changes (new instructions, SQL snippets, " + "column descriptions) to improve accuracy." + ), +) +async def optimizer(request: AgentRequest) -> AgentResponse: + """Route incoming agent requests to optimization tools.""" + # TODO: Parse intent from request.messages and dispatch to tools + ... + + +# ── Tools ──────────────────────────────────────────────────────────────────── + + +@optimizer.tool( + description=( + "Generate optimization suggestions based on benchmark labeling feedback. " + "Analyzes incorrect/correct Genie answers and suggests specific config " + "changes to improve accuracy. Returns a list of suggestions with " + "field paths, current values, and suggested replacements." + ), +) +async def generate_suggestions( + space_data: dict, + labeling_feedback: list[dict], +) -> dict: + """Source: backend/services/optimizer.py::GenieSpaceOptimizer.generate_optimizations + + Each feedback item has: + - question_text: The benchmark question + - is_correct: Whether Genie answered correctly + - feedback_text: User's notes on what went wrong + """ + raise NotImplementedError("Phase 5: move optimizer.py here") + + +@optimizer.tool( + description=( + "Merge optimization suggestions into a Genie Space configuration. " + "Applies field-level changes without LLM calls — fast, deterministic." + ), +) +async def merge_config( + space_data: dict, + suggestions: list[dict], +) -> dict: + """Source: backend/services/optimizer.py::GenieSpaceOptimizer.merge_config + + Each suggestion has: + - field_path: Dot-notation path (e.g., "instructions.text_instructions[0].content") + - suggested_value: The new value to set + """ + raise NotImplementedError("Phase 5: move optimizer.merge_config here") + + +@optimizer.tool( + description=( + "Label a benchmark question as correct or incorrect with feedback. " + "Stores the labeling result for later optimization." + ), +) +async def label_benchmark( + question_text: str, + is_correct: bool, + feedback_text: str = "", +) -> dict: + """New tool — currently labeling is handled purely in frontend state. + + This tool would persist labeling decisions for cross-session use. + """ + raise NotImplementedError("Phase 5: implement labeling persistence") + + +# ── Standalone entry point ─────────────────────────────────────────────────── + +app = optimizer.app diff --git a/agents/optimizer/app.yaml b/agents/optimizer/app.yaml new file mode 100644 index 0000000..114fb3f --- /dev/null +++ b/agents/optimizer/app.yaml @@ -0,0 +1,23 @@ +# Databricks Apps configuration for genie-optimizer agent. +# +# Optimization suggestions from benchmark feedback — needs LLM endpoint. +# No Lakebase dependency (stateless). + +command: + - "uvicorn" + - "app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "8000" + +env: + # LLM serving endpoint + - name: LLM_MODEL + value: "databricks-claude-sonnet-4-6" + + # MLflow Tracing + - name: MLFLOW_TRACKING_URI + value: "databricks" + - name: MLFLOW_REGISTRY_URI + value: "databricks-uc" diff --git a/agents/scorer/app.py b/agents/scorer/app.py new file mode 100644 index 0000000..2789160 --- /dev/null +++ b/agents/scorer/app.py @@ -0,0 +1,134 @@ +"""genie-scorer — IQ scoring agent for Genie Spaces. + +Extracted from: + - backend/routers/spaces.py (scan, history, star, list endpoints) + - backend/services/scanner.py (rule-based scoring engine) + - backend/services/lakebase.py (score persistence) + +This agent has NO LLM dependency — it's pure rule-based scoring. +Lowest-risk extraction target; validates the @app_agent pattern. + +Integration patterns used: + - Challenge 1 (OBO auth): obo_context() bridges @app_agent → monolith auth + - Challenge 4 (SP fallback): genie_api_call() retries with SP on scope errors + - Challenge 5 (Lakebase): init_pool(SCORER_DDL) for idempotent schema setup +""" + +from __future__ import annotations + +from dbx_agent_app import AgentRequest, AgentResponse, app_agent + +from agents._shared.auth_bridge import obo_context +from agents._shared.sp_fallback import genie_api_call +from agents._shared.lakebase_client import init_pool, SCORER_DDL + + +@app_agent( + name="genie-scorer", + description=( + "IQ scoring for Genie Spaces. Scans space configurations against a " + "rule-based scoring rubric (foundation, data setup, SQL assets, " + "optimization), persists scores to Lakebase, and tracks score history." + ), +) +async def scorer(request: AgentRequest) -> AgentResponse: + """Route incoming agent requests to the appropriate scoring tool.""" + # TODO: Parse intent from request.messages and dispatch to tools + ... + + +# ── Lifecycle ──────────────────────────────────────────────────────────────── + + +async def on_startup(): + """Initialize Lakebase pool with scorer-specific DDL.""" + await init_pool(SCORER_DDL) + + +# ── Tools ──────────────────────────────────────────────────────────────────── +# Each tool maps to a current REST endpoint in backend/routers/spaces.py. +# Domain logic lives in scanner.py (moved as-is from backend/services/). + + +@scorer.tool( + description=( + "Run an IQ scan on a Genie Space. Fetches the space configuration, " + "calculates a score (0-100) across four dimensions (foundation, data " + "setup, SQL assets, optimization), and persists the result to Lakebase." + ), +) +async def scan_space(space_id: str, request: AgentRequest) -> dict: + """Source: backend/services/scanner.py::scan_space + backend/routers/spaces.py::trigger_scan + + Integration pattern: + obo_context() sets up both monolith ContextVar and tools-core auth. + genie_api_call() auto-retries with SP on scope errors. + Domain logic (scanner.calculate_score) works unchanged. + """ + with obo_context(request.user_context.access_token): + # Fetch space config (with automatic SP fallback for scope errors) + space_data = genie_api_call( + "GET", + f"/api/2.0/genie/spaces/{space_id}", + query={"include_serialized_space": "true"}, + ) + # TODO Phase 2: scanner.calculate_score(space_data) + # TODO Phase 2: save_scan_result(space_id, score) + raise NotImplementedError("Phase 2: move scanner.py + lakebase.py here") + + +@scorer.tool( + description=( + "Get score history for a Genie Space over the last N days. " + "Returns a list of {score, maturity, scanned_at} entries." + ), +) +async def get_history(space_id: str, days: int = 30) -> list[dict]: + """Source: backend/services/lakebase.py::get_score_history""" + raise NotImplementedError("Phase 2: move lakebase.get_score_history here") + + +@scorer.tool( + description="Toggle the star (bookmark) status of a Genie Space.", +) +async def toggle_star(space_id: str, starred: bool) -> dict: + """Source: backend/services/lakebase.py::star_space""" + raise NotImplementedError("Phase 2: move lakebase.star_space here") + + +@scorer.tool( + description=( + "List all Genie Spaces the user has access to, enriched with IQ " + "scores. Supports filtering by name, star status, and score range." + ), +) +async def list_spaces( + search: str | None = None, + starred_only: bool = False, + min_score: int | None = None, + max_score: int | None = None, +) -> list[dict]: + """Source: backend/routers/spaces.py::list_spaces + + Note (PR #6-#8): API response uses `space_id`/`title` fields (not `id`/`display_name`). + Returns `space_url` per item (host + /genie/rooms/{space_id}). + Uses SP fallback via get_service_principal_client() when OBO token lacks genie scope. + """ + raise NotImplementedError("Phase 2: move list_spaces logic here") + + +@scorer.tool( + description="Get detailed space metadata with latest scan result and star status.", +) +async def get_space_detail(space_id: str) -> dict: + """Source: backend/routers/spaces.py::get_space_detail + + Note (PR #7): Includes SP fallback (_is_scope_error check) for Genie API calls. + """ + raise NotImplementedError("Phase 2: move get_space_detail logic here") + + +# ── Standalone entry point ─────────────────────────────────────────────────── +# For local development: uvicorn agents.scorer.app:app --port 8001 + +app = scorer.app diff --git a/agents/scorer/app.yaml b/agents/scorer/app.yaml new file mode 100644 index 0000000..0f9b5b9 --- /dev/null +++ b/agents/scorer/app.yaml @@ -0,0 +1,23 @@ +# Databricks Apps configuration for genie-scorer agent. +# +# IQ scoring for Genie Spaces — no LLM dependency, pure rule-based. +# Needs Lakebase for score persistence and star tracking. + +command: + - "uvicorn" + - "app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "8000" + +env: + # Lakebase PostgreSQL — for score persistence and star tracking + - name: LAKEBASE_HOST + value: "" + - name: LAKEBASE_PORT + value: "5432" + - name: LAKEBASE_DATABASE + value: "databricks_postgres" + - name: LAKEBASE_INSTANCE_NAME + value: "" diff --git a/agents/supervisor/__init__.py b/agents/supervisor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agents/supervisor/proxy.py b/agents/supervisor/proxy.py new file mode 100644 index 0000000..62107de --- /dev/null +++ b/agents/supervisor/proxy.py @@ -0,0 +1,179 @@ +"""Transparent proxy from supervisor to sub-agents. + +Maps current /api/* paths to sub-agent URLs so the React SPA needs +zero changes. Handles JSON responses and SSE streaming. + +The route table is ordered — more specific paths match before general +prefixes. Each entry maps a path prefix to an environment variable +containing the sub-agent's base URL (set via agents.yaml url_env_map). + +Path → agent mapping derived from frontend/src/lib/api.ts (28 API calls). +""" + +from __future__ import annotations + +import os +import re +from typing import Optional + +import httpx +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, StreamingResponse + + +# Ordered route table: (pattern, env_var_or_None) +# More specific patterns MUST come before general prefixes. +# None means the supervisor handles the route directly (no proxy). +ROUTE_TABLE: list[tuple[str, Optional[str]]] = [ + # Specific sub-paths that override their parent prefix + ("/api/spaces/*/fix", "FIXER_URL"), # fix agent (SSE) + ("/api/genie/create", "CREATOR_URL"), # create Genie Space via API + + # General prefixes + ("/api/spaces", "SCORER_URL"), # list, scan, history, star + ("/api/space", "ANALYZER_URL"), # fetch, parse + ("/api/analyze", "ANALYZER_URL"), # section, all, stream (SSE) + ("/api/genie", "ANALYZER_URL"), # query + ("/api/sql", "ANALYZER_URL"), # execute + ("/api/optimize", "OPTIMIZER_URL"), # stream optimization (SSE) + ("/api/config", "OPTIMIZER_URL"), # merge + ("/api/create", "CREATOR_URL"), # agent chat (SSE), discover, validate, create + ("/api/checklist", "ANALYZER_URL"), # static content + ("/api/sections", "ANALYZER_URL"), # section list + + # Supervisor-owned (no proxy) + ("/api/settings", None), + ("/api/auth", None), + ("/api/admin", None), +] + +# Pre-compile glob patterns (only "/api/spaces/*/fix" currently) +_COMPILED_ROUTES: list[tuple[re.Pattern, Optional[str]]] = [] + +for pattern, env_var in ROUTE_TABLE: + if "*" in pattern: + # Convert glob "*" to regex "[^/]+" + regex = "^" + re.escape(pattern).replace(r"\*", "[^/]+") + _COMPILED_ROUTES.append((re.compile(regex), env_var)) + else: + # Simple prefix match + _COMPILED_ROUTES.append((re.compile("^" + re.escape(pattern)), env_var)) + + +def _resolve_upstream(path: str) -> Optional[str]: + """Find the upstream agent URL for a given API path. + + Returns: + Base URL string if the path should be proxied. + None if the supervisor handles it directly. + + Raises: + KeyError: If no route matches the path. + """ + for compiled_pattern, env_var in _COMPILED_ROUTES: + if compiled_pattern.match(path): + if env_var is None: + return None + url = os.environ.get(env_var) + if not url: + return None + return url.rstrip("/") + + raise KeyError(f"No route for {path}") + + +# Hop-by-hop headers that should not be forwarded +_HOP_HEADERS = frozenset({"host", "content-length", "transfer-encoding"}) + + +def mount_proxy(app: FastAPI): + """Mount the catch-all proxy route on a FastAPI app. + + This should be mounted AFTER any supervisor-owned routes + (settings, auth, admin) so they take priority. + """ + + @app.api_route( + "/api/{path:path}", + methods=["GET", "POST", "PUT", "DELETE"], + ) + async def proxy(request: Request, path: str): + full_path = f"/api/{path}" + + try: + upstream_base = _resolve_upstream(full_path) + except KeyError: + return JSONResponse( + status_code=404, + content={"detail": f"No upstream agent for {full_path}"}, + ) + + if upstream_base is None: + # Supervisor-owned route that wasn't caught by an explicit handler. + return JSONResponse( + status_code=404, + content={"detail": f"Not found: {full_path}"}, + ) + + # Forward all headers except hop-by-hop + headers = { + k: v + for k, v in request.headers.items() + if k.lower() not in _HOP_HEADERS + } + + upstream_url = f"{upstream_base}{full_path}" + if request.url.query: + upstream_url += f"?{request.url.query}" + + body = await request.body() + + # First, make a non-streaming request to check the content type + async with httpx.AsyncClient(timeout=300.0) as client: + upstream_resp = await client.request( + method=request.method, + url=upstream_url, + headers=headers, + content=body, + follow_redirects=True, + ) + + content_type = upstream_resp.headers.get("content-type", "") + + # SSE: re-issue as a streaming request and forward chunks + if "text/event-stream" in content_type: + + async def stream(): + async with httpx.AsyncClient(timeout=300.0) as sc: + async with sc.stream( + method=request.method, + url=upstream_url, + headers=headers, + content=body, + ) as sr: + async for chunk in sr.aiter_bytes(): + yield chunk + + return StreamingResponse( + stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + # JSON: pass through with status code + if content_type.startswith("application/json"): + return JSONResponse( + status_code=upstream_resp.status_code, + content=upstream_resp.json(), + headers={"X-Upstream-Agent": upstream_base}, + ) + + # Other content types: pass through as-is + return JSONResponse( + status_code=upstream_resp.status_code, + content={"raw": upstream_resp.text}, + headers={"X-Upstream-Agent": upstream_base}, + ) diff --git a/docs/architecture-proposal.md b/docs/architecture-proposal.md new file mode 100644 index 0000000..be47223 --- /dev/null +++ b/docs/architecture-proposal.md @@ -0,0 +1,609 @@ +# Genie Workbench → Multi-Agent Architecture + +> **Status:** Proposal +> **Author:** Stuart Gano +> **Audience:** Sean Zhang (Workbench maintainer) +> **Date:** 2026-03-10 + +--- + +## Executive Summary + +The Genie Workbench is a monolithic Databricks App (~10,200 lines backend) that hand-rolls OBO auth, tool-calling loops, SSE streaming, and SDK wrappers. Two FE-built libraries solve these exact problems: + +- **AI Dev Kit** (`databricks-tools-core`) — pre-built Python functions for SQL execution, Unity Catalog browsing, and warehouse management +- **dbx-agent-app** — `@app_agent` decorator that auto-generates `/invocations` endpoints, agent cards, MCP servers, health checks, and handles OBO auth + +This proposal refactors the Workbench into a **multi-agent system** where each capability is a separate, discoverable `@app_agent` app. The result: ~30% less code, free MCP servers, A2A discovery, and `mlflow.genai.evaluate()` support — with zero changes to the React frontend. + +--- + +## Current Architecture (Monolith) + +``` +┌─────────────────────────────────────────────────┐ +│ backend/main.py (FastAPI) │ +│ │ +│ ┌──────────────────────────────────────────┐ │ +│ │ OBOAuthMiddleware │ │ +│ │ (hand-rolled ContextVar + x-forwarded- │ │ +│ │ access-token extraction) │ │ +│ └──────────────────────────────────────────┘ │ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌───────────────┐ │ +│ │ routers/ │ │ routers/ │ │ routers/ │ │ +│ │ spaces │ │ analysis │ │ create │ │ +│ │ (scan, │ │ (analyze,│ │ (UC discovery │ │ +│ │ history, │ │ stream, │ │ agent chat, │ │ +│ │ star, │ │ query, │ │ validate, │ │ +│ │ fix) │ │ optimize│ │ create) │ │ +│ └──────────┘ └──────────┘ └───────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────┐ │ +│ │ services/ │ │ +│ │ scanner.py analyzer.py optimizer.py │ │ +│ │ fix_agent.py create_agent.py │ │ +│ │ create_agent_tools.py (2,717 lines!) │ │ +│ │ create_agent_session.py │ │ +│ │ uc_client.py sql_executor.py │ │ +│ │ genie_client.py lakebase.py auth.py │ │ +│ └──────────────────────────────────────────┘ │ +│ │ +│ frontend/dist/ (React SPA, static files) │ +└─────────────────────────────────────────────────┘ +``` + +### Pain points + +| Issue | Impact | +|-------|--------| +| `create_agent_tools.py` is 2,717 lines of hand-coded tool definitions + JSON schemas + dispatch table | Every new tool requires ~80 lines of boilerplate | +| OBO auth in `services/auth.py` (136 lines) uses ContextVar + middleware — breaks in streaming generators | Streaming endpoints need manual `set_obo_user_token()` re-establishment. Recent fix added `get_service_principal_client()` fallback for missing OAuth scopes | +| `genie_client.py` (244 lines) duplicates SP-fallback pattern (`_is_scope_error`) in every API call | Each new Genie API function must remember to add scope-error retry logic | +| `sql_executor.py` (220 lines) reimplements what `databricks-tools-core.sql` provides | Maintenance burden, no warehouse auto-detection improvements | +| `uc_client.py` (60 lines) reimplements what `databricks-tools-core.unity_catalog` provides | Duplicated effort | +| No agent discovery — other workspace apps can't call Workbench capabilities | Siloed functionality | +| No eval support — testing requires manual curl/browser interaction | No regression testing pipeline | +| Monolithic deployment — any change redeploys everything | Slow iteration on individual capabilities | + +--- + +## Proposed Architecture (Multi-Agent) + +``` +┌─────────────────────────────────────────────────────────┐ +│ genie-workbench (supervisor) │ +│ React SPA + FastAPI shell │ +│ Routes frontend API calls → sub-agent /invocations │ +├─────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ genie- │ │ genie- │ │ genie- │ │ +│ │ scorer │ │ analyzer │ │ creator │ │ +│ │ │ │ │ │ │ │ +│ │ IQ scan │ │ LLM deep │ │ Space │ │ +│ │ scoring │ │ analysis │ │ creation │ │ +│ │ history │ │ synthesis│ │ wizard │ │ +│ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +│ ┌──────────┐ ┌──────────┐ │ +│ │ genie- │ │ genie- │ │ +│ │ optimizer│ │ fixer │ │ +│ │ │ │ │ │ +│ │ Benchmark│ │ AI fix │ │ +│ │ labeling │ │ agent │ │ +│ │ suggest │ │ patches │ │ +│ └──────────┘ └──────────┘ │ +└─────────────────────────────────────────────────────────┘ +``` + +Each sub-agent is a standalone Databricks App with: +- **`@app_agent` decorator** — auto-generates `/invocations`, `/.well-known/agent.json`, `/health`, MCP server +- **OBO auth** — handled by `request.user_context` (replaces ContextVar middleware) +- **Tool definitions** — auto-generated from `@agent.tool()` decorated functions (replaces JSON schemas) +- **Eval support** — `app_predict_fn()` bridge to `mlflow.genai.evaluate()` + +--- + +## Agent Decomposition + +### Agent Boundaries + +| Agent | Source | Tools | Needs Lakebase? | Streaming? | LLM? | +|-------|--------|-------|-----------------|------------|------| +| **genie-scorer** | `agents/scorer/` | `scan_space`, `get_history`, `toggle_star`, `list_spaces` | Yes (scores, stars) | No | No | +| **genie-analyzer** | `agents/analyzer/` | `fetch_space`, `analyze_section`, `analyze_all`, `query_genie`, `execute_sql` | No | Yes (SSE) | Yes | +| **genie-creator** | `agents/creator/` | All 16 current tools (discover_*, describe_*, profile_*, generate_config, etc.) | Yes (sessions) | Yes (SSE) | Yes | +| **genie-optimizer** | `agents/optimizer/` | `generate_suggestions`, `merge_config`, `label_benchmark` | No | No (heartbeat SSE) | Yes | +| **genie-fixer** | `agents/fixer/` | `generate_fixes`, `apply_patch` | No | Yes (SSE) | Yes | +| **supervisor** | root `app.py` | Routes to sub-agents, serves React SPA, `/api/settings`, `/api/auth` | Yes (starred) | Proxy | No | + +### What moves where + +``` +backend/services/scanner.py → agents/scorer/scanner.py (as-is, domain logic) +backend/services/analyzer.py → agents/analyzer/analyzer.py (as-is, domain logic) +backend/services/optimizer.py → agents/optimizer/optimizer.py (as-is, domain logic) +backend/services/fix_agent.py → agents/fixer/fix_agent.py (as-is, domain logic) +backend/services/create_agent.py → agents/creator/agent.py (as-is, domain logic) +backend/services/create_agent_session.py → agents/creator/session.py (as-is) +backend/prompts_create/ → agents/creator/prompts/ (as-is) +backend/references/ → agents/creator/references/ (as-is) + +backend/services/uc_client.py → DELETED (replaced by databricks-tools-core) +backend/sql_executor.py → DELETED (replaced by databricks-tools-core) +backend/routers/spaces.py → DISSOLVED (endpoints become scorer/supervisor tools) +backend/routers/analysis.py → DISSOLVED (endpoints become analyzer/optimizer tools) +backend/routers/create.py → DISSOLVED (endpoints become creator tools) +``` + +### What stays custom (irreplaceable domain logic) + +These files contain business logic specific to GenieIQ/GenieRx and move to their respective agents unchanged: + +- `scanner.py` — Rule-based IQ scoring (maturity levels, dimension weights) +- `analyzer.py` — LLM checklist evaluation with session management +- `optimizer.py` — Optimization suggestion generation from labeling feedback +- `fix_agent.py` — Patch generation + application via Genie API +- `create_agent.py` — Tool-calling loop with message compaction, JSON repair, session recovery +- `create_agent_session.py` — Two-tier session persistence (memory + Lakebase) +- `prompts_create/` — Dynamic prompt assembly (9 modules: core, data_sources, requirements, plan, etc.) +- `references/schema.md` — Genie Space schema reference +- `genie_creator.py` — Genie API write operations +- `genie_client.py` — Genie API read operations (including SP-fallback for missing OAuth scopes, added in PR #7) +- `lakebase.py` — PostgreSQL persistence with in-memory fallback + +--- + +## What Gets Replaced + +### 1. Tool Definition Boilerplate → `@agent.tool()` Decorators + +**Before** (create_agent_tools.py, ~80 lines per tool): +```python +TOOL_DEFINITIONS = [ + { + "type": "function", + "function": { + "name": "discover_catalogs", + "description": "List all Unity Catalog catalogs the user has access to.", + "parameters": {"type": "object", "properties": {}, "required": []}, + }, + }, + # ... 15 more tool definitions with nested JSON schemas ... +] + +def handle_tool_call(name: str, arguments: dict, session_config=None) -> dict: + handlers = { + "discover_catalogs": _discover_catalogs, + "discover_schemas": _discover_schemas, + # ... 14 more entries ... + } + handler = handlers.get(name) + # ... dispatch logic ... +``` + +**After** (auto-generated from function signatures): +```python +@creator.tool(description="List all Unity Catalog catalogs the user has access to.") +async def discover_catalogs() -> dict: + from databricks_tools_core.unity_catalog import list_catalogs + return {"catalogs": list_catalogs()} + +@creator.tool(description="List schemas within a catalog.") +async def discover_schemas(catalog: str) -> dict: + from databricks_tools_core.unity_catalog import list_schemas + return {"schemas": list_schemas(catalog)} +``` + +**Impact:** ~580 lines of JSON schemas + 40-line dispatch table → auto-generated. + +### 2. OBO Auth Middleware + SP Fallback → `request.user_context` + +**Before** (main.py + auth.py + genie_client.py): +```python +# main.py — ContextVar middleware +class OBOAuthMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + token = request.headers.get("x-forwarded-access-token", "") + if token: + set_obo_user_token(token) # ContextVar + request.state.user_token = token + response = await call_next(request) + if not is_streaming: + clear_obo_user_token() + return response + +# auth.py — SP fallback for scope errors (added in PR #7) +def get_service_principal_client() -> WorkspaceClient: + """Bypass OBO for ops requiring scopes the user token lacks.""" + return _get_default_client() + +# genie_client.py — every API function repeats this pattern +try: + return _get_space_with_client(client, genie_space_id) +except Exception as e: + if _is_scope_error(e): + sp_client = get_service_principal_client() + return _get_space_with_client(sp_client, genie_space_id) + +# In streaming generators: +if user_token: + set_obo_user_token(user_token) # Must re-establish in generator! +``` + +**After** (`@app_agent` handles it): +```python +@app_agent(name="genie-scorer", ...) +async def scorer(request: AgentRequest) -> AgentResponse: + # request.user_context.access_token is automatically available + # No ContextVar management, no SP fallback boilerplate + ... +``` + +**Impact:** ~30 lines of middleware + SP fallback pattern duplicated across every API call → zero. + +### 3. UC Client + SQL Executor → `databricks-tools-core` + +| Current | Lines | Replacement | +|---------|-------|-------------| +| `backend/services/uc_client.py` | 60 | `from databricks_tools_core.unity_catalog import list_catalogs, list_schemas, list_tables` | +| `backend/sql_executor.py` | 220 | `from databricks_tools_core.sql import execute_sql, get_best_warehouse` | +| Warehouse auto-detection | 30 | `get_best_warehouse()` | + +**Impact:** 310 lines deleted, replaced by maintained library functions. + +--- + +## Deployment Topology + +### agents.yaml + +```yaml +project: + name: genie-workbench + workspace_path: /Workspace/Shared/apps + +agents: + - name: scorer + source: ./agents/scorer + - name: analyzer + source: ./agents/analyzer + - name: creator + source: ./agents/creator + - name: optimizer + source: ./agents/optimizer + - name: fixer + source: ./agents/fixer + - name: supervisor + source: . + depends_on: [scorer, analyzer, creator, optimizer, fixer] + url_env_map: + scorer: SCORER_URL + analyzer: ANALYZER_URL + creator: CREATOR_URL + optimizer: OPTIMIZER_URL + fixer: FIXER_URL +``` + +Each agent deploys as its own Databricks App with: +- Its own `app.yaml` defining env vars and resource bindings +- Its own service principal (for Lakebase, LLM endpoint access) +- Auto-generated `/.well-known/agent.json` for A2A discovery +- Auto-generated MCP server for tool integration + +### Per-Agent app.yaml Example (scorer) + +```yaml +command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] +env: + - name: LAKEBASE_HOST + value: "" + - name: LAKEBASE_INSTANCE_NAME + value: "" +``` + +--- + +## Wire Protocol + +### Frontend → Supervisor → Sub-Agents + +The React SPA continues to hit the same API paths (`/api/spaces/*`, `/api/analyze/*`, `/api/create/*`). The supervisor proxies requests to sub-agents: + +``` +Browser → /api/spaces/scan → supervisor → genie-scorer /invocations +Browser → /api/analyze/stream → supervisor → genie-analyzer /invocations +Browser → /api/create/agent/chat → supervisor → genie-creator /invocations +Browser → /api/optimize → supervisor → genie-optimizer /invocations +Browser → /api/spaces/{id}/fix → supervisor → genie-fixer /invocations +``` + +The supervisor uses the Responses Agent protocol (or simple HTTP proxying) to forward requests. For streaming endpoints, the supervisor proxies SSE responses transparently. + +### Agent-to-Agent (A2A) Discovery + +After deployment, each agent exposes `/.well-known/agent.json`: + +```json +{ + "name": "genie-scorer", + "description": "IQ scoring for Genie Spaces", + "url": "https://genie-workbench-scorer.cloud.databricks.com", + "tools": [ + {"name": "scan_space", "description": "Run IQ scan on a Genie Space"}, + {"name": "get_history", "description": "Get score history"}, + {"name": "toggle_star", "description": "Toggle star on a space"} + ] +} +``` + +Other workspace apps can discover and call these agents using `AgentDiscovery`. + +--- + +## Migration Path (Phased, Backwards-Compatible) + +### Phase 1: Scaffolding + Architecture Doc ← **This PR** + +- Architecture proposal for review +- `agents.yaml` deployment config +- Skeleton `app.py` + `app.yaml` for each agent +- No behavior changes to existing monolith + +### Phase 2: Extract genie-scorer (lowest risk) + +**Why first:** No LLM calls, no streaming, no sessions — pure rule-based scoring. Validates the `@app_agent` pattern with minimal risk. + +Files moved: +- `backend/services/scanner.py` → `agents/scorer/scanner.py` (as-is) +- Relevant Lakebase functions → `agents/scorer/lakebase.py` + +What gets deleted from monolith: +- Scan/history/star endpoints from `backend/routers/spaces.py` (~80 lines) + +### Phase 3: Extract genie-fixer (streaming + LLM, medium complexity) + +**Why second:** Streaming SSE + LLM calls, but simpler than creator (no sessions, no 16 tools). + +Files moved: +- `backend/services/fix_agent.py` → `agents/fixer/fix_agent.py` +- Fix prompt → `agents/fixer/prompts.py` + +Validates: Streaming via async generator → SSE (auto-handled by `@app_agent`) + +### Phase 4: Extract genie-analyzer (streaming + LLM, high complexity) + +Files moved: +- `backend/services/analyzer.py` → `agents/analyzer/analyzer.py` +- Analysis prompts → `agents/analyzer/prompts/` + +Tools: `fetch_space`, `analyze_section`, `analyze_all`, `query_genie`, `execute_sql` + +### Phase 5: Extract genie-optimizer + +Files moved: +- `backend/services/optimizer.py` → `agents/optimizer/optimizer.py` +- Benchmark labeling logic → `agents/optimizer/labeling.py` + +Tools: `generate_suggestions`, `merge_config`, `label_benchmark` + +### Phase 6: Extract genie-creator (most complex, last) + +**Why last:** 16 tools, session persistence, complex tool-calling loop with message compaction. Hardest extraction. + +Key change: 16 hand-coded tool definitions become `@creator.tool()` decorators: +```python +@creator.tool(description="List Unity Catalog catalogs") +async def discover_catalogs() -> dict: + from databricks_tools_core.unity_catalog import list_catalogs + return {"catalogs": list_catalogs()} +``` + +What stays custom: Dynamic prompt assembly, session persistence, message compaction, config generation/validation. These are domain logic. + +What gets replaced: +- Tool definition boilerplate (~580 lines of JSON schemas → auto-generated from function signatures) +- `handle_tool_call()` dispatcher (~40 lines → auto-routing) +- OBO middleware → `request.user_context` + +### Phase 7: Supervisor + Frontend + +The supervisor becomes a thin shell that: +1. Serves the React SPA (static files) +2. Routes API calls to sub-agents +3. Handles settings and auth endpoints + +Frontend changes: **Minimal.** API client (`frontend/src/lib/api.ts`) keeps hitting the same paths. The supervisor proxies to sub-agents transparently. + +### Phase 8: AI Dev Kit Integration + +Replace hand-rolled utilities with `databricks-tools-core` across all agents: + +| Current | Lines | Replacement | +|---------|-------|-------------| +| `backend/services/uc_client.py` | 60 | `databricks_tools_core.unity_catalog` | +| `backend/sql_executor.py` | 220 | `databricks_tools_core.sql` | +| Warehouse auto-detection in sql_executor | 30 | `get_best_warehouse()` | + +--- + +## Eval Story + +Each agent becomes independently evaluatable via the `dbx-agent-app` bridge: + +```python +from dbx_agent_app.bridge import app_predict_fn +import mlflow + +predict = app_predict_fn("https://genie-workbench-scorer.cloud.databricks.com") +results = mlflow.genai.evaluate( + data=eval_dataset, + predict_fn=predict, + scorers=[correctness_scorer, latency_scorer], +) +``` + +This replaces the current "manual curl and check" testing with automated, repeatable evaluation pipelines for each agent independently. + +--- + +## Integration Challenges — Concrete Solutions + +The three auth systems that need bridging: +- **Monolith auth** (`backend/services/auth.py:25`): `_obo_client` ContextVar → `WorkspaceClient` +- **`@app_agent`** (`dbx_agent_app/core/types.py:32`): `request.user_context` → `UserContext` with `.access_token` +- **`databricks-tools-core`** (`databricks_tools_core/auth.py:35-36`): `_host_ctx`/`_token_ctx` ContextVars via `set_databricks_auth()` + +### 1. OBO Auth Bridge → `agents/_shared/auth_bridge.py` + +**Problem:** Each agent receives `request.user_context` from `@app_agent`, but domain logic calls `get_workspace_client()` from the monolith's auth module. During migration, both patterns need to work. And `databricks-tools-core` functions use their own separate ContextVars. + +**Solution:** `obo_context()` context manager that sets up all three auth systems in one `with` block: + +```python +from agents._shared.auth_bridge import obo_context + +@scorer.tool(description="Run IQ scan on a Genie Space") +async def scan_space(space_id: str, request: AgentRequest) -> dict: + with obo_context(request.user_context.access_token): + # All of these now work: + # - monolith's get_workspace_client() returns OBO client + # - databricks-tools-core functions use OBO token + result = scanner.calculate_score(space_id) +``` + +For streaming generators, capture the token before the generator starts and re-enter `obo_context()` per-yield (same pattern as `backend/routers/create.py:125-198`). + +### 2. Complex Tool Schemas → `agents/creator/schemas.py` + +**Problem:** `generate_config` has 11 parameters with 4-5 nesting levels (tables → column configs, example SQLs → parameters, etc.). `@app_agent`'s schema generator only handles primitives. The monolith defines these schemas as **~580 lines of hand-written JSON** in `create_agent_tools.py` — brittle, hard to maintain, and easy to get out of sync with the runtime code. + +**Solution:** **~80 lines of Pydantic models** that auto-generate the equivalent JSON Schema via `.model_json_schema()` and double as runtime validation: + +```python +from agents.creator.schemas import GenerateConfigArgs + +@creator.tool( + description="Generate a Genie Space configuration", + parameters=GenerateConfigArgs.model_json_schema(), +) +async def generate_config(**kwargs) -> dict: + args = GenerateConfigArgs(**kwargs) # Validate at runtime +``` + +580 lines of hand-maintained JSON → 80 lines of Pydantic models. Schema and validation are always in sync because they come from the same source. + +### 3. Frontend Transparency → `agents/supervisor/proxy.py` + +**Problem:** The React SPA makes 28 API calls to `/api/*` that route to 5 different sub-agents after decomposition. The frontend should not change. + +**Solution:** Ordered route table with prefix matching, glob support for path parameters, and SSE stream detection: + +```python +ROUTE_TABLE = [ + ("/api/spaces/*/fix", "FIXER_URL"), # specific before general + ("/api/genie/create", "CREATOR_URL"), + ("/api/spaces", "SCORER_URL"), + ("/api/analyze", "ANALYZER_URL"), + ("/api/create", "CREATOR_URL"), + # ... etc +] +``` + +SSE streams are detected by `content-type: text/event-stream` and forwarded as chunked bytes. OBO headers pass through automatically. + +### 4. SP Fallback Decorator → `agents/_shared/sp_fallback.py` + +**Problem:** The `_is_scope_error()` + retry-with-SP pattern is duplicated across `genie_client.py` and `spaces.py`. Each agent that calls Genie APIs needs this pattern. + +**Solution:** `@with_sp_fallback` decorator and `genie_api_call()` convenience function: + +```python +from agents._shared.sp_fallback import genie_api_call + +# One-liner with automatic SP fallback +space = genie_api_call("GET", f"/api/2.0/genie/spaces/{space_id}", + query={"include_serialized_space": "true"}) +``` + +### 5. Shared Lakebase Pool → `agents/_shared/lakebase_client.py` + +**Problem:** Multiple agents need Lakebase (scorer for scores/stars, creator for sessions). Each runs as a separate Databricks App with its own credentials. + +**Solution:** Shared pool lifecycle + idempotent DDL per agent: + +```python +from agents._shared.lakebase_client import init_pool, SCORER_DDL + +# At startup — creates tables if they don't exist +await init_pool(SCORER_DDL) +``` + +Each agent initializes its own pool from its own env vars. Domain-specific query functions stay in each agent's module. The shared client manages pool lifecycle, credential generation, and DDL only. + +--- + +## Estimated Impact + +| Metric | Before | After | +|--------|--------|-------| +| Backend Python lines | ~10,178 | ~7,100 (30% reduction from eliminating boilerplate) | +| Files deleted | 0 | 5 (routers + utility wrappers replaced by libraries) | +| Tool definition boilerplate | ~580 lines JSON schemas | 0 (auto-generated from type hints) | +| Dispatch table code | ~40 lines | 0 (auto-routing by `@app_agent`) | +| OBO auth code | ~30 lines middleware | 0 (handled by framework) | +| Auto-generated endpoints | 0 | 30+ (5 agents × 6 endpoints each: /invocations, /health, agent.json, MCP, etc.) | +| MCP servers | 0 | 5 (one per agent, free) | +| Agent discovery | None | A2A protocol, workspace-wide | +| Eval support | Manual testing | `mlflow.genai.evaluate()` via bridge | +| Deployment | Single `databricks apps deploy` | `dbx-agent-app deploy --config agents.yaml` (per-agent or all) | + +--- + +## Verification Plan + +1. **Unit tests:** Each agent's tools can be tested independently via `agent(AgentRequest(...))` — the `@app_agent` decorator makes the handler directly callable. + +2. **Integration tests:** Deploy all agents locally (`uvicorn agents/scorer/app:app --port 8001`, etc.), configure supervisor with local URLs, run existing E2E tests. + +3. **A2A discovery:** After deploying to Databricks Apps, verify `/.well-known/agent.json` returns correct agent cards. Use `AgentDiscovery` to scan workspace. + +4. **Eval bridge:** Run `mlflow.genai.evaluate()` against each deployed agent using `app_predict_fn()`. + +5. **Frontend smoke test:** Verify React SPA still works end-to-end through the supervisor proxy. + +--- + +## Files in This PR + +### New files — scaffolds + deployment +- `docs/architecture-proposal.md` — this document +- `agents.yaml` — multi-agent deployment config +- `agents/scorer/app.py` — scorer agent scaffold +- `agents/scorer/app.yaml` — scorer Databricks Apps config +- `agents/analyzer/app.py` — analyzer agent scaffold +- `agents/analyzer/app.yaml` — analyzer Databricks Apps config +- `agents/creator/app.py` — creator agent scaffold +- `agents/creator/app.yaml` — creator Databricks Apps config +- `agents/optimizer/app.py` — optimizer agent scaffold +- `agents/optimizer/app.yaml` — optimizer Databricks Apps config +- `agents/fixer/app.py` — fixer agent scaffold +- `agents/fixer/app.yaml` — fixer Databricks Apps config + +### New files — integration challenge solutions +- `agents/_shared/__init__.py` — shared utilities package +- `agents/_shared/auth_bridge.py` — Challenge 1: OBO auth context manager bridging all 3 auth systems +- `agents/_shared/sp_fallback.py` — Challenge 4: SP fallback decorator for Genie API scope errors +- `agents/_shared/lakebase_client.py` — Challenge 5: Shared Lakebase pool with idempotent DDL +- `agents/creator/schemas.py` — Challenge 2: Pydantic models replacing ~580 lines of JSON schemas +- `agents/supervisor/__init__.py` — supervisor package +- `agents/supervisor/proxy.py` — Challenge 3: Frontend-transparent proxy with SSE support + +### Modified files +- `agents/scorer/app.py` — wired up auth_bridge, sp_fallback, and lakebase_client imports +- `agents/creator/app.py` — uses Pydantic schema override for generate_config/present_plan +- `docs/architecture-proposal.md` — replaced placeholder challenge descriptions with concrete solutions + +### No changes to existing monolith +The existing `backend/` code is untouched. All new files are additive. diff --git a/docs/genierx-spec.md b/docs/genierx-spec.md new file mode 100644 index 0000000..3c5c23f --- /dev/null +++ b/docs/genierx-spec.md @@ -0,0 +1,287 @@ +# GenieRX Specification + +## Purpose + +GenieRX is an analyzer and recommender for Genie spaces and their underlying semantic models. Its job is to: + +- Inspect how data and metrics are modeled for Genie (tables, views, metric views, knowledge store expressions, instructions). +- Classify fields into authoritative facts, canonical metrics, and heuristic signals. +- Recommend changes that align with Databricks best practices for Genie, Unity Catalog metric views, and the Genie knowledge store. + +GenieRX must never change data or semantics itself; it produces a structured review and recommendation set that humans can apply (or that other automation can implement safely). + +--- + +## 1. Core Concepts and Taxonomy + +GenieRX must reason about every field, metric, and score using the following taxonomy: + +### 1.1 Authoritative Facts + +**Definition:** +- Directly sourced from a system of record (billing, CRM, product telemetry, etc.). +- No business logic applied beyond basic cleaning (type casting, null handling). + +**Examples:** +- Transaction amounts, usage measures, timestamps from logs. +- Pipeline stages from CRM. +- Owner/segment assignments from master data. + +**GenieRX behavior:** +- Treat these as safe for Genie to query directly (tables or metric-view sources). +- Recommend surfacing them as columns, dimensions, or base measures without caveats, as long as upstream data quality is acceptable. + +### 1.2 Canonical Metrics + +**Definition:** +- Derived metrics with: + - A clear, stable SQL definition. + - Cross-team agreement (e.g., analytics, finance, ops). + - An owner who is accountable for changes. +- Examples: revenue, active users, funnel conversion, churn rate, cost per order. + +**GenieRX behavior:** +- Prefer to implement as metric view measures or knowledge-store measures/filters/dimensions, not as ad hoc SQL in Genie instructions. +- Encourage: + - Centralized definition in Unity Catalog metric views where possible. + - Short, precise names plus documentation (description + semantic metadata). +- Mark these as safe to present as "facts" in Genie answers (subject to the usual "data as of & filters" context). + +### 1.3 Heuristic Signals + +**Definition:** +- Derived fields that depend on subjective thresholds, incomplete joins, fragile text features, or evolving business rules. +- Examples: + - Coverage / gap flags based on keyword lists and spend thresholds. + - "Is_X" tags inferred via heuristic classification. + - Composite opportunity or risk scores with arbitrary buckets/weights. + - Buckets that encode assumptions about missing data or multi-tenant joins. + +**GenieRX behavior:** +- Always treat these as heuristic signals, not authoritative facts. +- Recommend: + - Implementing them as measures or filters with explicit caveats in the description and/or semantic metadata (for example, "heuristic", "approximate", "experimental"). + - Avoiding column names that imply certainty (prefer `potential_*`, `*_score`, `*_heuristic_flag`). +- When these are currently modeled as bare columns, GenieRX should: + - Flag them as high risk for misinterpretation in Genie answers. + - Suggest converting them into modeled measures/filters with clear labels and descriptions. + +--- + +## 2. Modeling Guidelines with Metric Views + +When the workspace uses Unity Catalog metric views as the semantic layer for Genie, GenieRX must evaluate and recommend according to the following patterns. + +### 2.1 Use Metric Views as the Primary Semantic Layer + +**Best practice:** +- For governed KPIs and complex aggregations, define them once as metric views and use those in: + - Genie spaces. + - Dashboards and alerts. + - SQL clients and downstream tools. + +**GenieRX should:** +- Prefer metric views over ad hoc SQL in Genie instructions when: + - Metrics are reused in many questions or dashboards. + - Correct rollup is non-trivial (ratios, distinct counts, windowed metrics, etc.). + +### 2.2 Organize Semantics into Dimensions, Measures, and Filters + +Metric views express semantics as: +- **Dimensions:** group-by attributes (e.g., account, segment, product, region, time grain). +- **Measures:** aggregated values (sum, avg, distinct count, ratios, scores). +- **Filters:** structured conditions used often for WHERE / HAVING. + +**GenieRX should:** +- Check that: + - Group-by attributes are modeled as dimensions, not repeated ad hoc in SQL. + - Key KPIs are measures, not free-floating columns. + - Common conditions ("active customers", "large orders", "priority accounts") are modeled as filters or boolean measures where appropriate. +- Recommend refactors such as: + - "Promote this repeated WHERE condition into a named filter `active_customers`." + - "Move this ratio calculation into a metric-view measure instead of recomputing it in instructions." + +### 2.3 Implement Heuristic Logic as Measures/Filters, Not Core Columns + +For heuristic signals: +- Prefer to keep raw inputs (spend, text features, joins) as authoritative columns, and encode heuristic logic as measures/filters in the metric view: + - **Measures:** scores or counts indicating likelihood, risk, or opportunity. + - **Filters:** boolean expressions such as `has_potential_gap`, `is_priority_account_heuristic`. + +**GenieRX should recommend:** +- Use descriptions and semantic metadata to mark: + - Purpose (e.g., "heuristic score to prioritize follow-up"). + - Known limitations (e.g., "sensitive to join failures; may over-count"). +- Avoid surfacing these measures as "the number of X" without caveats; instead, position them as signals. + +### 2.4 Enforce Metric-View Querying Best Practices + +Because metric views require explicit measure references: +- Queries must use the `MEASURE()` aggregate function for measures; `SELECT *` is not supported. + +**GenieRX should:** +- Check whether Genie SQL examples and instructions correctly reference measures using `MEASURE()` and: + - Flag places where raw measure columns are referenced without `MEASURE()`. + - Suggest corrected SQL patterns. + +--- + +## 3. Modeling Guidelines with the Genie Knowledge Store + +When the workspace uses Genie knowledge store features (space-level metadata, SQL expressions, entity/value mapping), GenieRX must evaluate and recommend according to these patterns. + +### 3.1 Use SQL Expressions for Structured Semantics + +The knowledge store lets authors define: +- **Measures:** KPIs and metrics with explicit SQL expressions. +- **Filters:** reusable boolean conditions. +- **Dimensions:** computed attributes for grouping or bucketing. + +**GenieRX should:** +- Encourage using SQL expressions for: + - Non-trivial metrics (ratios, distinct counts, window functions). + - Business-rule-based flags (e.g., "strategic customers", "at-risk contracts"). + - Time-derived dimensions (e.g., fiscal period, week buckets). +- Flag situations where: + - The same logic is duplicated across multiple Genie SQL examples/instructions. + - Important metrics only exist inside long-form instructions or user prompts. + +### 3.2 Align Table/Column Metadata with Business Terms + +**Best practice from Genie docs:** +- Keep spaces topic-specific and domain-focused. +- Use clear table and column descriptions and hide irrelevant or duplicate columns. + +**GenieRX should:** +- Evaluate: + - Whether key business terms are reflected in table/column descriptions and synonyms. + - Whether noisy or unused columns remain exposed to Genie. +- Recommend: + - Adding or refining descriptions to explain what measures/dimensions represent. + - Adding synonyms where business language differs from schema names. + - Hiding columns that are raw, deprecated, or confusing for business users. + +### 3.3 Distinguish Canonical vs Heuristic in Descriptions + +For each SQL expression in the knowledge store, GenieRX should: +- Classify as canonical metric or heuristic signal. +- Recommend description patterns, for example: + - **Canonical:** "Primary KPI for [domain]. Defined as ... and reviewed by [team]." + - **Heuristic:** "Heuristic score that approximates [concept]. Based on thresholds X/Y/Z and subject to misclassification. Use as prioritization signal, not as exact count." +- Suggest adding explicit notes for Genie: + - "When answering questions with this metric, briefly explain that it is a heuristic estimate." + +--- + +## 4. Genie Space Best Practices to Enforce + +GenieRX must anchor its recommendations in the official Genie best practices and internal field guidance. + +### 4.1 Scope and Data Model + +- Spaces should be topic-specific (single domain, business area, or workflow), not "kitchen sink" collections of tables. +- Use a small number of core tables or metric views with: + - Clear relationships (defined either in metric views or in knowledge store join metadata). + - Cleaned and de-duplicated columns. + +**GenieRX should:** +- Flag spaces that: + - Include many loosely related tables. + - Depend heavily on raw staging tables instead of curated or metric views. +- Recommend: + - Splitting domains into separate spaces. + - Using curated views / metric views to simplify the model. + +### 4.2 Instructions and Examples + +**Best practices include:** +- Keep instructions concise and focused on business rules and semantics, not low-level SQL formatting. +- Provide example SQL that demonstrates: + - Correct use of metric views and measures. + - Preferred filters and joins. +- Use benchmarks and validation questions to evaluate Genie performance over time. + +**GenieRX should:** +- Assess whether instructions: + - Explain how core metrics are defined and when to use them. + - Avoid unnecessary repetition and token-heavy prose. +- Recommend: + - Extracting embedded business rules from instructions into metric views and knowledge-store expressions. + - Adding or refining benchmark question sets for critical KPIs. + +--- + +## 5. GenieRX Review Workflow + +When GenieRX analyzes a space or semantic model, it should follow this high-level workflow: + +### Step 1: Inventory Sources and Semantics + +- List all data sources used by the space: + - Tables, views, metric views. + - Knowledge-store SQL expressions (measures, filters, dimensions). +- Identify all exposed fields and measures used in example SQL or benchmarks. + +### Step 2: Classify Fields Using the Taxonomy + +- For each column/measure, determine if it's an **authoritative fact**, **canonical metric**, or **heuristic signal** based on: + - Upstream SoT (billing, CRM, product, etc.). + - Presence in metric views or knowledge store. + - Use of thresholds, keyword lists, or ad hoc scoring logic. + +### Step 3: Check Alignment with Databricks Best Practices + +- **Data model:** Topic-focused, few core tables/metric views, clean joins. +- **Semantics:** Canonical metrics in metric views or knowledge-store measures/filters. +- **Instructions:** Clear, concise, oriented around business questions and metrics. +- **Evals:** Benchmarks or validation questions exist for key metrics. + +### Step 4: Generate Recommendations in Three Buckets + +**Safety & Clarity:** +- Where might Genie misrepresent heuristic signals as facts? +- Which metrics need stronger descriptions or caveats? + +**Semantic Modeling:** +- Which repeated logic should be moved into metric views or SQL expressions? +- Which filters or dimensions should be promoted into named entities? + +**Space Design:** +- Should tables/views be swapped for metric views? +- Are there irrelevant columns/tables that should be hidden? +- Are there missing joins, synonyms, or value dictionaries that would improve answer quality? + +### Step 5: Summarize in a User-Friendly Report + +For each analyzed space/model, output: + +1. **Overview** - 1-2 paragraph summary of main findings and risk level (low/medium/high). +2. **Semantic Model Assessment** - Table of key metrics/signals with: Name, type (authoritative/canonical/heuristic), grain, and notes. +3. **Recommended Changes** - Ranked list of concrete actions (e.g., "Create metric view for X", "Convert Y to heuristic measure with description", "Hide columns A/B/C"). +4. **Optional** - Suggestions for benchmarks or validation questions. + +--- + +## 6. Design Principles for GenieRX + +GenieRX should always adhere to these principles: + +- **Do not fabricate** underlying data or definitions; base assessments only on the actual space configuration, metric views, and knowledge store content. +- **Bias toward explicit semantics:** Prefer named measures/filters/dimensions over ad hoc SQL or fragile instructions. +- **Respect governance and ownership:** Highlight when changes would affect canonical metrics owned by other teams; recommend collaboration, not unilateral changes. +- **Aim for explainability:** Recommendations should be understandable to data and business owners. "Move this heuristic from a column to a measure with caveats" is better than opaque tuning. + +--- + +## Sources + +- Unity Catalog metric views | Databricks on AWS +- Build a knowledge store for more reliable Genie spaces | Databricks on AWS +- Genie Best Practices +- [Field Apps] GenieRX: a Genie analyzer / recommender +- Product Analytics (go/product-analytics) +- DAIS 2025 - UC Metrics - Discovery - Genie +- Genie Guidelines +- Genie Space - Field Engineering Guide +- Writing Effective Databricks Genie Instructions +- Genie + Metrics (FEIP-818)