Skip to content

Commit a9cb712

Browse files
Flush remaining data to S3 during shutdown (#6424)
flush remaining data to S3 during shutdown Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com>
1 parent fbdc8a0 commit a9cb712

3 files changed

Lines changed: 141 additions & 0 deletions

File tree

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,23 @@ private void doInitializeInternal() {
175175
public void doOutput(final Collection<Record<Event>> records) {
176176
s3SinkService.output(records);
177177
}
178+
179+
/**
180+
* Shutdown the S3 sink by flushing all remaining data to S3 before shutting down.
181+
* This ensures no data is lost during shutdown.
182+
*/
183+
@Override
184+
public void shutdown() {
185+
LOG.info("S3 sink shutdown initiated. Flushing all remaining data to S3.");
186+
try {
187+
// Force flush all remaining groups to S3
188+
s3SinkService.flushAllRemainingGroups();
189+
} catch (Exception e) {
190+
LOG.error("Error occurred while flushing remaining data during shutdown", e);
191+
} finally {
192+
// Call parent shutdown to stop retry thread
193+
super.shutdown();
194+
LOG.info("S3 sink shutdown completed.");
195+
}
196+
}
178197
}

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,4 +248,44 @@ private void checkAggregateThresholdsAndFlushIfNeeded(final List<CompletableFutu
248248
}
249249
}
250250
}
251+
252+
/**
253+
* Force flush all remaining S3 groups to S3. This is called during shutdown
254+
* to ensure all buffered data is uploaded before the sink shuts down.
255+
*/
256+
void flushAllRemainingGroups() {
257+
if (s3GroupManager.hasNoGroups()) {
258+
LOG.debug("No remaining groups to flush during shutdown");
259+
return;
260+
}
261+
262+
LOG.info("Shutdown initiated. Flushing {} remaining S3 groups to S3.", s3GroupManager.getNumberOfGroups());
263+
reentrantLock.lock();
264+
try {
265+
final List<CompletableFuture<?>> completableFutures = new ArrayList<>();
266+
267+
// Force flush all remaining groups
268+
for (final S3Group s3Group : s3GroupManager.getS3GroupEntries()) {
269+
LOG.info("Force flushing group with key {} containing {} events and {} bytes during shutdown",
270+
s3Group.getBuffer().getKey(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize());
271+
flushToS3IfNeeded(completableFutures, s3Group, true);
272+
numberOfObjectsForceFlushed.increment();
273+
}
274+
275+
// Wait for all uploads to complete
276+
if (!completableFutures.isEmpty()) {
277+
try {
278+
LOG.info("Waiting for {} uploads to complete during shutdown", completableFutures.size());
279+
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
280+
.thenRun(() -> LOG.info("All {} uploads completed during shutdown", completableFutures.size()))
281+
.join();
282+
LOG.info("All remaining groups have been flushed to S3 during shutdown");
283+
} catch (final Exception e) {
284+
LOG.error("Exception occurred while waiting for uploads to complete during shutdown", e);
285+
}
286+
}
287+
} finally {
288+
reentrantLock.unlock();
289+
}
290+
}
251291
}

data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,88 @@ void output_will_flush_the_largest_group_until_below_aggregate_threshold_when_ag
665665
verify(s3ObjectsForceFlushedCounter, times(2)).increment();
666666
}
667667

