Skip to content

Commit 4107135

Browse files
committed
add logging to track s3 records processed per object and show which object was completed when messages are deleted
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 6416154 commit 4107135

2 files changed

Lines changed: 4 additions & 0 deletions

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ private void doProcessObject(final AcknowledgementSet acknowledgementSet,
234234
if (recordsWritten == 0) {
235235
LOG.warn("Failed to find any records in S3 object: s3ObjectReference={}.", s3ObjectReference);
236236
s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment();
237+
} else {
238+
LOG.info("Completed reading S3 object: s3ObjectReference={}, records={}", s3ObjectReference, recordsWritten);
237239
}
238240
s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize);
239241
s3ObjectPluginMetrics.getS3ObjectEventsSummary().record(recordsWritten);

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
290290
parsedMessageVisibilityTimesMap.remove(parsedMessage);
291291
}
292292
if (result == true) {
293+
LOG.info("Acknowledgement received, deleting SQS message for S3 object s3://{}/{}, sqsMessageId={}",
294+
parsedMessage.getBucketName(), parsedMessage.getObjectKey(), parsedMessage.getMessage().messageId());
293295
final boolean successfullyDeletedAllMessages = deleteSqsMessages(waitingForAcknowledgements);
294296
if (successfullyDeletedAllMessages && s3SourceConfig.isDeleteS3ObjectsOnRead()) {
295297
deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments);

0 commit comments

Comments
 (0)