Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
@Named
public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSliceWorkerProgressState> {
private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class);
// delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable
// In general, newly generated events become queryable after 30 ~ 120 second
protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300;
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated";
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
Expand Down Expand Up @@ -73,10 +76,9 @@ public void initialize(List<String> dimensionTypes) {
*/
@Override
public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) {
Instant latestModifiedTime = Instant.now();
double startCount = partitionsCreatedCounter.count();

createPartitionsForDimensionTypes(leaderPartition, coordinator, latestModifiedTime, dimensionTypes);
Instant latestModifiedTime = createPartitions(leaderPartition, coordinator);

double partitionsInThisCrawl = partitionsCreatedCounter.count() - startCount;
log.info("Total partitions created in this crawl: {}", partitionsInThisCrawl);
Expand All @@ -89,82 +91,85 @@ public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buff
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
}

private void createPartitionsForDimensionTypes(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator,
Instant latestModifiedTime,
List<String> dimensionTypes) {
private Instant createPartitions(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
DimensionalTimeSliceLeaderProgressState leaderProgressState =
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();

if (leaderProgressState.getRemainingHours() == 0) {
createPartitionForIncrementalSync(leaderPartition, coordinator,
latestModifiedTime, dimensionTypes);
return createPartitionsForIncrementalSync(leaderPartition, coordinator);
} else {
createPartitionForHistoricalPull(leaderPartition, coordinator,
latestModifiedTime, dimensionTypes);
return createPartitionsForHistoricalPull(leaderPartition, coordinator);
}
}

/**
* Creates partitions for historical data pull. Creates hourly partitions
* for each dimension type, working backwards from the current time.
*/
private void createPartitionForHistoricalPull(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator,
Instant latestModifiedTime,
List<String> dimensionTypes) {
private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
DimensionalTimeSliceLeaderProgressState leaderProgressState =
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
int remainingHours = leaderProgressState.getRemainingHours();
Instant initialTime = leaderProgressState.getLastPollTime();
Instant nowUtc = initialTime.truncatedTo(ChronoUnit.HOURS);
for (int i = remainingHours; i > 0; i-- ) {
Instant startTime = nowUtc.minus(Duration.ofHours(i));;
Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS);
for (int i = remainingHours; i > 1; i--) {
Instant startTime = latestHour.minus(Duration.ofHours(i));
Instant endTime = startTime.plus(HOUR_DURATION);

for (String dimensionType : dimensionTypes) {
createWorkerPartition(startTime, endTime, dimensionType, coordinator);
}
createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator);
}

// Create final partitions from last hour to now
for (String dimensionType : dimensionTypes) {
createWorkerPartition(nowUtc, latestModifiedTime, dimensionType, coordinator);
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
if (latestModifiedTime.isAfter(latestHour)) {
// if checkpointing time is after the latest hour, creat one partition for last hour
// and one from latest hour to checkpointing time
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestHour, coordinator);
createWorkerPartitionsForDimensionTypes(latestHour, latestModifiedTime, coordinator);
} else {
// if checkpointing time is not later than the latest hour, create one partition from 1 hour ago to checkpointing time
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestModifiedTime, coordinator);
}

updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);

return latestModifiedTime;
}

