Skip to content

Commit 8cf5caa

Browse files
committed
Review comment fixes
Signed-off-by: enugraju <enugraju@amazon.com>
1 parent 838067c commit 8cf5caa

3 files changed

Lines changed: 59 additions & 61 deletions

File tree

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,11 @@
3737
@Named
3838
public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSliceWorkerProgressState> {
3939
private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class);
40-
// delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable
41-
// In general, newly generated events become queryable after 30 ~ 120 second
42-
protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300;
4340
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "dimensionalTimeSliceWorkerPartitionsCreated";
4441
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
4542
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
4643
private static final Duration HOUR_DURATION = Duration.ofHours(1);
47-
private static final long MINUTES_PER_HOUR = 60;
48-
private static final long WAIT_MINUTES_BEFORE_PARTITION_CREATION = WAIT_SECONDS_BEFORE_PARTITION_CREATION / 60; // 5 minutes
44+
static final Duration WAIT_BEFORE_PARTITION_CREATION = Duration.ofMinutes(5);
4945

5046
private final CrawlerClient client;
5147
private final Counter partitionsCreatedCounter;
@@ -121,16 +117,16 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
121117
DimensionalTimeSliceLeaderProgressState leaderProgressState =
122118
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
123119
Instant initialTime = leaderProgressState.getLastPollTime();
124-
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
120+
Instant latestModifiedTime = initialTime.minus(WAIT_BEFORE_PARTITION_CREATION);
125121
Instant remainingDuration = leaderProgressState.getRemainingDuration();
126122
long remainingMinutes = Duration.between(remainingDuration, initialTime).toMinutes();
127123

128124
// For sub-hour time ranges (less than 60 minutes), create a single partition
129-
if (remainingMinutes < MINUTES_PER_HOUR) {
125+
if (remainingMinutes < HOUR_DURATION.toMinutes()) {
130126
log.info("Creating partition for sub-hour historical pull: {} minutes", remainingMinutes);
131127
Instant startTime = initialTime.minus(Duration.ofMinutes(remainingMinutes));
132128
Instant endTime;
133-
if (remainingMinutes <= WAIT_MINUTES_BEFORE_PARTITION_CREATION) {
129+
if (remainingMinutes <= WAIT_BEFORE_PARTITION_CREATION.toMinutes()) {
134130
// For very small ranges, skip the 5-minute delay to create a valid partition
135131
endTime = initialTime;
136132
} else {
@@ -143,8 +139,8 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
143139
}
144140

145141
// For hour or longer time ranges, use hourly partitioning
146-
long remainingHours = remainingMinutes / MINUTES_PER_HOUR;
147-
long extraMinutes = remainingMinutes % MINUTES_PER_HOUR;
142+
long remainingHours = remainingMinutes / HOUR_DURATION.toMinutes();
143+
long extraMinutes = remainingMinutes % HOUR_DURATION.toMinutes();
148144
Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS);
149145

150146
// Create hourly partitions for complete hours
@@ -180,7 +176,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
180176
*/
181177
private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartition,
182178
EnhancedSourceCoordinator coordinator) {
183-
Instant latestModifiedTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
179+
Instant latestModifiedTime = Instant.now().minus(WAIT_BEFORE_PARTITION_CREATION);
184180
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
185181
Instant lastPollTime = leaderProgressState.getLastPollTime();
186182

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,7 @@ public DimensionalTimeSliceLeaderProgressState(
6262
*/
6363
public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int remainingHours) {
6464
this.lastPollTime = lastPollTime;
65-
long minutes = remainingHours * MINUTES_PER_HOUR;
66-
if (minutes > 0) {
67-
this.remainingDuration = lastPollTime.minus(Duration.ofMinutes(minutes));
68-
} else {
69-
this.remainingDuration = lastPollTime;
70-
}
65+
this.remainingDuration = remainingDurationFromHours(lastPollTime, remainingHours);
7166
}
7267

7368
/**
@@ -79,14 +74,19 @@ public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int r
7974
@JsonSetter("remaining_hours")
8075
public void setRemainingHours(int remainingHours) {
8176
if (this.remainingDuration == null || this.remainingDuration.equals(this.lastPollTime)) {
82-
// Only set if remainingDuration hasn't been set yet (prefer remainingDuration to hours)
83-
long minutes = remainingHours * MINUTES_PER_HOUR;
84-
if (minutes > 0 && this.lastPollTime != null) {
85-
this.remainingDuration = this.lastPollTime.minus(Duration.ofMinutes(minutes));
86-
} else if (this.lastPollTime != null) {
87-
this.remainingDuration = this.lastPollTime;
88-
}
77+
this.remainingDuration = remainingDurationFromHours(this.lastPollTime, remainingHours);
78+
}
79+
}
80+
81+
private Instant remainingDurationFromHours(Instant lastPollTime, int remainingHours) {
82+
if (lastPollTime == null) {
83+
return null;
84+
}
85+
long minutes = remainingHours * MINUTES_PER_HOUR;
86+
if (minutes > 0) {
87+
return lastPollTime.minus(Duration.ofMinutes(minutes));
8988
}
89+
return lastPollTime;
9090
}
9191

9292
/**

0 commit comments

Comments
 (0)