Skip to content

Commit 3191d20

Browse files
samikshya-dbclaude
andcommitted
[PECOBLR-1381] Implement telemetry Phase 6: Metric collection & aggregation
This commit implements Phase 6 (metric collection and aggregation) for the telemetry system. Phase 6: Metric Collection & Aggregation - Implement error classification (errors.go) - isTerminalError() for identifying non-retryable errors - classifyError() for categorizing errors for telemetry - HTTP error handling utilities - Implement telemetry interceptor (interceptor.go) - beforeExecute() / afterExecute() hooks for statement execution - Context-based metric tracking with metricContext - Latency measurement and tag collection - Connection event recording - Error swallowing with panic recovery - Implement metrics aggregator (aggregator.go) - Statement-level metric aggregation - Batch size and flush interval logic - Background flush goroutine with ticker - Thread-safe metric recording with mutex protection - Immediate flush for connection and terminal errors - Aggregated counts (chunks, bytes, polls) - Update telemetryClient (client.go) - Wire up aggregator with exporter - Automatic aggregator start in constructor - Graceful shutdown with 5s timeout - getInterceptor() for per-connection interceptors Architecture: - Each connection gets its own interceptor instance - All interceptors share the same aggregator (per host) - Aggregator batches metrics and flushes periodically - Exporter sends batched metrics to Databricks - Circuit breaker protects against endpoint failures Testing: - All 70+ existing tests continue to pass - Compilation verified, no breaking changes Note: Phase 7 (driver integration) will be completed separately to allow careful review and testing of hooks in connection.go and statement.go. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 38e41de commit 3191d20

File tree

4 files changed

+523
-9
lines changed

4 files changed

+523
-9
lines changed

telemetry/aggregator.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package telemetry
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
// metricsAggregator aggregates metrics by statement and batches for export.
10+
type metricsAggregator struct {
11+
mu sync.RWMutex
12+
13+
statements map[string]*statementMetrics
14+
batch []*telemetryMetric
15+
exporter *telemetryExporter
16+
17+
batchSize int
18+
flushInterval time.Duration
19+
stopCh chan struct{}
20+
flushTimer *time.Ticker
21+
}
22+
23+
// statementMetrics holds aggregated metrics for a statement.
24+
type statementMetrics struct {
25+
statementID string
26+
sessionID string
27+
totalLatency time.Duration
28+
chunkCount int
29+
bytesDownloaded int64
30+
pollCount int
31+
errors []string
32+
tags map[string]interface{}
33+
}
34+
35+
// newMetricsAggregator creates a new metrics aggregator.
36+
func newMetricsAggregator(exporter *telemetryExporter, cfg *Config) *metricsAggregator {
37+
agg := &metricsAggregator{
38+
statements: make(map[string]*statementMetrics),
39+
batch: make([]*telemetryMetric, 0, cfg.BatchSize),
40+
exporter: exporter,
41+
batchSize: cfg.BatchSize,
42+
flushInterval: cfg.FlushInterval,
43+
stopCh: make(chan struct{}),
44+
}
45+
46+
// Start background flush timer
47+
go agg.flushLoop()
48+
49+
return agg
50+
}
51+
52+
// recordMetric records a metric for aggregation.
53+
func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetryMetric) {
54+
// Swallow all errors
55+
defer func() {
56+
if r := recover(); r != nil {
57+
// Log at trace level only
58+
// logger.Trace().Msgf("telemetry: recordMetric panic: %v", r)
59+
}
60+
}()
61+
62+
agg.mu.Lock()
63+
defer agg.mu.Unlock()
64+
65+
switch metric.metricType {
66+
case "connection":
67+
// Emit connection events immediately
68+
agg.batch = append(agg.batch, metric)
69+
if len(agg.batch) >= agg.batchSize {
70+
agg.flushUnlocked(ctx)
71+
}
72+
73+
case "statement":
74+
// Aggregate by statement ID
75+
stmt, exists := agg.statements[metric.statementID]
76+
if !exists {
77+
stmt = &statementMetrics{
78+
statementID: metric.statementID,
79+
sessionID: metric.sessionID,
80+
tags: make(map[string]interface{}),
81+
}
82+
agg.statements[metric.statementID] = stmt
83+
}
84+
85+
// Update aggregated values
86+
stmt.totalLatency += time.Duration(metric.latencyMs) * time.Millisecond
87+
if chunkCount, ok := metric.tags["chunk_count"].(int); ok {
88+
stmt.chunkCount += chunkCount
89+
}
90+
if bytes, ok := metric.tags["bytes_downloaded"].(int64); ok {
91+
stmt.bytesDownloaded += bytes
92+
}
93+
if pollCount, ok := metric.tags["poll_count"].(int); ok {
94+
stmt.pollCount += pollCount
95+
}
96+
97+
// Store error if present
98+
if metric.errorType != "" {
99+
stmt.errors = append(stmt.errors, metric.errorType)
100+
}
101+
102+
// Merge tags
103+
for k, v := range metric.tags {
104+
stmt.tags[k] = v
105+
}
106+
107+
case "error":
108+
// Check if terminal error
109+
if metric.errorType != "" && isTerminalError(&simpleError{msg: metric.errorType}) {
110+
// Flush terminal errors immediately
111+
agg.batch = append(agg.batch, metric)
112+
agg.flushUnlocked(ctx)
113+
} else {
114+
// Buffer non-terminal errors with statement
115+
if stmt, exists := agg.statements[metric.statementID]; exists {
116+
stmt.errors = append(stmt.errors, metric.errorType)
117+
}
118+
}
119+
}
120+
}
121+
122+
// completeStatement marks a statement as complete and emits aggregated metric.
123+
func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID string, failed bool) {
124+
defer func() {
125+
if r := recover(); r != nil {
126+
// Log at trace level only
127+
}
128+
}()
129+
130+
agg.mu.Lock()
131+
defer agg.mu.Unlock()
132+
133+
stmt, exists := agg.statements[statementID]
134+
if !exists {
135+
return
136+
}
137+
delete(agg.statements, statementID)
138+
139+
// Create aggregated metric
140+
metric := &telemetryMetric{
141+
metricType: "statement",
142+
timestamp: time.Now(),
143+
statementID: stmt.statementID,
144+
sessionID: stmt.sessionID,
145+
latencyMs: stmt.totalLatency.Milliseconds(),
146+
tags: stmt.tags,
147+
}
148+
149+
// Add aggregated counts
150+
metric.tags["chunk_count"] = stmt.chunkCount
151+
metric.tags["bytes_downloaded"] = stmt.bytesDownloaded
152+
metric.tags["poll_count"] = stmt.pollCount
153+
154+
// Add error information if failed
155+
if failed && len(stmt.errors) > 0 {
156+
// Use the first error as the primary error type
157+
metric.errorType = stmt.errors[0]
158+
}
159+
160+
agg.batch = append(agg.batch, metric)
161+
162+
// Flush if batch full
163+
if len(agg.batch) >= agg.batchSize {
164+
agg.flushUnlocked(ctx)
165+
}
166+
}
167+
168+
// flushLoop runs periodic flush in background.
169+
func (agg *metricsAggregator) flushLoop() {
170+
agg.flushTimer = time.NewTicker(agg.flushInterval)
171+
defer agg.flushTimer.Stop()
172+
173+
for {
174+
select {
175+
case <-agg.flushTimer.C:
176+
agg.flush(context.Background())
177+
case <-agg.stopCh:
178+
return
179+
}
180+
}
181+
}
182+
183+
// flush flushes pending metrics to exporter.
184+
func (agg *metricsAggregator) flush(ctx context.Context) {
185+
agg.mu.Lock()
186+
defer agg.mu.Unlock()
187+
agg.flushUnlocked(ctx)
188+
}
189+
190+
// flushUnlocked flushes without locking (caller must hold lock).
191+
func (agg *metricsAggregator) flushUnlocked(ctx context.Context) {
192+
if len(agg.batch) == 0 {
193+
return
194+
}
195+
196+
// Copy batch and clear
197+
metrics := make([]*telemetryMetric, len(agg.batch))
198+
copy(metrics, agg.batch)
199+
agg.batch = agg.batch[:0]
200+
201+
// Export asynchronously
202+
go func() {
203+
defer func() {
204+
if r := recover(); r != nil {
205+
// Log at trace level only
206+
}
207+
}()
208+
agg.exporter.export(ctx, metrics)
209+
}()
210+
}
211+
212+
// close stops the aggregator and flushes pending metrics.
213+
func (agg *metricsAggregator) close(ctx context.Context) error {
214+
close(agg.stopCh)
215+
agg.flush(ctx)
216+
return nil
217+
}
218+
219+
// simpleError is a simple error implementation for testing.
220+
type simpleError struct {
221+
msg string
222+
}
223+
224+
func (e *simpleError) Error() string {
225+
return e.msg
226+
}

telemetry/client.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package telemetry
22

33
import (
4+
"context"
45
"net/http"
56
"sync"
7+
"time"
68
)
79

810
// telemetryClient represents a client for sending telemetry data to Databricks.
@@ -11,44 +13,75 @@ import (
1113
// - One telemetryClient instance is shared across ALL connections to the same host
1214
// - This prevents rate limiting by consolidating telemetry from multiple connections
1315
// - The client MUST be fully thread-safe as it will be accessed concurrently
14-
// - All methods (start, close, and future export methods) must use proper synchronization
16+
// - All methods (start, close, and export methods) use proper synchronization
1517
//
16-
// The mu mutex protects the started and closed flags. Future implementations in Phase 4
17-
// will need to ensure thread-safety for batch operations and flushing.
18-
//
19-
// This is a minimal stub implementation that will be fully implemented in Phase 4.
18+
// The mu mutex protects the started and closed flags.
19+
// The aggregator handles thread-safe metric collection and batching.
2020
type telemetryClient struct {
2121
host string
2222
httpClient *http.Client
2323
cfg *Config
24+
25+
exporter *telemetryExporter
26+
aggregator *metricsAggregator
27+
2428
mu sync.Mutex // Protects started and closed flags
2529
started bool
2630
closed bool
2731
}
2832

2933
// newTelemetryClient creates a new telemetry client for the given host.
3034
func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient {
35+
// Create exporter
36+
exporter := newTelemetryExporter(host, httpClient, cfg)
37+
38+
// Create aggregator with exporter
39+
aggregator := newMetricsAggregator(exporter, cfg)
40+
3141
return &telemetryClient{
3242
host: host,
3343
httpClient: httpClient,
3444
cfg: cfg,
45+
exporter: exporter,
46+
aggregator: aggregator,
3547
}
3648
}
3749

3850
// start starts the telemetry client's background operations.
39-
// This is a stub implementation that will be fully implemented in Phase 4.
51+
// The aggregator starts its background flush timer automatically.
4052
func (c *telemetryClient) start() error {
4153
c.mu.Lock()
4254
defer c.mu.Unlock()
55+
56+
if c.started {
57+
return nil
58+
}
59+
4360
c.started = true
61+
// Aggregator already started in newTelemetryClient
4462
return nil
4563
}
4664

4765
// close stops the telemetry client and flushes any pending data.
48-
// This is a stub implementation that will be fully implemented in Phase 4.
66+
// Provides graceful shutdown with a timeout to flush pending metrics.
4967
func (c *telemetryClient) close() error {
5068
c.mu.Lock()
51-
defer c.mu.Unlock()
69+
if c.closed {
70+
c.mu.Unlock()
71+
return nil
72+
}
5273
c.closed = true
53-
return nil
74+
c.mu.Unlock()
75+
76+
// Flush pending metrics with timeout
77+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
78+
defer cancel()
79+
80+
return c.aggregator.close(ctx)
81+
}
82+
83+
// getInterceptor returns a new interceptor for a connection.
84+
// Each connection gets its own interceptor, but they all share the same aggregator.
85+
func (c *telemetryClient) getInterceptor(enabled bool) *interceptor {
86+
return newInterceptor(c.aggregator, enabled)
5487
}

0 commit comments

Comments
 (0)