Skip to content

Commit bc4e923

Browse files
committed
Refactor SPOG header injection: use transport wrapper, drop telemetry API churn
Replace the per-function extraHeaders parameter threading (added during the previous merge with main) with a minimal http.Client wrapper in the top-level dbsql package. Before (5 files, including 3 in telemetry/*): connector.go extracted ?o=<workspaceId>, then passed it as an ExtraHeaders field through telemetry.TelemetryInitOptions → isTelemetryEnabled → featureFlagCache.isTelemetryEnabled → fetchFeatureFlag, where it was applied to the outbound request. The telemetry push path (telemetry/exporter.go) was NOT covered. After (2 files — connector.go and auth/oauth/u2m/u2m.go): connector.go extracts ?o= as before, but now wraps the driver's *http.Client with a headerInjectingTransport that sets the SPOG header on every outbound request through that client. Passes the wrapped client (not c.client) into TelemetryInitOptions.HTTPClient. Advantages: - telemetry/*.go files revert to identical-to-main. No API churn. - Both feature-flag and telemetry-push paths automatically get the SPOG header (previously only feature-flag did). - Future HTTP paths that reuse the telemetry http.Client inherit SPOG routing for free. Thrift is unaffected: it uses c.client directly (not the wrapper) and routes via ?o= in the URL path. The transport wrapper is only applied to the HTTP client handed to telemetry. Signed-off-by: Madhavendra Rathore Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 32c376e commit bc4e923

4 files changed

Lines changed: 61 additions & 23 deletions

File tree

connector.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,26 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
8181
}
8282
log := logger.WithContext(conn.id, driverctx.CorrelationIdFromContext(ctx), "")
8383

84-
// Extract SPOG routing headers from ?o= in HTTPPath
85-
spogHeaders := extractSpogHeaders(c.cfg.HTTPPath)
84+
// Extract SPOG routing headers from ?o= in HTTPPath. When ?o=<workspaceId>
85+
// is present (Custom URL / SPOG hosts), wrap the HTTP client used for
86+
// telemetry + feature-flag calls with a transport that injects
87+
// x-databricks-org-id. Thrift routes via the URL so its own c.client
88+
// doesn't need wrapping.
89+
telemetryClient := c.client
90+
if spogHeaders := extractSpogHeaders(c.cfg.HTTPPath); len(spogHeaders) > 0 {
91+
telemetryClient = withSpogHeaders(c.client, spogHeaders)
92+
}
8693

8794
// Initialize telemetry: client config overlay decides; if unset, feature flags decide
8895
conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{
8996
Host: c.cfg.Host,
9097
DriverVersion: c.cfg.DriverVersion,
91-
HTTPClient: c.client,
98+
HTTPClient: telemetryClient,
9299
EnableTelemetry: c.cfg.EnableTelemetry,
93100
BatchSize: c.cfg.TelemetryBatchSize,
94101
FlushInterval: c.cfg.TelemetryFlushInterval,
95102
RetryCount: c.cfg.TelemetryRetryCount,
96103
RetryDelay: c.cfg.TelemetryRetryDelay,
97-
ExtraHeaders: spogHeaders,
98104
})
99105
if conn.telemetry != nil {
100106
log.Debug().Msg("telemetry initialized for connection")
@@ -171,6 +177,51 @@ func extractSpogHeaders(httpPath string) map[string]string {
171177
return map[string]string{"x-databricks-org-id": orgID}
172178
}
173179

180+
// withSpogHeaders returns a new *http.Client that reuses the transport of the
181+
// provided client, wrapped to inject the given SPOG headers on every outbound
182+
// request. The original client is left unchanged. If a request already has a
183+
// given header set (e.g., the caller set it explicitly), the wrapper does not
184+
// override it.
185+
//
186+
// This is how the driver gets x-databricks-org-id onto both the feature-flag
187+
// check and the telemetry push without touching the telemetry package's
188+
// signatures.
189+
func withSpogHeaders(base *http.Client, headers map[string]string) *http.Client {
190+
baseTransport := base.Transport
191+
if baseTransport == nil {
192+
baseTransport = http.DefaultTransport
193+
}
194+
return &http.Client{
195+
Transport: &headerInjectingTransport{
196+
base: baseTransport,
197+
headers: headers,
198+
},
199+
CheckRedirect: base.CheckRedirect,
200+
Jar: base.Jar,
201+
Timeout: base.Timeout,
202+
}
203+
}
204+
205+
// headerInjectingTransport wraps an http.RoundTripper and sets a fixed set of
206+
// headers on every outbound request. Caller-supplied headers with the same
207+
// name are not overridden.
208+
type headerInjectingTransport struct {
209+
base http.RoundTripper
210+
headers map[string]string
211+
}
212+
213+
// RoundTrip implements http.RoundTripper.
214+
func (t *headerInjectingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
215+
// Clone per RoundTripper contract — must not mutate the caller's request.
216+
req2 := req.Clone(req.Context())
217+
for k, v := range t.headers {
218+
if req2.Header.Get(k) == "" {
219+
req2.Header.Set(k, v)
220+
}
221+
}
222+
return t.base.RoundTrip(req2)
223+
}
224+
174225
func withUserConfig(ucfg config.UserConfig) ConnOption {
175226
return func(c *config.Config) {
176227
c.UserConfig = ucfg

telemetry/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ func ParseTelemetryConfig(params map[string]string) *Config {
126126
// (databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver).
127127
//
128128
// In all other cases — explicit opt-out or server flag absent/unreachable — returns false.
129-
func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, httpClient *http.Client, extraHeaders map[string]string) bool {
129+
func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, httpClient *http.Client) bool {
130130
if cfg.EnableTelemetry != nil {
131131
return *cfg.EnableTelemetry
132132
}
133133

134-
serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, httpClient, extraHeaders)
134+
serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, httpClient)
135135
if err != nil {
136136
return false
137137
}

telemetry/driver_integration.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,6 @@ type TelemetryInitOptions struct {
4141

4242
// RetryDelay is the base delay between retries (0 = use default 100ms).
4343
RetryDelay time.Duration
44-
45-
// ExtraHeaders are additional HTTP headers to attach to feature-flag
46-
// check requests and telemetry-push requests. Primarily used to carry
47-
// x-databricks-org-id for SPOG (Custom URL) workspace routing — see
48-
// extractSpogHeaders in the top-level dbsql package.
49-
//
50-
// May be nil.
51-
ExtraHeaders map[string]string
5244
}
5345

5446
// InitializeForConnection initializes telemetry for a database connection.
@@ -86,7 +78,7 @@ func InitializeForConnection(ctx context.Context, opts TelemetryInitOptions) *In
8678
flagCache.getOrCreateContext(opts.Host)
8779

8880
// Check if telemetry should be enabled
89-
enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.HTTPClient, opts.ExtraHeaders)
81+
enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.HTTPClient)
9082
if !enabled {
9183
flagCache.releaseContext(opts.Host)
9284
return nil

telemetry/featureflag.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (c *featureFlagCache) releaseContext(host string) {
8686

8787
// isTelemetryEnabled checks if telemetry is enabled for the host.
8888
// Uses cached value if available and not expired.
89-
func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, httpClient *http.Client, extraHeaders map[string]string) (bool, error) {
89+
func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) {
9090
c.mu.RLock()
9191
flagCtx, exists := c.contexts[host]
9292
c.mu.RUnlock()
@@ -135,7 +135,7 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string,
135135
flagCtx.mu.Unlock()
136136

137137
// Fetch fresh value (outside lock so other readers are not blocked).
138-
enabled, err := fetchFeatureFlag(ctx, host, driverVersion, httpClient, extraHeaders)
138+
enabled, err := fetchFeatureFlag(ctx, host, driverVersion, httpClient)
139139

140140
// Update cache.
141141
flagCtx.mu.Lock()
@@ -166,7 +166,7 @@ func (c *featureFlagContext) isExpired() bool {
166166
}
167167

168168
// fetchFeatureFlag fetches the feature flag value from Databricks.
169-
func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, httpClient *http.Client, extraHeaders map[string]string) (bool, error) {
169+
func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) {
170170
// Add timeout to context if it doesn't have a deadline
171171
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
172172
var cancel context.CancelFunc
@@ -183,11 +183,6 @@ func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, ht
183183
return false, fmt.Errorf("failed to create feature flag request: %w", err)
184184
}
185185

186-
// Attach extra headers (e.g. x-databricks-org-id for SPOG routing).
187-
for k, v := range extraHeaders {
188-
req.Header.Set(k, v)
189-
}
190-
191186
resp, err := httpClient.Do(req)
192187
if err != nil {
193188
return false, fmt.Errorf("failed to fetch feature flag: %w", err)

0 commit comments

Comments
 (0)