Skip to content

Commit a7b3b68

Browse files
authored
Renew partition lease in DataFileLoader to prevent reprocessing (opensearch-project#6821)
* Renew partition lease in DataFileLoader to prevent reprocessing Signed-off-by: Hai Yan <oeyh@amazon.com> * Throw exception on flush failure so scheduler can give up partition Signed-off-by: Hai Yan <oeyh@amazon.com> --------- Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 2b1acd7 commit a7b3b68

2 files changed

Lines changed: 74 additions & 1 deletion

File tree

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import java.time.Duration;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.ScheduledExecutorService;
37+
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.AtomicLong;
3639

3740
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
@@ -44,6 +47,8 @@ public class DataFileLoader implements Runnable {
4447
static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);
4548
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
4649
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
50+
static final Duration LEASE_RENEWAL_INTERVAL = Duration.ofMinutes(5);
51+
static final Duration LEASE_RENEWAL_DURATION = Duration.ofMinutes(15);
4752
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
4853
static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
4954
static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors";
@@ -114,6 +119,16 @@ public static DataFileLoader create(final DataFilePartition dataFilePartition,
114119
public void run() {
115120
LOG.info(SENSITIVE, "Start loading s3://{}/{}", bucket, objectKey);
116121

122+
final ScheduledExecutorService leaseRenewalScheduler = Executors.newSingleThreadScheduledExecutor();
123+
leaseRenewalScheduler.scheduleAtFixedRate(() -> {
124+
try {
125+
sourceCoordinator.saveProgressStateForPartition(dataFilePartition, LEASE_RENEWAL_DURATION);
126+
LOG.debug(SENSITIVE, "Successfully renewed lease for partition {}", objectKey);
127+
} catch (Exception e) {
128+
LOG.warn(SENSITIVE, "Failed to renew lease for partition {}", objectKey, e);
129+
}
130+
}, LEASE_RENEWAL_INTERVAL.toMillis(), LEASE_RENEWAL_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
131+
117132
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
118133

119134
AtomicLong eventCount = new AtomicLong();
@@ -189,6 +204,9 @@ public void run() {
189204
} catch (Exception e) {
190205
LOG.error(NOISY, "Failed to write events to buffer", e);
191206
exportRecordErrorCounter.increment(eventCount.get());
207+
throw new RuntimeException(e);
208+
} finally {
209+
leaseRenewalScheduler.shutdownNow();
192210
}
193211
}
194212

data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.parquet.avro.AvroParquetReader;
1212
import org.apache.parquet.hadoop.ParquetReader;
1313
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
1415
import org.junit.jupiter.api.extension.ExtendWith;
1516
import org.junit.jupiter.params.ParameterizedTest;
1617
import org.junit.jupiter.params.provider.EnumSource;
@@ -41,10 +42,15 @@
4142
import java.time.Duration;
4243
import java.util.Optional;
4344
import java.util.UUID;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.ScheduledExecutorService;
47+
import java.util.concurrent.TimeUnit;
4448

49+
import static org.junit.jupiter.api.Assertions.assertThrows;
4550
import static org.mockito.ArgumentMatchers.any;
4651
import static org.mockito.ArgumentMatchers.anyInt;
4752
import static org.mockito.ArgumentMatchers.anyLong;
53+
import static org.mockito.ArgumentMatchers.eq;
4854
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
4955
import static org.mockito.Mockito.doThrow;
5056
import static org.mockito.Mockito.mock;
@@ -203,7 +209,7 @@ void test_flush_failure_then_error_metric_updated(EngineType engineType) throws
203209
readerMockedStatic.when(() -> AvroParquetReader.<GenericRecord>builder(any(InputFile.class), any())).thenReturn(builder);
204210
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(any(Buffer.class), anyInt(), any(Duration.class))).thenReturn(bufferAccumulator);
205211

206-
dataFileLoader.run();
212+
assertThrows(RuntimeException.class, () -> dataFileLoader.run());
207213
}
208214

209215
verify(bufferAccumulator).add(any(Record.class));
@@ -218,6 +224,55 @@ void test_flush_failure_then_error_metric_updated(EngineType engineType) throws
218224
verify(exportRecordErrorCounter).increment(1);
219225
}
220226

227+
@Test
228+
void test_run_schedules_lease_renewal() throws Exception {
229+
final String bucket = UUID.randomUUID().toString();
230+
final String key = UUID.randomUUID().toString();
231+
when(dataFilePartition.getBucket()).thenReturn(bucket);
232+
when(dataFilePartition.getKey()).thenReturn(key);
233+
final DataFileProgressState progressState = mock(DataFileProgressState.class, RETURNS_DEEP_STUBS);
234+
when(dataFilePartition.getProgressState()).thenReturn(Optional.of(progressState));
235+
when(progressState.getEngineType()).thenReturn(EngineType.MYSQL.toString());
236+
237+
InputStream inputStream = mock(InputStream.class);
238+
when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream);
239+
240+
final String randomString = UUID.randomUUID().toString();
241+
final BaseEventBuilder<Event> eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS);
242+
final Event event = mock(Event.class);
243+
when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder);
244+
when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event);
245+
when(event.toJsonString()).thenReturn(randomString);
246+
when(recordConverter.convert(any(), any(), any(), any(), any(), any(), anyLong(), anyLong(), any(), any())).thenReturn(event);
247+
248+
AvroParquetReader.Builder<GenericRecord> builder = mock(AvroParquetReader.Builder.class);
249+
ParquetReader<GenericRecord> parquetReader = mock(ParquetReader.class);
250+
BufferAccumulator<Record<Event>> bufferAccumulator = mock(BufferAccumulator.class);
251+
when(builder.build()).thenReturn(parquetReader);
252+
when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), (GenericRecord) null);
253+
254+
try (MockedStatic<AvroParquetReader> readerMockedStatic = mockStatic(AvroParquetReader.class);
255+
MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
256+
MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
257+
258+
ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
259+
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(mockScheduler);
260+
261+
readerMockedStatic.when(() -> AvroParquetReader.<GenericRecord>builder(any(InputFile.class), any())).thenReturn(builder);
262+
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(any(Buffer.class), anyInt(), any(Duration.class))).thenReturn(bufferAccumulator);
263+
264+
DataFileLoader dataFileLoader = createObjectUnderTest();
265+
dataFileLoader.run();
266+
267+
verify(mockScheduler).scheduleAtFixedRate(
268+
any(Runnable.class),
269+
eq(DataFileLoader.LEASE_RENEWAL_INTERVAL.toMillis()),
270+
eq(DataFileLoader.LEASE_RENEWAL_INTERVAL.toMillis()),
271+
eq(TimeUnit.MILLISECONDS));
272+
verify(mockScheduler).shutdownNow();
273+
}
274+
}
275+
221276
private DataFileLoader createObjectUnderTest() {
222277
final InputCodec codec = new ParquetInputCodec(eventFactory);
223278
return DataFileLoader.create(dataFilePartition, codec, buffer, s3ObjectReader, recordConverter,

0 commit comments

Comments
 (0)