diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 552a684ea2..85bcd28bd3 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -431,21 +431,17 @@ type changefeedStatus struct { minSentTs atomic.Uint64 scanInterval atomic.Int64 - lastAdjustTime atomic.Time - lastTrendAdjustTime atomic.Time - usageWindow *memoryUsageWindow - syncPointInterval time.Duration + scanWindowController *adaptiveScanWindowController + syncPointInterval time.Duration } func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { status := &changefeedStatus{ - changefeedID: changefeedID, - usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), - syncPointInterval: syncPointInterval, + changefeedID: changefeedID, + scanWindowController: newAdaptiveScanWindowController(time.Now()), + syncPointInterval: syncPointInterval, } status.scanInterval.Store(int64(defaultScanInterval)) - status.lastAdjustTime.Store(time.Now()) - status.lastTrendAdjustTime.Store(time.Now()) return status } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..3e74a55a28 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -1142,9 +1142,7 @@ func (c *eventBroker) removeChangefeedStatus(status *changefeedStatus) { } filter.GetSharedFilterStorage().RemoveFilter(changefeedID) - metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String()) - metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String()) - metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String()) + deleteScanWindowMetrics(changefeedID.String()) } func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { @@ -1263,8 +1261,7 @@ func (c *eventBroker) getOrSetChangefeedStatus(info DispatcherInfo) *changefeedS return actual.(*changefeedStatus) } log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) - metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeedID.String()).Set(defaultScanInterval.Seconds()) + initializeScanWindowMetrics(changefeedID.String()) return status } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 4a274e0c12..e6aa762b9e 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -373,7 +373,6 @@ func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { status := addChangefeedStatusToBrokerForTest(t, broker, changefeedID, time.Second*10) status.scanInterval.Store(int64(40 * time.Second)) - status.lastAdjustTime.Store(time.Now()) control := event.NewCongestionControlWithVersion(event.CongestionControlVersion2) control.AddAvailableMemoryWithDispatchersAndUsage(changefeedID.ID(), 0, 1, nil) @@ -382,7 +381,7 @@ func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { require.Equal(t, int64(10*time.Second), status.scanInterval.Load()) } -func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T) { +func TestHandleCongestionControlV2DoesNotResetScanIntervalOnMemoryRelease(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close() @@ -395,7 +394,7 @@ func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T control.AddAvailableMemoryWithDispatchersAndUsageAndReleaseCount(changefeedID.ID(), 0, 0.5, nil, 1) broker.handleCongestionControl(node.ID("event-collector-1"), control) - require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) } func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) { @@ -406,7 +405,6 @@ func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) { status := addChangefeedStatusToBrokerForTest(t, broker, changefeedID, time.Second*10) status.scanInterval.Store(int64(40 * time.Second)) - status.lastAdjustTime.Store(time.Now()) control := event.NewCongestionControl() control.AddAvailableMemoryWithDispatchers(changefeedID.ID(), 0, nil) diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 46cb9cffb3..e972b6f7bd 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -43,11 +43,6 @@ const ( // this cooldown and are applied immediately. scanIntervalAdjustCooldown = 30 * time.Second - // scanTrendAdjustCooldown is the minimum time between trend-based interval - // adjustments. This is shorter than the general cooldown because trend - // adjustments need to be more responsive to rising memory pressure. - scanTrendAdjustCooldown = 5 * time.Second - // memoryUsageWindowDuration is the duration of the sliding window for // collecting memory usage samples. Samples older than this duration are // pruned from the window. @@ -58,10 +53,12 @@ const ( memoryUsageHighThreshold = 0.7 // memoryUsageCriticalThreshold (90%) triggers an aggressive reduction of - // the scan interval to 1/4 of its current value when memory usage exceeds - // this level. + // the scan interval once memory usage exceeds this level. memoryUsageCriticalThreshold = 0.9 + // memoryUsageEmergencyThreshold (98%) triggers the strongest emergency brake. + memoryUsageEmergencyThreshold = 0.98 + // memoryUsageLowThreshold (20%) allows the scan interval to be increased // by 25% when both max and average memory usage are below this level. memoryUsageLowThreshold = 0.2 @@ -71,6 +68,50 @@ const ( // increase may exceed the normal sync point interval cap. memoryUsageVeryLowThreshold = 0.1 + // scanWindowModeratePressureThreshold is the smoothed usage threshold that + // starts accumulating pressure score for gradual interval reductions. + scanWindowModeratePressureThreshold = 0.55 + + // scanWindowHighPressureThreshold triggers a stronger but still bounded + // interval reduction when sustained high pressure is observed. + scanWindowHighPressureThreshold = 0.75 + + // scanWindowPressureAdjustCooldown is the minimum time between non-critical + // downward adjustments. It prevents the controller from overreacting before + // previous interval changes have time to take effect. + scanWindowPressureAdjustCooldown = 10 * time.Second + + // scanWindowCriticalBrakeCooldown deduplicates repeated critical brakes + // caused by the same short burst. Without this cooldown, one peak retained + // in the usage window can repeatedly trigger critical_brake on every report. + scanWindowCriticalBrakeCooldown = 10 * time.Second + + // scanWindowReleaseRecoveryCooldown is the minimum time after a downward + // adjustment before the controller is allowed to recover upward again. + scanWindowReleaseRecoveryCooldown = 15 * time.Second + + // scanWindowVeryLowRecoveryCooldown is the minimum time after a recent + // instability event before the controller can re-enter the aggressive + // very_low_recovery path. + scanWindowVeryLowRecoveryCooldown = 90 * time.Second + + // scanWindowFastUsageAlpha controls the responsiveness of the short-term EMA. + scanWindowFastUsageAlpha = 0.4 + + // scanWindowSlowUsageAlpha controls the responsiveness of the long-term EMA. + scanWindowSlowUsageAlpha = 0.2 + + // scanWindowPressureTriggerScore is the score required to trigger a gradual + // downward adjustment under sustained but non-critical pressure. + scanWindowPressureTriggerScore = 3.0 + + // scanWindowPressureScoreCeiling bounds the pressure accumulator. + scanWindowPressureScoreCeiling = 8.0 + + // scanWindowPressureReliefPerRelease is the amount of accumulated pressure + // cleared by one downstream release pulse. + scanWindowPressureReliefPerRelease = 2.0 + // scanWindowStaleDispatcherHeartbeatThreshold is the duration after which a // dispatcher is treated as stale for scan window base ts calculation if it // hasn't sent heartbeat updates. This prevents stale dispatchers (for example, @@ -102,12 +143,67 @@ type memoryUsageStats struct { cnt int } +type scanWindowReport struct { + usageRatio float64 + memoryReleaseCount uint32 +} + +type scanWindowDecisionReason string + +const ( + scanWindowDecisionNone scanWindowDecisionReason = "none" + scanWindowDecisionCriticalBrake scanWindowDecisionReason = "critical_brake" + scanWindowDecisionHighPressure scanWindowDecisionReason = "high_pressure" + scanWindowDecisionSustainedPressure scanWindowDecisionReason = "sustained_pressure" + scanWindowDecisionLowRecovery scanWindowDecisionReason = "low_recovery" + scanWindowDecisionVeryLowRecovery scanWindowDecisionReason = "very_low_recovery" +) + +type scanWindowDecision struct { + newInterval time.Duration + maxInterval time.Duration + reason scanWindowDecisionReason + usage memoryUsageStats + fastUsageEMA float64 + slowUsageEMA float64 + pressureScore float64 +} + +type scanWindowController interface { + OnCongestionReport(now time.Time, currentInterval time.Duration, maxInterval time.Duration, report scanWindowReport) scanWindowDecision +} + +type adaptiveScanWindowController struct { + mu sync.Mutex + + usageWindow *memoryUsageWindow + + lastAdjustTime time.Time + lastDownAdjustTime time.Time + lastCriticalTime time.Time + lastInstabilityTime time.Time + + fastUsageEMA float64 + slowUsageEMA float64 + emaInitialized bool + + pressureScore float64 +} + func newMemoryUsageWindow(window time.Duration) *memoryUsageWindow { return &memoryUsageWindow{ window: window, } } +func newAdaptiveScanWindowController(now time.Time) *adaptiveScanWindowController { + return &adaptiveScanWindowController{ + usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), + lastAdjustTime: now, + lastDownAdjustTime: now, + } +} + func (w *memoryUsageWindow) addSample(now time.Time, ratio float64) { if ratio < 0 { ratio = 0 @@ -166,150 +262,397 @@ func (w *memoryUsageWindow) pruneLocked(now time.Time) { } func (c *changefeedStatus) updateMemoryUsage(now time.Time, usageRatio float64, memoryReleaseCount uint32) { - if c.usageWindow == nil { + if c.scanWindowController == nil { return } - if usageRatio != usageRatio || usageRatio < 0 { - usageRatio = 0 - } - if usageRatio > 1 { - usageRatio = 1 - } - - if memoryReleaseCount > 0 { - c.resetScanIntervalToDefault(now) - c.usageWindow.reset() - c.usageWindow.addSample(now, usageRatio) + normalizedUsageRatio := normalizeUsageRatio(usageRatio) + current := time.Duration(c.scanInterval.Load()) + decision := c.scanWindowController.OnCongestionReport(now, current, c.maxScanInterval(), scanWindowReport{ + usageRatio: normalizedUsageRatio, + memoryReleaseCount: memoryReleaseCount, + }) + c.observeScanWindowControllerMetrics(normalizedUsageRatio, memoryReleaseCount, current, decision) + if decision.newInterval == current { return } - c.usageWindow.addSample(now, usageRatio) - stats := c.usageWindow.stats(now) - c.adjustScanInterval(now, stats) + c.scanInterval.Store(int64(decision.newInterval)) + metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(decision.newInterval.Seconds()) + + log.Info("scan interval adjusted", + zap.Stringer("changefeedID", c.changefeedID), + zap.String("reason", string(decision.reason)), + zap.Duration("oldInterval", current), + zap.Duration("newInterval", decision.newInterval), + zap.Duration("maxInterval", decision.maxInterval), + zap.Float64("avgUsage", decision.usage.avg), + zap.Float64("maxUsage", decision.usage.max), + zap.Float64("firstUsage", decision.usage.first), + zap.Float64("lastUsage", decision.usage.last), + zap.Float64("fastUsageEMA", decision.fastUsageEMA), + zap.Float64("slowUsageEMA", decision.slowUsageEMA), + zap.Float64("pressureScore", decision.pressureScore), + zap.Uint32("memoryReleaseCount", memoryReleaseCount), + zap.Bool("syncPointEnabled", c.isSyncpointEnabled()), + zap.Duration("syncPointInterval", c.syncPointInterval)) } -func (c *changefeedStatus) resetScanIntervalToDefault(now time.Time) { - current := time.Duration(c.scanInterval.Load()) - if current != defaultScanInterval { - c.scanInterval.Store(int64(defaultScanInterval)) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(defaultScanInterval.Seconds()) +func initializeScanWindowMetrics(changefeed string) { + metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeed).Set(0) + metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeed).Set(defaultScanInterval.Seconds()) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report").Set(0) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg").Set(0) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max").Set(0) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast").Set(0) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow").Set(0) + metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed).Set(0) +} - log.Info("scan interval reset to default", - zap.Stringer("changefeedID", c.changefeedID), - zap.Duration("oldInterval", current), - zap.Duration("newInterval", defaultScanInterval)) +func deleteScanWindowMetrics(changefeed string) { + metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "report") + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "avg") + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "max") + metrics.EventServiceScanWindowUsageEMAGaugeVec.DeleteLabelValues(changefeed, "fast") + metrics.EventServiceScanWindowUsageEMAGaugeVec.DeleteLabelValues(changefeed, "slow") + metrics.EventServiceScanWindowPressureScoreGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowMemoryReleaseCount.DeleteLabelValues(changefeed) + for _, reason := range []scanWindowDecisionReason{ + scanWindowDecisionNone, + scanWindowDecisionCriticalBrake, + scanWindowDecisionHighPressure, + scanWindowDecisionSustainedPressure, + scanWindowDecisionLowRecovery, + scanWindowDecisionVeryLowRecovery, + } { + metrics.EventServiceScanWindowAdjustCount.DeleteLabelValues(changefeed, string(reason)) } +} - c.lastAdjustTime.Store(now) - c.lastTrendAdjustTime.Store(now) +func (c *changefeedStatus) observeScanWindowControllerMetrics( + usageRatio float64, + memoryReleaseCount uint32, + current time.Duration, + decision scanWindowDecision, +) { + changefeed := c.changefeedID.String() + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report").Set(usageRatio) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg").Set(decision.usage.avg) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max").Set(decision.usage.max) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast").Set(decision.fastUsageEMA) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow").Set(decision.slowUsageEMA) + metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed).Set(decision.pressureScore) + if memoryReleaseCount > 0 { + metrics.EventServiceScanWindowMemoryReleaseCount.WithLabelValues(changefeed).Add(float64(memoryReleaseCount)) + } + if decision.newInterval != current { + metrics.EventServiceScanWindowAdjustCount.WithLabelValues(changefeed, string(decision.reason)).Inc() + } } -// Constants for trend detection and increase eligibility. const ( - minTrendSamples = 4 // Minimum samples needed to detect a valid trend - increasingTrendEpsilon = 0.02 // Minimum delta to consider as "increasing" - increasingTrendStartRatio = 0.3 // Threshold (30%) above which trend damping kicks in - minIncreaseSamples = 10 // Minimum samples needed before allowing increase minIncreaseSpanNumerator = 4 // Observation span must be at least 4/5 of window minIncreaseSpanDenominator = 5 ) -// adjustScanInterval dynamically adjusts the scan interval based on memory pressure. -// -// Algorithm overview: -// - "Fast brake, slow accelerate": Decreases are applied immediately when memory -// pressure is high, while increases require cooldown periods and stable conditions. -// - Tiered response: Different thresholds trigger different adjustment magnitudes. -// - Trend prediction: Detects rising memory pressure early and proactively reduces -// the interval before hitting critical thresholds. -// -// Thresholds and actions: -// - Critical (>90%): Reduce interval to 1/4 (aggressive) -// - High (>70%): Reduce interval to 1/2 -// - Trend damping (>30% AND rising): Reduce interval by 10% -// - Low (<30% max AND avg): Increase interval by 25% -// - Very low (<10% max AND avg): Increase interval by 50%, may exceed normal cap -func (c *changefeedStatus) adjustScanInterval(now time.Time, usage memoryUsageStats) { - current := time.Duration(c.scanInterval.Load()) +func (c *adaptiveScanWindowController) OnCongestionReport(now time.Time, current time.Duration, maxInterval time.Duration, report scanWindowReport) scanWindowDecision { + c.mu.Lock() + defer c.mu.Unlock() + if current <= 0 { current = defaultScanInterval } - maxInterval := c.maxScanInterval() if maxInterval < minScanInterval { maxInterval = minScanInterval } - // Trend detection: check if memory usage is rising over the observation window. - // This enables proactive intervention before hitting high thresholds. - trendDelta := usage.last - usage.first - isIncreasing := usage.cnt >= minTrendSamples && trendDelta > increasingTrendEpsilon - isAboveTrendStart := usage.last > increasingTrendStartRatio - canAdjustOnTrend := now.Sub(c.lastTrendAdjustTime.Load()) >= scanTrendAdjustCooldown - shouldDampOnTrend := isAboveTrendStart && isIncreasing && canAdjustOnTrend - - // Increase eligibility: conservative conditions to prevent oscillation. - // Requires: cooldown passed, enough samples, sufficient observation span, - // and NOT in an increasing trend situation (to avoid fighting against pressure). + + c.usageWindow.addSample(now, report.usageRatio) + usage := c.usageWindow.stats(now) + c.updateUsageEMALocked(report.usageRatio) + + if decision, ok := c.tryCriticalBrakeLocked(now, current, maxInterval, usage); ok { + return decision + } + + c.updatePressureScoreLocked(usage) + if report.memoryReleaseCount > 0 { + c.relievePressureLocked(report.memoryReleaseCount) + } + + if c.shouldReduceForHighPressureLocked(now, usage) { + newInterval := max(scaleDuration(current, 3, 4), defaultScanInterval) + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionHighPressure, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if c.shouldReduceForSustainedPressureLocked(now, usage) { + newInterval := max(scaleDuration(current, 9, 10), defaultScanInterval) + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionSustainedPressure, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if !c.allowedToIncreaseLocked(now, usage) { + return scanWindowDecision{ + newInterval: current, + maxInterval: maxInterval, + reason: scanWindowDecisionNone, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if c.isVeryLowPressureLocked(usage) && c.allowedToVeryLowRecoverLocked(now) { + effectiveMaxInterval := maxScanInterval + newInterval := min(scaleDuration(current, 3, 2), effectiveMaxInterval) + if newInterval > current { + c.noteAdjustmentLocked(now, false) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: effectiveMaxInterval, + reason: scanWindowDecisionVeryLowRecovery, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + } + + if current < maxInterval && c.isLowPressureLocked(usage) { + newInterval := min(scaleDuration(current, 5, 4), maxInterval) + if newInterval > current { + c.noteAdjustmentLocked(now, false) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionLowRecovery, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + } + + return scanWindowDecision{ + newInterval: current, + maxInterval: maxInterval, + reason: scanWindowDecisionNone, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } +} + +func (c *adaptiveScanWindowController) tryCriticalBrakeLocked( + now time.Time, + current time.Duration, + maxInterval time.Duration, + usage memoryUsageStats, +) (scanWindowDecision, bool) { + if now.Sub(c.lastCriticalTime) < scanWindowCriticalBrakeCooldown { + return scanWindowDecision{}, false + } + + switch { + case usage.last > memoryUsageEmergencyThreshold: + newInterval := max(current/4, minScanInterval) + c.lastCriticalTime = now + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionCriticalBrake, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + }, true + case usage.last > memoryUsageCriticalThreshold: + newInterval := max(current/2, minScanInterval) + c.lastCriticalTime = now + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionCriticalBrake, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + }, true + default: + return scanWindowDecision{}, false + } +} + +func (c *adaptiveScanWindowController) updateUsageEMALocked(value float64) { + if !c.emaInitialized { + c.fastUsageEMA = value + c.slowUsageEMA = value + c.emaInitialized = true + return + } + c.fastUsageEMA = ema(c.fastUsageEMA, value, scanWindowFastUsageAlpha) + c.slowUsageEMA = ema(c.slowUsageEMA, value, scanWindowSlowUsageAlpha) +} + +func (c *adaptiveScanWindowController) updatePressureScoreLocked(usage memoryUsageStats) { + switch { + case c.fastUsageEMA >= scanWindowHighPressureThreshold || + c.slowUsageEMA >= scanWindowHighPressureThreshold || + usage.max >= memoryUsageHighThreshold: + c.pressureScore = min(c.pressureScore+2, scanWindowPressureScoreCeiling) + case c.fastUsageEMA >= scanWindowModeratePressureThreshold || + c.slowUsageEMA >= scanWindowModeratePressureThreshold || + usage.avg >= scanWindowModeratePressureThreshold: + c.pressureScore = min(c.pressureScore+1, scanWindowPressureScoreCeiling) + case c.fastUsageEMA < 0.30 && c.slowUsageEMA < 0.25 && usage.last < 0.30: + c.pressureScore = maxFloat64(0, c.pressureScore-1.5) + default: + c.pressureScore = maxFloat64(0, c.pressureScore-0.5) + } +} + +func (c *adaptiveScanWindowController) relievePressureLocked(memoryReleaseCount uint32) { + relief := min(float64(memoryReleaseCount)*scanWindowPressureReliefPerRelease, scanWindowPressureScoreCeiling) + c.pressureScore = maxFloat64(0, c.pressureScore-relief) +} + +func (c *adaptiveScanWindowController) shouldReduceForHighPressureLocked(now time.Time, usage memoryUsageStats) bool { + if now.Sub(c.lastDownAdjustTime) < scanWindowPressureAdjustCooldown { + return false + } + + return c.fastUsageEMA >= scanWindowHighPressureThreshold || + c.slowUsageEMA >= scanWindowHighPressureThreshold || + usage.max >= memoryUsageHighThreshold +} + +func (c *adaptiveScanWindowController) shouldReduceForSustainedPressureLocked(now time.Time, usage memoryUsageStats) bool { + if now.Sub(c.lastDownAdjustTime) < scanWindowPressureAdjustCooldown { + return false + } + if c.pressureScore < scanWindowPressureTriggerScore { + return false + } + return c.fastUsageEMA >= scanWindowModeratePressureThreshold || + c.slowUsageEMA >= scanWindowModeratePressureThreshold || + usage.avg >= scanWindowModeratePressureThreshold +} + +func (c *adaptiveScanWindowController) allowedToIncreaseLocked(now time.Time, usage memoryUsageStats) bool { minIncreaseSpan := memoryUsageWindowDuration * minIncreaseSpanNumerator / minIncreaseSpanDenominator - allowedToIncrease := now.Sub(c.lastAdjustTime.Load()) >= scanIntervalAdjustCooldown && + return now.Sub(c.lastAdjustTime) >= scanIntervalAdjustCooldown && + now.Sub(c.lastDownAdjustTime) >= scanWindowReleaseRecoveryCooldown && usage.cnt >= minIncreaseSamples && usage.span >= minIncreaseSpan && - !(isAboveTrendStart && isIncreasing) + c.pressureScore < 1 +} - // Determine the new interval based on memory pressure levels. - // Priority order: critical > high > trend damping > very low > low - adjustedOnTrend := false - newInterval := current - switch { - case usage.last > memoryUsageCriticalThreshold || usage.max > memoryUsageCriticalThreshold: - // Critical pressure: aggressive reduction to 1/4 - newInterval = max(current/4, minScanInterval) - case usage.last > memoryUsageHighThreshold || usage.max > memoryUsageHighThreshold: - // High pressure: reduce to 1/2 - newInterval = max(current/2, minScanInterval) - case shouldDampOnTrend: - // Trend damping: pressure is moderate (>30%) but rising. Reduce by 10% to - // preemptively slow down before downstream gets overwhelmed. - newInterval = max(scaleDuration(current, 9, 10), minScanInterval) - adjustedOnTrend = true - case allowedToIncrease && usage.max < memoryUsageVeryLowThreshold && usage.avg < memoryUsageVeryLowThreshold: - // Very low pressure (<20%): increase by 50%, allowed to exceed sync point cap. - maxInterval = maxScanInterval - newInterval = min(scaleDuration(current, 3, 2), maxInterval) - case allowedToIncrease && usage.max < memoryUsageLowThreshold && usage.avg < memoryUsageLowThreshold: - // Low pressure (<40%): increase by 25%, capped by sync point interval. - newInterval = min(scaleDuration(current, 5, 4), maxInterval) - } - - // Anti-oscillation guard: decreases are always applied immediately, - // but increases are blocked if cooldown conditions aren't met. - if newInterval > current && !allowedToIncrease { - return +func (c *adaptiveScanWindowController) allowedToVeryLowRecoverLocked(now time.Time) bool { + if c.lastInstabilityTime.IsZero() { + return true + } + return now.Sub(c.lastInstabilityTime) >= scanWindowVeryLowRecoveryCooldown +} + +func (c *adaptiveScanWindowController) isVeryLowPressureLocked(usage memoryUsageStats) bool { + return usage.max < memoryUsageVeryLowThreshold && + usage.avg < memoryUsageVeryLowThreshold && + c.fastUsageEMA < memoryUsageVeryLowThreshold && + c.slowUsageEMA < memoryUsageVeryLowThreshold +} + +func (c *adaptiveScanWindowController) isLowPressureLocked(usage memoryUsageStats) bool { + return usage.max < memoryUsageLowThreshold && + usage.avg < memoryUsageLowThreshold && + c.fastUsageEMA < memoryUsageLowThreshold+0.03 && + c.slowUsageEMA < memoryUsageLowThreshold+0.02 +} + +func (c *adaptiveScanWindowController) noteAdjustmentLocked(now time.Time, downward bool) { + c.lastAdjustTime = now + if downward { + c.lastDownAdjustTime = now + c.lastInstabilityTime = now } +} - if newInterval != current { - c.scanInterval.Store(int64(newInterval)) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(newInterval.Seconds()) - c.lastAdjustTime.Store(now) - if adjustedOnTrend { - c.lastTrendAdjustTime.Store(now) - } +func (c *adaptiveScanWindowController) setLastAdjustTimeForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastAdjustTime = now +} + +func (c *adaptiveScanWindowController) setLastDownAdjustTimeForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastDownAdjustTime = now +} + +func (c *adaptiveScanWindowController) setPressureScoreForTest(score float64) { + c.mu.Lock() + defer c.mu.Unlock() + c.pressureScore = score +} + +func (c *adaptiveScanWindowController) resetForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.usageWindow.reset() + c.lastAdjustTime = now + c.lastDownAdjustTime = now + c.lastCriticalTime = time.Time{} + c.lastInstabilityTime = time.Time{} + c.fastUsageEMA = 0 + c.slowUsageEMA = 0 + c.emaInitialized = false + c.pressureScore = 0 +} + +func normalizeUsageRatio(usageRatio float64) float64 { + if usageRatio != usageRatio || usageRatio < 0 { + return 0 + } + if usageRatio > 1 { + return 1 + } + return usageRatio +} + +func ema(previous float64, value float64, alpha float64) float64 { + return previous + alpha*(value-previous) +} - log.Info("scan interval adjusted", - zap.Stringer("changefeedID", c.changefeedID), - zap.Duration("oldInterval", current), - zap.Duration("newInterval", newInterval), - zap.Duration("maxInterval", maxInterval), - zap.Float64("avgUsage", usage.avg), - zap.Float64("maxUsage", usage.max), - zap.Float64("firstUsage", usage.first), - zap.Float64("lastUsage", usage.last), - zap.Float64("trendDelta", trendDelta), - zap.Int("usageSamples", usage.cnt), - zap.Bool("syncPointEnabled", c.isSyncpointEnabled()), - zap.Duration("syncPointInterval", c.syncPointInterval)) +func maxFloat64(a float64, b float64) float64 { + if a > b { + return a } + return b } func (c *changefeedStatus) maxScanInterval() time.Duration { diff --git a/pkg/eventservice/scan_window_controller_trace_test.go b/pkg/eventservice/scan_window_controller_trace_test.go new file mode 100644 index 0000000000..5105f03fb0 --- /dev/null +++ b/pkg/eventservice/scan_window_controller_trace_test.go @@ -0,0 +1,126 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventservice + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type scanWindowControllerTracePoint struct { + offset time.Duration + usageRatio float64 + memoryReleaseCount uint32 +} + +type scanWindowControllerTraceResult struct { + finalInterval time.Duration + reasons []scanWindowDecisionReason + intervals []time.Duration +} + +func runScanWindowControllerTrace( + controller scanWindowController, + start time.Time, + startInterval time.Duration, + maxInterval time.Duration, + trace []scanWindowControllerTracePoint, +) scanWindowControllerTraceResult { + currentInterval := startInterval + result := scanWindowControllerTraceResult{ + finalInterval: currentInterval, + reasons: make([]scanWindowDecisionReason, 0, len(trace)), + intervals: make([]time.Duration, 0, len(trace)), + } + + for _, point := range trace { + decision := controller.OnCongestionReport( + start.Add(point.offset), + currentInterval, + maxInterval, + scanWindowReport{ + usageRatio: point.usageRatio, + memoryReleaseCount: point.memoryReleaseCount, + }, + ) + currentInterval = decision.newInterval + result.reasons = append(result.reasons, decision.reason) + result.intervals = append(result.intervals, currentInterval) + } + + result.finalInterval = currentInterval + return result +} + +func countScanWindowDecisionReasons(reasons []scanWindowDecisionReason, target scanWindowDecisionReason) int { + count := 0 + for _, reason := range reasons { + if reason == target { + count++ + } + } + return count +} + +func TestAdaptiveScanWindowControllerLogDerivedBurstDoesNotCascadeCriticalBrake(t *testing.T) { + t.Parallel() + + start := time.Unix(0, 0) + controller := newAdaptiveScanWindowController(start) + + // Derived from the no-reset burst window observed in cdc.log around + // 2026/04/24 07:41:15 ~ 07:41:17 UTC: + // - first sample crosses the critical threshold + // - subsequent samples fall quickly + // - legacy/current behavior keeps re-entering critical_brake only because + // usage.max stays latched in the 30s window + trace := []scanWindowControllerTracePoint{ + {offset: 0, usageRatio: 0.9749945681542158}, + {offset: 1 * time.Second, usageRatio: 0.800051974132657}, + {offset: 2 * time.Second, usageRatio: 0.6189540326595306}, + {offset: 3 * time.Second, usageRatio: 0.4300000000000000}, + } + + result := runScanWindowControllerTrace(controller, start, 8*time.Second, maxScanInterval, trace) + + require.LessOrEqual(t, countScanWindowDecisionReasons(result.reasons, scanWindowDecisionCriticalBrake), 1) + require.Greater(t, result.finalInterval, 2*time.Second) + require.NotEqual(t, minScanInterval, result.finalInterval) +} + +func TestAdaptiveScanWindowControllerBlocksVeryLowRecoveryAfterRecentCriticalBrake(t *testing.T) { + t.Parallel() + + start := time.Unix(0, 0) + controller := newAdaptiveScanWindowController(start) + + trace := make([]scanWindowControllerTracePoint, 0, 96) + trace = append(trace, scanWindowControllerTracePoint{ + offset: 0, + usageRatio: 1, + }) + for second := 1; second <= 70; second++ { + trace = append(trace, scanWindowControllerTracePoint{ + offset: time.Duration(second) * time.Second, + usageRatio: 0, + }) + } + + result := runScanWindowControllerTrace(controller, start, 4*time.Second, maxScanInterval, trace) + + require.Zero(t, countScanWindowDecisionReasons(result.reasons, scanWindowDecisionVeryLowRecovery)) + require.LessOrEqual(t, result.finalInterval, scaleDuration(defaultScanInterval, 5, 4)) +} diff --git a/pkg/eventservice/scan_window_simulator_test.go b/pkg/eventservice/scan_window_simulator_test.go new file mode 100644 index 0000000000..d1903dfb72 --- /dev/null +++ b/pkg/eventservice/scan_window_simulator_test.go @@ -0,0 +1,684 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventservice + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + legacyScanTrendAdjustCooldown = 5 * time.Second + legacyMinTrendSamples = 4 + legacyIncreasingTrendEpsilon = 0.02 + legacyTrendStartRatio = 0.3 +) + +type legacyScanWindowController struct { + usageWindow *memoryUsageWindow + lastAdjustTime time.Time + lastTrendAdjustTime time.Time +} + +func newLegacyScanWindowController(now time.Time) *legacyScanWindowController { + return &legacyScanWindowController{ + usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), + lastAdjustTime: now, + lastTrendAdjustTime: now, + } +} + +func (c *legacyScanWindowController) OnCongestionReport(now time.Time, current time.Duration, maxInterval time.Duration, report scanWindowReport) scanWindowDecision { + if current <= 0 { + current = defaultScanInterval + } + if maxInterval < minScanInterval { + maxInterval = minScanInterval + } + + c.usageWindow.addSample(now, normalizeUsageRatio(report.usageRatio)) + usage := c.usageWindow.stats(now) + + if report.memoryReleaseCount > 0 { + c.usageWindow.reset() + c.usageWindow.addSample(now, normalizeUsageRatio(report.usageRatio)) + c.lastAdjustTime = now + c.lastTrendAdjustTime = now + return scanWindowDecision{ + newInterval: defaultScanInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionVeryLowRecovery, + usage: usage, + } + } + + trendDelta := usage.last - usage.first + isIncreasing := usage.cnt >= legacyMinTrendSamples && trendDelta > legacyIncreasingTrendEpsilon + isAboveTrendStart := usage.last > legacyTrendStartRatio + canAdjustOnTrend := now.Sub(c.lastTrendAdjustTime) >= legacyScanTrendAdjustCooldown + shouldDampOnTrend := isAboveTrendStart && isIncreasing && canAdjustOnTrend + + minIncreaseSpan := memoryUsageWindowDuration * minIncreaseSpanNumerator / minIncreaseSpanDenominator + allowedToIncrease := now.Sub(c.lastAdjustTime) >= scanIntervalAdjustCooldown && + usage.cnt >= minIncreaseSamples && + usage.span >= minIncreaseSpan && + !(isAboveTrendStart && isIncreasing) + + newInterval := current + effectiveMaxInterval := maxInterval + reason := scanWindowDecisionNone + + switch { + case usage.last > memoryUsageCriticalThreshold || usage.max > memoryUsageCriticalThreshold: + newInterval = max(current/4, minScanInterval) + reason = scanWindowDecisionCriticalBrake + case usage.last > memoryUsageHighThreshold || usage.max > memoryUsageHighThreshold: + newInterval = max(current/2, minScanInterval) + reason = scanWindowDecisionHighPressure + case shouldDampOnTrend: + newInterval = max(scaleDuration(current, 9, 10), minScanInterval) + reason = scanWindowDecisionSustainedPressure + case allowedToIncrease && usage.max < memoryUsageVeryLowThreshold && usage.avg < memoryUsageVeryLowThreshold: + effectiveMaxInterval = maxScanInterval + newInterval = min(scaleDuration(current, 3, 2), effectiveMaxInterval) + reason = scanWindowDecisionVeryLowRecovery + case allowedToIncrease && usage.max < memoryUsageLowThreshold && usage.avg < memoryUsageLowThreshold: + newInterval = min(scaleDuration(current, 5, 4), effectiveMaxInterval) + reason = scanWindowDecisionLowRecovery + } + + if newInterval > current && !allowedToIncrease { + newInterval = current + reason = scanWindowDecisionNone + } + + if newInterval != current { + c.lastAdjustTime = now + if shouldDampOnTrend { + c.lastTrendAdjustTime = now + } + } + + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: effectiveMaxInterval, + reason: reason, + usage: usage, + } +} + +type simulationRateSegment struct { + start time.Duration + end time.Duration + bytesPerSec float64 +} + +type simulationNodeConfig struct { + capacityBytes float64 + drainProfile []simulationRateSegment + dispatcherQuota float64 +} + +type simulationDispatcherConfig struct { + nodeIndex int + inputProfile []simulationRateSegment +} + +type scanWindowSimulationScenario struct { + duration time.Duration + tick time.Duration + reportInterval time.Duration + startInterval time.Duration + normalMaxInterval time.Duration + scanRateBytesPerSec float64 + intervalPenaltyPivot time.Duration + minScanEfficiency float64 + releasePulseBytes float64 + releaseArmUsage float64 + nodes []simulationNodeConfig + dispatchers []simulationDispatcherConfig +} + +type simulationNodeState struct { + config simulationNodeConfig + pendingBytes float64 + releaseArmed bool + releasedBytesBuffered float64 + releaseCount uint32 +} + +type simulationDispatcherState struct { + config simulationDispatcherConfig + upstreamPendingBytes float64 +} + +type scanWindowSimulationMetrics struct { + meanThroughputBytesPerSec float64 + throughputCV float64 + minThroughputBytesPerSec float64 + maxThroughputBytesPerSec float64 + minInterval time.Duration + maxInterval time.Duration + adjustCount int + skippedByChangefeedQuota int + skippedByDispatcherQuota int +} + +type scanWindowSimulationTracePoint struct { + elapsed time.Duration + interval time.Duration + throughputBytesPerSec float64 + maxUsageRatio float64 + totalReleaseCount uint32 +} + +type scanWindowSimulationResult struct { + metrics scanWindowSimulationMetrics + trace []scanWindowSimulationTracePoint +} + +func TestScanWindowSimulatorAdaptiveControllerStabilizesRecovery(t *testing.T) { + t.Parallel() + + scenario := scanWindowSimulationScenario{ + duration: 150 * time.Second, + tick: 200 * time.Millisecond, + reportInterval: time.Second, + startInterval: defaultScanInterval, + normalMaxInterval: time.Minute, + scanRateBytesPerSec: 10 * 1024 * 1024, + intervalPenaltyPivot: defaultScanInterval, + minScanEfficiency: 0.15, + releasePulseBytes: 8 * 1024 * 1024, + releaseArmUsage: 0.45, + nodes: []simulationNodeConfig{ + { + capacityBytes: 96 * 1024 * 1024, + dispatcherQuota: 16 * 1024 * 1024, + drainProfile: []simulationRateSegment{ + {start: 0, end: 40 * time.Second, bytesPerSec: 24 * 1024 * 1024}, + {start: 40 * time.Second, end: 60 * time.Second, bytesPerSec: 12 * 1024 * 1024}, + {start: 60 * time.Second, end: 80 * time.Second, bytesPerSec: 24 * 1024 * 1024}, + {start: 80 * time.Second, end: 100 * time.Second, bytesPerSec: 12 * 1024 * 1024}, + {start: 100 * time.Second, end: 120 * time.Second, bytesPerSec: 24 * 1024 * 1024}, + {start: 120 * time.Second, end: 150 * time.Second, bytesPerSec: 24 * 1024 * 1024}, + }, + }, + }, + dispatchers: []simulationDispatcherConfig{ + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 150 * time.Second, bytesPerSec: 8 * 1024 * 1024}}}, + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 150 * time.Second, bytesPerSec: 8 * 1024 * 1024}}}, + }, + } + + start := time.Unix(0, 0) + legacyMetrics := runScanWindowSimulation(newLegacyScanWindowController(start), scenario) + adaptiveMetrics := runScanWindowSimulation(newAdaptiveScanWindowController(start), scenario) + + t.Logf("legacy metrics: %+v", legacyMetrics) + t.Logf("adaptive metrics: %+v", adaptiveMetrics) + + require.Greater(t, legacyMetrics.adjustCount, 0) + require.LessOrEqual(t, adaptiveMetrics.throughputCV, legacyMetrics.throughputCV+1e-9) + require.Less(t, adaptiveMetrics.adjustCount, legacyMetrics.adjustCount) + require.Greater(t, adaptiveMetrics.minInterval, legacyMetrics.minInterval) + require.GreaterOrEqual(t, adaptiveMetrics.meanThroughputBytesPerSec, legacyMetrics.meanThroughputBytesPerSec*0.99) +} + +func TestScanWindowSimulatorObservedWindowScenarios(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + scenario scanWindowSimulationScenario + minLegacyMaxInterval time.Duration + minLegacyFloorCollapses int + maxAdaptiveFloorCollapses int + minLegacyTinyWindowReports int + maxAdaptiveTinyWindowReports int + minLegacyReleaseBurst uint32 + }{ + { + name: "w3_like_big_peak_release_collapse", + scenario: newBigPeakReleaseCollapseScenario(), + minLegacyMaxInterval: 4 * time.Minute, + minLegacyFloorCollapses: 1, + maxAdaptiveFloorCollapses: 0, + minLegacyTinyWindowReports: 1, + maxAdaptiveTinyWindowReports: 0, + minLegacyReleaseBurst: 1, + }, + { + name: "w4_like_plateau_then_collapse", + scenario: newPlateauCollapseScenario(), + minLegacyMaxInterval: 4 * time.Minute, + minLegacyFloorCollapses: 1, + maxAdaptiveFloorCollapses: 0, + minLegacyTinyWindowReports: 1, + maxAdaptiveTinyWindowReports: 0, + minLegacyReleaseBurst: 1, + }, + { + name: "w5_like_reset_dominant_collapse", + scenario: newResetDominantCollapseScenario(), + minLegacyMaxInterval: 90 * time.Second, + minLegacyFloorCollapses: 1, + maxAdaptiveFloorCollapses: 0, + minLegacyTinyWindowReports: 2, + maxAdaptiveTinyWindowReports: 0, + minLegacyReleaseBurst: 2, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + start := time.Unix(0, 0) + legacy := runScanWindowSimulationWithTrace(newLegacyScanWindowController(start), tc.scenario) + adaptive := runScanWindowSimulationWithTrace(newAdaptiveScanWindowController(start), tc.scenario) + + legacyCollapses := countSimulationFloorCollapses(legacy.trace, 30*time.Second, 2*time.Minute, defaultScanInterval, 0.8) + adaptiveCollapses := countSimulationFloorCollapses(adaptive.trace, 30*time.Second, 2*time.Minute, defaultScanInterval, 0.8) + legacyTinyWindowReports := countSimulationIntervalsAtMost(legacy.trace, defaultScanInterval) + adaptiveTinyWindowReports := countSimulationIntervalsAtMost(adaptive.trace, defaultScanInterval) + legacyReleaseBurst := maxSimulationReleaseBurst(legacy.trace) + + t.Logf("legacy metrics: %+v", legacy.metrics) + t.Logf("adaptive metrics: %+v", adaptive.metrics) + t.Logf("legacy collapses=%d adaptive collapses=%d legacy tiny=%d adaptive tiny=%d legacy release=%d", + legacyCollapses, adaptiveCollapses, legacyTinyWindowReports, adaptiveTinyWindowReports, legacyReleaseBurst) + + require.GreaterOrEqual(t, legacy.metrics.maxInterval, tc.minLegacyMaxInterval) + require.GreaterOrEqual(t, legacyCollapses, tc.minLegacyFloorCollapses) + require.LessOrEqual(t, adaptiveCollapses, tc.maxAdaptiveFloorCollapses) + require.GreaterOrEqual(t, legacyTinyWindowReports, tc.minLegacyTinyWindowReports) + require.LessOrEqual(t, adaptiveTinyWindowReports, tc.maxAdaptiveTinyWindowReports) + require.GreaterOrEqual(t, legacyReleaseBurst, tc.minLegacyReleaseBurst) + require.LessOrEqual(t, adaptive.metrics.throughputCV, legacy.metrics.throughputCV+1e-9) + require.GreaterOrEqual(t, adaptive.metrics.meanThroughputBytesPerSec, legacy.metrics.meanThroughputBytesPerSec*0.95) + }) + } +} + +func runScanWindowSimulation(controller scanWindowController, scenario scanWindowSimulationScenario) scanWindowSimulationMetrics { + return runScanWindowSimulationWithTrace(controller, scenario).metrics +} + +func runScanWindowSimulationWithTrace(controller scanWindowController, scenario scanWindowSimulationScenario) scanWindowSimulationResult { + nodes := make([]simulationNodeState, len(scenario.nodes)) + for i, cfg := range scenario.nodes { + nodes[i] = simulationNodeState{config: cfg} + } + + dispatchers := make([]simulationDispatcherState, len(scenario.dispatchers)) + for i, cfg := range scenario.dispatchers { + dispatchers[i] = simulationDispatcherState{config: cfg} + } + + currentInterval := scenario.startInterval + if currentInterval <= 0 { + currentInterval = defaultScanInterval + } + + start := time.Unix(0, 0) + nextReport := scenario.reportInterval + totalDrained := 0.0 + lastReportedDrained := 0.0 + throughputSamples := make([]float64, 0, int(scenario.duration/scenario.reportInterval)+1) + + result := scanWindowSimulationResult{ + metrics: scanWindowSimulationMetrics{ + minInterval: currentInterval, + maxInterval: currentInterval, + }, + trace: make([]scanWindowSimulationTracePoint, 0, int(scenario.duration/scenario.reportInterval)+1), + } + + for elapsed := time.Duration(0); elapsed < scenario.duration; elapsed += scenario.tick { + for i := range dispatchers { + dispatchers[i].upstreamPendingBytes += rateAt(dispatchers[i].config.inputProfile, elapsed) * scenario.tick.Seconds() + } + + scanEfficiency := clampFloat64( + scenario.minScanEfficiency, + 1, + float64(currentInterval)/float64(scenario.intervalPenaltyPivot), + ) + perDispatcherBudget := scenario.scanRateBytesPerSec * scanEfficiency * scenario.tick.Seconds() + + for i := range dispatchers { + dispatcher := &dispatchers[i] + if dispatcher.upstreamPendingBytes <= 0 { + continue + } + + node := &nodes[dispatcher.config.nodeIndex] + available := maxFloat64(0, node.config.capacityBytes-node.pendingBytes) + if available <= 0 { + result.metrics.skippedByChangefeedQuota++ + continue + } + if perDispatcherBudget > node.config.dispatcherQuota { + result.metrics.skippedByDispatcherQuota++ + continue + } + + scanned := min( + perDispatcherBudget, + dispatcher.upstreamPendingBytes, + node.config.dispatcherQuota, + available, + ) + if scanned <= 0 { + continue + } + + dispatcher.upstreamPendingBytes -= scanned + node.pendingBytes += scanned + } + + for i := range nodes { + node := &nodes[i] + usageBeforeDrain := 0.0 + if node.config.capacityBytes > 0 { + usageBeforeDrain = node.pendingBytes / node.config.capacityBytes + } + if usageBeforeDrain >= scenario.releaseArmUsage { + node.releaseArmed = true + } + + drain := min(node.pendingBytes, rateAt(node.config.drainProfile, elapsed)*scenario.tick.Seconds()) + node.pendingBytes -= drain + totalDrained += drain + + if node.releaseArmed && drain > 0 { + node.releasedBytesBuffered += drain + for node.releasedBytesBuffered >= scenario.releasePulseBytes { + node.releaseCount++ + node.releasedBytesBuffered -= scenario.releasePulseBytes + } + } + + if node.releaseArmed && node.pendingBytes/node.config.capacityBytes < scenario.releaseArmUsage/2 { + node.releaseArmed = false + node.releasedBytesBuffered = 0 + } + } + + if elapsed+scenario.tick < nextReport { + continue + } + + throughput := (totalDrained - lastReportedDrained) / scenario.reportInterval.Seconds() + throughputSamples = append(throughputSamples, throughput) + lastReportedDrained = totalDrained + + reportMaxUsage := 0.0 + var reportReleaseCount uint32 + for i := range nodes { + node := &nodes[i] + usageRatio := 0.0 + if node.config.capacityBytes > 0 { + usageRatio = clampFloat64(0, 1, node.pendingBytes/node.config.capacityBytes) + } + reportMaxUsage = maxFloat64(reportMaxUsage, usageRatio) + reportReleaseCount += node.releaseCount + + decision := controller.OnCongestionReport(start.Add(nextReport), currentInterval, scenario.normalMaxInterval, scanWindowReport{ + usageRatio: usageRatio, + memoryReleaseCount: node.releaseCount, + }) + if decision.newInterval != currentInterval { + result.metrics.adjustCount++ + currentInterval = decision.newInterval + if currentInterval < result.metrics.minInterval { + result.metrics.minInterval = currentInterval + } + if currentInterval > result.metrics.maxInterval { + result.metrics.maxInterval = currentInterval + } + } + node.releaseCount = 0 + } + + result.trace = append(result.trace, scanWindowSimulationTracePoint{ + elapsed: nextReport, + interval: currentInterval, + throughputBytesPerSec: throughput, + maxUsageRatio: reportMaxUsage, + totalReleaseCount: reportReleaseCount, + }) + nextReport += scenario.reportInterval + } + + result.metrics.meanThroughputBytesPerSec, result.metrics.throughputCV, result.metrics.minThroughputBytesPerSec, result.metrics.maxThroughputBytesPerSec = summarizeThroughput(throughputSamples) + return result +} + +func summarizeThroughput(samples []float64) (mean float64, cv float64, minValue float64, maxValue float64) { + if len(samples) == 0 { + return 0, 0, 0, 0 + } + + minValue = samples[0] + maxValue = samples[0] + sum := 0.0 + for _, sample := range samples { + sum += sample + if sample < minValue { + minValue = sample + } + if sample > maxValue { + maxValue = sample + } + } + mean = sum / float64(len(samples)) + if mean == 0 { + return mean, 0, minValue, maxValue + } + + variance := 0.0 + for _, sample := range samples { + diff := sample - mean + variance += diff * diff + } + variance /= float64(len(samples)) + cv = math.Sqrt(variance) / mean + return mean, cv, minValue, maxValue +} + +func rateAt(segments []simulationRateSegment, elapsed time.Duration) float64 { + for _, segment := range segments { + if elapsed >= segment.start && elapsed < segment.end { + return segment.bytesPerSec + } + } + return 0 +} + +func clampFloat64(minValue float64, maxValue float64, value float64) float64 { + if value < minValue { + return minValue + } + if value > maxValue { + return maxValue + } + return value +} + +func newBigPeakReleaseCollapseScenario() scanWindowSimulationScenario { + const mib = 1024 * 1024 + + return scanWindowSimulationScenario{ + duration: 6 * time.Minute, + tick: 200 * time.Millisecond, + reportInterval: time.Second, + startInterval: 2 * time.Minute, + normalMaxInterval: 15 * time.Minute, + scanRateBytesPerSec: 160 * mib, + intervalPenaltyPivot: 2 * time.Minute, + minScanEfficiency: 0.05, + releasePulseBytes: 24 * mib, + releaseArmUsage: 0.45, + nodes: []simulationNodeConfig{ + { + capacityBytes: 512 * mib, + dispatcherQuota: 32 * mib, + drainProfile: []simulationRateSegment{ + {start: 0, end: 2 * time.Minute, bytesPerSec: 12 * mib}, + {start: 2 * time.Minute, end: 210 * time.Second, bytesPerSec: 7 * mib}, + {start: 210 * time.Second, end: 4 * time.Minute, bytesPerSec: 10 * mib}, + {start: 4 * time.Minute, end: 5 * time.Minute, bytesPerSec: 18 * mib}, + {start: 5 * time.Minute, end: 6 * time.Minute, bytesPerSec: 10 * mib}, + }, + }, + }, + dispatchers: []simulationDispatcherConfig{ + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 6 * time.Minute, bytesPerSec: 5 * mib}}}, + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 6 * time.Minute, bytesPerSec: 5 * mib}}}, + }, + } +} + +func newPlateauCollapseScenario() scanWindowSimulationScenario { + const mib = 1024 * 1024 + + return scanWindowSimulationScenario{ + duration: 6 * time.Minute, + tick: 200 * time.Millisecond, + reportInterval: time.Second, + startInterval: 4 * time.Minute, + normalMaxInterval: 15 * time.Minute, + scanRateBytesPerSec: 160 * mib, + intervalPenaltyPivot: 2 * time.Minute, + minScanEfficiency: 0.05, + releasePulseBytes: 20 * mib, + releaseArmUsage: 0.40, + nodes: []simulationNodeConfig{ + { + capacityBytes: 512 * mib, + dispatcherQuota: 32 * mib, + drainProfile: []simulationRateSegment{ + {start: 0, end: 90 * time.Second, bytesPerSec: 12 * mib}, + {start: 90 * time.Second, end: 210 * time.Second, bytesPerSec: 8.5 * mib}, + {start: 210 * time.Second, end: 240 * time.Second, bytesPerSec: 6 * mib}, + {start: 240 * time.Second, end: 300 * time.Second, bytesPerSec: 18 * mib}, + {start: 300 * time.Second, end: 6 * time.Minute, bytesPerSec: 11 * mib}, + }, + }, + }, + dispatchers: []simulationDispatcherConfig{ + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 6 * time.Minute, bytesPerSec: 5 * mib}}}, + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 6 * time.Minute, bytesPerSec: 5 * mib}}}, + }, + } +} + +func newResetDominantCollapseScenario() scanWindowSimulationScenario { + const mib = 1024 * 1024 + + return scanWindowSimulationScenario{ + duration: 4 * time.Minute, + tick: 200 * time.Millisecond, + reportInterval: time.Second, + startInterval: 90 * time.Second, + normalMaxInterval: 10 * time.Minute, + scanRateBytesPerSec: 128 * mib, + intervalPenaltyPivot: time.Minute, + minScanEfficiency: 0.05, + releasePulseBytes: 8 * mib, + releaseArmUsage: 0.25, + nodes: []simulationNodeConfig{ + { + capacityBytes: 256 * mib, + dispatcherQuota: 32 * mib, + drainProfile: []simulationRateSegment{ + {start: 0, end: 30 * time.Second, bytesPerSec: 10 * mib}, + {start: 30 * time.Second, end: 75 * time.Second, bytesPerSec: 5 * mib}, + {start: 75 * time.Second, end: 105 * time.Second, bytesPerSec: 14 * mib}, + {start: 105 * time.Second, end: 150 * time.Second, bytesPerSec: 5 * mib}, + {start: 150 * time.Second, end: 180 * time.Second, bytesPerSec: 14 * mib}, + {start: 180 * time.Second, end: 240 * time.Second, bytesPerSec: 10 * mib}, + }, + }, + }, + dispatchers: []simulationDispatcherConfig{ + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 4 * time.Minute, bytesPerSec: 4 * mib}}}, + {nodeIndex: 0, inputProfile: []simulationRateSegment{{start: 0, end: 4 * time.Minute, bytesPerSec: 4 * mib}}}, + }, + } +} + +func countSimulationFloorCollapses( + trace []scanWindowSimulationTracePoint, + minPeak time.Duration, + lookback time.Duration, + floorInterval time.Duration, + minDropRatio float64, +) int { + if len(trace) == 0 { + return 0 + } + + collapseCount := 0 + for i := 1; i < len(trace); i++ { + if trace[i].interval > floorInterval || trace[i-1].interval <= floorInterval { + continue + } + + peakInterval := time.Duration(0) + for j := i - 1; j >= 0; j-- { + if trace[i].elapsed-trace[j].elapsed > lookback { + break + } + if trace[j].interval > peakInterval { + peakInterval = trace[j].interval + } + } + if peakInterval < minPeak { + continue + } + + dropRatio := 1 - float64(trace[i].interval)/float64(peakInterval) + if dropRatio >= minDropRatio { + collapseCount++ + } + } + return collapseCount +} + +func countSimulationIntervalsAtMost(trace []scanWindowSimulationTracePoint, threshold time.Duration) int { + count := 0 + for _, point := range trace { + if point.interval <= threshold { + count++ + } + } + return count +} + +func maxSimulationReleaseBurst(trace []scanWindowSimulationTracePoint) uint32 { + var maxBurst uint32 + for _, point := range trace { + if point.totalReleaseCount > maxBurst { + maxBurst = point.totalReleaseCount + } + } + return maxBurst +} diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 01ee458e70..02eab5b98e 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -18,18 +18,29 @@ import ( "time" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" ) +func markScanWindowReadyForIncrease(status *changefeedStatus, now time.Time) { + status.scanWindowController.setLastAdjustTimeForTest(now.Add(-scanIntervalAdjustCooldown - time.Second)) + status.scanWindowController.setLastDownAdjustTimeForTest(now.Add(-scanWindowReleaseRecoveryCooldown - time.Second)) +} + +func markScanWindowReadyForDecrease(status *changefeedStatus, now time.Time) { + status.scanWindowController.setLastDownAdjustTimeForTest(now.Add(-scanWindowPressureAdjustCooldown - time.Second)) +} + func TestAdjustScanIntervalVeryLowBypassesSyncPointCap(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) + markScanWindowReadyForIncrease(status, now) // Start from the sync point capped max interval, then allow it to grow slowly. status.scanInterval.Store(int64(1 * time.Minute)) @@ -47,7 +58,7 @@ func TestAdjustScanIntervalLowRespectsSyncPointCap(t *testing.T) { status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) + markScanWindowReadyForIncrease(status, now) status.scanInterval.Store(int64(40 * time.Second)) @@ -57,32 +68,28 @@ func TestAdjustScanIntervalLowRespectsSyncPointCap(t *testing.T) { require.Equal(t, int64(50*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreaseIgnoresCooldown(t *testing.T) { +func TestAdjustScanIntervalHighPressureUsesBoundedReduction(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) + markScanWindowReadyForDecrease(status, now) status.scanInterval.Store(int64(40 * time.Second)) status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 0.8, 0) - require.Equal(t, int64(20*time.Second), status.scanInterval.Load()) + require.Equal(t, int64(30*time.Second), status.scanInterval.Load()) } func TestAdjustScanIntervalCriticalPressure(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) - now := time.Now() - status.lastAdjustTime.Store(now) - status.scanInterval.Store(int64(40 * time.Second)) - - status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 1, 0) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(10*time.Second), status.scanInterval.Load()) } -func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { +func TestUpdateMemoryUsageDoesNotResetScanIntervalOnMemoryRelease(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) @@ -90,7 +97,45 @@ func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { status.scanInterval.Store(int64(40 * time.Second)) status.updateMemoryUsage(now, 0.5, 1) - require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) +} + +func TestUpdateMemoryUsageRecordsScanWindowObservationMetrics(t *testing.T) { + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + changefeed := status.changefeedID.String() + t.Cleanup(func() { + deleteScanWindowMetrics(changefeed) + }) + + now := time.Now() + status.scanInterval.Store(int64(40 * time.Second)) + + status.updateMemoryUsage(now, 0.6, 1) + + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow")), 1e-9) + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed)), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowMemoryReleaseCount.WithLabelValues(changefeed)), 1e-9) +} + +func TestUpdateMemoryUsageRecordsScanWindowAdjustCount(t *testing.T) { + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + changefeed := status.changefeedID.String() + t.Cleanup(func() { + deleteScanWindowMetrics(changefeed) + }) + + now := time.Now() + markScanWindowReadyForDecrease(status, now) + status.scanInterval.Store(int64(40 * time.Second)) + + status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 0.8, 0) + + require.Equal(t, int64(30*time.Second), status.scanInterval.Load()) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowAdjustCount.WithLabelValues(changefeed, string(scanWindowDecisionHighPressure))), 1e-9) } func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { @@ -99,7 +144,7 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) start := time.Now() - status.lastAdjustTime.Store(start.Add(-scanIntervalAdjustCooldown - time.Second)) + markScanWindowReadyForIncrease(status, start) status.scanInterval.Store(int64(40 * time.Second)) @@ -112,37 +157,32 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { require.Equal(t, int64(50*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreasesWhenUsageIncreasing(t *testing.T) { +func TestAdjustScanIntervalReducesOnSustainedPressure(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) + markScanWindowReadyForDecrease(status, now) status.scanInterval.Store(int64(40 * time.Second)) - status.updateMemoryUsage(now, 0.10, 0) - status.updateMemoryUsage(now.Add(1*time.Second), 0.11, 0) - status.updateMemoryUsage(now.Add(2*time.Second), 0.12, 0) - status.updateMemoryUsage(now.Add(3*time.Second), 0.13, 0) - require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) + status.updateMemoryUsage(now, 0.60, 0) + status.updateMemoryUsage(now.Add(1*time.Second), 0.60, 0) + status.updateMemoryUsage(now.Add(2*time.Second), 0.60, 0) + require.Equal(t, int64(36*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreasesWhenUsageIncreasingAboveThirtyPercent(t *testing.T) { +func TestAdjustScanIntervalDoesNotIncreaseBeforeCooldown(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) - status.lastTrendAdjustTime.Store(now.Add(-scanTrendAdjustCooldown - time.Second)) - status.scanInterval.Store(int64(40 * time.Second)) - status.updateMemoryUsage(now, 0.31, 0) - status.updateMemoryUsage(now.Add(1*time.Second), 0.32, 0) - status.updateMemoryUsage(now.Add(2*time.Second), 0.33, 0) - status.updateMemoryUsage(now.Add(3*time.Second), 0.34, 0) - require.Equal(t, int64(36*time.Second), status.scanInterval.Load()) + for i := 0; i < 10; i++ { + status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0.05, 0) + } + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) } func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { diff --git a/pkg/eventservice/scan_window_trace_fixture_test.go b/pkg/eventservice/scan_window_trace_fixture_test.go new file mode 100644 index 0000000000..8d20c7dbee --- /dev/null +++ b/pkg/eventservice/scan_window_trace_fixture_test.go @@ -0,0 +1,411 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventservice + +import ( + "bufio" + "encoding/json" + "math" + "os" + "path/filepath" + "runtime" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + observedScanWindowTraceFixtureFile = "kafka_test_observed_trace_20260423.jsonl" + observedScanWindowTraceTimeLayout = "2006-01-02 15:04:05" +) + +type observedScanWindowTracePoint struct { + ts time.Time + offsetSec int + observedScanIntervalSec float64 + observedResetDispatcherPerSec float64 + observedSinkRowsPerSec *float64 + observedMemoryQuotaBytes float64 +} + +type observedScanWindowTraceRow struct { + Time string `json:"time"` + OffsetSec int `json:"offsetSec"` + ObservedScanIntervalSec *float64 `json:"observedScanIntervalSec"` + ObservedResetDispatcherPerSec *float64 `json:"observedResetDispatcherPerSec"` + ObservedSinkRowsPerSec *float64 `json:"observedSinkRowsPerSec"` + ObservedMemoryQuotaBytes *float64 `json:"observedMemoryQuotaBytes"` +} + +type observedScanWindowCollapse struct { + peak observedScanWindowTracePoint + floor observedScanWindowTracePoint + dropRatio float64 +} + +func TestObservedScanWindowTraceFixtureLoadsExpectedShape(t *testing.T) { + t.Parallel() + + trace := loadObservedScanWindowTrace(t) + require.Len(t, trace, 233) + + require.Equal(t, mustParseObservedScanWindowTraceTime(t, "2026-04-23 18:17:45"), trace[0].ts) + require.Equal(t, 0, trace[0].offsetSec) + require.Nil(t, trace[0].observedSinkRowsPerSec) + require.InDelta(t, 0, trace[0].observedMemoryQuotaBytes, 1e-9) + + last := trace[len(trace)-1] + require.Equal(t, mustParseObservedScanWindowTraceTime(t, "2026-04-23 19:32:30"), last.ts) + require.Equal(t, 4485, last.offsetSec) + + cadenceCounts := observedTraceCadenceCounts(trace) + require.Equal(t, 231, cadenceCounts[15]) + require.Equal(t, 1, cadenceCounts[1020]) + require.Len(t, cadenceCounts, 2) + + dominantQuotaBytes, dominantQuotaCount := dominantObservedTraceQuota(trace) + require.Equal(t, int64(1<<30), dominantQuotaBytes) + require.Equal(t, 232, dominantQuotaCount) + + require.InDelta(t, 811.0, maxObservedTraceScanInterval(trace), 1e-9) +} + +func TestObservedScanWindowTraceCharacterizationCapturesLegacySawtooth(t *testing.T) { + t.Parallel() + + trace := loadObservedScanWindowTrace(t) + + peaks := findObservedTraceSeparatedPeaks(trace, 300, 5*time.Minute) + require.GreaterOrEqual(t, len(peaks), 3) + + collapses := findObservedTraceFloorCollapses(trace, 300, time.Minute, defaultScanInterval.Seconds(), 0.8) + require.GreaterOrEqual(t, len(collapses), 3) + for _, collapse := range collapses { + require.LessOrEqual(t, collapse.floor.observedScanIntervalSec, defaultScanInterval.Seconds()) + } + + intervals := observedTraceScanIntervals(trace) + require.InDelta(t, 25.3, observedTracePercentile(intervals, 0.5), 1e-9) + require.InDelta(t, 438.0, observedTracePercentile(intervals, 0.95), 1e-9) + require.Greater(t, observedTracePercentile(intervals, 0.95), observedTracePercentile(intervals, 0.5)*10) +} + +func TestObservedScanWindowTraceWindowOracles(t *testing.T) { + t.Parallel() + + trace := loadObservedScanWindowTrace(t) + + w1Peak := findObservedTracePointByTime(t, trace, "2026-04-23 18:24:30") + w1Floor := findObservedTracePointByTime(t, trace, "2026-04-23 18:25:15") + require.InDelta(t, 438.0, w1Peak.observedScanIntervalSec, 1e-9) + require.InDelta(t, 1.0, w1Floor.observedScanIntervalSec, 1e-9) + require.Equal(t, 45, w1Floor.offsetSec-w1Peak.offsetSec) + + w3Peak := findObservedTracePointByTime(t, trace, "2026-04-23 18:56:45") + w3Floor := findObservedTracePointByTime(t, trace, "2026-04-23 18:57:45") + require.InDelta(t, 811.0, w3Peak.observedScanIntervalSec, 1e-9) + require.InDelta(t, 1.0, w3Floor.observedScanIntervalSec, 1e-9) + require.Equal(t, 60, w3Floor.offsetSec-w3Peak.offsetSec) + require.GreaterOrEqual(t, w3Peak.observedResetDispatcherPerSec, 0.9) + require.GreaterOrEqual(t, w3Floor.observedResetDispatcherPerSec, 0.8) + + w4Window := selectObservedTraceWindow( + t, + trace, + "2026-04-23 19:08:15", + "2026-04-23 19:10:30", + ) + require.GreaterOrEqual(t, maxObservedTraceScanInterval(w4Window), 592.0) + require.GreaterOrEqual(t, countObservedTraceScanIntervalsAtLeast(w4Window, 300), 8) + require.InDelta( + t, + 1.0, + findObservedTracePointByTime(t, trace, "2026-04-23 19:11:00").observedScanIntervalSec, + 1e-9, + ) + + w5Window := selectObservedTraceWindow( + t, + trace, + "2026-04-23 19:15:45", + "2026-04-23 19:18:30", + ) + require.GreaterOrEqual(t, maxObservedTraceScanInterval(w5Window), 135.0) + require.GreaterOrEqual(t, maxObservedTraceResetRate(w5Window), 1.7) + require.InDelta( + t, + 1.0, + findObservedTracePointByTime(t, trace, "2026-04-23 19:18:45").observedScanIntervalSec, + 1e-9, + ) +} + +func loadObservedScanWindowTrace(t *testing.T) []observedScanWindowTracePoint { + t.Helper() + + file, err := os.Open(observedScanWindowTraceFixturePath(t)) + require.NoError(t, err) + defer file.Close() + + trace := make([]observedScanWindowTracePoint, 0, 256) + scanner := bufio.NewScanner(file) + for lineNo := 1; scanner.Scan(); lineNo++ { + var row observedScanWindowTraceRow + err = json.Unmarshal(scanner.Bytes(), &row) + require.NoError(t, err, "line %d", lineNo) + require.NotNil(t, row.ObservedScanIntervalSec, "line %d", lineNo) + require.NotNil(t, row.ObservedResetDispatcherPerSec, "line %d", lineNo) + require.NotNil(t, row.ObservedMemoryQuotaBytes, "line %d", lineNo) + + trace = append(trace, observedScanWindowTracePoint{ + ts: mustParseObservedScanWindowTraceTime(t, row.Time), + offsetSec: row.OffsetSec, + observedScanIntervalSec: *row.ObservedScanIntervalSec, + observedResetDispatcherPerSec: *row.ObservedResetDispatcherPerSec, + observedSinkRowsPerSec: row.ObservedSinkRowsPerSec, + observedMemoryQuotaBytes: *row.ObservedMemoryQuotaBytes, + }) + } + require.NoError(t, scanner.Err()) + + return trace +} + +func observedScanWindowTraceFixturePath(t *testing.T) string { + t.Helper() + + _, filename, _, ok := runtime.Caller(0) + require.True(t, ok) + + return filepath.Join( + filepath.Dir(filename), + "..", + "..", + ".issue", + "replay-data", + observedScanWindowTraceFixtureFile, + ) +} + +func mustParseObservedScanWindowTraceTime(t *testing.T, value string) time.Time { + t.Helper() + + ts, err := time.ParseInLocation(observedScanWindowTraceTimeLayout, value, time.Local) + require.NoError(t, err) + return ts +} + +func observedTraceCadenceCounts(trace []observedScanWindowTracePoint) map[int]int { + counts := make(map[int]int) + for i := 1; i < len(trace); i++ { + delta := trace[i].offsetSec - trace[i-1].offsetSec + counts[delta]++ + } + return counts +} + +func dominantObservedTraceQuota(trace []observedScanWindowTracePoint) (int64, int) { + counts := make(map[int64]int) + for _, point := range trace { + quotaBytes := int64(math.Round(point.observedMemoryQuotaBytes)) + counts[quotaBytes]++ + } + + var dominantQuotaBytes int64 + dominantQuotaCount := -1 + for quotaBytes, count := range counts { + if count > dominantQuotaCount { + dominantQuotaBytes = quotaBytes + dominantQuotaCount = count + } + } + return dominantQuotaBytes, dominantQuotaCount +} + +func maxObservedTraceScanInterval(trace []observedScanWindowTracePoint) float64 { + maxInterval := 0.0 + for _, point := range trace { + maxInterval = maxFloat64(maxInterval, point.observedScanIntervalSec) + } + return maxInterval +} + +func observedTraceScanIntervals(trace []observedScanWindowTracePoint) []float64 { + intervals := make([]float64, 0, len(trace)) + for _, point := range trace { + intervals = append(intervals, point.observedScanIntervalSec) + } + return intervals +} + +func observedTracePercentile(values []float64, percentile float64) float64 { + if len(values) == 0 { + return 0 + } + if len(values) == 1 { + return values[0] + } + + sorted := append([]float64(nil), values...) + sort.Float64s(sorted) + + position := percentile * float64(len(sorted)-1) + lowerIndex := int(math.Floor(position)) + upperIndex := int(math.Ceil(position)) + if lowerIndex == upperIndex { + return sorted[lowerIndex] + } + + weight := position - float64(lowerIndex) + return sorted[lowerIndex]*(1-weight) + sorted[upperIndex]*weight +} + +func findObservedTraceSeparatedPeaks( + trace []observedScanWindowTracePoint, + minPeakIntervalSec float64, + minSeparation time.Duration, +) []observedScanWindowTracePoint { + peaks := make([]observedScanWindowTracePoint, 0, 8) + for i := 1; i < len(trace)-1; i++ { + current := trace[i] + if current.observedScanIntervalSec < minPeakIntervalSec { + continue + } + if current.observedScanIntervalSec < trace[i-1].observedScanIntervalSec || + current.observedScanIntervalSec < trace[i+1].observedScanIntervalSec { + continue + } + + if len(peaks) == 0 || current.ts.Sub(peaks[len(peaks)-1].ts) >= minSeparation { + peaks = append(peaks, current) + continue + } + + if current.observedScanIntervalSec > peaks[len(peaks)-1].observedScanIntervalSec { + peaks[len(peaks)-1] = current + } + } + return peaks +} + +func findObservedTraceFloorCollapses( + trace []observedScanWindowTracePoint, + minPeakIntervalSec float64, + lookback time.Duration, + floorIntervalSec float64, + minDropRatio float64, +) []observedScanWindowCollapse { + collapses := make([]observedScanWindowCollapse, 0, 8) + lookbackSec := int(lookback / time.Second) + + for i := 1; i < len(trace); i++ { + floor := trace[i] + if floor.observedScanIntervalSec > floorIntervalSec { + continue + } + if trace[i-1].observedScanIntervalSec <= floorIntervalSec { + continue + } + + peakIndex := -1 + peakInterval := 0.0 + for j := i - 1; j >= 0; j-- { + if floor.offsetSec-trace[j].offsetSec > lookbackSec { + break + } + if trace[j].observedScanIntervalSec > peakInterval { + peakInterval = trace[j].observedScanIntervalSec + peakIndex = j + } + } + + if peakIndex == -1 || peakInterval < minPeakIntervalSec { + continue + } + + dropRatio := 1 - floor.observedScanIntervalSec/peakInterval + if dropRatio < minDropRatio { + continue + } + + collapses = append(collapses, observedScanWindowCollapse{ + peak: trace[peakIndex], + floor: floor, + dropRatio: dropRatio, + }) + } + + return collapses +} + +func findObservedTracePointByTime( + t *testing.T, + trace []observedScanWindowTracePoint, + timeText string, +) observedScanWindowTracePoint { + t.Helper() + + target := mustParseObservedScanWindowTraceTime(t, timeText) + for _, point := range trace { + if point.ts.Equal(target) { + return point + } + } + + t.Fatalf("trace point not found: %s", timeText) + return observedScanWindowTracePoint{} +} + +func selectObservedTraceWindow( + t *testing.T, + trace []observedScanWindowTracePoint, + startText string, + endText string, +) []observedScanWindowTracePoint { + t.Helper() + + start := mustParseObservedScanWindowTraceTime(t, startText) + end := mustParseObservedScanWindowTraceTime(t, endText) + require.False(t, end.Before(start)) + + window := make([]observedScanWindowTracePoint, 0, 32) + for _, point := range trace { + if point.ts.Before(start) || point.ts.After(end) { + continue + } + window = append(window, point) + } + require.NotEmpty(t, window) + return window +} + +func countObservedTraceScanIntervalsAtLeast(trace []observedScanWindowTracePoint, thresholdSec float64) int { + count := 0 + for _, point := range trace { + if point.observedScanIntervalSec >= thresholdSec { + count++ + } + } + return count +} + +func maxObservedTraceResetRate(trace []observedScanWindowTracePoint) float64 { + maxRate := 0.0 + for _, point := range trace { + maxRate = maxFloat64(maxRate, point.observedResetDispatcherPerSec) + } + return maxRate +} diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index e6ed10cc90..ef5672d2e7 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -69,6 +69,41 @@ var ( Name: "scan_window_interval", Help: "The scan window interval in seconds for each changefeed", }, []string{"changefeed"}) + EventServiceScanWindowUsageRatioGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_usage_ratio", + Help: "The usage ratio observed by the scan window controller for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowUsageEMAGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_usage_ema", + Help: "The usage EMA values used by the scan window controller for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowPressureScoreGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_pressure_score", + Help: "The pressure score maintained by the scan window controller for each changefeed", + }, []string{"changefeed"}) + EventServiceScanWindowMemoryReleaseCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_memory_release_count", + Help: "The number of memory release events reported to the scan window controller for each changefeed", + }, []string{"changefeed"}) + EventServiceScanWindowAdjustCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_adjust_count", + Help: "The number of scan window adjustments made by the controller for each changefeed", + }, []string{"changefeed", "reason"}) EventServiceScanDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -201,6 +236,11 @@ func initEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceResolvedTsLagGauge) registry.MustRegister(EventServiceScanWindowBaseTsGaugeVec) registry.MustRegister(EventServiceScanWindowIntervalGaugeVec) + registry.MustRegister(EventServiceScanWindowUsageRatioGaugeVec) + registry.MustRegister(EventServiceScanWindowUsageEMAGaugeVec) + registry.MustRegister(EventServiceScanWindowPressureScoreGaugeVec) + registry.MustRegister(EventServiceScanWindowMemoryReleaseCount) + registry.MustRegister(EventServiceScanWindowAdjustCount) registry.MustRegister(EventServiceScanDuration) registry.MustRegister(EventServiceScannedCount) registry.MustRegister(EventServiceDispatcherGauge)