From 4e23ba7a7286fd2a5b219f3b8bb64215e8f6134f Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 5 Jun 2026 13:59:11 -0500 Subject: [PATCH] add logging to track s3 records processed per object and show which object was completed when messages are deleted Signed-off-by: Taylor Gray --- .../plugins/source/s3/S3ObjectWorker.java | 2 ++ .../plugins/source/s3/SqsWorker.java | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java index d973da3213..3a6a8adf13 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java @@ -234,6 +234,8 @@ private void doProcessObject(final AcknowledgementSet acknowledgementSet, if (recordsWritten == 0) { LOG.warn("Failed to find any records in S3 object: s3ObjectReference={}.", s3ObjectReference); s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment(); + } else { + LOG.info("Completed reading S3 object: s3ObjectReference={}, records={}", s3ObjectReference, recordsWritten); } s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize); s3ObjectPluginMetrics.getS3ObjectEventsSummary().record(recordsWritten); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 674fb21e2f..a41591d88b 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -291,8 +291,12 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } if (result == true) { final boolean successfullyDeletedAllMessages = deleteSqsMessages(waitingForAcknowledgements); - if (successfullyDeletedAllMessages && s3SourceConfig.isDeleteS3ObjectsOnRead()) { - deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments); + if (successfullyDeletedAllMessages) { + LOG.info("Deleted SQS message for S3 object s3://{}/{}, sqsMessageId={}", + parsedMessage.getBucketName(), parsedMessage.getObjectKey(), parsedMessage.getMessage().messageId()); + if (s3SourceConfig.isDeleteS3ObjectsOnRead()) { + deleteS3Objects(s3ObjectDeletionWaitingForAcknowledgments); + } } } }, @@ -392,10 +396,12 @@ private boolean deleteSqsMessages(final List del if (deleteMessageBatchResponse.hasSuccessful()) { final int deletedMessagesCount = deleteMessageBatchResponse.successful().size(); if (deletedMessagesCount > 0) { - final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream() - .map(DeleteMessageBatchResultEntry::id) - .collect(Collectors.joining(", ")); - LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages); + if (!endToEndAcknowledgementsEnabled) { + final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream() + .map(DeleteMessageBatchResultEntry::id) + .collect(Collectors.joining(", ")); + LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages); + } sqsMessagesDeletedCounter.increment(deletedMessagesCount); } }