Skip to content

Commit 12c3f2a

Browse files
Add SPOG (Custom URL) routing support via x-databricks-org-id header (#347)
## Summary On SPOG (Custom URL / account-level) workspaces, `httpPath` has the form `/sql/1.0/warehouses/<id>?o=<workspaceId>`. The `?o=` parameter routes Thrift calls correctly via the URL, but other endpoints (telemetry push, feature-flag check) run on separate hosts and need `x-databricks-org-id` as an HTTP header to route to the right workspace. Without it, those requests 404 or get misrouted on SPOG hosts. ## Change All contained in `connector.go`: 1. `extractSpogHeaders(httpPath string) map[string]string` — parses the `?o=` query param using `url.ParseQuery` (stdlib, not regex). Returns `{"x-databricks-org-id": "<workspaceId>"}` or `nil`. Three DEBUG log paths cover: malformed query, missing `?o=`, and successful extraction. 2. `headerInjectingTransport` — a lightweight `http.RoundTripper` wrapper that clones the request per the contract and sets the provided headers if not already set by the caller. 3. `withSpogHeaders(base *http.Client, headers map[string]string) *http.Client` — returns a new client with the same settings but a wrapped transport. 4. In `Connect()`, when `extractSpogHeaders` returns non-nil, the driver passes a wrapped client into `TelemetryInitOptions.HTTPClient`. The wrapped client is used for both the feature-flag check and the telemetry push. The driver's own `c.client` is left alone, so Thrift routing (which uses `?o=` in the URL) is unaffected. ## Why a transport wrapper instead of threading a parameter An earlier revision of this PR threaded an `extraHeaders` parameter through `telemetry.TelemetryInitOptions` → `isTelemetryEnabled` → `featureFlagCache.isTelemetryEnabled` → `fetchFeatureFlag`. That approach: - Required API-surface changes in 3 telemetry files (`config.go`, `featureflag.go`, `driver_integration.go`). - Only covered the feature-flag check; `telemetry/exporter.go` (telemetry push) still sent `Content-Type` as the only header — SPOG routing would 404 at push time. The RoundTripper wrapper: - Keeps `telemetry/*` identical to `origin/main`. Zero API churn. - Automatically applies to every outbound request using the wrapped client — feature-flag check, telemetry push, and any future HTTP paths that reuse it. - Respects caller-set headers (if a request already has `x-databricks-org-id` for some reason, the wrapper does not override). ## Endpoints covered | Endpoint | Uses wrapped client? | Gets `x-databricks-org-id`? | | :---- | :----: | :----: | | Feature flags `/api/2.0/connector-service/feature-flags/GOLANG/{v}` | Yes | ✅ | | Telemetry push | Yes | ✅ | | Thrift | No (uses `c.client` directly; already routes via URL) | — (not needed — URL-routed) | | OAuth token exchange | No (separate client, talks to `login.microsoftonline.com`) | — (not needed) | | CloudFetch / Volume operations | No (presigned URLs) | — (not needed) | ## Verification - `go build ./...` clean. - Debug log output confirms extraction on SPOG URLs: ``` SPOG header extraction: injecting x-databricks-org-id=<id> (extracted from ?o= in httpPath) ``` ## Note on the earlier `auth/oauth/u2m/u2m.go` change The two earliest commits on this branch (`23697e5`, `0ec7e06`) modified `auth/oauth/u2m/u2m.go` to avoid sending an empty `client_secret` on the PKCE public-app flow, documented as a fix for the server's `"Public app should not use a client secret"` rejection. **That change was empirically verified to not be needed** (see commit `3576c92` which reverts it): - **Prod Legacy** (`adb-6436897454825492.12.azuredatabricks.net`): Unpatched `u2m.go` PASSES end-to-end. Server accepts the empty `client_secret`. - **Stg Legacy** (`adb-7064161269814046.2.staging.azuredatabricks.net`): Unpatched `u2m.go` FAILS with `unexpected HTTP status 400 Bad Request` during token exchange. Since production tolerates the current behavior, customers aren't impacted. Keeping this PR minimal; if staging-level strictness later rolls out to prod, we can re-add the u2m fix then. ## Test plan - [ ] Unit tests pass (`go test ./...`) - [ ] Manually verified SPOG header injection appears in `x-databricks-org-id` on feature-flag check and telemetry push against a SPOG workspace (not gated by this PR, but prerequisite for SPOG telemetry to work at all) - [ ] No regressions on non-SPOG workspaces (wrapper is a no-op when `extractSpogHeaders` returns nil) --------- Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> Signed-off-by: Madhavendra Rathore Co-authored-by: Samikshya Chand <148681192+samikshya-db@users.noreply.github.com>
1 parent 3c0f7e4 commit 12c3f2a

2 files changed

Lines changed: 261 additions & 1 deletion

File tree

connector.go

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"database/sql/driver"
77
"fmt"
88
"net/http"
9+
"net/url"
910
"strings"
1011
"time"
1112

@@ -80,11 +81,21 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
8081
}
8182
log := logger.WithContext(conn.id, driverctx.CorrelationIdFromContext(ctx), "")
8283

