Skip to content

Add S3 Scan processing condition evaluator to ensure object completeness#6624

Merged
dlvenable merged 2 commits into
opensearch-project:mainfrom
Zhangxunmt:main
Mar 19, 2026
Merged

Add S3 Scan processing condition evaluator to ensure object completeness#6624
dlvenable merged 2 commits into
opensearch-project:mainfrom
Zhangxunmt:main

Conversation

@Zhangxunmt

@Zhangxunmt Zhangxunmt commented Mar 10, 2026

Copy link
Copy Markdown
Collaborator

Description

Problem

When using S3 scan to read the output of asynchronous batch inference jobs (e.g. Amazon Bedrock batch jobs), output files appear in S3 before the job has fully completed. There was no way to delay processing of those files until the job was actually done, leading to partial reads.

Solution

Introduces a new processing_conditions list at the S3 scan bucket level. Before processing an S3 object, the worker evaluates each applicable condition by reading a co-located manifest/sentinel file and running a Data Prepper expression against its JSON content. Processing is deferred until all conditions are satisfied.

Configuration example

  scan:
    buckets:
      - bucket:
          name: "my-batch-bucket"
          data_selection: data_only
          processing_conditions:
            - object_name: "manifest.json.out"
              when: "/processedRecordCount == /totalRecordCount"
              applicable_prefix:
                - "bedrock-multisource/output-multisource/"
              retry_delay: PT5M
              max_retry: 10
          filter:
            include_prefix:
              - "bedrock-multisource/output-multisource/"
            exclude_suffix:
              - manifest.json.out

For an object at s3://my-bucket/bedrock-multisource/output-multisource/job-123/result.jsonl.out, the worker will:

  1. Download s3://my-bucket/bedrock-multisource/output-multisource/job-123/manifest.json.out
  2. Parse it as JSON and evaluate /processedRecordCount == /totalRecordCount
  3. If true → proceed with processing
  4. If false or manifest not found → release the partition back with a now + retry_delay priority timestamp, retrying up to max_retry times

processing_conditions fields:

object_name │ string │ required │ Manifest file name, resolved in the same S3 directory as the object
when │ string │ required │ Data Prepper expression evaluated against manifest JSON
applicable_prefix │ list │ all objects │ Prefixes this condition applies to; absent means all objects in the bucket
retry_delay │ Duration │ PT5M │ Wait time before re-queuing when condition is not met
max_retry │ int │ 10 │ Max retry attempts before the partition is abandoned

Implementation details:

New classes:

  • S3ScanProcessingCondition — config POJO for a single condition entry
  • S3ScanProcessingConditionEvaluator — encapsulates all condition-checking logic: S3 manifest download, JSON parsing into JacksonEvent, expression evaluation via ExpressionEvaluator

Modified classes:

  • S3ScanBucketOption — adds processing_conditions field
  • ScanObjectWorker — checks conditions before processing each object; on failure calls sourceCoordinator.closePartition(objectToProcess.get().getPartitionKey(), retryDelay, maxRetry, false);
  • S3ScanService — constructs a single shared S3ScanProcessingConditionEvaluator (stateless) and passes it to all workers
  • S3Source — injects ExpressionEvaluator via @DataPrepperPluginConstructor and threads it through to S3ScanService

Testing:

  • 23 unit tests added for S3ScanProcessingConditionEvaluator covering:
    • Null/empty condition guards
    • include_prefix matching (no match skips, empty/null applies to all, partial match)
    • Expression evaluation (true/false, JSON fields forwarded correctly to evaluator)
    • Manifest key resolution (nested path uses same directory; root-level object uses filename only)
    • Error handling (NoSuchKeyException → defer; unexpected exception → defer)
    • Multi-condition logic (all pass, first fails short-circuits, non-matching skipped)
    • findFirstMatching for retry setting lookup
  • Existing unit and integration tests updated to compile with the new constructor signatures

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@graytaylor0 graytaylor0 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change. It looks good! Couple comments on what the behavior is for some of the edge cases.

}
final String manifestKey = resolveManifestKey(objectKey, condition.getFileName());
try {
final String content = readS3ObjectAsString(bucket, manifestKey);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think we should handle the case where the manifest file is not json?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good call out. I think we can add a codec section in the processing condition config. Thoughts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think a codec is ideal here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added codec to process different formats of the object for conditions. It'd be one codec per condition so I used a Map to load all the codec.

@graytaylor0

Copy link
Copy Markdown
Member

Also one more thing. As a follow up please create a follow up PR to the documentation website here (https://github.com/opensearch-project/documentation-website).

This documentation is what users will use (https://docs.opensearch.org/latest/data-prepper/pipelines/configuration/sources/s3/#scan-bucket)

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Zhangxunmt !

}
final String manifestKey = resolveManifestKey(objectKey, condition.getFileName());
try {
final String content = readS3ObjectAsString(bucket, manifestKey);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think a codec is ideal here.

@NotEmpty
private String when;

@JsonProperty("include_prefix")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name may be a little confusing. It initially appeared to me to relate to whether or not to include the condition file. But instead it means that the condition is applicable for any object in that prefix.

It seems we may need a slightly better name to help clarify this. Perhaps "applicable" would be clearer - applicable_for_prefix.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to applicable_prefix


final List<S3ScanProcessingCondition> conditions = bucketProcessingConditionsMap.get(bucket);
if (!processingConditionEvaluator.allConditionsMet(bucket, objectKey, conditions)) {
final S3ScanProcessingCondition failedCondition = processingConditionEvaluator.findFirstMatching(objectKey, conditions);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be the failed condition, right? It seems we are just arbitrarily picking the first one.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah probably not accurate enough. refactored to use the exact failed condition to proceed.

if (!isApplicable(objectKey, condition)) {
continue;
}
final String manifestKey = resolveManifestKey(objectKey, condition.getFileName());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove reference to "manifest" since that is a Bedrock term.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to conditionObjectKey

…teness

Signed-off-by: Xun Zhang <xunzh@amazon.com>
…hen max_retry exhauses etc

Signed-off-by: Xun Zhang <xunzh@amazon.com>
final AtomicReference<Event> firstEvent = new AtomicReference<>();
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();
try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) {
codec.parse(response, record -> firstEvent.compareAndSet(null, record.getData()));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we should make sure of. Does this actually work with a single json object in a file? Last I checked the json codec only supported json arrays. Did you test again with a static json object (not ndjson)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I tested and verified it works with single json object. I uses the actual one of the real manifest file in the below format.

{"totalRecordCount":50000,"processedRecordCount":50000,"successRecordCount":30000,"errorRecordCount":0,"inputTokenCount":5300000,"outputTokenCount":0,"inputAudioSecond":0,"inputVideoSecond":0,"inputStandardImageCount":0,"inputDocumentImageCount":0,"inputTextTokenCount":0,"inputImageTokenCount":0,"inputAudioTokenCount":0,"inputVideoTokenCount":0,"outputTextTokenCount":0,"outputImageTokenCount":0}

@Zhangxunmt

Copy link
Copy Markdown
Collaborator Author

Also one more thing. As a follow up please create a follow up PR to the documentation website here (https://github.com/opensearch-project/documentation-website).

This documentation is what users will use (https://docs.opensearch.org/latest/data-prepper/pipelines/configuration/sources/s3/#scan-bucket)

The document PR is created for this new config update: opensearch-project/documentation-website#12104.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Zhangxunmt !

@dlvenable dlvenable merged commit 363af8f into opensearch-project:main Mar 19, 2026
72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants