Add S3 Scan processing condition evaluator to ensure object completeness#6624
Conversation
graytaylor0
left a comment
There was a problem hiding this comment.
Thanks for making this change. It looks good! Couple comments on what the behavior is for some of the edge cases.
| } | ||
| final String manifestKey = resolveManifestKey(objectKey, condition.getFileName()); | ||
| try { | ||
| final String content = readS3ObjectAsString(bucket, manifestKey); |
There was a problem hiding this comment.
How do you think we should handle the case where the manifest file is not json?
There was a problem hiding this comment.
This is a good call out. I think we can add a codec section in the processing condition config. Thoughts?
There was a problem hiding this comment.
Yes, I think a codec is ideal here.
There was a problem hiding this comment.
Added codec to process different formats of the object for conditions. It'd be one codec per condition so I used a Map to load all the codec.
|
Also one more thing. As a follow up please create a follow up PR to the documentation website here (https://github.com/opensearch-project/documentation-website). This documentation is what users will use (https://docs.opensearch.org/latest/data-prepper/pipelines/configuration/sources/s3/#scan-bucket) |
| } | ||
| final String manifestKey = resolveManifestKey(objectKey, condition.getFileName()); | ||
| try { | ||
| final String content = readS3ObjectAsString(bucket, manifestKey); |
There was a problem hiding this comment.
Yes, I think a codec is ideal here.
| @NotEmpty | ||
| private String when; | ||
|
|
||
| @JsonProperty("include_prefix") |
There was a problem hiding this comment.
I think this name may be a little confusing. It initially appeared to me to relate to whether or not to include the condition file. But instead it means that the condition is applicable for any object in that prefix.
It seems we may need a slightly better name to help clarify this. Perhaps "applicable" would be clearer - applicable_for_prefix.
There was a problem hiding this comment.
renamed to applicable_prefix
|
|
||
| final List<S3ScanProcessingCondition> conditions = bucketProcessingConditionsMap.get(bucket); | ||
| if (!processingConditionEvaluator.allConditionsMet(bucket, objectKey, conditions)) { | ||
| final S3ScanProcessingCondition failedCondition = processingConditionEvaluator.findFirstMatching(objectKey, conditions); |
There was a problem hiding this comment.
This may not be the failed condition, right? It seems we are just arbitrarily picking the first one.
There was a problem hiding this comment.
yeah probably not accurate enough. refactored to use the exact failed condition to proceed.
| if (!isApplicable(objectKey, condition)) { | ||
| continue; | ||
| } | ||
| final String manifestKey = resolveManifestKey(objectKey, condition.getFileName()); |
There was a problem hiding this comment.
Let's remove reference to "manifest" since that is a Bedrock term.
There was a problem hiding this comment.
renamed to conditionObjectKey
…teness Signed-off-by: Xun Zhang <xunzh@amazon.com>
…hen max_retry exhauses etc Signed-off-by: Xun Zhang <xunzh@amazon.com>
| final AtomicReference<Event> firstEvent = new AtomicReference<>(); | ||
| final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); | ||
| try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) { | ||
| codec.parse(response, record -> firstEvent.compareAndSet(null, record.getData())); |
There was a problem hiding this comment.
One thing we should make sure of. Does this actually work with a single json object in a file? Last I checked the json codec only supported json arrays. Did you test again with a static json object (not ndjson)
There was a problem hiding this comment.
Yes I tested and verified it works with single json object. I uses the actual one of the real manifest file in the below format.
{"totalRecordCount":50000,"processedRecordCount":50000,"successRecordCount":30000,"errorRecordCount":0,"inputTokenCount":5300000,"outputTokenCount":0,"inputAudioSecond":0,"inputVideoSecond":0,"inputStandardImageCount":0,"inputDocumentImageCount":0,"inputTextTokenCount":0,"inputImageTokenCount":0,"inputAudioTokenCount":0,"inputVideoTokenCount":0,"outputTextTokenCount":0,"outputImageTokenCount":0}
The document PR is created for this new config update: opensearch-project/documentation-website#12104. |
Description
Problem
When using S3 scan to read the output of asynchronous batch inference jobs (e.g. Amazon Bedrock batch jobs), output files appear in S3 before the job has fully completed. There was no way to delay processing of those files until the job was actually done, leading to partial reads.
Solution
Introduces a new processing_conditions list at the S3 scan bucket level. Before processing an S3 object, the worker evaluates each applicable condition by reading a co-located manifest/sentinel file and running a Data Prepper expression against its JSON content. Processing is deferred until all conditions are satisfied.
Configuration example
For an object at s3://my-bucket/bedrock-multisource/output-multisource/job-123/result.jsonl.out, the worker will:
processing_conditions fields:
object_name │ string │ required │ Manifest file name, resolved in the same S3 directory as the object
when │ string │ required │ Data Prepper expression evaluated against manifest JSON
applicable_prefix │ list │ all objects │ Prefixes this condition applies to; absent means all objects in the bucket
retry_delay │ Duration │ PT5M │ Wait time before re-queuing when condition is not met
max_retry │ int │ 10 │ Max retry attempts before the partition is abandoned
Implementation details:
New classes:
Modified classes:
Testing:
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.