Skip to content

Enable ZeroBuffer#5661

Merged
oeyh merged 1 commit into
opensearch-project:mainfrom
MohammedAghil:fix-and-re-enable-zero-buffer
May 22, 2025
Merged

Enable ZeroBuffer#5661
oeyh merged 1 commit into
opensearch-project:mainfrom
MohammedAghil:fix-and-re-enable-zero-buffer

Conversation

@MohammedAghil
Copy link
Copy Markdown
Contributor

@MohammedAghil MohammedAghil commented Apr 30, 2025

Description

At present zero-buffer is disabled due to the issue #5545.

Enable Zero Buffer under the following constraints:

  1. Number of workers is equal to 1
  2. Only allow pipelines which do not use @SingleThread processors.

If a pipeline configuration does not meet the above requirements and still uses zero-buffer, then throw an exception to indicate the constraints.

Related Issues

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Copy Markdown
Collaborator

@oeyh oeyh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable Zero Buffer under the following constraints:

  1. Number of workers is equal to 1
  2. Only allow pipelines which do not use @SingleThread processors.

Thanks for the PR! For my understanding, does the pipeline need to meet both constraints to use zero buffer? What would be the issue if using a @SingleThread processor while number of workers is 1?

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these improvements. I have one comment.

//}
if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) {
// Check if there are any processor sets with @SingleThread processors
boolean hasSingleThreadedProcessors = processorSets.stream().anyMatch(processorSet -> processorSet.size() > 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will allow creating a pipeline with @SingleThread processors if the user defines workers to 1. However, this will not yield the correct behavior because the processors are called from multiple threads.

It may be ideal to check each Processor for the annotation directly.

Copy link
Copy Markdown
Contributor Author

@MohammedAghil MohammedAghil Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks @dlvenable .

Copy link
Copy Markdown
Contributor Author

@MohammedAghil MohammedAghil May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have modified this to check for the @SingleThread annotation directly


return bufferMap;
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these tests fail with your previous code? In addition to these tests, we must write an end-to-end test that will fail with your previous changes. And with this code that same test would pass. Without that we should not merge this code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, as per our discussion, the integration test that fails with previous changes has been added in the PR : https://github.com/opensearch-project/data-prepper/pull/5691/files

@san81
Copy link
Copy Markdown
Collaborator

san81 commented May 1, 2025

If source worker threads are used to execute processor chain and sink, there is no importance of number of process workers. All we should do is, while source worker is executing the processor chain and sink, they are given only one instance of each processor. I think, the two points mentioned in this PR description are not addressing the root cause of the issue it created the last time.

Copy link
Copy Markdown
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing Request changes, so that this can be committed after reviews are complete

@kkondaka kkondaka dismissed their stale review May 6, 2025 17:49

Integration test will be added separately.

@MohammedAghil MohammedAghil force-pushed the fix-and-re-enable-zero-buffer branch from 5b60ac1 to ef8070d Compare May 20, 2025 21:48
@@ -0,0 +1,10 @@
simple-pipeline:
workers: 2
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make workers: 1 so that this is not causing the failure. Otherwise, you are not sure if you are testing the code for @SingleThread.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sure. Thank you for catching that.

@dlvenable
Copy link
Copy Markdown
Member

Thanks! I see that you rebased to include the new integration tests. I have one more comment.

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
@MohammedAghil MohammedAghil force-pushed the fix-and-re-enable-zero-buffer branch from ef8070d to 2669b70 Compare May 21, 2025 20:16
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @MohammedAghil !

@oeyh oeyh merged commit 1031c23 into opensearch-project:main May 22, 2025
43 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants