Skip to content

Commit a1f8b0c

Browse files
committed
Rebase onto updated PR #320; remove ForceEnableTelemetry; fix test alignment
- Remove ForceEnableTelemetry from telemetry Config, driver_integration.go, and all call sites (connector.go) - Update feature flag tests to use new connector-service endpoint format ({"flags": [{"name": ..., "value": ...}]} instead of {"flags": {...}}) - Update exporter/integration tests to use new TelemetryRequest payload format - Update config/connector tests to reflect EnableTelemetry=true default - Fix rows_test.go NewRows calls to include telemetryCtx and telemetryUpdate args
1 parent 8c8fef0 commit a1f8b0c

22 files changed

+705
-495
lines changed

connection_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ func TestConn_runQuery(t *testing.T) {
10371037
client: testClient,
10381038
cfg: config.WithDefaults(),
10391039
}
1040-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1040+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
10411041
assert.Error(t, err)
10421042
assert.Nil(t, exStmtResp)
10431043
assert.Nil(t, opStatusResp)
@@ -1079,7 +1079,7 @@ func TestConn_runQuery(t *testing.T) {
10791079
client: testClient,
10801080
cfg: config.WithDefaults(),
10811081
}
1082-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1082+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
10831083

10841084
assert.Error(t, err)
10851085
assert.Equal(t, 1, executeStatementCount)
@@ -1125,7 +1125,7 @@ func TestConn_runQuery(t *testing.T) {
11251125
client: testClient,
11261126
cfg: config.WithDefaults(),
11271127
}
1128-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1128+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
11291129

11301130
assert.NoError(t, err)
11311131
assert.Equal(t, 1, executeStatementCount)
@@ -1172,7 +1172,7 @@ func TestConn_runQuery(t *testing.T) {
11721172
client: testClient,
11731173
cfg: config.WithDefaults(),
11741174
}
1175-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1175+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
11761176

11771177
assert.Error(t, err)
11781178
assert.Equal(t, 1, executeStatementCount)
@@ -1225,7 +1225,7 @@ func TestConn_runQuery(t *testing.T) {
12251225
client: testClient,
12261226
cfg: config.WithDefaults(),
12271227
}
1228-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1228+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
12291229

12301230
assert.NoError(t, err)
12311231
assert.Equal(t, 1, executeStatementCount)
@@ -1277,7 +1277,7 @@ func TestConn_runQuery(t *testing.T) {
12771277
client: testClient,
12781278
cfg: config.WithDefaults(),
12791279
}
1280-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1280+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
12811281

12821282
assert.Error(t, err)
12831283
assert.Equal(t, 1, executeStatementCount)
@@ -1330,7 +1330,7 @@ func TestConn_runQuery(t *testing.T) {
13301330
client: testClient,
13311331
cfg: config.WithDefaults(),
13321332
}
1333-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1333+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
13341334

13351335
assert.NoError(t, err)
13361336
assert.Equal(t, 1, executeStatementCount)
@@ -1383,7 +1383,7 @@ func TestConn_runQuery(t *testing.T) {
13831383
client: testClient,
13841384
cfg: config.WithDefaults(),
13851385
}
1386-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1386+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
13871387

13881388
assert.Error(t, err)
13891389
assert.Equal(t, 1, executeStatementCount)

connector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
5555
}
5656

5757
protocolVersion := int64(c.cfg.ThriftProtocolVersion)
58+
59+
sessionStart := time.Now()
5860
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
5961
ClientProtocolI64: &protocolVersion,
6062
Configuration: sessionParams,
@@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
6466
},
6567
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
6668
})
69+
sessionLatencyMs := time.Since(sessionStart).Milliseconds()
70+
6771
if err != nil {
6872
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
6973
}
@@ -80,11 +84,13 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
8084
conn.telemetry = telemetry.InitializeForConnection(
8185
ctx,
8286
c.cfg.Host,
87+
c.cfg.DriverVersion,
8388
c.client,
8489
c.cfg.EnableTelemetry,
8590
)
8691
if conn.telemetry != nil {
8792
log.Debug().Msg("telemetry initialized for connection")
93+
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
8894
}
8995

9096
log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)

internal/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig {
184184
ucfg.UseLz4Compression = false
185185
ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults()
186186

187+
// EnableTelemetry defaults to unset (ConfigValue zero value),
188+
// meaning telemetry is controlled by server feature flags.
189+
187190
return ucfg
188191
}
189192

internal/rows/rows.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ type rows struct {
5757
logger_ *dbsqllog.DBSQLLogger
5858

5959
ctx context.Context
60+
61+
// Telemetry tracking
62+
telemetryCtx context.Context
63+
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
64+
chunkCount int
65+
bytesDownloaded int64
6066
}
6167

6268
var _ driver.Rows = (*rows)(nil)
@@ -72,6 +78,8 @@ func NewRows(
7278
client cli_service.TCLIService,
7379
config *config.Config,
7480
directResults *cli_service.TSparkDirectResults,
81+
telemetryCtx context.Context,
82+
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
7583
) (driver.Rows, dbsqlerr.DBError) {
7684

7785
connId := driverctx.ConnIdFromContext(ctx)
@@ -103,14 +111,18 @@ func NewRows(
103111
logger.Debug().Msgf("databricks: creating Rows, pageSize: %d, location: %v", pageSize, location)
104112

105113
r := &rows{
106-
client: client,
107-
opHandle: opHandle,
108-
connId: connId,
109-
correlationId: correlationId,
110-
location: location,
111-
config: config,
112-
logger_: logger,
113-
ctx: ctx,
114+
client: client,
115+
opHandle: opHandle,
116+
connId: connId,
117+
correlationId: correlationId,
118+
location: location,
119+
config: config,
120+
logger_: logger,
121+
ctx: ctx,
122+
telemetryCtx: telemetryCtx,
123+
telemetryUpdate: telemetryUpdate,
124+
chunkCount: 0,
125+
bytesDownloaded: 0,
114126
}
115127

116128
// if we already have results for the query do some additional initialization
@@ -127,6 +139,17 @@ func NewRows(
127139
if err != nil {
128140
return r, err
129141
}
142+
143+
r.chunkCount++
144+
if directResults.ResultSet != nil && directResults.ResultSet.Results != nil && directResults.ResultSet.Results.ArrowBatches != nil {
145+
for _, batch := range directResults.ResultSet.Results.ArrowBatches {
146+
r.bytesDownloaded += int64(len(batch.Batch))
147+
}
148+
}
149+
150+
if r.telemetryUpdate != nil {
151+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
152+
}
130153
}
131154

132155
var d rowscanner.Delimiter
@@ -458,6 +481,19 @@ func (r *rows) fetchResultPage() error {
458481
return err1
459482
}
460483

484+
r.chunkCount++
485+
if fetchResult != nil && fetchResult.Results != nil {
486+
if fetchResult.Results.ArrowBatches != nil {
487+
for _, batch := range fetchResult.Results.ArrowBatches {
488+
r.bytesDownloaded += int64(len(batch.Batch))
489+
}
490+
}
491+
}
492+
493+
if r.telemetryUpdate != nil {
494+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
495+
}
496+
461497
err1 = r.makeRowScanner(fetchResult)
462498
if err1 != nil {
463499
return err1

internal/rows/rows_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ func TestColumnsWithDirectResults(t *testing.T) {
421421
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
422422
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
423423

424-
d, err := NewRows(ctx, nil, client, nil, nil)
424+
d, err := NewRows(ctx, nil, client, nil, nil, nil, nil)
425425
assert.Nil(t, err)
426426

427427
rowSet := d.(*rows)
@@ -720,7 +720,7 @@ func TestRowsCloseOptimization(t *testing.T) {
720720
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
721721
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
722722
opHandle := &cli_service.TOperationHandle{OperationId: &cli_service.THandleIdentifier{GUID: []byte{'f', 'o'}}}
723-
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil)
723+
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil, nil, nil)
724724

725725
// rowSet has no direct results calling Close should result in call to client to close operation
726726
err := rowSet.Close()
@@ -733,7 +733,7 @@ func TestRowsCloseOptimization(t *testing.T) {
733733
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
734734
}
735735
closeCount = 0
736-
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
736+
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
737737
err = rowSet.Close()
738738
assert.Nil(t, err, "rows.Close should not throw an error")
739739
assert.Equal(t, 1, closeCount)
@@ -746,7 +746,7 @@ func TestRowsCloseOptimization(t *testing.T) {
746746
ResultSetMetadata: &cli_service.TGetResultSetMetadataResp{Schema: &cli_service.TTableSchema{}},
747747
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
748748
}
749-
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
749+
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
750750
err = rowSet.Close()
751751
assert.Nil(t, err, "rows.Close should not throw an error")
752752
assert.Equal(t, 0, closeCount)
@@ -816,7 +816,7 @@ func TestGetArrowBatches(t *testing.T) {
816816

817817
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2})
818818
cfg := config.WithDefaults()
819-
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
819+
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
820820
assert.Nil(t, err)
821821

822822
rows2, ok := rows.(dbsqlrows.Rows)
@@ -889,7 +889,7 @@ func TestGetArrowBatches(t *testing.T) {
889889

890890
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2, fetchResp3})
891891
cfg := config.WithDefaults()
892-
rows, err := NewRows(ctx, nil, client, cfg, nil)
892+
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
893893
assert.Nil(t, err)
894894

895895
rows2, ok := rows.(dbsqlrows.Rows)
@@ -950,7 +950,7 @@ func TestGetArrowBatches(t *testing.T) {
950950

951951
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
952952
cfg := config.WithDefaults()
953-
rows, err := NewRows(ctx, nil, client, cfg, nil)
953+
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
954954
assert.Nil(t, err)
955955

956956
rows2, ok := rows.(dbsqlrows.Rows)
@@ -977,7 +977,7 @@ func TestGetArrowBatches(t *testing.T) {
977977

978978
client := getSimpleClient([]cli_service.TFetchResultsResp{})
979979
cfg := config.WithDefaults()
980-
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
980+
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
981981
assert.Nil(t, err)
982982

983983
rows2, ok := rows.(dbsqlrows.Rows)
@@ -1556,7 +1556,7 @@ func TestFetchResultPage_PropagatesGetNextPageError(t *testing.T) {
15561556

15571557
executeStatementResp := cli_service.TExecuteStatementResp{}
15581558
cfg := config.WithDefaults()
1559-
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
1559+
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
15601560
// Call Next and ensure it propagates the error from getNextPage
15611561
actualErr := rows.Next(nil)
15621562

telemetry/aggregator.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,19 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
6565
// Swallow all errors
6666
defer func() {
6767
if r := recover(); r != nil {
68-
logger.Debug().Msgf("telemetry: recordMetric panic: %v", r)
68+
// Log at trace level only
69+
// logger.Trace().Msgf("telemetry: recordMetric panic: %v", r)
6970
}
7071
}()
7172

7273
agg.mu.Lock()
7374
defer agg.mu.Unlock()
7475

7576
switch metric.metricType {
76-
case "connection":
77-
// Emit connection events immediately: connection lifecycle events must be captured
78-
// before the connection closes, as we won't have another opportunity to flush
77+
case "connection", "operation":
78+
// Emit connection and operation events immediately
7979
agg.batch = append(agg.batch, metric)
80-
if len(agg.batch) >= agg.batchSize {
81-
agg.flushUnlocked(ctx)
82-
}
80+
agg.flushUnlocked(ctx)
8381

8482
case "statement":
8583
// Aggregate by statement ID
@@ -118,8 +116,7 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
118116
case "error":
119117
// Check if terminal error
120118
if metric.errorType != "" && isTerminalError(&simpleError{msg: metric.errorType}) {
121-
// Flush terminal errors immediately: terminal errors often lead to connection
122-
// termination. If we wait for the next batch/timer flush, this data may be lost
119+
// Flush terminal errors immediately
123120
agg.batch = append(agg.batch, metric)
124121
agg.flushUnlocked(ctx)
125122
} else {
@@ -135,7 +132,7 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
135132
func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID string, failed bool) {
136133
defer func() {
137134
if r := recover(); r != nil {
138-
logger.Debug().Msgf("telemetry: completeStatement panic: %v", r)
135+
// Log at trace level only
139136
}
140137
}()
141138

@@ -223,7 +220,7 @@ func (agg *metricsAggregator) flushUnlocked(ctx context.Context) {
223220
defer func() {
224221
<-agg.exportSem
225222
if r := recover(); r != nil {
226-
logger.Debug().Msgf("telemetry: async export panic: %v", r)
223+
// Log at trace level only
227224
}
228225
}()
229226
agg.exporter.export(ctx, metrics)

0 commit comments

Comments
 (0)