Skip to content

eventseivice: improve scan window#4901

Draft
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0424-scanwindow
Draft

eventseivice: improve scan window#4901
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0424-scanwindow

Conversation

@asddongmen
Copy link
Copy Markdown
Collaborator

@asddongmen asddongmen commented Apr 24, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Before
img_v3_02112_4a87f20c-d9c1-4352-a66b-b28638aa1b3g

After
img_v3_02112_f1c1550d-d773-4a02-8319-73d59985e79g

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 24, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels Apr 24, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 24, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 24, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ddf074be-721a-4a64-b57d-796c97ecb2a5

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 24, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request replaces the legacy scan interval adjustment logic with a new adaptiveScanWindowController that utilizes dual Exponential Moving Averages (EMA) and a pressure score mechanism for more robust congestion control. The update includes a new simulation test suite and trace-based validation. Feedback highlights a potential race condition during interval updates and suggests replacing hardcoded scaling factors with named constants. Furthermore, it is recommended to transition the EMA and pressure score decay to time-weighted calculations to maintain consistency across varying reporting frequencies.

Comment on lines 257 to +266
current := time.Duration(c.scanInterval.Load())
if current != defaultScanInterval {
c.scanInterval.Store(int64(defaultScanInterval))
metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(defaultScanInterval.Seconds())

log.Info("scan interval reset to default",
zap.Stringer("changefeedID", c.changefeedID),
zap.Duration("oldInterval", current),
zap.Duration("newInterval", defaultScanInterval))
decision := c.scanWindowController.OnCongestionReport(now, current, c.maxScanInterval(), scanWindowReport{
usageRatio: normalizeUsageRatio(usageRatio),
memoryReleaseCount: memoryReleaseCount,
})
if decision.newInterval == current {
return
}

c.lastAdjustTime.Store(now)
c.lastTrendAdjustTime.Store(now)
c.scanInterval.Store(int64(decision.newInterval))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition when updating scanInterval. The current interval is loaded at line 257 and the new decision is stored at line 266. Since these operations are not atomic with respect to each other, concurrent calls to updateMemoryUsage (which can happen if multiple collectors report congestion for the same changefeed) might result in a decision based on a stale current value, leading to inconsistent interval adjustments. Consider synchronizing the load, calculation, and store operations, for example by passing the atomic variable into the controller or wrapping the call in a mutex.

Comment on lines +342 to +412
newInterval := max(scaleDuration(current, 3, 4), defaultScanInterval)
c.noteAdjustmentLocked(now, true)
return scanWindowDecision{
newInterval: newInterval,
maxInterval: maxInterval,
reason: scanWindowDecisionHighPressure,
usage: usage,
fastUsageEMA: c.fastUsageEMA,
slowUsageEMA: c.slowUsageEMA,
pressureScore: c.pressureScore,
}
}

if c.shouldReduceForSustainedPressureLocked(now, usage) {
newInterval := max(scaleDuration(current, 9, 10), defaultScanInterval)
c.noteAdjustmentLocked(now, true)
return scanWindowDecision{
newInterval: newInterval,
maxInterval: maxInterval,
reason: scanWindowDecisionSustainedPressure,
usage: usage,
fastUsageEMA: c.fastUsageEMA,
slowUsageEMA: c.slowUsageEMA,
pressureScore: c.pressureScore,
}
}

if !c.allowedToIncreaseLocked(now, usage) {
return scanWindowDecision{
newInterval: current,
maxInterval: maxInterval,
reason: scanWindowDecisionNone,
usage: usage,
fastUsageEMA: c.fastUsageEMA,
slowUsageEMA: c.slowUsageEMA,
pressureScore: c.pressureScore,
}
}

if c.isVeryLowPressureLocked(usage) {
effectiveMaxInterval := maxScanInterval
newInterval := min(scaleDuration(current, 3, 2), effectiveMaxInterval)
if newInterval > current {
c.noteAdjustmentLocked(now, false)
return scanWindowDecision{
newInterval: newInterval,
maxInterval: effectiveMaxInterval,
reason: scanWindowDecisionVeryLowRecovery,
usage: usage,
fastUsageEMA: c.fastUsageEMA,
slowUsageEMA: c.slowUsageEMA,
pressureScore: c.pressureScore,
}
}
}

if current < maxInterval && c.isLowPressureLocked(usage) {
newInterval := min(scaleDuration(current, 5, 4), maxInterval)
if newInterval > current {
c.noteAdjustmentLocked(now, false)
return scanWindowDecision{
newInterval: newInterval,
maxInterval: maxInterval,
reason: scanWindowDecisionLowRecovery,
usage: usage,
fastUsageEMA: c.fastUsageEMA,
slowUsageEMA: c.slowUsageEMA,
pressureScore: c.pressureScore,
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The scaling factors used for interval adjustments (e.g., 3/4, 9/10, 3/2, 5/4) are hardcoded magic numbers. These should be defined as named constants (e.g., scanWindowHighPressureScaleNumerator, etc.) to improve readability and facilitate future tuning of the adaptive algorithm.

Comment on lines +425 to +434
func (c *adaptiveScanWindowController) updateUsageEMALocked(value float64) {
if !c.emaInitialized {
c.fastUsageEMA = value
c.slowUsageEMA = value
c.emaInitialized = true
return
}
c.fastUsageEMA = ema(c.fastUsageEMA, value, scanWindowFastUsageAlpha)
c.slowUsageEMA = ema(c.slowUsageEMA, value, scanWindowSlowUsageAlpha)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The EMA calculation previous + alpha*(value-previous) assumes a constant sampling interval. If congestion reports arrive at irregular frequencies, the EMA will be biased. For more robust behavior in a distributed environment, consider a time-weighted EMA where the smoothing factor accounts for the time elapsed since the last sample.

Comment on lines +436 to +451
func (c *adaptiveScanWindowController) updatePressureScoreLocked(usage memoryUsageStats) {
switch {
case c.fastUsageEMA >= scanWindowHighPressureThreshold ||
c.slowUsageEMA >= scanWindowHighPressureThreshold ||
usage.max >= memoryUsageHighThreshold:
c.pressureScore = min(c.pressureScore+2, scanWindowPressureScoreCeiling)
case c.fastUsageEMA >= scanWindowModeratePressureThreshold ||
c.slowUsageEMA >= scanWindowModeratePressureThreshold ||
usage.avg >= scanWindowModeratePressureThreshold:
c.pressureScore = min(c.pressureScore+1, scanWindowPressureScoreCeiling)
case c.fastUsageEMA < 0.30 && c.slowUsageEMA < 0.25 && usage.last < 0.30:
c.pressureScore = maxFloat64(0, c.pressureScore-1.5)
default:
c.pressureScore = maxFloat64(0, c.pressureScore-0.5)
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pressure score logic uses several magic numbers for increments and decrements. Furthermore, the constant decay in the default case (-0.5) makes the pressure score's recovery rate dependent on the reporting frequency rather than actual time. This could lead to premature recovery if reports are frequent, or sluggish recovery if they are sparse. Consider making the decay time-proportional.

Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 24, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant