diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java index d973da3213..3a6a8adf13 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java @@ -234,6 +234,8 @@ private void doProcessObject(final AcknowledgementSet acknowledgementSet, if (recordsWritten == 0) { LOG.warn("Failed to find any records in S3 object: s3ObjectReference={}.", s3ObjectReference); s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment(); + } else { + LOG.info("Completed reading S3 object: s3ObjectReference={}, records={}", s3ObjectReference, recordsWritten); } s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize); s3ObjectPluginMetrics.getS3ObjectEventsSummary().record(recordsWritten); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 674fb21e2f..a41591d88b 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -291,8 +291,12 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } if (result == true) { final boolean successfullyDeletedAllMessages = deleteSqsMessages(waitingForAcknowledgements); - if (successfullyDeletedAllMessages && s3SourceConfig.isDeleteS3ObjectsOnRead()) { - deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments); + if (successfullyDeletedAllMessages) { + LOG.info("Deleted SQS message for S3 object s3://{}/{}, sqsMessageId={}", + parsedMessage.getBucketName(), parsedMessage.getObjectKey(), parsedMessage.getMessage().messageId()); + if (s3SourceConfig.isDeleteS3ObjectsOnRead()) { + deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments); + } } } }, @@ -392,10 +396,12 @@ private boolean deleteSqsMessages(final List del if (deleteMessageBatchResponse.hasSuccessful()) { final int deletedMessagesCount = deleteMessageBatchResponse.successful().size(); if (deletedMessagesCount > 0) { - final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream() - .map(DeleteMessageBatchResultEntry::id) - .collect(Collectors.joining(", ")); - LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages); + if (!endToEndAcknowledgementsEnabled) { + final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream() + .map(DeleteMessageBatchResultEntry::id) + .collect(Collectors.joining(", ")); + LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages); + } sqsMessagesDeletedCounter.increment(deletedMessagesCount); } }