Skip to content

Base Livecapture Config and Manager Implementations#5971

Open
K4ran30 wants to merge 10 commits into
opensearch-project:mainfrom
K4ran30:live-capture
Open

Base Livecapture Config and Manager Implementations#5971
K4ran30 wants to merge 10 commits into
opensearch-project:mainfrom
K4ran30:live-capture

Conversation

@K4ran30

@K4ran30 K4ran30 commented Aug 6, 2025

Copy link
Copy Markdown

Description

This PR introduces the core live capture functionality to Data Prepper, enabling real-time event tracing capabilities. This first commit includes the foundational classes including configuration, event capture coordination, and rate limiting.

  • LiveCaptureAppConfig.java - Spring configuration for live capture initialization
  • LiveCaptureConfiguration.java - Configuration class mapping to YAML settings
  • LiveCaptureManager.java - Central singleton managing live capture state

Check List

Notes

  • This is the first commit in a multi-part implementation of live capture
  • Documentation will be added in a separate issue/PR once the complete feature is implemented
  • Additional commits will follow with interceptors and pipeline integration

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
@kkondaka

kkondaka commented Aug 8, 2025

Copy link
Copy Markdown
Collaborator

@K4ran30 every java file in src/main should have a test file in src/test. Please write tests for all the code.

Signed-off-by: Karan Bhakta <kabhakta@amazon.com>

try {
Map<String, Object> eventData = event.toMap();
return activeFilters.entrySet().stream().allMatch(filter -> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use DataPrepper expressions instead.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would I do this by using the DataPrepper's ExpressionEvaluator?

Karan Bhakta added 2 commits August 9, 2025 20:17
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
public static final String PROCESSOR = "Processor";
public static final String ROUTE = "Route";
public static final String SINK = "Sink";
private static volatile LiveCaptureManager INSTANCE;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we need this. For now, we can keep it but we should not need this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this. This LiveCaptureManager can be created as a Spring Bean so that we don't need this at all.

Karan Bhakta added 2 commits August 11, 2025 10:40
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this initial PR!

public static final String PROCESSOR = "Processor";
public static final String ROUTE = "Route";
public static final String SINK = "Sink";
private static volatile LiveCaptureManager INSTANCE;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this. This LiveCaptureManager can be created as a Spring Bean so that we don't need this at all.

return INSTANCE;
}

public static boolean shouldLiveCapture(final Event event) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid all of these static methods. Use dependency injection where need and have a single instance provided by Spring.

}

// turn every entry into a dataprepper event so we can output to sink
public void processEventLiveCapture(final List<Map<String, Object>> liveCaptureEntries) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a more concrete model than Map<String, Object>.

@K4ran30 K4ran30 Aug 12, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I introduce model classes for the entries?

}
break;
case PROCESSOR:
if (name != null) captureMetaData.put("processorName", name);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that using Name in the suffix here is quite correct.

For one, we may add an Id concept in the future. For another, these are processor/sink types.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from name to type

Map<String, Object> configMap = (Map<String, Object>) sinkConfig;
// Find the first non-metadata entry to create the sink
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
if (!"entry_threshold".equals(entry.getKey()) && !"batch_size".equals(entry.getKey())) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these things? entry_threshold and batch_size?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry_threshold and batch_size are live capture configuration metadata that users can control through the config file.

Karan Bhakta and others added 4 commits August 13, 2025 14:44
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
…into live-capture

# Conflicts:
#	data-prepper-core/src/main/java/org/opensearch/dataprepper/core/livecapture/LiveCaptureAppConfig.java
#	data-prepper-core/src/main/java/org/opensearch/dataprepper/core/livecapture/LiveCaptureManager.java
#	data-prepper-core/src/test/java/org/opensearch/dataprepper/core/livecapture/LiveCaptureAppConfigTest.java
Signed-off-by: Karan Bhakta <kabhakta@amazon.com>
Signed-off-by: Karan Bhakta <123035205+K4ran30@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants