Skip to content

add S3 Enrich processor to merge ml batch job output with source inputs#5992

Merged
graytaylor0 merged 5 commits into
opensearch-project:mainfrom
Zhangxunmt:mainline
Mar 24, 2026
Merged

add S3 Enrich processor to merge ml batch job output with source inputs#5992
graytaylor0 merged 5 commits into
opensearch-project:mainfrom
Zhangxunmt:mainline

Conversation

@Zhangxunmt

@Zhangxunmt Zhangxunmt commented Aug 13, 2025

Copy link
Copy Markdown
Collaborator

Description

This PR implements the proposed S3 Enricher processor to merge the data from S3 into the events in the data prepper pipeline.

This processor can be used to merge the ML batch job results in the S3Key "<source_file_basename-xxxxx>.jsonl.out" with certain fields from source data in the S3Key "source_file_basename.jsonl".

This is the config of this processor:

  - s3_enrich:
      bucket:
        name: offlinebatch                              # Name of the S3 bucket containing enrichment source files
        filter:
          include_prefix: bedrockbatch/originsource/    # Only scan objects under this key prefix within the bucket
      codec:
        ndjson:                                         # Codec used to parse enrichment source files (newline-delimited JSON)
      default_bucket_owner: 802041417063                # AWS account ID that owns the bucket; used to validate bucket ownership on access
      aws:
        region: us-east-1
      s3_object_size_limit: 100mb                      # Maximum file size to load into memory; objects exceeding this limit are skipped (min: 100mb, max: 300mb)
      s3_key_path: "s3/key"                             # Event field path whose value contains the S3 output object key (used to derive the enrichment source object)

      # Regex applied to the output S3 key to extract the base name of the enrichment source file.
      # Capture group 1 becomes the source file stem; the codec extension is appended to form the final lookup key.
      # Example: "test_batch_50k-2025-11-06T21-19-15Z-xxx.jsonl.out" → capture group 1 = "test_batch_50k"
      #          → looks up "bedrockbatch/originsource/test_batch_50k.jsonl" in the bucket
      s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$

      correlation_keys: 
         - "recordId"                       # Field present in both the pipeline event and the enrichment source record; used to match and join the two
      keys_to_merge:                                    # Fields to copy from the matched enrichment source record into the pipeline event
        - "field_A"
        - "field_B"
        - "field_C"
      enrich_when: /s3/key != null                      # Data Prepper expression; processor is skipped for events where this evaluates to false``` 

 
### Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
 
### Check List
- [ ] New functionality includes testing.
- [ ] New functionality has a documentation issue. Please link to it in this PR.
  - [ ] New functionality has javadoc added
- [ ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md).

@Zhangxunmt Zhangxunmt changed the title add ml merge processor to merge S3 inputs add S3 Enricher processor to merge ml batch job output with source inputs Dec 16, 2025
@Zhangxunmt Zhangxunmt force-pushed the mainline branch 2 times, most recently from 05e390c to a6cddc2 Compare December 16, 2025 07:52

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Zhangxunmt for this change!

I left some comments. One thing that might help reduce the amount of work in this PR is to create a different PR that moves all the common code from S3 source into s3-common as its own PR.

Comment thread data-prepper-plugins/s3-enricher-processor/build.gradle Outdated
Comment thread data-prepper-plugins/s3-enrich-processor/build.gradle Outdated
Comment thread settings.gradle Outdated
@Zhangxunmt Zhangxunmt mentioned this pull request Jan 13, 2026
4 tasks
@Zhangxunmt Zhangxunmt force-pushed the mainline branch 2 times, most recently from a7e549a to 32673a9 Compare March 10, 2026 04:08
@Zhangxunmt Zhangxunmt changed the title add S3 Enricher processor to merge ml batch job output with source inputs add S3 Enrich processor to merge ml batch job output with source inputs Mar 19, 2026

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Zhangxunmt , I have a few more comments.

