Skip to content

[FEATURE] : Add synchronized buffer plugin#6642

Open
MohammedAghil wants to merge 1 commit into
opensearch-project:mainfrom
MohammedAghil:feature/synchronised-buffer
Open

[FEATURE] : Add synchronized buffer plugin#6642
MohammedAghil wants to merge 1 commit into
opensearch-project:mainfrom
MohammedAghil:feature/synchronised-buffer

Conversation

@MohammedAghil

Copy link
Copy Markdown
Contributor

Add a synchronized buffer implementation that provides synchronous data flow from source to sink while leveraging parallel processing via Data Prepper's process worker threads. Writers block until their records are fully processed and checkpointed by worker threads.

Description

In the existing zero buffer, the caller thread handles everything — writing, processing, and publishing to sink — which means process worker threads are unused. The synchronized buffer addresses this by allowing the caller thread to write a batch of records into a shared, thread-safe queue and then block until all records in that batch have been fully processed and checkpointed by the worker threads. This enables use cases like the OpenSearch API source where the HTTP response must only be returned after the data has been delivered to the sink.

Key design:

  • The caller wraps records in a SignaledBatch and enqueues it, then blocks waiting for completion
  • Multiple process worker threads read slices from the batch concurrently, respecting a configurable batch_size
  • On checkpoint, each worker marks its slice as processed; once all records in the batch are accounted for, the blocked caller is unblocked
  • Metrics are emitted for records written and records read

Issues Resolved

Resolves #5712

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Add a synchronized buffer implementation that provides synchronous
data flow from source to sink while leveraging parallel processing
via Data Prepper's process worker threads. Writers block until their
records are fully processed and checkpointed by worker threads.

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
@github-actions

Copy link
Copy Markdown

⚠️ License Header Violations Found

The following newly added files are missing required license headers:

  • data-prepper-plugins/synchronized-buffer/build.gradle
  • data-prepper-plugins/synchronized-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/SynchronizedBuffer.java
  • data-prepper-plugins/synchronized-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/SynchronizedBufferConfig.java
  • data-prepper-plugins/synchronized-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/model/ReadBatch.java
  • data-prepper-plugins/synchronized-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/model/SignaledBatch.java
  • data-prepper-plugins/synchronized-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/SynchronizedBufferTests.java

Please add the appropriate license header to each file and push your changes.

See the license header requirements: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Synchronous Buffer

1 participant