From 932ed7866d34dd6ba9dc9030fd44215262349b02 Mon Sep 17 00:00:00 2001 From: Kalvin Chau Date: Fri, 26 Jun 2026 09:44:03 -0700 Subject: [PATCH] feat(buzz-agent): support databricks v2 add an explicit databricks_v2 provider that routes ai gateway requests by model family while preserving legacy databricks serving endpoint behavior. reuse databricks auth config for static tokens and oauth, and cover the v2 gateway paths with envelope tests. --- crates/buzz-agent/README.md | 7 +- crates/buzz-agent/src/config.rs | 12 +- crates/buzz-agent/src/lib.rs | 8 +- crates/buzz-agent/src/llm.rs | 126 +++++++++++++++- crates/buzz-agent/tests/databricks_oauth.rs | 150 +++++++++++++++++--- 5 files changed, 269 insertions(+), 34 deletions(-) diff --git a/crates/buzz-agent/README.md b/crates/buzz-agent/README.md index bc1d97d51..55cf6ab50 100644 --- a/crates/buzz-agent/README.md +++ b/crates/buzz-agent/README.md @@ -129,7 +129,7 @@ Everything is environment variables. No flags, no config files. (We are a subpro | Variable | Default | Notes | |---|---|---| -| `BUZZ_AGENT_PROVIDER` | — | `anthropic`, `openai`, or `databricks`. If unset, or if `anthropic`/`openai` is selected but its API key is missing, Databricks is auto-selected when `DATABRICKS_HOST` + `DATABRICKS_MODEL` are set. | +| `BUZZ_AGENT_PROVIDER` | — | `anthropic`, `openai`, `databricks`, or `databricks_v2`. If unset, or if `anthropic`/`openai` is selected but its API key is missing, Databricks is auto-selected when `DATABRICKS_HOST` + `DATABRICKS_MODEL` are set. | | `ANTHROPIC_API_KEY` | — | Required when provider=anthropic unless Databricks fallback is configured. | | `ANTHROPIC_MODEL` | — | Required when provider=anthropic. | | `ANTHROPIC_BASE_URL` | `https://api.anthropic.com` | | @@ -158,7 +158,7 @@ Everything is environment variables. No flags, no config files. (We are a subpro ## Providers -`buzz-agent` speaks two HTTP dialects. Pick with `BUZZ_AGENT_PROVIDER`. +`buzz-agent` speaks a few HTTP dialects. Pick with `BUZZ_AGENT_PROVIDER`. | Provider | `BUZZ_AGENT_PROVIDER` | Endpoint (auto) | Tested with | |---|---|---|---| @@ -170,6 +170,7 @@ Everything is environment variables. No flags, no config files. (We are a subpro | OpenRouter | `openai` | `POST {base}/chat/completions` | anything they route | | Block Gateway | `openai` | `POST {base}/chat/completions` | gpt-5, claude | | Databricks | `databricks` | `POST {host}/serving-endpoints/{model}/invocations` | goose-claude-4-6-sonnet | +| Databricks AI Gateway v2 | `databricks_v2` | `POST {host}/ai-gateway/{provider}/v1/...` | databricks-gpt-5-5, databricks-claude-opus-4-7 | If `BUZZ_AGENT_PROVIDER=anthropic` is selected without `ANTHROPIC_API_KEY`, or `BUZZ_AGENT_PROVIDER=openai` is selected without `OPENAI_COMPAT_API_KEY`, the agent automatically falls back to Databricks OAuth when `DATABRICKS_HOST` and `DATABRICKS_MODEL` are set. The same Databricks fallback applies when `BUZZ_AGENT_PROVIDER` is unset. Explicit Anthropic/OpenAI API keys always win. @@ -177,7 +178,7 @@ If `BUZZ_AGENT_PROVIDER=anthropic` is selected without `ANTHROPIC_API_KEY`, or ` By default (`OPENAI_COMPAT_API=auto`) the agent picks **Responses** when `OPENAI_COMPAT_BASE_URL` points at an `*.openai.com` host and **Chat Completions** everywhere else. Pin the choice explicitly with `OPENAI_COMPAT_API=chat` or `OPENAI_COMPAT_API=responses` for providers that diverge from the default (e.g. a Responses-compatible self-hosted gateway). -`Provider` is a Rust `enum` with one `match` in `Llm::complete`. There is no trait, no `Box`, no async-trait. Adding a third provider is a `match` arm and one `body`/`parse` pair in `llm.rs`. +`Provider` is a Rust `enum` with one `match` in `Llm::complete`. There is no trait, no `Box`, no async-trait. Adding a provider is a `match` arm and one `body`/`parse` pair in `llm.rs`. ## MCP Servers diff --git a/crates/buzz-agent/src/config.rs b/crates/buzz-agent/src/config.rs index 1d5a3df3b..9b6c596df 100644 --- a/crates/buzz-agent/src/config.rs +++ b/crates/buzz-agent/src/config.rs @@ -33,6 +33,9 @@ pub enum Provider { /// with a dynamically-acquired bearer (OAuth 2.0 PKCE, or static `DATABRICKS_TOKEN`). /// Wire format is OpenAI-chat-compatible — reuses the same body builder and parser. Databricks, + /// Databricks AI Gateway v2. Routes by model family through the gateway's + /// OpenAI Responses, Anthropic Messages, or MLflow Chat Completions paths. + DatabricksV2, } /// Which OpenAI-family HTTP API to call. Set via `OPENAI_COMPAT_API` @@ -114,8 +117,8 @@ impl Config { // bad value can't break an Anthropic-only deployment. // // Databricks borrows api_key as the *optional* `DATABRICKS_TOKEN` escape - // hatch — empty means "use OAuth PKCE." The model lives in the URL path, - // not the request body (see `EndpointStrategy::DatabricksServing`). + // hatch — empty means "use OAuth PKCE." Legacy Databricks encodes the + // model in the URL path; Databricks v2 keeps it in the request body. let (api_key, model, base_url, openai_api) = match provider { Provider::Anthropic => ( req("ANTHROPIC_API_KEY")?, @@ -137,12 +140,12 @@ impl Config { env_or("OPENAI_COMPAT_BASE_URL", "https://api.openai.com/v1"), parse_openai_api(env("OPENAI_COMPAT_API").as_deref())?, ), - Provider::Databricks => ( + Provider::Databricks | Provider::DatabricksV2 => ( env("DATABRICKS_TOKEN").unwrap_or_default(), resolve_model(databricks_model.as_deref(), buzz_agent_model.as_deref()) .ok_or_else(|| "config: DATABRICKS_MODEL required".to_string())?, databricks_host.ok_or_else(|| "config: DATABRICKS_HOST required".to_string())?, - OpenAiApi::Chat, // Databricks invocations is chat-shaped + OpenAiApi::Chat, // unused by Databricks v2; legacy Databricks is chat-shaped ), }; let system_prompt = match (env("BUZZ_AGENT_SYSTEM_PROMPT"), env("BUZZ_AGENT_SYSTEM_PROMPT_FILE")) { @@ -318,6 +321,7 @@ fn resolve_provider( "config: OPENAI_COMPAT_API_KEY required (or set DATABRICKS_HOST and DATABRICKS_MODEL for Databricks OAuth fallback)".into(), ), "databricks" => Ok(Provider::Databricks), + "databricks_v2" | "databricks-v2" => Ok(Provider::DatabricksV2), _ => Err(format!( "config: BUZZ_AGENT_PROVIDER={raw} not supported" )), diff --git a/crates/buzz-agent/src/lib.rs b/crates/buzz-agent/src/lib.rs index 953d66f5c..1636ce4c9 100644 --- a/crates/buzz-agent/src/lib.rs +++ b/crates/buzz-agent/src/lib.rs @@ -78,13 +78,13 @@ pub fn run() -> Result<(), Box> { } /// `buzz-agent auth ` — run the interactive auth flow for a -/// provider and persist the result, then exit. Today the only provider is -/// `databricks` (OAuth 2.0 PKCE). Reads `DATABRICKS_HOST` from env; needs -/// a browser on the machine. +/// provider and persist the result, then exit. Today this supports Databricks +/// OAuth 2.0 PKCE. Reads `DATABRICKS_HOST` from env; needs a browser on the +/// machine. async fn auth_subcommand(args: &[String]) -> Result<(), Box> { let provider = args.first().map(String::as_str); match provider { - Some("databricks") => { + Some("databricks" | "databricks_v2" | "databricks-v2") => { let host = std::env::var("DATABRICKS_HOST") .map_err(|_| "auth databricks: DATABRICKS_HOST required")?; let pkce = auth::PkceOAuthConfig { diff --git a/crates/buzz-agent/src/llm.rs b/crates/buzz-agent/src/llm.rs index 7ad068e61..f967f44cd 100644 --- a/crates/buzz-agent/src/llm.rs +++ b/crates/buzz-agent/src/llm.rs @@ -23,6 +23,13 @@ const MAX_LLM_ERROR_BODY_BYTES: usize = 4 * 1024; /// alongside its `_body` serializer. type OpenAiParse = fn(Value) -> Result; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DatabricksV2Route { + OpenAiResponses, + AnthropicMessages, + MlflowChatCompletions, +} + pub struct Llm { http: Client, /// One-shot sticky flag: set when a Chat Completions request comes @@ -83,6 +90,23 @@ impl Llm { }) .await } + Provider::DatabricksV2 => { + self.databricks_v2_request(cfg, |route| match route { + DatabricksV2Route::OpenAiResponses => ( + responses_body(cfg, system_prompt, history, tools), + parse_responses as OpenAiParse, + ), + DatabricksV2Route::AnthropicMessages => ( + anthropic_body(cfg, system_prompt, history, tools), + parse_anthropic as OpenAiParse, + ), + DatabricksV2Route::MlflowChatCompletions => ( + openai_body(cfg, system_prompt, history, tools), + parse_openai as OpenAiParse, + ), + }) + .await + } } } @@ -137,6 +161,46 @@ impl Llm { .await?; Ok(r.text) } + Provider::DatabricksV2 => { + let r = self + .databricks_v2_request(cfg, |route| match route { + DatabricksV2Route::OpenAiResponses => ( + json!({ + "model": cfg.model, + "max_output_tokens": max_output_tokens, + "instructions": system_prompt, + "input": user_prompt, + }), + parse_responses as OpenAiParse, + ), + DatabricksV2Route::AnthropicMessages => ( + json!({ + "model": cfg.model, + "max_tokens": max_output_tokens, + "system": system_prompt, + "messages": [{ + "role": "user", + "content": [{ "type": "text", "text": user_prompt }], + }], + }), + parse_anthropic as OpenAiParse, + ), + DatabricksV2Route::MlflowChatCompletions => ( + json!({ + "model": cfg.model, + "stream": false, + "max_completion_tokens": max_output_tokens, + "messages": [ + { "role": "system", "content": system_prompt }, + { "role": "user", "content": user_prompt }, + ], + }), + parse_openai as OpenAiParse, + ), + }) + .await?; + Ok(r.text) + } } } @@ -176,6 +240,22 @@ impl Llm { } } + async fn databricks_v2_request( + &self, + cfg: &Config, + mut build: F, + ) -> Result + where + F: FnMut(DatabricksV2Route) -> (Value, OpenAiParse) + Send, + { + let route = databricks_v2_route_for_model(&cfg.model); + let (body, parse) = build(route); + parse( + self.post_openai(cfg, databricks_v2_path(route), &body) + .await?, + ) + } + /// POST to an OpenAI-family endpoint. For OpenAI-compat this is just /// `{base_url}{path}` with the body untouched. For Databricks the URL /// becomes `{base_url}/serving-endpoints/{model}/invocations` and the @@ -517,6 +597,25 @@ fn is_responses_required_error(body: &str) -> bool { || b.contains("use the responses api") } +fn databricks_v2_route_for_model(model: &str) -> DatabricksV2Route { + let lower = model.to_ascii_lowercase(); + if lower.contains("gpt-5") || lower.contains("gpt5") { + DatabricksV2Route::OpenAiResponses + } else if lower.contains("claude") { + DatabricksV2Route::AnthropicMessages + } else { + DatabricksV2Route::MlflowChatCompletions + } +} + +fn databricks_v2_path(route: DatabricksV2Route) -> &'static str { + match route { + DatabricksV2Route::OpenAiResponses => "/ai-gateway/openai/v1/responses", + DatabricksV2Route::AnthropicMessages => "/ai-gateway/anthropic/v1/messages", + DatabricksV2Route::MlflowChatCompletions => "/ai-gateway/mlflow/v1/chat/completions", + } +} + fn parse_responses(v: Value) -> Result { let mut text = String::new(); let mut tool_calls = Vec::new(); @@ -886,7 +985,7 @@ fn build_token_source(cfg: &Config) -> Result, AgentError> Provider::Anthropic | Provider::OpenAi => { Ok(Arc::new(StaticTokenSource::new(cfg.api_key.clone()))) } - Provider::Databricks => { + Provider::Databricks | Provider::DatabricksV2 => { if !cfg.api_key.is_empty() { return Ok(Arc::new(StaticTokenSource::new(cfg.api_key.clone()))); } @@ -1199,6 +1298,31 @@ mod tests { } } + #[test] + fn databricks_v2_routes_by_model_family() { + for (model, route, path) in [ + ( + "databricks-gpt-5-5", + DatabricksV2Route::OpenAiResponses, + "/ai-gateway/openai/v1/responses", + ), + ( + "databricks-claude-opus-4-7", + DatabricksV2Route::AnthropicMessages, + "/ai-gateway/anthropic/v1/messages", + ), + ( + "custom-tool-model", + DatabricksV2Route::MlflowChatCompletions, + "/ai-gateway/mlflow/v1/chat/completions", + ), + ] { + let got = databricks_v2_route_for_model(model); + assert_eq!(got, route, "model={model}"); + assert_eq!(databricks_v2_path(got), path, "model={model}"); + } + } + #[test] fn parse_responses_rejects_malformed_function_arguments() { let v = serde_json::json!({ diff --git a/crates/buzz-agent/tests/databricks_oauth.rs b/crates/buzz-agent/tests/databricks_oauth.rs index 01169b471..6366cbdd7 100644 --- a/crates/buzz-agent/tests/databricks_oauth.rs +++ b/crates/buzz-agent/tests/databricks_oauth.rs @@ -362,7 +362,7 @@ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpListener; use tokio::sync::Mutex; -#[derive(Debug)] +#[derive(Clone, Debug)] struct CapturedRequest { path: String, authorization: Option, @@ -466,10 +466,10 @@ impl Drop for AgentHarness { } impl AgentHarness { - async fn spawn_databricks(base_url: &str, model: &str) -> Self { + async fn spawn_provider(provider: &str, base_url: &str, model: &str) -> Self { let bin = env!("CARGO_BIN_EXE_buzz-agent"); let mut cmd = tokio::process::Command::new(bin); - cmd.env("BUZZ_AGENT_PROVIDER", "databricks") + cmd.env("BUZZ_AGENT_PROVIDER", provider) .env("DATABRICKS_HOST", base_url) .env("DATABRICKS_MODEL", model) .env("DATABRICKS_TOKEN", "test-bearer") @@ -521,23 +521,8 @@ impl AgentHarness { } } -#[tokio::test] -async fn databricks_envelope_routes_through_serving_endpoints_and_strips_model() { - // One canned chat-completions-shaped response → assistant says "ok" - // with end_turn so the agent loop exits cleanly. - let canned = vec![json!({ - "id": "x", - "object": "chat.completion", - "choices": [{ - "index": 0, - "message": { "role": "assistant", "content": "ok" }, - "finish_reason": "stop" - }] - })]; - let (base, captured) = spawn_capturing_server(canned).await; - - let model = "goose-claude-4-6-sonnet"; - let mut h = AgentHarness::spawn_databricks(&base, model).await; +async fn run_single_prompt(provider: &str, base: &str, model: &str) { + let mut h = AgentHarness::spawn_provider(provider, base, model).await; h.send( "initialize", json!({ "protocolVersion": 1, "clientCapabilities": {} }), @@ -554,13 +539,39 @@ async fn databricks_envelope_routes_through_serving_endpoints_and_strips_model() ) .await; let _ = h.recv_for(3).await; +} + +async fn run_captured_prompt( + provider: &str, + model: &str, + canned: Vec, +) -> CapturedRequest { + let (base, captured) = spawn_capturing_server(canned).await; + run_single_prompt(provider, &base, model).await; let reqs = captured.lock().await; assert_eq!(reqs.len(), 1, "expected exactly one LLM request"); - let req = &reqs[0]; + reqs[0].clone() +} + +#[tokio::test] +async fn databricks_envelope_routes_through_serving_endpoints_and_strips_model() { + // One canned chat-completions-shaped response → assistant says "ok" + // with end_turn so the agent loop exits cleanly. + let canned = vec![json!({ + "id": "x", + "object": "chat.completion", + "choices": [{ + "index": 0, + "message": { "role": "assistant", "content": "ok" }, + "finish_reason": "stop" + }] + })]; + let model = "goose-claude-4-6-sonnet"; + let req = run_captured_prompt("databricks", model, canned).await; assert_eq!( - req.path, + req.path.as_str(), format!("/serving-endpoints/{model}/invocations"), "Databricks must route to serving-endpoints/{{model}}/invocations" ); @@ -583,3 +594,98 @@ async fn databricks_envelope_routes_through_serving_endpoints_and_strips_model() "request body should keep the chat `messages` field" ); } + +#[tokio::test] +async fn databricks_v2_gpt5_routes_through_ai_gateway_responses() { + let canned = vec![json!({ + "status": "completed", + "output": [{ + "type": "message", + "content": [{ "type": "output_text", "text": "ok" }] + }] + })]; + let model = "databricks-gpt-5-5"; + let req = run_captured_prompt("databricks_v2", model, canned).await; + + assert_eq!( + req.path.as_str(), + "/ai-gateway/openai/v1/responses", + "Databricks v2 GPT-5 models must route through AI Gateway Responses" + ); + assert_eq!( + req.authorization.as_deref(), + Some("Bearer test-bearer"), + "Authorization must be the static DATABRICKS_TOKEN as a Bearer" + ); + assert_eq!(req.body["model"], model); + assert!( + req.body.get("input").and_then(|v| v.as_array()).is_some(), + "Responses request body should keep `input`: {:?}", + req.body + ); +} + +#[tokio::test] +async fn databricks_v2_claude_routes_through_ai_gateway_anthropic_messages() { + let canned = vec![json!({ + "stop_reason": "end_turn", + "content": [{ "type": "text", "text": "ok" }] + })]; + let model = "databricks-claude-opus-4-7"; + let req = run_captured_prompt("databricks_v2", model, canned).await; + + assert_eq!( + req.path.as_str(), + "/ai-gateway/anthropic/v1/messages", + "Databricks v2 Claude models must route through AI Gateway Anthropic Messages" + ); + assert_eq!( + req.authorization.as_deref(), + Some("Bearer test-bearer"), + "Authorization must be the static DATABRICKS_TOKEN as a Bearer" + ); + assert_eq!(req.body["model"], model); + assert!( + req.body + .get("messages") + .and_then(|v| v.as_array()) + .is_some(), + "Anthropic request body should keep `messages`: {:?}", + req.body + ); +} + +#[tokio::test] +async fn databricks_v2_other_models_route_through_ai_gateway_mlflow_chat() { + let canned = vec![json!({ + "id": "x", + "object": "chat.completion", + "choices": [{ + "index": 0, + "message": { "role": "assistant", "content": "ok" }, + "finish_reason": "stop" + }] + })]; + let model = "custom-chat-model"; + let req = run_captured_prompt("databricks_v2", model, canned).await; + + assert_eq!( + req.path.as_str(), + "/ai-gateway/mlflow/v1/chat/completions", + "Databricks v2 fallback models must route through AI Gateway MLflow Chat" + ); + assert_eq!( + req.authorization.as_deref(), + Some("Bearer test-bearer"), + "Authorization must be the static DATABRICKS_TOKEN as a Bearer" + ); + assert_eq!(req.body["model"], model); + assert!( + req.body + .get("messages") + .and_then(|v| v.as_array()) + .is_some(), + "Chat request body should keep `messages`: {:?}", + req.body + ); +}