Skip to content

[Fix] Add sleep in zero buffer read when no data is flowing#5846

Merged
oeyh merged 4 commits into
opensearch-project:mainfrom
oeyh:fix/zero-buffer-high-cpu
Jun 30, 2025
Merged

[Fix] Add sleep in zero buffer read when no data is flowing#5846
oeyh merged 4 commits into
opensearch-project:mainfrom
oeyh:fix/zero-buffer-high-cpu

Conversation

@oeyh
Copy link
Copy Markdown
Collaborator

@oeyh oeyh commented Jun 29, 2025

Description

The ZeroBuffer was causing high CPU usage when no data was flowing through the pipeline. The read() method (called by ProcessWorker#run method) would continuously return immediately when no data is flowing, creating a tight loop that consumed excessive CPU resources.

This PR:

  • Added a 1-second sleep in the read() method when no data is available. This prevents the tight loop and significantly reduces CPU usage during idle periods.
  • Added unit test to verify sleep behavior

Testing:

  • Profiled zero buffer pipeline run before and after the changes and confirmed that ProcessWorker#run no longer takes most of the cpu time.

Issues Resolved

N/A

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Hai Yan <oeyh@amazon.com>
sb2k16
sb2k16 previously approved these changes Jun 30, 2025
public void testReadFromEmptyBufferCallsThreadSleep() {
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();

long startTime = System.currentTimeMillis();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our build are taking a long time and tests with sleeps contribute to this. I think using Mockito's mockStatic on Thread.sleep() is better here. I think some other tests do this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got this error when trying to use mockStatic on Thread.class. Looks like it's not possible.

It is not possible to mock static methods of java.lang.Thread to avoid interfering with class loading what leads to infinite loops

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to test Thread.sleep() call indirectly through Thread.currentThread().interrupt() instead.

Signed-off-by: Hai Yan <oeyh@amazon.com>
sb2k16
sb2k16 previously approved these changes Jun 30, 2025
try {
Thread.sleep(DEFAULT_READ_SLEEP_MILLIS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to throw if sleep is interrupted. It should be silently ignored, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Otherwise it may break the process loop and shutdown.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Signed-off-by: Hai Yan <oeyh@amazon.com>
Thread.sleep(DEFAULT_READ_SLEEP_MILLIS);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think Thread.currentThread().interrupt(); is needed. We do not need to restore the "interrupted" status.

Signed-off-by: Hai Yan <oeyh@amazon.com>
@oeyh oeyh merged commit f27b493 into opensearch-project:main Jun 30, 2025
45 of 47 checks passed
JonahCalvo pushed a commit to JonahCalvo/os-data-prepper that referenced this pull request Jul 17, 2025
…ch-project#5846)

* Add sleep in zero buffer read when no data is flowing

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Update unit test

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Catch InterruptedException and not throw

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Address one more comment

Signed-off-by: Hai Yan <oeyh@amazon.com>

---------

Signed-off-by: Hai Yan <oeyh@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants