Skip to content

Commit dfdc465

Browse files
author
pixie-agent
committed
fix(ae-trigger): escape a PollLimit-saturated watermark boundary (from #67)
ae-prod's boundary handling accumulates the seen-fingerprint set on a no-progress tick, but if >PollLimit rows share one normalized event_time the SQL (>= watermark ORDER BY time LIMIT N, no secondary key) returns the same N boundary rows every poll → rows beyond N at that timestamp are never emitted (infinite boundary). Detect all-skipped-at-capacity and advance the watermark by 1ns to make forward progress (fingerprint dedup already tolerates the 1ns overlap). Cherry-picked the trigger fix + its test from the stale CodeRabbit-chat PR #67 (687851d); dropped that PR's unrelated gen-pod.tmpl.yaml churn. #67 itself is NOT mergeable (77 commits behind ae-prod, re-adds deleted terraform).
1 parent 714c1fb commit dfdc465

2 files changed

Lines changed: 35 additions & 7 deletions

File tree

src/vizier/services/adaptive_export/internal/trigger/clickhouse.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,15 @@ func (t *ClickHouseHTTP) run(ctx context.Context, out chan<- kubescape.Event) {
273273
// with the time-based throttle inside flushWatermark, this
274274
// produces at most one persistent INSERT per WatermarkSaveInterval.
275275
const saveEveryN = 256
276+
skippedAtBoundary := 0
276277
for i, row := range rows {
277278
fp := rowFingerprint(row)
278279
// Cursor comparisons are in NORMALIZED nanos (F8): the raw
279280
// event_time unit is not enforced, so compare on the same scale
280281
// as the SQL filter (chNormEventTimeNanos) and maxSeen.
281282
evn := normalizeEventTimeNanos(row.EventTime)
282283
if evn == watermark && seenAtBoundary[fp] {
284+
skippedAtBoundary++
283285
continue // already pushed in a prior poll at this exact boundary
284286
}
285287
ev, err := kubescape.Extract(row)
@@ -314,6 +316,23 @@ func (t *ClickHouseHTTP) run(ctx context.Context, out chan<- kubescape.Event) {
314316
for fp := range nextSeen {
315317
seenAtBoundary[fp] = true
316318
}
319+
// Paging escape: if every row returned was a boundary-skip AND
320+
// the response was at PollLimit capacity, there may be additional
321+
// rows at the same normalized event_time that we will never reach
322+
// (the SQL ORDER BY has no secondary key, so LIMIT always returns
323+
// the same PollLimit rows from the boundary). Advance the watermark
324+
// by 1 nanosecond to escape the boundary. In practice this means
325+
// at most one nanosecond's worth of events are not re-delivered on
326+
// the next poll, which is acceptable: the fingerprint dedup already
327+
// tolerates boundary overlap, and we prefer forward progress over
328+
// an infinite loop.
329+
if skippedAtBoundary > 0 && len(nextSeen) == 0 && len(rows) >= t.cfg.PollLimit {
330+
watermark++
331+
seenAtBoundary = map[string]bool{}
332+
dirty = true
333+
log.WithField("watermark", watermark).
334+
Warn("trigger: boundary paging escape — advanced watermark by 1ns to unblock poll")
335+
}
317336
}
318337
// Final flush at end of pollOnce — also throttled.
319338
flushWatermark()

src/vizier/services/adaptive_export/internal/trigger/clickhouse_internal_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net/http/httptest"
2323
"strconv"
2424
"strings"
25+
"sync"
2526
"testing"
2627
"time"
2728
)
@@ -61,9 +62,13 @@ func TestNormalizeEventTimeNanos(t *testing.T) {
6162
// stops a larger-unit row from poisoning the watermark (F8). It captures the
6263
// query the trigger sends to ClickHouse.
6364
func TestFetchSinceFiltersOnNormalizedEventTime(t *testing.T) {
65+
var mu sync.Mutex
6466
var gotQuery string
6567
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
66-
gotQuery = r.URL.Query().Get("query")
68+
q := r.URL.Query().Get("query")
69+
mu.Lock()
70+
gotQuery = q
71+
mu.Unlock()
6772
w.WriteHeader(200) // empty body = 0 rows, valid JSONEachRow
6873
}))
6974
defer srv.Close()
@@ -80,16 +85,20 @@ func TestFetchSinceFiltersOnNormalizedEventTime(t *testing.T) {
8085
t.Fatalf("fetchSince: %v", err)
8186
}
8287

83-
if !strings.Contains(gotQuery, chNormEventTimeNanos) {
84-
t.Errorf("query does not normalize event_time; want %q in:\n%s", chNormEventTimeNanos, gotQuery)
88+
mu.Lock()
89+
q := gotQuery
90+
mu.Unlock()
91+
92+
if !strings.Contains(q, chNormEventTimeNanos) {
93+
t.Errorf("query does not normalize event_time; want %q in:\n%s", chNormEventTimeNanos, q)
8594
}
8695
// The >= bound must compare the normalized expression against the nanos
8796
// watermark, not the raw column.
8897
wantPred := chNormEventTimeNanos + " >= " + strconv.FormatUint(wmNanos, 10)
89-
if !strings.Contains(gotQuery, wantPred) {
90-
t.Errorf("query filter is not normalized-vs-nanos-watermark; want %q in:\n%s", wantPred, gotQuery)
98+
if !strings.Contains(q, wantPred) {
99+
t.Errorf("query filter is not normalized-vs-nanos-watermark; want %q in:\n%s", wantPred, q)
91100
}
92-
if strings.Contains(gotQuery, "event_time >= ") {
93-
t.Errorf("query still uses RAW event_time filter (poison-prone):\n%s", gotQuery)
101+
if strings.Contains(q, "event_time >= ") {
102+
t.Errorf("query still uses RAW event_time filter (poison-prone):\n%s", q)
94103
}
95104
}

0 commit comments

Comments
 (0)