Enable ZeroBuffer#5661
Conversation
oeyh
left a comment
There was a problem hiding this comment.
Enable Zero Buffer under the following constraints:
- Number of workers is equal to 1
- Only allow pipelines which do not use @SingleThread processors.
Thanks for the PR! For my understanding, does the pipeline need to meet both constraints to use zero buffer? What would be the issue if using a @SingleThread processor while number of workers is 1?
dlvenable
left a comment
There was a problem hiding this comment.
Thanks for these improvements. I have one comment.
| //} | ||
| if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) { | ||
| // Check if there are any processor sets with @SingleThread processors | ||
| boolean hasSingleThreadedProcessors = processorSets.stream().anyMatch(processorSet -> processorSet.size() > 1); |
There was a problem hiding this comment.
I think that this will allow creating a pipeline with @SingleThread processors if the user defines workers to 1. However, this will not yield the correct behavior because the processors are called from multiple threads.
It may be ideal to check each Processor for the annotation directly.
There was a problem hiding this comment.
Thank you, I have modified this to check for the @SingleThread annotation directly
|
|
||
| return bufferMap; | ||
| } | ||
|
|
There was a problem hiding this comment.
Do these tests fail with your previous code? In addition to these tests, we must write an end-to-end test that will fail with your previous changes. And with this code that same test would pass. Without that we should not merge this code.
There was a problem hiding this comment.
Thank you, as per our discussion, the integration test that fails with previous changes has been added in the PR : https://github.com/opensearch-project/data-prepper/pull/5691/files
|
If source worker threads are used to execute processor chain and sink, there is no importance of number of process workers. All we should do is, while source worker is executing the processor chain and sink, they are given only one instance of each processor. I think, the two points mentioned in this PR description are not addressing the root cause of the issue it created the last time. |
kkondaka
left a comment
There was a problem hiding this comment.
Removing Request changes, so that this can be committed after reviews are complete
5b60ac1 to
ef8070d
Compare
| @@ -0,0 +1,10 @@ | |||
| simple-pipeline: | |||
| workers: 2 | |||
There was a problem hiding this comment.
Make workers: 1 so that this is not causing the failure. Otherwise, you are not sure if you are testing the code for @SingleThread.
There was a problem hiding this comment.
Okay sure. Thank you for catching that.
|
Thanks! I see that you rebased to include the new integration tests. I have one more comment. |
Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
ef8070d to
2669b70
Compare
Description
At present zero-buffer is disabled due to the issue #5545.
Enable Zero Buffer under the following constraints:
If a pipeline configuration does not meet the above requirements and still uses zero-buffer, then throw an exception to indicate the constraints.
Related Issues
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.