Skip to content

Commit 6dd935f

Browse files
samikshya-dbclaude
andauthored
[PECOBLR-1383] Add statement execution hooks for telemetry collection (#321)
## Summary This **stacked PR** builds on #320 and adds statement execution hooks to complete end-to-end telemetry collection. **Stack:** Part 3 of 3 - Base: #319 (PECOBLR-1143 - Phases 4-5) - Previous: #320 (PECOBLR-1381-1382 - Phases 6-7) - This PR: PECOBLR-1383 (Statement execution hooks) --- ## Changes ### Exported Methods for Driver Integration **`telemetry/interceptor.go`** - ✅ Exported `BeforeExecute()` - starts metric tracking for a statement - ✅ Exported `AfterExecute()` - records metric with timing and error info - ✅ Exported `AddTag()` - adds tags to current metric context - ✅ Exported `CompleteStatement()` - marks statement complete and flushes ### Statement Execution Hooks **`connection.go`** - ✅ Added hooks to `QueryContext()`: - Calls `BeforeExecute()` with statement ID from operation handle GUID - Uses defer to call `AfterExecute()` and `CompleteStatement()` - ✅ Added hooks to `ExecContext()`: - Calls `BeforeExecute()` with statement ID - Proper error handling (includes stagingErr) - Uses defer to call `AfterExecute()` and `CompleteStatement()` ### Documentation **`telemetry/DESIGN.md`** - ✅ Updated Phase 6 to mark as completed - ✅ Added statement execution hooks to Phase 7 checklist --- ## Integration Flow ``` Connection.QueryContext() ↓ BeforeExecute(statementID) → creates metricContext with startTime ↓ [Statement Execution] ↓ AfterExecute(err) → records metric with latency and error ↓ CompleteStatement(statementID, failed) → flushes aggregated metrics ``` --- ## Testing **All tests passing** ✅ - ✅ 99 telemetry tests (2.018s) - ✅ All driver tests (58.576s) - ✅ No breaking changes - ✅ Telemetry properly disabled when not configured --- ## End-to-End Telemetry With this PR, the telemetry system is **fully functional end-to-end**: 1. ✅ **Collection** - Metrics collected from QueryContext/ExecContext 2. ✅ **Aggregation** - Statement-level aggregation with batching 3. ✅ **Circuit Breaker** - Protection against failing endpoints 4. ✅ **Export** - HTTP POST with retry and exponential backoff 5. ✅ **Feature Flags** - Server-side control with 5-level priority 6. ✅ **Resource Management** - Per-host clients with reference counting --- ## Related Issues - Builds on: #320 (PECOBLR-1381-1382) - Implements: PECOBLR-1383 (Statement execution hooks) ✅ --- ## Checklist - [x] Export interceptor methods for driver use - [x] Add hooks to QueryContext - [x] Add hooks to ExecContext - [x] Update DESIGN.md checklist - [x] All tests passing - [x] No breaking changes --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent f850b47 commit 6dd935f

File tree

5 files changed

+111
-75
lines changed

5 files changed

+111
-75
lines changed

connection.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,33 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
123123

124124
corrId := driverctx.CorrelationIdFromContext(ctx)
125125

126+
// Capture execution start time for telemetry before running the query
127+
executeStart := time.Now()
126128
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
127129
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
128130
stagingErr := c.execStagingOperation(exStmtResp, ctx)
129131

132+
// Telemetry: track statement execution
133+
var statementID string
134+
var closeOpErr error // Track CloseOperation errors for telemetry
135+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
136+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
137+
// Use BeforeExecuteWithTime to set the correct start time (before execution)
138+
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
139+
defer func() {
140+
finalErr := err
141+
if stagingErr != nil {
142+
finalErr = stagingErr
143+
}
144+
// Include CloseOperation error in telemetry if it occurred
145+
if closeOpErr != nil && finalErr == nil {
146+
finalErr = closeOpErr
147+
}
148+
c.telemetry.AfterExecute(ctx, finalErr)
149+
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
150+
}()
151+
}
152+
130153
if exStmtResp != nil && exStmtResp.OperationHandle != nil {
131154
// since we have an operation handle we can close the operation if necessary
132155
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
@@ -137,6 +160,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
137160
})
138161
if err1 != nil {
139162
log.Err(err1).Msg("databricks: failed to close operation after executing statement")
163+
closeOpErr = err1 // Capture for telemetry
140164
}
141165
}
142166
}
@@ -168,10 +192,25 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
168192

169193
// first we try to get the results synchronously.
170194
// at any point in time that the context is done we must cancel and return
195+
196+
// Capture execution start time for telemetry before running the query
197+
executeStart := time.Now()
171198
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
172199
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
173200
defer log.Duration(msg, start)
174201

202+
// Telemetry: track statement execution
203+
var statementID string
204+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
205+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
206+
// Use BeforeExecuteWithTime to set the correct start time (before execution)
207+
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
208+
defer func() {
209+
c.telemetry.AfterExecute(ctx, err)
210+
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
211+
}()
212+
}
213+
175214
if err != nil {
176215
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
177216
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)

telemetry/DESIGN.md

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,34 +2116,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21162116
- [x] Test server error handling
21172117
- [x] Test unreachable server scenarios
21182118

2119-
### Phase 6: Collection & Aggregation (PECOBLR-1381)
2120-
- [ ] Implement `interceptor.go` for metric collection
2121-
- [ ] Implement beforeExecute() and afterExecute() hooks
2122-
- [ ] Implement context-based metric tracking with metricContext
2123-
- [ ] Implement latency measurement (startTime, latencyMs calculation)
2124-
- [ ] Add tag collection methods (addTag)
2125-
- [ ] Implement error swallowing with panic recovery
2126-
- [ ] Implement `aggregator.go` for batching
2127-
- [ ] Implement statement-level aggregation (statementMetrics)
2128-
- [ ] Implement batch size and flush interval logic
2129-
- [ ] Implement background flush goroutine (flushLoop)
2130-
- [ ] Add thread-safe metric recording
2131-
- [ ] Implement completeStatement() for final aggregation
2132-
- [ ] Implement error classification in `errors.go`
2133-
- [ ] Implement error type classification (terminal vs retryable)
2134-
- [ ] Implement HTTP status code classification
2135-
- [ ] Add error pattern matching
2136-
- [ ] Implement isTerminalError() function
2137-
- [ ] Update `client.go` to integrate aggregator
2138-
- [ ] Wire up aggregator with exporter
2139-
- [ ] Implement background flush timer
2140-
- [ ] Update start() and close() methods
2141-
- [ ] Add unit tests for collection and aggregation
2142-
- [ ] Test interceptor metric collection and latency tracking
2143-
- [ ] Test aggregation logic
2144-
- [ ] Test batch flushing (size-based and time-based)
2145-
- [ ] Test error classification
2146-
- [ ] Test client with aggregator integration
2119+
### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED
2120+
- [x] Implement `interceptor.go` for metric collection
2121+
- [x] Implement beforeExecute() and afterExecute() hooks
2122+
- [x] Implement context-based metric tracking with metricContext
2123+
- [x] Implement latency measurement (startTime, latencyMs calculation)
2124+
- [x] Add tag collection methods (addTag)
2125+
- [x] Implement error swallowing with panic recovery
2126+
- [x] Implement `aggregator.go` for batching
2127+
- [x] Implement statement-level aggregation (statementMetrics)
2128+
- [x] Implement batch size and flush interval logic
2129+
- [x] Implement background flush goroutine (flushLoop)
2130+
- [x] Add thread-safe metric recording
2131+
- [x] Implement completeStatement() for final aggregation
2132+
- [x] Implement error classification in `errors.go`
2133+
- [x] Implement error type classification (terminal vs retryable)
2134+
- [x] Implement HTTP status code classification
2135+
- [x] Add error pattern matching
2136+
- [x] Implement isTerminalError() function
2137+
- [x] Update `client.go` to integrate aggregator
2138+
- [x] Wire up aggregator with exporter
2139+
- [x] Implement background flush timer
2140+
- [x] Update start() and close() methods
2141+
- [x] Add unit tests for collection and aggregation
2142+
- [x] Test interceptor metric collection and latency tracking
2143+
- [x] Test aggregation logic
2144+
- [x] Test batch flushing (size-based and time-based)
2145+
- [x] Test error classification
2146+
- [x] Test client with aggregator integration
21472147