/**
* Creates partitions for incremental sync. Creates one partition per dimension type
* from the last poll time to current time.
*/
private void createPartitionForIncrementalSync(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator,
Instant latestModifiedTime,
List<String> dimensionTypes) {
private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
Instant latestModifiedTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
Instant lastPollTime = leaderProgressState.getLastPollTime();

// Create one partition from lastPollTime to latestModifiedTime for each type
for (String dimensionType : dimensionTypes) {
createWorkerPartition(lastPollTime, latestModifiedTime, dimensionType, coordinator);
if (lastPollTime.isBefore(latestModifiedTime)) {
// Create one partition from lastPollTime to latestModifiedTime for each type
createWorkerPartitionsForDimensionTypes(lastPollTime, latestModifiedTime, coordinator);

updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
return latestModifiedTime;
}

updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
return lastPollTime;
}

void createWorkerPartition(Instant startTime, Instant endTime,
String dimensionType, EnhancedSourceCoordinator coordinator) {
DimensionalTimeSliceWorkerProgressState workerState = new DimensionalTimeSliceWorkerProgressState();
workerState.setPartitionCreationTime(Instant.now());
workerState.setStartTime(startTime);
workerState.setEndTime(endTime);
workerState.setDimensionType(dimensionType);

SaasSourcePartition partition = new SaasSourcePartition(workerState, LAST_UPDATED_KEY + UUID.randomUUID());
coordinator.createPartition(partition);
partitionsCreatedCounter.increment();
void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime, EnhancedSourceCoordinator coordinator) {
for (String dimensionType : dimensionTypes) {
DimensionalTimeSliceWorkerProgressState workerState = new DimensionalTimeSliceWorkerProgressState();
workerState.setPartitionCreationTime(Instant.now());
workerState.setStartTime(startTime);
workerState.setEndTime(endTime);
workerState.setDimensionType(dimensionType);

SaasSourcePartition partition = new SaasSourcePartition(workerState, LAST_UPDATED_KEY + UUID.randomUUID());
coordinator.createPartition(partition);
partitionsCreatedCounter.increment();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.List;

import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_SECONDS_BEFORE_PARTITION_CREATION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -36,6 +37,7 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -80,8 +82,8 @@ void setUp() {
}

@Test
void testCrawl_withIncrementalSync() {
Instant lastPollTime = Instant.now().minus(Duration.ofHours(1));
void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() {
Instant lastPollTime = Instant.now().minusSeconds(400);
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0);
LeaderPartition leaderPartition = new LeaderPartition(state);

Expand All @@ -105,10 +107,64 @@ void testCrawl_withIncrementalSync() {
}

@Test
void testCrawl_withHistoricalSync() {
Instant now = Instant.now().truncatedTo(ChronoUnit.HOURS);
int lookbackHours = 3;
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, lookbackHours);
void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() {
Instant lastPollTime = Instant.now().minusSeconds(10);
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0);
LeaderPartition leaderPartition = new LeaderPartition(state);

Instant latest = crawler.crawl(leaderPartition, coordinator);

assertNotNull(latest);
verify(coordinator, never()).createPartition(partitionCaptor.capture());
verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any());
verify(partitionsCreatedCounter, never()).increment();
}

@Test
void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() {
Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS);
Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1);
int lookbackHours = 2;
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours);
LeaderPartition leaderPartition = new LeaderPartition(state);

Instant latest = crawler.crawl(leaderPartition, coordinator);

assertNotNull(latest);
// Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions
int expectedPartitions = (lookbackHours) * LOG_TYPES.size();
verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture());
verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any());
verify(partitionsCreatedCounter, times(expectedPartitions)).increment();

List<SaasSourcePartition> createdPartitions = partitionCaptor.getAllValues();
assertEquals(expectedPartitions, createdPartitions.size());

// Verify first hour's partitions
for (int i = 0; i < LOG_TYPES.size(); i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime());
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
}

// Verify previous hour's partitions
for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime());
assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime());
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType());
}
}

@Test
void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() {
Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS);
Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1);
int lookbackHours = 2;
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours);
LeaderPartition leaderPartition = new LeaderPartition(state);

Instant latest = crawler.crawl(leaderPartition, coordinator);
Expand All @@ -127,29 +183,53 @@ void testCrawl_withHistoricalSync() {
for (int i = 0; i < LOG_TYPES.size(); i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(now.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
assertEquals(now.minus(Duration.ofHours(lookbackHours-1)), workerState.getEndTime());
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime());
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
}

// Verify previous hour's partitions
for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime());
assertEquals(latestHour, workerState.getEndTime());
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType());
}

// Verify latest hour's partitions
for (int i = LOG_TYPES.size() * 2; i < LOG_TYPES.size() * 3; i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(latestHour, workerState.getStartTime());
assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime());
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size() * 2), workerState.getDimensionType());
}
}

@Test
void testCreateWorkerPartition() {
void createWorkerPartitionsForDimensionTypes() {
Instant start = Instant.parse("2024-10-30T00:00:00Z");
Instant end = start.plus(Duration.ofHours(1));
String logType = "Exchange";

crawler.createWorkerPartition(start, end, logType, coordinator);
crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator);

verify(coordinator).createPartition(partitionCaptor.capture());
verify(partitionsCreatedCounter).increment();
int expectedPartitions = LOG_TYPES.size();
verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture());
verify(partitionsCreatedCounter, times(expectedPartitions)).increment();

List<SaasSourcePartition> createdPartitions = partitionCaptor.getAllValues();
assertEquals(expectedPartitions, createdPartitions.size());

SaasSourcePartition partition = partitionCaptor.getValue();
DimensionalTimeSliceWorkerProgressState state =
(DimensionalTimeSliceWorkerProgressState) partition.getProgressState().get();
assertEquals(start, state.getStartTime());
assertEquals(end, state.getEndTime());
assertEquals(logType, state.getDimensionType());
// Verify first hour's partitions
for (int i = 0; i < LOG_TYPES.size(); i++) {
DimensionalTimeSliceWorkerProgressState workerState =
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
assertEquals(start, workerState.getStartTime());
assertEquals(end, workerState.getEndTime());
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
}
}

@Test
Expand Down
Loading