*/
public Cache<String, Event> getOrLoadRecordCache(String s3Url, Supplier<Cache<String, Event>> loader) {
return s3Cache.get(s3Url, key -> {
LOG.info("Loading S3 data for: {}", key);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a debug log. It will happen too often for normal logging.

if (matcher.matches() && matcher.groupCount() >= 1) {
final String baseName = matcher.group(1);
if (baseName != null && !baseName.isBlank()) {
return baseName + ".jsonl";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this jsonl extension be configurable?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's reuse this config in the extension to make it parametric. This is used in the S3 sink to define the extension of the generated file.

        codec:
          ndjson:
            extension: jsonl

* @param targetEvent the event to enrich (output record)
* @param sourceEvent the event containing source data (from cache)
*/
private void mergeData(Event targetEvent, Event sourceEvent) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Event class has a merge method. Can it work for you? If not, is there a way it could be configured for your needs?

@Override
public void merge(final Event other) {
if(!(other instanceof JacksonEvent))
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
final JacksonEvent otherJacksonEvent = (JacksonEvent) other;
if(!(otherJacksonEvent.jsonNode instanceof ObjectNode)) {
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent with object data.");
}
final ObjectNode otherObjectNode = (ObjectNode) otherJacksonEvent.jsonNode;
if(!(jsonNode instanceof ObjectNode)) {
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
}
((ObjectNode) jsonNode).setAll(otherObjectNode);
}

@ExtendWith(MockitoExtension.class)
class S3ObjectReferenceResolverTest {

private static final String BUCKET_NAME = "test-bucket";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this and KEY_PATH into a non-static fields. Initialize them as random values in @BeforeEach.


@Test
void resolve_throws_when_s3_key_is_blank() {
when(event.get(KEY_PATH, String.class)).thenReturn(" ");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need this:

when(bucketOption.getName()).thenReturn(bucketName);

Otherwise you may not testing what you mean to.


@Test
void resolve_throws_when_s3_key_is_null() {
when(event.get(KEY_PATH, String.class)).thenReturn(null);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need this:

when(bucketOption.getName()).thenReturn(bucketName);

Otherwise you may not testing what you mean to.

private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;

@DataPrepperPluginConstructor

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also make this @Experimental while we work on it in case we want to make some changes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree. I will add the Experimental annotation to it.

* @throws UnsupportedOperationException if the current Event does not support merging.
* @since 2.11
*/
void merge(Event other, List<String> keys);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this Collection<String>

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final List<ScanObjectWorker> workers;

public S3ScanService(final S3SourceConfig s3SourceConfig,
public S3ScanService(final S3SourceConfig s3SourceConfig,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

* @param keys the list of keys to selectively merge
* @throws IllegalArgumentException if the input event is not compatible to merge.
* @throws UnsupportedOperationException if the current Event does not support merging.
* @since 2.11

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @since 2.11
* @since 2.15

The next version is 2.15.

Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
if (matcher.matches() && matcher.groupCount() >= 1) {
final String baseName = matcher.group(1);
if (baseName != null && !baseName.isBlank()) {
return baseName + ".jsonl";

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's reuse this config in the extension to make it parametric. This is used in the S3 sink to define the extension of the generated file.

        codec:
          ndjson:
            extension: jsonl

private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;

@DataPrepperPluginConstructor

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree. I will add the Experimental annotation to it.


import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

@Experimental

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added @experimental here for the new processor class.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Zhangxunmt for this great new processor!

@dlvenable dlvenable added this to the v2.15 milestone Mar 24, 2026
for (final String key : keys) {
final Object value = other.get(key, Object.class);
if (value != null) {
put(key, value);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data has something like this { "key": null }, do you thing users will want it merged?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For the S3 enrich use case(adding data from a lookup), skipping nulls makes sense — you don't want a missing/null enrichment field to overwrite a valid value in the target event. But in general, a user might explicitly want to merge a null to clear a field.

Given that the only current consumer is S3EnrichProcessor and the semantics are enrichment, I think we can keep skipping nulls. But I'll add a comment in the code in the next PR to make the decision explicit so in the future this can enhanced for other cases.

@graytaylor0 graytaylor0 merged commit 627a6b7 into opensearch-project:main Mar 24, 2026
101 of 108 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants