Skip to content

Commit 6b7d565

Browse files
samikshya-dbclaude
andcommitted
Populate driver_connection_params in telemetry
Added comprehensive connection parameters to telemetry exports: - host and port - http_path (warehouse/cluster identifier) - enable_arrow (Arrow batches setting) - socket_timeout (client timeout in seconds) - rows_fetched_per_block (MaxRows setting) - enable_metric_view_metadata Updated initialization chain to pass connection parameters through exporter -> client -> manager -> driver integration. All telemetry tests passing with enriched connection metadata. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 5e33162 commit 6b7d565

File tree

10 files changed

+95
-55
lines changed

10 files changed

+95
-55
lines changed

connector.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,26 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
8989
}
9090
// else: leave nil to check server feature flag
9191

92+
// Build connection parameters for telemetry
93+
connParams := &telemetry.DriverConnectionParameters{
94+
Host: c.cfg.Host,
95+
Port: c.cfg.Port,
96+
HTTPPath: c.cfg.HTTPPath,
97+
EnableArrow: c.cfg.UseArrowBatches,
98+
EnableMetricViewMetadata: c.cfg.EnableMetricViewMetadata,
99+
SocketTimeoutSeconds: int64(c.cfg.ClientTimeout.Seconds()),
100+
RowsFetchedPerBlock: int64(c.cfg.MaxRows),
101+
}
102+
92103
conn.telemetry = telemetry.InitializeForConnection(
93104
ctx,
94105
c.cfg.Host,
106+
c.cfg.Port,
107+
c.cfg.HTTPPath,
95108
c.cfg.DriverVersion,
96109
c.client,
97110
enableTelemetry,
111+
connParams,
98112
)
99113
if conn.telemetry != nil {
100114
log.Debug().Msg("telemetry initialized for connection")

telemetry/benchmark_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) {
2020
}))
2121
defer server.Close()
2222

23-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
23+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
2424
aggregator := newMetricsAggregator(exporter, cfg)
2525
defer aggregator.close(context.Background())
2626

@@ -48,7 +48,7 @@ func BenchmarkInterceptor_Overhead_Disabled(b *testing.B) {
4848
}))
4949
defer server.Close()
5050

51-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
51+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
5252
aggregator := newMetricsAggregator(exporter, cfg)
5353
defer aggregator.close(context.Background())
5454

@@ -75,7 +75,7 @@ func BenchmarkAggregator_RecordMetric(b *testing.B) {
7575
}))
7676
defer server.Close()
7777

78-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
78+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
7979
aggregator := newMetricsAggregator(exporter, cfg)
8080
defer aggregator.close(context.Background())
8181

@@ -105,7 +105,7 @@ func BenchmarkExporter_Export(b *testing.B) {
105105
}))
106106
defer server.Close()
107107

108-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
108+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
109109

110110
ctx := context.Background()
111111
metrics := []*telemetryMetric{
@@ -141,7 +141,7 @@ func BenchmarkConcurrentConnections_PerHostSharing(b *testing.B) {
141141
for pb.Next() {
142142
// Simulate getting a client (should share per host)
143143
mgr := getClientManager()
144-
client := mgr.getOrCreateClient(host, "test-version", httpClient, cfg)
144+
client := mgr.getOrCreateClient(host, 443, "", "test-version", httpClient, cfg, nil)
145145
_ = client
146146

147147
// Release client
@@ -198,7 +198,7 @@ func TestLoadTesting_ConcurrentConnections(t *testing.T) {
198198
defer wg.Done()
199199

200200
// Get client (should share)
201-
client := mgr.getOrCreateClient(host, "test-version", httpClient, cfg)
201+
client := mgr.getOrCreateClient(host, 443, "", "test-version", httpClient, cfg, nil)
202202
interceptor := client.GetInterceptor(true)
203203

204204
// Simulate some operations
@@ -237,8 +237,8 @@ func TestGracefulShutdown_ReferenceCountingCleanup(t *testing.T) {
237237
mgr := getClientManager()
238238

239239
// Create multiple references
240-
client1 := mgr.getOrCreateClient(host, "test-version", httpClient, cfg)
241-
client2 := mgr.getOrCreateClient(host, "test-version", httpClient, cfg)
240+
client1 := mgr.getOrCreateClient(host, 443, "", "test-version", httpClient, cfg, nil)
241+
client2 := mgr.getOrCreateClient(host, 443, "", "test-version", httpClient, cfg, nil)
242242

243243
if client1 != client2 {
244244
t.Error("Expected same client instance for same host")
@@ -292,7 +292,7 @@ func TestGracefulShutdown_FinalFlush(t *testing.T) {
292292
}))
293293
defer server.Close()
294294

295-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
295+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
296296
aggregator := newMetricsAggregator(exporter, cfg)
297297

298298
// Record a metric

telemetry/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ type telemetryClient struct {
3131
}
3232

3333
// newTelemetryClient creates a new telemetry client for the given host.
34-
func newTelemetryClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient {
34+
func newTelemetryClient(host string, port int, httpPath string, driverVersion string, httpClient *http.Client, cfg *Config, connParams *DriverConnectionParameters) *telemetryClient {
3535
// Create exporter
36-
exporter := newTelemetryExporter(host, driverVersion, httpClient, cfg)
36+
exporter := newTelemetryExporter(host, port, httpPath, driverVersion, httpClient, cfg, connParams)
3737

3838
// Create aggregator with exporter
3939
aggregator := newMetricsAggregator(exporter, cfg)

telemetry/driver_integration.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ import (
2323
func InitializeForConnection(
2424
ctx context.Context,
2525
host string,
26+
port int,
27+
httpPath string,
2628
driverVersion string,
2729
httpClient *http.Client,
2830
enableTelemetry *bool,
31+
connParams *DriverConnectionParameters,
2932
) *Interceptor {
3033
// Create telemetry config
3134
cfg := DefaultConfig()
@@ -43,7 +46,7 @@ func InitializeForConnection(
4346

4447
// Get or create telemetry client for this host
4548
clientMgr := getClientManager()
46-
telemetryClient := clientMgr.getOrCreateClient(host, driverVersion, httpClient, cfg)
49+
telemetryClient := clientMgr.getOrCreateClient(host, port, httpPath, driverVersion, httpClient, cfg, connParams)
4750
if telemetryClient == nil {
4851
return nil
4952
}

telemetry/exporter.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@ import (
1313
// telemetryExporter exports metrics to Databricks telemetry service.
1414
type telemetryExporter struct {
1515
host string
16+
port int
17+
httpPath string
1618
driverVersion string
1719
httpClient *http.Client
1820
circuitBreaker *circuitBreaker
1921
cfg *Config
22+
// Connection parameters for telemetry
23+
connParams *DriverConnectionParameters
2024
}
2125

2226
// telemetryMetric represents a metric to export.
@@ -32,13 +36,25 @@ type telemetryMetric struct {
3236
}
3337

3438
// newTelemetryExporter creates a new exporter.
35-
func newTelemetryExporter(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryExporter {
39+
func newTelemetryExporter(host string, port int, httpPath string, driverVersion string, httpClient *http.Client, cfg *Config, connParams *DriverConnectionParameters) *telemetryExporter {
40+
// Build connection parameters if not provided
41+
if connParams == nil {
42+
connParams = &DriverConnectionParameters{
43+
Host: host,
44+
Port: port,
45+
HTTPPath: httpPath,
46+
}
47+
}
48+
3649
return &telemetryExporter{
3750
host: host,
51+
port: port,
52+
httpPath: httpPath,
3853
driverVersion: driverVersion,
3954
httpClient: httpClient,
4055
circuitBreaker: getCircuitBreakerManager().getCircuitBreaker(host),
4156
cfg: cfg,
57+
connParams: connParams,
4258
}
4359
}
4460

@@ -72,7 +88,7 @@ func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetr
7288
// doExport performs the actual export with retries and exponential backoff.
7389
func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMetric) error {
7490
// Create telemetry request with protoLogs format (matches JDBC/Node.js)
75-
payload, err := createTelemetryRequest(metrics, e.driverVersion)
91+
payload, err := createTelemetryRequest(metrics, e.driverVersion, e.connParams)
7692
if err != nil {
7793
return fmt.Errorf("failed to create telemetry request: %w", err)
7894
}

telemetry/exporter_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestNewTelemetryExporter(t *testing.T) {
1616
httpClient := &http.Client{Timeout: 5 * time.Second}
1717
host := "test-host"
1818

19-
exporter := newTelemetryExporter(host, "test-version", httpClient, cfg)
19+
exporter := newTelemetryExporter(host, 443, "", "test-version", httpClient, cfg, nil)
2020

2121
if exporter.host != host {
2222
t.Errorf("Expected host %s, got %s", host, exporter.host)
@@ -73,7 +73,7 @@ func TestExport_Success(t *testing.T) {
7373
httpClient := &http.Client{Timeout: 5 * time.Second}
7474

7575
// Use full server URL for testing
76-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
76+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
7777

7878
metrics := []*telemetryMetric{
7979
{
@@ -113,7 +113,7 @@ func TestExport_RetryOn5xx(t *testing.T) {
113113
httpClient := &http.Client{Timeout: 5 * time.Second}
114114

115115
// Use full server URL for testing
116-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
116+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
117117

118118
metrics := []*telemetryMetric{
119119
{
@@ -145,7 +145,7 @@ func TestExport_NonRetryable4xx(t *testing.T) {
145145
httpClient := &http.Client{Timeout: 5 * time.Second}
146146

147147
// Use full server URL for testing
148-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
148+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
149149

150150
metrics := []*telemetryMetric{
151151
{
@@ -181,7 +181,7 @@ func TestExport_Retry429(t *testing.T) {
181181
httpClient := &http.Client{Timeout: 5 * time.Second}
182182

183183
// Use full server URL for testing
184-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
184+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
185185

186186
metrics := []*telemetryMetric{
187187
{
@@ -211,7 +211,7 @@ func TestExport_CircuitBreakerOpen(t *testing.T) {
211211
httpClient := &http.Client{Timeout: 5 * time.Second}
212212

213213
// Use full server URL for testing
214-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
214+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
215215

216216
// Open the circuit breaker by recording failures
217217
cb := exporter.circuitBreaker
@@ -260,7 +260,7 @@ func TestCreateTelemetryRequest_TagFiltering(t *testing.T) {
260260
},
261261
}
262262

263-
req, err := createTelemetryRequest([]*telemetryMetric{metric}, "1.0.0")
263+
req, err := createTelemetryRequest([]*telemetryMetric{metric}, "1.0.0", &DriverConnectionParameters{Host: "test-host", Port: 443})
264264
if err != nil {
265265
t.Fatalf("Failed to create telemetry request: %v", err)
266266
}
@@ -329,7 +329,7 @@ func TestExport_ErrorSwallowing(t *testing.T) {
329329
httpClient := &http.Client{Timeout: 5 * time.Second}
330330

331331
// Use full server URL for testing
332-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
332+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
333333

334334
metrics := []*telemetryMetric{
335335
{
@@ -365,7 +365,7 @@ func TestExport_ContextCancellation(t *testing.T) {
365365
httpClient := &http.Client{Timeout: 5 * time.Second}
366366

367367
// Use full server URL for testing
368-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
368+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
369369

370370
metrics := []*telemetryMetric{
371371
{
@@ -398,7 +398,7 @@ func TestExport_ExponentialBackoff(t *testing.T) {
398398
httpClient := &http.Client{Timeout: 5 * time.Second}
399399

400400
// Use full server URL for testing
401-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
401+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
402402

403403
metrics := []*telemetryMetric{
404404
{

telemetry/integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestIntegration_EndToEnd_WithCircuitBreaker(t *testing.T) {
4848
defer server.Close()
4949

5050
// Create telemetry client
51-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
51+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
5252
aggregator := newMetricsAggregator(exporter, cfg)
5353
defer aggregator.close(context.Background())
5454

@@ -95,7 +95,7 @@ func TestIntegration_CircuitBreakerOpening(t *testing.T) {
9595
}))
9696
defer server.Close()
9797

98-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
98+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
9999
aggregator := newMetricsAggregator(exporter, cfg)
100100
defer aggregator.close(context.Background())
101101

@@ -212,7 +212,7 @@ func TestIntegration_PrivacyCompliance_NoQueryText(t *testing.T) {
212212
}))
213213
defer server.Close()
214214

215-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
215+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
216216
aggregator := newMetricsAggregator(exporter, cfg)
217217
defer aggregator.close(context.Background())
218218

@@ -274,7 +274,7 @@ func TestIntegration_TagFiltering(t *testing.T) {
274274
}))
275275
defer server.Close()
276276

277-
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
277+
exporter := newTelemetryExporter(server.URL, 443, "", "test-version", httpClient, cfg, nil)
278278

279279
// Test metric with mixed tags
280280
metric := &telemetryMetric{

telemetry/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ func getClientManager() *clientManager {
4545

4646
// getOrCreateClient gets or creates a telemetry client for the host.
4747
// Increments reference count.
48-
func (m *clientManager) getOrCreateClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient {
48+
func (m *clientManager) getOrCreateClient(host string, port int, httpPath string, driverVersion string, httpClient *http.Client, cfg *Config, connParams *DriverConnectionParameters) *telemetryClient {
4949
m.mu.Lock()
5050
defer m.mu.Unlock()
5151

5252
holder, exists := m.clients[host]
5353
if !exists {
54-
client := newTelemetryClient(host, driverVersion, httpClient, cfg)
54+
client := newTelemetryClient(host, port, httpPath, driverVersion, httpClient, cfg, connParams)
5555
if err := client.start(); err != nil {
5656
// Failed to start client, don't add to map
5757
logger.Logger.Debug().Str("host", host).Err(err).Msg("failed to start telemetry client")

0 commit comments

Comments
 (0)