Skip to content

Commit ac2d0ac

Browse files
committed
Address PR review feedback: fix critical/high telemetry issues
- Fix double-close panic: add sync.Once to aggregator.close() - Fix Interceptor.Close() closing shared aggregator: only flush, not close - Bound export goroutines with semaphore (cap 8) in flushUnlocked - Use cancellable context in flushLoop so exports stop on shutdown - Change UserConfig.EnableTelemetry from bool to ConfigValue[bool] so explicit false in DSN properly disables telemetry (not treated as unset) - Fix classifyError to use ordered slice for deterministic classification Co-authored-by: samikshya-chand_data
1 parent 75f0779 commit ac2d0ac

File tree

6 files changed

+58
-38
lines changed

6 files changed

+58
-38
lines changed

connector.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,12 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
7676
}
7777
log := logger.WithContext(conn.id, driverctx.CorrelationIdFromContext(ctx), "")
7878

79-
// Initialize telemetry: pass user opt-in flag; if unset, feature flags decide
80-
var enableTelemetry *bool
81-
if c.cfg.EnableTelemetry {
82-
trueVal := true
83-
enableTelemetry = &trueVal
84-
}
85-
79+
// Initialize telemetry: client config overlay decides; if unset, feature flags decide
8680
conn.telemetry = telemetry.InitializeForConnection(
8781
ctx,
8882
c.cfg.Host,
8983
c.client,
90-
enableTelemetry,
84+
c.cfg.EnableTelemetry,
9185
)
9286
if conn.telemetry != nil {
9387
log.Debug().Msg("telemetry initialized for connection")

internal/config/config.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,10 @@ type UserConfig struct {
9999
RetryWaitMax time.Duration
100100
RetryMax int
101101
// Telemetry configuration
102-
EnableTelemetry bool // Opt-in for telemetry; follows client > server > default priority
103-
Transport http.RoundTripper
102+
// Uses config overlay pattern: client > server > default.
103+
// Unset = check server feature flag; explicitly true/false overrides the server.
104+
EnableTelemetry ConfigValue[bool]
105+
Transport http.RoundTripper
104106
UseLz4Compression bool
105107
EnableMetricViewMetadata bool
106108
CloudFetchConfig
@@ -290,7 +292,7 @@ func ParseDSN(dsn string) (UserConfig, error) {
290292
if err != nil {
291293
return UserConfig{}, err
292294
}
293-
ucfg.EnableTelemetry = enableTelemetry
295+
ucfg.EnableTelemetry = NewConfigValue(enableTelemetry)
294296
}
295297

296298
// for timezone we do a case insensitive key match.

telemetry/aggregator.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ type metricsAggregator struct {
2020
flushInterval time.Duration
2121
stopCh chan struct{}
2222
flushTimer *time.Ticker
23+
24+
closeOnce sync.Once
25+
ctx context.Context // Cancellable context for in-flight exports
26+
cancel context.CancelFunc // Cancels ctx on close
27+
exportSem chan struct{} // Bounds concurrent export goroutines
2328
}
2429

2530
// statementMetrics holds aggregated metrics for a statement.
@@ -38,13 +43,17 @@ type statementMetrics struct {
3843

3944
// newMetricsAggregator creates a new metrics aggregator.
4045
func newMetricsAggregator(exporter *telemetryExporter, cfg *Config) *metricsAggregator {
46+
ctx, cancel := context.WithCancel(context.Background())
4147
agg := &metricsAggregator{
4248
statements: make(map[string]*statementMetrics),
4349
batch: make([]*telemetryMetric, 0, cfg.BatchSize),
4450
exporter: exporter,
4551
batchSize: cfg.BatchSize,
4652
flushInterval: cfg.FlushInterval,
4753
stopCh: make(chan struct{}),
54+
ctx: ctx,
55+
cancel: cancel,
56+
exportSem: make(chan struct{}, 8), // Bound to 8 concurrent exports
4857
}
4958

5059
// Start background flush timer
@@ -182,7 +191,7 @@ func (agg *metricsAggregator) flushLoop() {
182191
for {
183192
select {
184193
case <-agg.flushTimer.C:
185-
agg.flush(context.Background())
194+
agg.flush(agg.ctx) // Use cancellable context so exports stop on shutdown
186195
case <-agg.stopCh:
187196
return
188197
}
@@ -207,9 +216,18 @@ func (agg *metricsAggregator) flushUnlocked(ctx context.Context) {
207216
copy(metrics, agg.batch)
208217
agg.batch = agg.batch[:0]
209218

219+
// Acquire semaphore slot; skip export if already at capacity to prevent goroutine leaks
220+
select {
221+
case agg.exportSem <- struct{}{}:
222+
default:
223+
logger.Debug().Msg("telemetry: export semaphore full, dropping metrics batch")
224+
return
225+
}
226+
210227
// Export asynchronously
211228
go func() {
212229
defer func() {
230+
<-agg.exportSem
213231
if r := recover(); r != nil {
214232
logger.Debug().Msgf("telemetry: async export panic: %v", r)
215233
}
@@ -219,8 +237,12 @@ func (agg *metricsAggregator) flushUnlocked(ctx context.Context) {
219237
}
220238

221239
// close stops the aggregator and flushes pending metrics.
240+
// Safe to call multiple times — subsequent calls are no-ops for the stop/cancel step.
222241
func (agg *metricsAggregator) close(ctx context.Context) error {
223-
close(agg.stopCh)
242+
agg.closeOnce.Do(func() {
243+
close(agg.stopCh)
244+
agg.cancel() // Cancel in-flight periodic export goroutines
245+
})
224246
agg.flush(ctx)
225247
return nil
226248
}

telemetry/driver_integration.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,19 @@ import (
1515
// - ctx: Context for the initialization
1616
// - host: Databricks host
1717
// - httpClient: HTTP client for making requests
18-
// - enableTelemetry: User opt-in flag (nil = unset, true = enable, false = disable)
18+
// - enableTelemetry: Client config overlay (unset = check server flag, true/false = override server)
1919
//
2020
// Returns:
2121
// - *Interceptor: Telemetry interceptor if enabled, nil otherwise
2222
func InitializeForConnection(
2323
ctx context.Context,
2424
host string,
2525
httpClient *http.Client,
26-
enableTelemetry *bool,
26+
enableTelemetry config.ConfigValue[bool],
2727
) *Interceptor {
28-
// Create telemetry config
28+
// Create telemetry config and apply client overlay
2929
cfg := DefaultConfig()
30-
31-
// Set EnableTelemetry based on user preference
32-
if enableTelemetry != nil {
33-
cfg.EnableTelemetry = config.NewConfigValue(*enableTelemetry)
34-
}
35-
// else: leave unset (will check server feature flag)
30+
cfg.EnableTelemetry = enableTelemetry
3631

3732
// Check if telemetry should be enabled
3833
if !isTelemetryEnabled(ctx, cfg, host, httpClient) {

telemetry/errors.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,22 +54,25 @@ func classifyError(err error) string {
5454

5555
errMsg := strings.ToLower(err.Error())
5656

57-
// Check for common error patterns
58-
patterns := map[string]string{
59-
"timeout": "timeout",
60-
"context cancel": "cancelled",
61-
"connection": "connection_error",
62-
"authentication": "auth_error",
63-
"unauthorized": "auth_error",
64-
"forbidden": "permission_error",
65-
"not found": "not_found",
66-
"syntax": "syntax_error",
67-
"invalid": "invalid_request",
57+
// Ordered patterns — first match wins, ensuring deterministic classification.
58+
patterns := []struct {
59+
pattern string
60+
errorType string
61+
}{
62+
{"timeout", "timeout"},
63+
{"context cancel", "cancelled"},
64+
{"connection", "connection_error"},
65+
{"authentication", "auth_error"},
66+
{"unauthorized", "auth_error"},
67+
{"forbidden", "permission_error"},
68+
{"not found", "not_found"},
69+
{"syntax", "syntax_error"},
70+
{"invalid", "invalid_request"},
6871
}
6972

70-
for pattern, errorType := range patterns {
71-
if strings.Contains(errMsg, pattern) {
72-
return errorType
73+
for _, p := range patterns {
74+
if strings.Contains(errMsg, p.pattern) {
75+
return p.errorType
7376
}
7477
}
7578

telemetry/interceptor.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,16 @@ func (i *Interceptor) completeStatement(ctx context.Context, statementID string,
157157
i.aggregator.completeStatement(ctx, statementID, failed)
158158
}
159159

160-
// Close shuts down the interceptor and flushes pending metrics.
160+
// Close flushes any pending per-connection metrics.
161+
// Does NOT close the shared aggregator — its lifecycle is managed via
162+
// ReleaseForConnection, which uses reference counting across all connections
163+
// to the same host.
161164
// Exported for use by the driver package.
162165
func (i *Interceptor) Close(ctx context.Context) error {
163166
if !i.enabled {
164167
return nil
165168
}
166169

167-
return i.aggregator.close(ctx)
170+
i.aggregator.flush(ctx)
171+
return nil
168172
}

0 commit comments

Comments
 (0)