Skip to content

Commit 1c7c572

Browse files
committed
Address PR review feedback
- README: remove non-existent forceEnableTelemetry param - aggregator: split connection/operation flush logic — connection flushes immediately, operation batches to avoid goroutine-per-call - config: use strconv.ParseBool for enableTelemetry; clarify default comment - request: align structs with proto schema (error_name/stack_trace, SqlExecutionEvent fields, ChunkDetails, ResultLatency, HostDetails, VolumeOperationEvent, DriverConnectionParameters); use crypto/rand - system_info: cache OS/runtime info with sync.Once to avoid repeated os.ReadFile on every metric - integration_test: fix PrivacyCompliance and TagFiltering tests to validate TelemetryRequest format (assertions previously never ran) Co-authored-by: Isaac
1 parent e23c725 commit 1c7c572

6 files changed

Lines changed: 192 additions & 107 deletions

File tree

README.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enable
7070
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
7171
```
7272

73-
**Advanced configuration** (for testing/debugging):
74-
```
75-
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?forceEnableTelemetry=true
76-
```
77-
7873
**What data is collected:**
7974
- ✅ Query latency and performance metrics
8075
- ✅ Error codes (not error messages)

telemetry/aggregator.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,20 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
7474
defer agg.mu.Unlock()
7575

7676
switch metric.metricType {
77-
case "connection", "operation":
78-
// Emit connection and operation events immediately
77+
case "connection":
78+
// Connection events flush immediately: lifecycle events must be captured before
79+
// the connection closes, as we won't have another opportunity to flush.
7980
agg.batch = append(agg.batch, metric)
8081
agg.flushUnlocked(ctx)
8182

83+
case "operation":
84+
// Operation events batch normally to avoid a goroutine-per-call if more
85+
// operation types are recorded per connection in the future.
86+
agg.batch = append(agg.batch, metric)
87+
if len(agg.batch) >= agg.batchSize {
88+
agg.flushUnlocked(ctx)
89+
}
90+
8291
case "statement":
8392
// Aggregate by statement ID
8493
stmt, exists := agg.statements[metric.statementID]

telemetry/config.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ type Config struct {
3939
}
4040

4141
// DefaultConfig returns default telemetry configuration.
42-
// Note: Telemetry is disabled by default and requires explicit opt-in.
42+
// Note: Telemetry is disabled by default. The default will remain false until
43+
// server-side feature flags are wired in to control the rollout.
4344
func DefaultConfig() *Config {
4445
return &Config{
45-
Enabled: false, // Disabled by default, requires explicit opt-in
46+
Enabled: false,
4647
EnableTelemetry: false,
4748
BatchSize: 100,
4849
FlushInterval: 5 * time.Second,
@@ -58,12 +59,9 @@ func DefaultConfig() *Config {
5859
func ParseTelemetryConfig(params map[string]string) *Config {
5960
cfg := DefaultConfig()
6061

61-
// Check for enableTelemetry flag (follows client > server > default priority)
6262
if v, ok := params["enableTelemetry"]; ok {
63-
if v == "true" || v == "1" {
64-
cfg.EnableTelemetry = true
65-
} else if v == "false" || v == "0" {
66-
cfg.EnableTelemetry = false
63+
if b, err := strconv.ParseBool(v); err == nil {
64+
cfg.EnableTelemetry = b
6765
}
6866
}
6967

telemetry/integration_test.go

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net/http"
88
"net/http/httptest"
9+
"strings"
910
"sync/atomic"
1011
"testing"
1112
"time"
@@ -183,12 +184,12 @@ func TestIntegration_OptInPriority_ExplicitOptOut(t *testing.T) {
183184
// TestIntegration_PrivacyCompliance verifies no sensitive data is collected.
184185
func TestIntegration_PrivacyCompliance_NoQueryText(t *testing.T) {
185186
cfg := DefaultConfig()
187+
cfg.FlushInterval = 50 * time.Millisecond
186188
httpClient := &http.Client{Timeout: 5 * time.Second}
187189

188-
var capturedPayload telemetryPayload
190+
var capturedBody []byte
189191
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
190-
body, _ := io.ReadAll(r.Body)
191-
json.Unmarshal(body, &capturedPayload)
192+
capturedBody, _ = io.ReadAll(r.Body)
192193
w.WriteHeader(http.StatusOK)
193194
}))
194195
defer server.Close()
@@ -199,93 +200,103 @@ func TestIntegration_PrivacyCompliance_NoQueryText(t *testing.T) {
199200

200201
interceptor := newInterceptor(aggregator, true)
201202

202-
// Simulate execution with sensitive data in tags (should be filtered)
203203
ctx := context.Background()
204204
statementID := "stmt-privacy"
205205
ctx = interceptor.BeforeExecute(ctx, "session-id", statementID)
206206

207-
// Try to add sensitive tags (should be filtered out)
207+
// Add sensitive tags — none of these should appear in the exported telemetry
208208
interceptor.AddTag(ctx, "query.text", "SELECT * FROM users")
209209
interceptor.AddTag(ctx, "user.email", "user@example.com")
210-
interceptor.AddTag(ctx, "workspace.id", "ws-123") // This should be allowed
211210

212211
interceptor.AfterExecute(ctx, nil)
213212
interceptor.CompleteStatement(ctx, statementID, false)
214213

215214
// Wait for flush
216215
time.Sleep(200 * time.Millisecond)
217216

218-
// Verify no sensitive data in captured payload
219-
if len(capturedPayload.Metrics) > 0 {
220-
for _, metric := range capturedPayload.Metrics {
221-
if _, ok := metric.Tags["query.text"]; ok {
222-
t.Error("Query text should not be exported")
223-
}
224-
if _, ok := metric.Tags["user.email"]; ok {
225-
t.Error("User email should not be exported")
226-
}
227-
// workspace.id should be allowed
228-
if _, ok := metric.Tags["workspace.id"]; !ok {
229-
t.Error("workspace.id should be exported")
230-
}
231-
}
217+
if len(capturedBody) == 0 {
218+
t.Fatal("Expected telemetry request to be sent")
232219
}
233220

234-
t.Log("Privacy compliance test passed: sensitive data filtered")
221+
// The exporter sends TelemetryRequest with ProtoLogs (JSON-encoded TelemetryFrontendLog).
222+
// Verify sensitive values are absent from the serialised payload.
223+
bodyStr := string(capturedBody)
224+
if strings.Contains(bodyStr, "SELECT * FROM users") {
225+
t.Error("Query text must not be exported")
226+
}
227+
if strings.Contains(bodyStr, "user@example.com") {
228+
t.Error("User email must not be exported")
229+
}
230+
231+
t.Log("Privacy compliance test passed: sensitive data not present in payload")
235232
}
236233

237-
// TestIntegration_TagFiltering verifies tag filtering works correctly.
238-
func TestIntegration_TagFiltering(t *testing.T) {
234+
// TestIntegration_FieldMapping verifies that only known metric fields are exported
235+
// in the TelemetryRequest format (no generic tag pass-through).
236+
func TestIntegration_FieldMapping(t *testing.T) {
239237
cfg := DefaultConfig()
240238
cfg.FlushInterval = 50 * time.Millisecond
241239
httpClient := &http.Client{Timeout: 5 * time.Second}
242240

243-
var capturedPayload telemetryPayload
241+
var capturedRequest TelemetryRequest
244242
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
245243
body, _ := io.ReadAll(r.Body)
246-
json.Unmarshal(body, &capturedPayload)
244+
json.Unmarshal(body, &capturedRequest)
247245
w.WriteHeader(http.StatusOK)
248246
}))
249247
defer server.Close()
250248

251249
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
252250

253-
// Test metric with mixed tags
254251
metric := &telemetryMetric{
255252
metricType: "connection",
256253
timestamp: time.Now(),
257254
workspaceID: "ws-test",
255+
sessionID: "sess-1",
256+
latencyMs: 42,
258257
tags: map[string]interface{}{
259-
"workspace.id": "ws-123", // Should export
260-
"driver.version": "1.0.0", // Should export
261-
"server.address": "localhost:8080", // Should NOT export (local only)
262-
"unknown.tag": "value", // Should NOT export
258+
"chunk_count": 3,
259+
"bytes_downloaded": int64(1024),
260+
"unknown.tag": "value", // should NOT appear in output
263261
},
264262
}
265263

266264
ctx := context.Background()
267265
exporter.export(ctx, []*telemetryMetric{metric})
268266

269-
// Wait for export
270-
time.Sleep(100 * time.Millisecond)
267+
time.Sleep(150 * time.Millisecond)
271268

272-
// Verify filtering
273-
if len(capturedPayload.Metrics) > 0 {
274-
exported := capturedPayload.Metrics[0]
269+
if len(capturedRequest.ProtoLogs) == 0 {
270+
t.Fatal("Expected at least one ProtoLog entry")
271+
}
275272

276-
if _, ok := exported.Tags["workspace.id"]; !ok {
277-
t.Error("workspace.id should be exported")
278-
}
279-
if _, ok := exported.Tags["driver.version"]; !ok {
280-
t.Error("driver.version should be exported")
281-
}
282-
if _, ok := exported.Tags["server.address"]; ok {
283-
t.Error("server.address should NOT be exported")
284-
}
285-
if _, ok := exported.Tags["unknown.tag"]; ok {
286-
t.Error("unknown.tag should NOT be exported")
273+
// Each ProtoLog entry is a JSON-encoded TelemetryFrontendLog.
274+
var log TelemetryFrontendLog
275+
if err := json.Unmarshal([]byte(capturedRequest.ProtoLogs[0]), &log); err != nil {
276+
t.Fatalf("Failed to unmarshal ProtoLog: %v", err)
277+
}
278+
279+
if log.Entry == nil || log.Entry.SQLDriverLog == nil {
280+
t.Fatal("Expected SQLDriverLog to be populated")
281+
}
282+
283+
entry := log.Entry.SQLDriverLog
284+
if entry.SessionID != "sess-1" {
285+
t.Errorf("Expected session_id=sess-1, got %q", entry.SessionID)
286+
}
287+
if entry.OperationLatencyMs != 42 {
288+
t.Errorf("Expected latency=42, got %d", entry.OperationLatencyMs)
289+
}
290+
if entry.SQLOperation != nil && entry.SQLOperation.ChunkDetails != nil {
291+
if entry.SQLOperation.ChunkDetails.TotalChunksIterated != 3 {
292+
t.Errorf("Expected total_chunks_iterated=3, got %d", entry.SQLOperation.ChunkDetails.TotalChunksIterated)
287293
}
288294
}
289295

290-
t.Log("Tag filtering test passed")
296+
// unknown.tag must not appear anywhere in the serialised output
297+
if strings.Contains(capturedRequest.ProtoLogs[0], "unknown.tag") {
298+
t.Error("unknown.tag must not be exported")
299+
}
300+
301+
t.Log("Field mapping test passed")
291302
}

0 commit comments

Comments
 (0)