21482148
### Phase 7: Driver Integration ✅ COMPLETED
21492149
- [x] Add telemetry initialization to `connection.go`
@@ -2167,9 +2167,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21672167
- [x] Test compilation with telemetry
21682168
- [x] Test no breaking changes to existing tests
21692169
- [x] Test graceful handling when disabled
2170-
2171-
Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for
2172-
actual metric collection can be added as follow-up enhancement.
2170+
- [x] Statement execution hooks
2171+
- [x] Add beforeExecute() hook to QueryContext
2172+
- [x] Add afterExecute() and completeStatement() hooks to QueryContext
2173+
- [x] Add beforeExecute() hook to ExecContext
2174+
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
2175+
- [x] Use operation handle GUID as statement ID
21732176

21742177
### Phase 8: Testing & Validation
21752178
- [ ] Run benchmark tests

telemetry/aggregator.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ type metricsAggregator struct {
2828
}
2929

3030
// statementMetrics holds aggregated metrics for a statement.
31-
//
32-
//nolint:unused // Will be used in Phase 8+
3331
type statementMetrics struct {
3432
statementID string
3533
sessionID string
@@ -63,8 +61,6 @@ func newMetricsAggregator(exporter *telemetryExporter, cfg *Config) *metricsAggr
6361
}
6462

6563
// recordMetric records a metric for aggregation.
66-
//
67-
//nolint:unused // Will be used in Phase 8+
6864
func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetryMetric) {
6965
// Swallow all errors
7066
defer func() {
@@ -136,8 +132,6 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
136132
}
137133

138134
// completeStatement marks a statement as complete and emits aggregated metric.
139-
//
140-
//nolint:unused // Will be used in Phase 8+
141135
func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID string, failed bool) {
142136
defer func() {
143137
if r := recover(); r != nil {
@@ -248,13 +242,10 @@ func (agg *metricsAggregator) close(ctx context.Context) error {
248242
}
249243

250244
// simpleError is a simple error implementation for testing.
251-
//
252-
//nolint:unused // Will be used in Phase 8+
253245
type simpleError struct {
254246
msg string
255247
}
256248

257-
//nolint:unused // Will be used in Phase 8+
258249
func (e *simpleError) Error() string {
259250
return e.msg
260251
}

telemetry/errors.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import (
88
// isTerminalError returns true if error is terminal (non-retryable).
99
// Terminal errors indicate user errors or permanent failures that won't
1010
// be resolved by retrying the operation.
11-
//
12-
//nolint:unused // Will be used in Phase 8+
11+
1312
func isTerminalError(err error) bool {
1413
if err == nil {
1514
return false
@@ -45,8 +44,7 @@ func isTerminalError(err error) bool {
4544

4645
// classifyError classifies an error for telemetry purposes.
4746
// Returns a string representation of the error type.
48-
//
49-
//nolint:unused // Will be used in Phase 8+
47+
5048
func classifyError(err error) string {
5149
if err == nil {
5250
return ""
@@ -89,14 +87,12 @@ func isRetryableError(err error) bool {
8987
}
9088

9189
// httpError represents an HTTP error with status code.
92-
//
93-
//nolint:unused // Will be used in Phase 8+
90+
9491
type httpError struct {
9592
statusCode int
9693
message string
9794
}
9895

99-
//nolint:unused // Will be used in Phase 8+
10096
func (e *httpError) Error() string {
10197
return e.message
10298
}
@@ -112,16 +108,14 @@ func newHTTPError(statusCode int, message string) error {
112108
}
113109

114110
// isTerminalHTTPStatus returns true for non-retryable HTTP status codes.
115-
//
116-
//nolint:unused // Will be used in Phase 8+
111+
117112
func isTerminalHTTPStatus(status int) bool {
118113
// 4xx errors (except 429) are terminal
119114
return status >= 400 && status < 500 && status != 429
120115
}
121116

122117
// extractHTTPError extracts HTTP error information if available.
123-
//
124-
//nolint:unused // Will be used in Phase 8+
118+
125119
func extractHTTPError(err error) (*httpError, bool) {
126120
var httpErr *httpError
127121
if errors.As(err, &httpErr) {

telemetry/interceptor.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,15 @@ type Interceptor struct {
1515
}
1616

1717
// metricContext holds metric collection state in context.
18-
//
19-
//nolint:unused // Will be used in Phase 8+
2018
type metricContext struct {
19+
sessionID string
2120
statementID string
2221
startTime time.Time
2322
tags map[string]interface{}
2423
}
2524

26-
//nolint:unused // Will be used in Phase 8+
2725
type contextKey int
2826

29-
//nolint:unused // Will be used in Phase 8+
3027
const metricContextKey contextKey = 0
3128

3229
// newInterceptor creates a new telemetry interceptor.
@@ -38,32 +35,28 @@ func newInterceptor(aggregator *metricsAggregator, enabled bool) *Interceptor {
3835
}
3936

4037
// withMetricContext adds metric context to the context.
41-
//
42-
//nolint:unused // Will be used in Phase 8+
4338
func withMetricContext(ctx context.Context, mc *metricContext) context.Context {
4439
return context.WithValue(ctx, metricContextKey, mc)
4540
}
4641

4742
// getMetricContext retrieves metric context from the context.
48-
//
49-
//nolint:unused // Will be used in Phase 8+
5043
func getMetricContext(ctx context.Context) *metricContext {
5144
if mc, ok := ctx.Value(metricContextKey).(*metricContext); ok {
5245
return mc
5346
}
5447
return nil
5548
}
5649

57-
// beforeExecute is called before statement execution.
50+
// BeforeExecute is called before statement execution.
5851
// Returns a new context with metric tracking attached.
59-
//
60-
//nolint:unused // Will be used in Phase 8+
61-
func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context {
52+
// Exported for use by the driver package.
53+
func (i *Interceptor) BeforeExecute(ctx context.Context, sessionID string, statementID string) context.Context {
6254
if !i.enabled {
6355
return ctx
6456
}
6557

6658
mc := &metricContext{
59+
sessionID: sessionID,
6760
statementID: statementID,
6861
startTime: time.Now(),
6962
tags: make(map[string]interface{}),
@@ -72,11 +65,28 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con
7265
return withMetricContext(ctx, mc)
7366
}
7467

75-
// afterExecute is called after statement execution.
68+
// BeforeExecuteWithTime is called before statement execution with a custom start time.
69+
// This is useful when the statement ID is not known until after execution starts.
70+
// Exported for use by the driver package.
71+
func (i *Interceptor) BeforeExecuteWithTime(ctx context.Context, sessionID string, statementID string, startTime time.Time) context.Context {
72+
if !i.enabled {
73+
return ctx
74+
}
75+
76+
mc := &metricContext{
77+
sessionID: sessionID,
78+
statementID: statementID,
79+
startTime: startTime,
80+
tags: make(map[string]interface{}),
81+
}
82+
83+
return withMetricContext(ctx, mc)
84+
}
85+
86+
// AfterExecute is called after statement execution.
7687
// Records the metric with timing and error information.
77-
//
78-
//nolint:unused // Will be used in Phase 8+
79-
func (i *Interceptor) afterExecute(ctx context.Context, err error) {
88+
// Exported for use by the driver package.
89+
func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
8090
if !i.enabled {
8191
return
8292
}
@@ -96,6 +106,7 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
96106
metric := &telemetryMetric{
97107
metricType: "statement",
98108
timestamp: mc.startTime,
109+
sessionID: mc.sessionID,
99110
statementID: mc.statementID,
100111
latencyMs: time.Since(mc.startTime).Milliseconds(),
101112
tags: mc.tags,
@@ -109,10 +120,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
109120
i.aggregator.recordMetric(ctx, metric)
110121
}
111122

112-
// addTag adds a tag to the current metric context.
113-
//
114-
//nolint:unused // Will be used in Phase 8+
115-
func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) {
123+
// AddTag adds a tag to the current metric context.
124+
// Exported for use by the driver package.
125+
func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) {
116126
if !i.enabled {
117127
return
118128
}
@@ -146,10 +156,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte
146156
i.aggregator.recordMetric(ctx, metric)
147157
}
148158

149-
// completeStatement marks a statement as complete and flushes aggregated metrics.
150-
//
151-
//nolint:unused // Will be used in Phase 8+
152-
func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) {
159+
// CompleteStatement marks a statement as complete and flushes aggregated metrics.
160+
// Exported for use by the driver package.
161+
func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) {
153162
if !i.enabled {
154163
return
155164
}

0 commit comments

Comments
 (0)