Skip to content

[FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression#966

Merged
mxm merged 5 commits into
apache:mainfrom
pchoudhury22:observed-scalability-coefficient
May 6, 2025
Merged

[FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression#966
mxm merged 5 commits into
apache:mainfrom
pchoudhury22:observed-scalability-coefficient

Conversation

@pchoudhury22

@pchoudhury22 pchoudhury22 commented Apr 2, 2025

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Currently, target parallelism computation assumes perfect linear scaling. However, real-time workloads often exhibit nonlinear scalability due to factors like network overhead and coordination costs.

This change introduces an observed scalability coefficient, estimated using linear regression on past (parallelism, processing rate) data, to improve the accuracy of scaling decisions.

Brief change log

Implemented a dynamic scaling coefficient to compute target parallelism based on observed scalability. The system estimates the scalability coefficient using a least squares linear regression approach, leveraging historical (parallelism, processing rate) data.
The regression model minimises the sum of squared errors. The baseline processing rate is computed using the smallest observed parallelism in the history. Model details:

The Linear Model

We define a linear relationship between parallelism (P) and processing rate (R):

$$R_i = β * P_i * α$$

where:

  • R_i = actual processing rate for the i-th data point
  • P_i = parallelism for the i-th data point
  • β = base factor (constant scale factor)
  • α = scaling coefficient to optimize

Squared Error

The loss function to minimise is the sum of squared errors (SSE):

$$Loss = Σ (R_i - R̂_i)^2$$

Substituting ( R̂_i = (β α) P_i ):

$$Loss = Σ (R_i - β α P_i)^2$$

Minimising the Error

Expanding ( (R_i - β α P_i)^2 ):

$$(R_i - β α P_i)^2 = R_i^2 - 2β α P_i R_i + (β α P_i)^2$$

Summing over all data points:

$$Loss = Σ (R_i^2 - 2β α P_i R_i + β^2 α^2 P_i^2)$$

Solving for α

To minimize for α, taking the derivative and solving we get:

$$α = (Σ P_i R_i) / (Σ P_i^2 * β)$$

Verifying this change

New unit tests added to cover this

Does this pull request potentially affect one of the following parts:

Dependencies (does it add or upgrade a dependency): no
The public API, i.e., is any changes to the CustomResourceDescriptors: no
Core observer or reconciler logic that is regularly executed: no

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Hi @gyfora , Please help review the PR! Thanks!

double timeDiff =
Duration.between(timestamp, latestTimestamp).getSeconds()
+ 1; // Avoid division by zero
double weight = parallelism / timeDiff;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide on this particular weighting approach? To be specific, what's the benefit compared to:

  • Not weighting
  • Using weights based on the difference with the current parallelism (locally weighted regression)

I think overall weighting makes sense but maybe weighing based on the parallelism difference ( and time) makes more sense then simply parallelism

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also add an enum configuration with some strategies here if we feel that would be required, but maybe an overkill initially

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong.

@gyfora gyfora requested a review from mxm April 29, 2025 07:41

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pchoudhury22, this is a great addition! Code looks very clean.

double timeDiff =
Duration.between(timestamp, latestTimestamp).getSeconds()
+ 1; // Avoid division by zero
double weight = parallelism / timeDiff;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong.

@gyfora

gyfora commented Apr 29, 2025

Copy link
Copy Markdown
Contributor

Thank you @pchoudhury22, this is a great addition! Code looks very clean.

@mxm We discussed this with @pchoudhury22 offline a little and came to the conclusion that due to the limited number of datapoints (scaling history is generally very limited epectially with the default kubernetes state store), it is probably best to initially remove the weighing completely.

In the future if we have a better way to accumulate history and training data points we can introduce different weighting strategies. What do you think?

@mxm

mxm commented Apr 29, 2025

Copy link
Copy Markdown
Contributor

+1 Let's remove the weighting completely for now.

… 2. Check scaling coefficient with threshold before returning. 3. Refactored tests for point [1] and [2].
@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Hi @gyfora @mxm Thanks so much for the review and the notes! Have updated the PR with the below changes

  1. removed weighting from the regression logic.
  2. Updated lowerBound for scaling coefficient calculation to 0.5.
  3. Updated description for OBSERVED_SCALABILITY_MIN_OBSERVATIONS key.

Please help review the PR! Thanks!

@mxm

mxm commented Apr 30, 2025

Copy link
Copy Markdown
Contributor

Thanks for the update!

2. Updated lowerBound for scaling coefficient calculation to 0.5.

Why 0.5? Should this be configurable?

@pchoudhury22

pchoudhury22 commented Apr 30, 2025

Copy link
Copy Markdown
Contributor Author

Primarily the thought process was to avoid a very small scaling coefficient. And honestly I guess, I just hesitated a bit to add another config :) . And as @gyfora suggested above on similar lines but in a threshold approach, we could probably use the same lowerBound as from scaling effectiveness threshold?

@gyfora

gyfora commented Apr 30, 2025

Copy link
Copy Markdown
Contributor

Primarily the thought process was to avoid a very small scaling coefficient. And honestly I guess, I just hesitated a bit to add another config :) . And as @gyfora suggested above on similar lines but in a threshold approach, we could probably use the same lowerBound as from scaling effectiveness threshold?

I wouldn't reuse the same config exactly here as it would have a different meaning here. So if you and @mxm feel that this should be adjustable we can add a new config for sure

@mxm

mxm commented Apr 30, 2025

Copy link
Copy Markdown
Contributor

I'm ok with keeping it at 0.5 for now, as this means that we will at most double the scaling factor, but I don't have an idea whether that would suffice or not. Adding a config option would give us the option to experiment with the lower limit.

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

That makes perfect sense! Will add an config for the lower limit 😊.

…idator to validate the min scaling coefficient config.
@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Hello, Good morning!
Have updated the PR with the below changes

  1. config for min value of Scaling coefficient.
  2. Updated validator to validate the min scaling coefficient config.

Please help review! Thanks!

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Hello @gyfora @mxm ! Hope you are doing great!

Have updated the PR with the below changes

  1. Config for min value of Scaling coefficient.
  2. Updated validator to validate the min scaling coefficient config.

Whenever you get a chance, I’d really appreciate your review and any feedback you might have. Thanks so much again for your time and support!

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Checking on failed checks.... :)

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Have updated the PR with the auto scaler configuration doc updates. Hopefully the Checks resolve now! But I don't think I am able to trigger the Checks. @mxm if you can please help with it?! Thanks! Really sorry for the inconvenience 😔

@pchoudhury22

Copy link
Copy Markdown
Contributor Author

The failed check of latest run for e2e_ci (v1_17, native, test_sessionjob_operations.sh) / test_sessionjob_operations.sh seems to be for a different reason. ie toomanyrequests: You have reached your unauthenticated pull rate limit. https://www.docker.com/increase-rate-limit .
The current rate limit per 6hour is 100 per IPv4 address or IPv6 /64 subnet . Since the 6hour bucket, Slightly confused as to how to handle it. But checking further..

@mxm

mxm commented May 5, 2025

Copy link
Copy Markdown
Contributor

All green now.

@mxm

mxm commented May 5, 2025

Copy link
Copy Markdown
Contributor

@gyfora Any further comments?

@mxm mxm merged commit 27daa5a into apache:main May 6, 2025
@pchoudhury22

Copy link
Copy Markdown
Contributor Author

Thanks so much for all the notes and guidance 😊!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants