Skip to content

Commit 4e23ba7

Browse files
committed
add logging to track s3 records processed per object and show which object was completed when messages are deleted
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 6416154 commit 4e23ba7

2 files changed

Lines changed: 14 additions & 6 deletions

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ private void doProcessObject(final AcknowledgementSet acknowledgementSet,
234234
if (recordsWritten == 0) {
235235
LOG.warn("Failed to find any records in S3 object: s3ObjectReference={}.", s3ObjectReference);
236236
s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment();
237+
} else {
238+
LOG.info("Completed reading S3 object: s3ObjectReference={}, records={}", s3ObjectReference, recordsWritten);
237239
}
238240
s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize);
239241
s3ObjectPluginMetrics.getS3ObjectEventsSummary().record(recordsWritten);

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,12 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
291291
}
292292
if (result == true) {
293293
final boolean successfullyDeletedAllMessages = deleteSqsMessages(waitingForAcknowledgements);
294-
if (successfullyDeletedAllMessages && s3SourceConfig.isDeleteS3ObjectsOnRead()) {
295-
deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments);
294+
if (successfullyDeletedAllMessages) {
295+
LOG.info("Deleted SQS message for S3 object s3://{}/{}, sqsMessageId={}",
296+
parsedMessage.getBucketName(), parsedMessage.getObjectKey(), parsedMessage.getMessage().messageId());
297+
if (s3SourceConfig.isDeleteS3ObjectsOnRead()) {
298+
deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments);
299+
}
296300
}
297301
}
298302
},
@@ -392,10 +396,12 @@ private boolean deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> del
392396
if (deleteMessageBatchResponse.hasSuccessful()) {
393397
final int deletedMessagesCount = deleteMessageBatchResponse.successful().size();
394398
if (deletedMessagesCount > 0) {
395-
final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream()
396-
.map(DeleteMessageBatchResultEntry::id)
397-
.collect(Collectors.joining(", "));
398-
LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages);
399+
if (!endToEndAcknowledgementsEnabled) {
400+
final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream()
401+
.map(DeleteMessageBatchResultEntry::id)
402+
.collect(Collectors.joining(", "));
403+
LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages);
404+
}
399405
sqsMessagesDeletedCounter.increment(deletedMessagesCount);
400406
}
401407
}

0 commit comments

Comments
 (0)