eventseivice: improve scan window#4901
Conversation
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces the legacy scan interval adjustment logic with a new adaptiveScanWindowController that utilizes dual Exponential Moving Averages (EMA) and a pressure score mechanism for more robust congestion control. The update includes a new simulation test suite and trace-based validation. Feedback highlights a potential race condition during interval updates and suggests replacing hardcoded scaling factors with named constants. Furthermore, it is recommended to transition the EMA and pressure score decay to time-weighted calculations to maintain consistency across varying reporting frequencies.
| current := time.Duration(c.scanInterval.Load()) | ||
| if current != defaultScanInterval { | ||
| c.scanInterval.Store(int64(defaultScanInterval)) | ||
| metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(defaultScanInterval.Seconds()) | ||
|
|
||
| log.Info("scan interval reset to default", | ||
| zap.Stringer("changefeedID", c.changefeedID), | ||
| zap.Duration("oldInterval", current), | ||
| zap.Duration("newInterval", defaultScanInterval)) | ||
| decision := c.scanWindowController.OnCongestionReport(now, current, c.maxScanInterval(), scanWindowReport{ | ||
| usageRatio: normalizeUsageRatio(usageRatio), | ||
| memoryReleaseCount: memoryReleaseCount, | ||
| }) | ||
| if decision.newInterval == current { | ||
| return | ||
| } | ||
|
|
||
| c.lastAdjustTime.Store(now) | ||
| c.lastTrendAdjustTime.Store(now) | ||
| c.scanInterval.Store(int64(decision.newInterval)) |
There was a problem hiding this comment.
There is a potential race condition when updating scanInterval. The current interval is loaded at line 257 and the new decision is stored at line 266. Since these operations are not atomic with respect to each other, concurrent calls to updateMemoryUsage (which can happen if multiple collectors report congestion for the same changefeed) might result in a decision based on a stale current value, leading to inconsistent interval adjustments. Consider synchronizing the load, calculation, and store operations, for example by passing the atomic variable into the controller or wrapping the call in a mutex.
| newInterval := max(scaleDuration(current, 3, 4), defaultScanInterval) | ||
| c.noteAdjustmentLocked(now, true) | ||
| return scanWindowDecision{ | ||
| newInterval: newInterval, | ||
| maxInterval: maxInterval, | ||
| reason: scanWindowDecisionHighPressure, | ||
| usage: usage, | ||
| fastUsageEMA: c.fastUsageEMA, | ||
| slowUsageEMA: c.slowUsageEMA, | ||
| pressureScore: c.pressureScore, | ||
| } | ||
| } | ||
|
|
||
| if c.shouldReduceForSustainedPressureLocked(now, usage) { | ||
| newInterval := max(scaleDuration(current, 9, 10), defaultScanInterval) | ||
| c.noteAdjustmentLocked(now, true) | ||
| return scanWindowDecision{ | ||
| newInterval: newInterval, | ||
| maxInterval: maxInterval, | ||
| reason: scanWindowDecisionSustainedPressure, | ||
| usage: usage, | ||
| fastUsageEMA: c.fastUsageEMA, | ||
| slowUsageEMA: c.slowUsageEMA, | ||
| pressureScore: c.pressureScore, | ||
| } | ||
| } | ||
|
|
||
| if !c.allowedToIncreaseLocked(now, usage) { | ||
| return scanWindowDecision{ | ||
| newInterval: current, | ||
| maxInterval: maxInterval, | ||
| reason: scanWindowDecisionNone, | ||
| usage: usage, | ||
| fastUsageEMA: c.fastUsageEMA, | ||
| slowUsageEMA: c.slowUsageEMA, | ||
| pressureScore: c.pressureScore, | ||
| } | ||
| } | ||
|
|
||
| if c.isVeryLowPressureLocked(usage) { | ||
| effectiveMaxInterval := maxScanInterval | ||
| newInterval := min(scaleDuration(current, 3, 2), effectiveMaxInterval) | ||
| if newInterval > current { | ||
| c.noteAdjustmentLocked(now, false) | ||
| return scanWindowDecision{ | ||
| newInterval: newInterval, | ||
| maxInterval: effectiveMaxInterval, | ||
| reason: scanWindowDecisionVeryLowRecovery, | ||
| usage: usage, | ||
| fastUsageEMA: c.fastUsageEMA, | ||
| slowUsageEMA: c.slowUsageEMA, | ||
| pressureScore: c.pressureScore, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if current < maxInterval && c.isLowPressureLocked(usage) { | ||
| newInterval := min(scaleDuration(current, 5, 4), maxInterval) | ||
| if newInterval > current { | ||
| c.noteAdjustmentLocked(now, false) | ||
| return scanWindowDecision{ | ||
| newInterval: newInterval, | ||
| maxInterval: maxInterval, | ||
| reason: scanWindowDecisionLowRecovery, | ||
| usage: usage, | ||
| fastUsageEMA: c.fastUsageEMA, | ||
| slowUsageEMA: c.slowUsageEMA, | ||
| pressureScore: c.pressureScore, | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
| func (c *adaptiveScanWindowController) updateUsageEMALocked(value float64) { | ||
| if !c.emaInitialized { | ||
| c.fastUsageEMA = value | ||
| c.slowUsageEMA = value | ||
| c.emaInitialized = true | ||
| return | ||
| } | ||
| c.fastUsageEMA = ema(c.fastUsageEMA, value, scanWindowFastUsageAlpha) | ||
| c.slowUsageEMA = ema(c.slowUsageEMA, value, scanWindowSlowUsageAlpha) | ||
| } |
There was a problem hiding this comment.
The EMA calculation previous + alpha*(value-previous) assumes a constant sampling interval. If congestion reports arrive at irregular frequencies, the EMA will be biased. For more robust behavior in a distributed environment, consider a time-weighted EMA where the smoothing factor accounts for the time elapsed since the last sample.
| func (c *adaptiveScanWindowController) updatePressureScoreLocked(usage memoryUsageStats) { | ||
| switch { | ||
| case c.fastUsageEMA >= scanWindowHighPressureThreshold || | ||
| c.slowUsageEMA >= scanWindowHighPressureThreshold || | ||
| usage.max >= memoryUsageHighThreshold: | ||
| c.pressureScore = min(c.pressureScore+2, scanWindowPressureScoreCeiling) | ||
| case c.fastUsageEMA >= scanWindowModeratePressureThreshold || | ||
| c.slowUsageEMA >= scanWindowModeratePressureThreshold || | ||
| usage.avg >= scanWindowModeratePressureThreshold: | ||
| c.pressureScore = min(c.pressureScore+1, scanWindowPressureScoreCeiling) | ||
| case c.fastUsageEMA < 0.30 && c.slowUsageEMA < 0.25 && usage.last < 0.30: | ||
| c.pressureScore = maxFloat64(0, c.pressureScore-1.5) | ||
| default: | ||
| c.pressureScore = maxFloat64(0, c.pressureScore-0.5) | ||
| } | ||
| } |
There was a problem hiding this comment.
The pressure score logic uses several magic numbers for increments and decrements. Furthermore, the constant decay in the default case (-0.5) makes the pressure score's recovery rate dependent on the reporting frequency rather than actual time. This could lead to premature recovery if reports are frequent, or sluggish recovery if they are sparse. Consider making the decay time-proportional.
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Before

After

Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note