84+
// Extract SPOG routing headers from ?o= in HTTPPath. When ?o=<workspaceId>
85+
// is present (Custom URL / SPOG hosts), wrap the HTTP client used for
86+
// telemetry + feature-flag calls with a transport that injects
87+
// x-databricks-org-id. Thrift routes via the URL so its own c.client
88+
// doesn't need wrapping.
89+
telemetryClient := c.client
90+
if spogHeaders := extractSpogHeaders(c.cfg.HTTPPath); len(spogHeaders) > 0 {
91+
telemetryClient = withSpogHeaders(c.client, spogHeaders)
92+
}
93+
8394
// Initialize telemetry: client config overlay decides; if unset, feature flags decide
8495
conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{
8596
Host: c.cfg.Host,
8697
DriverVersion: c.cfg.DriverVersion,
87-
HTTPClient: c.client,
98+
HTTPClient: telemetryClient,
8899
EnableTelemetry: c.cfg.EnableTelemetry,
89100
BatchSize: c.cfg.TelemetryBatchSize,
90101
FlushInterval: c.cfg.TelemetryFlushInterval,
@@ -126,6 +137,91 @@ func NewConnector(options ...ConnOption) (driver.Connector, error) {
126137
return &connector{cfg: cfg, client: client}, nil
127138
}
128139

140+
// extractSpogHeaders extracts ?o=<workspaceId> from httpPath and returns
141+
// an x-databricks-org-id header for SPOG routing.
142+
//
143+
// On SPOG (Custom URL) workspaces, httpPath is of the form
144+
// /sql/1.0/warehouses/<id>?o=<workspaceId>. The ?o= parameter keeps Thrift
145+
// requests routed to the correct workspace via the URL itself, but other
146+
// endpoints (telemetry, feature flags) run on separate hosts and need the
147+
// x-databricks-org-id header. This function extracts ?o= from httpPath once
148+
// and returns it so those paths can inject it as an HTTP header.
149+
//
150+
// Returns nil if:
151+
// - httpPath has no query string ("?"), or
152+
// - the query string is malformed and can't be parsed, or
153+
// - the ?o= parameter is missing or empty.
154+
func extractSpogHeaders(httpPath string) map[string]string {
155+
if !strings.Contains(httpPath, "?") {
156+
return nil
157+
}
158+
// Parse query string from httpPath
159+
parts := strings.SplitN(httpPath, "?", 2)
160+
params, err := url.ParseQuery(parts[1])
161+
if err != nil {
162+
logger.Debug().Msgf(
163+
"SPOG header extraction: malformed query string in httpPath, skipping org-id extraction: %s",
164+
err)
165+
return nil
166+
}
167+
orgID := params.Get("o")
168+
if orgID == "" {
169+
logger.Debug().Msg(
170+
"SPOG header extraction: httpPath has query string but no ?o= param, " +
171+
"skipping x-databricks-org-id injection")
172+
return nil
173+
}
174+
logger.Debug().Msgf(
175+
"SPOG header extraction: injecting x-databricks-org-id=%s (extracted from ?o= in httpPath)",
176+
orgID)
177+
return map[string]string{"x-databricks-org-id": orgID}
178+
}
179+
180+
// withSpogHeaders returns a new *http.Client that reuses the transport of the
181+
// provided client, wrapped to inject the given SPOG headers on every outbound
182+
// request. The original client is left unchanged. If a request already has a
183+
// given header set (e.g., the caller set it explicitly), the wrapper does not
184+
// override it.
185+
//
186+
// This is how the driver gets x-databricks-org-id onto both the feature-flag
187+
// check and the telemetry push without touching the telemetry package's
188+
// signatures.
189+
func withSpogHeaders(base *http.Client, headers map[string]string) *http.Client {
190+
baseTransport := base.Transport
191+
if baseTransport == nil {
192+
baseTransport = http.DefaultTransport
193+
}
194+
return &http.Client{
195+
Transport: &headerInjectingTransport{
196+
base: baseTransport,
197+
headers: headers,
198+
},
199+
CheckRedirect: base.CheckRedirect,
200+
Jar: base.Jar,
201+
Timeout: base.Timeout,
202+
}
203+
}
204+
205+
// headerInjectingTransport wraps an http.RoundTripper and sets a fixed set of
206+
// headers on every outbound request. Caller-supplied headers with the same
207+
// name are not overridden.
208+
type headerInjectingTransport struct {
209+
base http.RoundTripper
210+
headers map[string]string
211+
}
212+
213+
// RoundTrip implements http.RoundTripper.
214+
func (t *headerInjectingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
215+
// Clone per RoundTripper contract — must not mutate the caller's request.
216+
req2 := req.Clone(req.Context())
217+
for k, v := range t.headers {
218+
if req2.Header.Get(k) == "" {
219+
req2.Header.Set(k, v)
220+
}
221+
}
222+
return t.base.RoundTrip(req2)
223+
}
224+
129225
func withUserConfig(ucfg config.UserConfig) ConnOption {
130226
return func(c *config.Config) {
131227
c.UserConfig = ucfg

connector_spog_test.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package dbsql
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"net/http/httptest"
7+
"strings"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestExtractSpogHeaders(t *testing.T) {
15+
tests := []struct {
16+
name string
17+
httpPath string
18+
want map[string]string
19+
}{
20+
{
21+
name: "no query string returns nil",
22+
httpPath: "/sql/1.0/warehouses/abc123",
23+
want: nil,
24+
},
25+
{
26+
name: "empty httpPath returns nil",
27+
httpPath: "",
28+
want: nil,
29+
},
30+
{
31+
name: "query string with o= extracts org id",
32+
httpPath: "/sql/1.0/warehouses/abc123?o=7064161269814046",
33+
want: map[string]string{"x-databricks-org-id": "7064161269814046"},
34+
},
35+
{
36+
name: "query string without o= returns nil",
37+
httpPath: "/sql/1.0/warehouses/abc123?other=val",
38+
want: nil,
39+
},
40+
{
41+
name: "empty o= value returns nil",
42+
httpPath: "/sql/1.0/warehouses/abc123?o=",
43+
want: nil,
44+
},
45+
{
46+
name: "o= among multiple params extracts correctly",
47+
httpPath: "/sql/1.0/warehouses/abc?foo=1&o=12345&bar=2",
48+
want: map[string]string{"x-databricks-org-id": "12345"},
49+
},
50+
{
51+
name: "first o= wins when duplicated",
52+
httpPath: "/sql/1.0/warehouses/abc?o=first&o=second",
53+
want: map[string]string{"x-databricks-org-id": "first"},
54+
},
55+
{
56+
name: "just ? with nothing after returns nil",
57+
httpPath: "/sql/1.0/warehouses/abc?",
58+
want: nil,
59+
},
60+
}
61+
for _, tc := range tests {
62+
t.Run(tc.name, func(t *testing.T) {
63+
got := extractSpogHeaders(tc.httpPath)
64+
assert.Equal(t, tc.want, got)
65+
})
66+
}
67+
}
68+
69+
func TestHeaderInjectingTransport_InjectsHeader(t *testing.T) {
70+
var gotHeader string
71+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
72+
gotHeader = r.Header.Get("x-databricks-org-id")
73+
w.WriteHeader(http.StatusOK)
74+
}))
75+
defer srv.Close()
76+
77+
client := withSpogHeaders(&http.Client{}, map[string]string{
78+
"x-databricks-org-id": "7064161269814046",
79+
})
80+
81+
req, err := http.NewRequest("GET", srv.URL, nil)
82+
require.NoError(t, err)
83+
resp, err := client.Do(req)
84+
require.NoError(t, err)
85+
_, _ = io.Copy(io.Discard, resp.Body)
86+
_ = resp.Body.Close()
87+
88+
assert.Equal(t, "7064161269814046", gotHeader, "SPOG header should be injected")
89+
}
90+
91+
func TestHeaderInjectingTransport_DoesNotOverrideCallerSet(t *testing.T) {
92+
var gotHeader string
93+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
94+
gotHeader = r.Header.Get("x-databricks-org-id")
95+
w.WriteHeader(http.StatusOK)
96+
}))
97+
defer srv.Close()
98+
99+
client := withSpogHeaders(&http.Client{}, map[string]string{
100+
"x-databricks-org-id": "from-wrapper",
101+
})
102+
103+
req, err := http.NewRequest("GET", srv.URL, nil)
104+
require.NoError(t, err)
105+
req.Header.Set("x-databricks-org-id", "from-caller")
106+
resp, err := client.Do(req)
107+
require.NoError(t, err)
108+
_, _ = io.Copy(io.Discard, resp.Body)
109+
_ = resp.Body.Close()
110+
111+
assert.Equal(t, "from-caller", gotHeader, "caller-set header must not be overridden")
112+
}
113+
114+
func TestHeaderInjectingTransport_PreservesOtherHeaders(t *testing.T) {
115+
var gotAuth, gotSpog, gotCustom string
116+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
117+
gotAuth = r.Header.Get("Authorization")
118+
gotSpog = r.Header.Get("x-databricks-org-id")
119+
gotCustom = r.Header.Get("X-Custom")
120+
w.WriteHeader(http.StatusOK)
121+
}))
122+
defer srv.Close()
123+
124+
client := withSpogHeaders(&http.Client{}, map[string]string{
125+
"x-databricks-org-id": "abc",
126+
})
127+
128+
req, err := http.NewRequest("GET", srv.URL, nil)
129+
require.NoError(t, err)
130+
req.Header.Set("Authorization", "Bearer xxx")
131+
req.Header.Set("X-Custom", "hello")
132+
resp, err := client.Do(req)
133+
require.NoError(t, err)
134+
_, _ = io.Copy(io.Discard, resp.Body)
135+
_ = resp.Body.Close()
136+
137+
assert.Equal(t, "Bearer xxx", gotAuth)
138+
assert.Equal(t, "hello", gotCustom)
139+
assert.Equal(t, "abc", gotSpog)
140+
}
141+
142+
func TestWithSpogHeaders_OriginalClientUntouched(t *testing.T) {
143+
originalTransport := &countingTransport{}
144+
original := &http.Client{Transport: originalTransport}
145+
146+
wrapped := withSpogHeaders(original, map[string]string{"x-databricks-org-id": "x"})
147+
148+
// Original client's transport should NOT be the wrapper type.
149+
_, isWrapped := original.Transport.(*headerInjectingTransport)
150+
assert.False(t, isWrapped, "original client's transport must not be mutated")
151+
152+
// Wrapped client MUST have the wrapper.
153+
_, isWrapped = wrapped.Transport.(*headerInjectingTransport)
154+
assert.True(t, isWrapped, "wrapped client must have headerInjectingTransport")
155+
}
156+
157+
type countingTransport struct {
158+
count int
159+
}
160+
161+
func (c *countingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
162+
c.count++
163+
return &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(""))}, nil
164+
}

0 commit comments

Comments
 (0)