668+
@Test
669+
void flushAllRemainingGroups_when_no_groups_does_not_flush() {
670+
when(s3GroupManager.hasNoGroups()).thenReturn(true);
671+
672+
final S3SinkService s3SinkService = createObjectUnderTest();
673+
s3SinkService.flushAllRemainingGroups();
674+
675+
verify(s3GroupManager).hasNoGroups();
676+
verify(s3GroupManager, never()).getS3GroupEntries();
677+
verify(s3ObjectsForceFlushedCounter, never()).increment();
678+
}
679+
680+
@Test
681+
void flushAllRemainingGroups_when_groups_exist_flushes_all() throws IOException {
682+
when(s3GroupManager.hasNoGroups()).thenReturn(false);
683+
when(s3GroupManager.getNumberOfGroups()).thenReturn(2);
684+
685+
final S3Group s3Group1 = mock(S3Group.class);
686+
final S3Group s3Group2 = mock(S3Group.class);
687+
final Buffer buffer1 = mock(Buffer.class);
688+
final Buffer buffer2 = mock(Buffer.class);
689+
final OutputStream outputStream1 = mock(OutputStream.class);
690+
final OutputStream outputStream2 = mock(OutputStream.class);
691+
692+
when(s3Group1.getBuffer()).thenReturn(buffer1);
693+
when(s3Group1.getOutputCodec()).thenReturn(codec);
694+
when(buffer1.getOutputStream()).thenReturn(outputStream1);
695+
when(buffer1.getKey()).thenReturn(UUID.randomUUID().toString());
696+
when(buffer1.getEventCount()).thenReturn(3);
697+
when(buffer1.getSize()).thenReturn(100L);
698+
when(buffer1.getDuration()).thenReturn(Duration.ZERO);
699+
700+
when(s3Group2.getBuffer()).thenReturn(buffer2);
701+
when(s3Group2.getOutputCodec()).thenReturn(codec);
702+
when(buffer2.getOutputStream()).thenReturn(outputStream2);
703+
when(buffer2.getKey()).thenReturn(UUID.randomUUID().toString());
704+
when(buffer2.getEventCount()).thenReturn(5);
705+
when(buffer2.getSize()).thenReturn(200L);
706+
when(buffer2.getDuration()).thenReturn(Duration.ZERO);
707+
708+
final CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
709+
when(buffer1.flushToS3(any(Consumer.class), any(Consumer.class))).thenReturn(Optional.of(completedFuture));
710+
when(buffer2.flushToS3(any(Consumer.class), any(Consumer.class))).thenReturn(Optional.of(completedFuture));
711+
712+
when(s3GroupManager.getS3GroupEntries()).thenReturn(List.of(s3Group1, s3Group2));
713+
doNothing().when(codec).complete(any(OutputStream.class));
714+
715+
final S3SinkService s3SinkService = createObjectUnderTest();
716+
717+
try (final MockedStatic<CompletableFuture> completableFutureMockedStatic = mockStatic(CompletableFuture.class)) {
718+
final CompletableFuture<Void> mockCompletableFuture = mock(CompletableFuture.class);
719+
when(mockCompletableFuture.thenRun(any(Runnable.class))).thenReturn(mockCompletableFuture);
720+
when(mockCompletableFuture.join()).thenReturn(null);
721+
completableFutureMockedStatic.when(() -> CompletableFuture.allOf(any())).thenReturn(mockCompletableFuture);
722+
s3SinkService.flushAllRemainingGroups();
723+
}
724+
725+
verify(s3GroupManager).hasNoGroups();
726+
verify(s3GroupManager).getNumberOfGroups();
727+
verify(s3GroupManager).getS3GroupEntries();
728+
verify(s3GroupManager).removeGroup(s3Group1);
729+
verify(s3GroupManager).removeGroup(s3Group2);
730+
verify(codec, times(2)).complete(any(OutputStream.class));
731+
verify(buffer1).flushToS3(any(Consumer.class), any(Consumer.class));
732+
verify(buffer2).flushToS3(any(Consumer.class), any(Consumer.class));
733+
verify(s3ObjectsForceFlushedCounter, times(2)).increment();
734+
}
735+
736+
@Test
737+
void flushAllRemainingGroups_when_empty_groups_list_does_not_increment_force_flush_counter() {
738+
when(s3GroupManager.hasNoGroups()).thenReturn(false);
739+
when(s3GroupManager.getNumberOfGroups()).thenReturn(0);
740+
when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.emptyList());
741+
742+
final S3SinkService s3SinkService = createObjectUnderTest();
743+
s3SinkService.flushAllRemainingGroups();
744+
745+
verify(s3GroupManager).hasNoGroups();
746+
verify(s3GroupManager).getS3GroupEntries();
747+
verify(s3ObjectsForceFlushedCounter, never()).increment();
748+
}
749+
668750
private Collection<Record<Event>> generateRandomStringEventRecord() {
669751
return generateEventRecords(50);
670752
}

0 commit comments

Comments
 (0)