Skip to content

Commit ad368f2

Browse files
committed
Add minute range support to Dimensional TimeSlice crawler framework
Signed-off-by: enugraju <enugraju@amazon.com>
1 parent 92aa5fd commit ad368f2

9 files changed

Lines changed: 659 additions & 43 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void start(Buffer<Record<Event>> buffer) {
8686

8787
@Override
8888
protected LeaderProgressState createLeaderProgressState() {
89-
return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackHours());
89+
return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackMinutes());
9090
}
9191

9292
@Override

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,26 @@ public class Office365SourceConfig implements CrawlerSourceConfig {
5959
@DurationMax(days = 7, message = "Range cannot exceed 7 days due to Office 365 API limitation")
6060
private Duration range;
6161

62+
/**
63+
* Gets the look back range as minutes for the crawler framework.
64+
* This method supports minute-level granularity for historical pulls.
65+
*
66+
* @return the number of minutes to look back, or 0 if no range is specified
67+
*/
68+
public long getLookBackMinutes() {
69+
if (range == null || range.isZero() || range.isNegative()) {
70+
return 0;
71+
}
72+
return range.toMinutes();
73+
}
74+
6275
/**
6376
* Gets the look back range as hours for compatibility with existing crawler framework.
6477
*
6578
* @return the number of hours to look back, or 0 if no range is specified
79+
* @deprecated Use {@link #getLookBackMinutes()} for minute-level granularity support
6680
*/
81+
@Deprecated
6782
public int getLookBackHours() {
6883
if (range == null || range.toHours() <= 0) {
6984
return 0;

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public AuditLogsResponse searchAuditLogs(final String logType,
6363
return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri);
6464
}
6565

66-
// Adjust start time based on configured lookback hours
66+
// Adjust start time based on configured lookback period (supports minute-level granularity)
6767
Instant adjustedStartTime = startTime;
68-
Instant lookBackHoursAgo = Instant.now().minus(Duration.ofHours(office365SourceConfig.getLookBackHours()));
69-
if (startTime.isBefore(lookBackHoursAgo) && lookBackHoursAgo.isBefore(endTime)) {
70-
adjustedStartTime = lookBackHoursAgo;
68+
Instant lookBackTimeAgo = Instant.now().minus(Duration.ofMinutes(office365SourceConfig.getLookBackMinutes()));
69+
if (startTime.isBefore(lookBackTimeAgo) && lookBackTimeAgo.isBefore(endTime)) {
70+
adjustedStartTime = lookBackTimeAgo;
7171
}
7272

7373
AuditLogsResponse response =

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void testGetters() {
7979
void testDefaultValues() {
8080
assertFalse(config.isAcknowledgments());
8181
assertEquals(4, config.getNumberOfWorkers());
82-
assertEquals(0, config.getLookBackHours());
82+
assertEquals(0, config.getLookBackMinutes());
8383
}
8484

8585
@Test
@@ -99,6 +99,66 @@ void testNegativeDurationRange() throws Exception {
9999
Duration negativeDuration = Duration.ofDays(-1);
100100
setField(config, "range", negativeDuration);
101101

102+
assertEquals(0, config.getLookBackMinutes());
103+
}
104+
105+
@Test
106+
void testGetLookBackMinutes_withMinuteRange() throws Exception {
107+
Duration fifteenMinutes = Duration.ofMinutes(15);
108+
setField(config, "range", fifteenMinutes);
109+
110+
// getLookBackHours should return 0 for sub-hour range
111+
assertEquals(0, config.getLookBackHours());
112+
// getLookBackMinutes should return 15
113+
assertEquals(15, config.getLookBackMinutes());
114+
}
115+
116+
@Test
117+
void testGetLookBackMinutes_with30MinuteRange() throws Exception {
118+
Duration thirtyMinutes = Duration.ofMinutes(30);
119+
setField(config, "range", thirtyMinutes);
120+
121+
// getLookBackHours should return 0 for sub-hour range
122+
assertEquals(0, config.getLookBackHours());
123+
// getLookBackMinutes should return 30
124+
assertEquals(30, config.getLookBackMinutes());
125+
}
126+
127+
@Test
128+
void testGetLookBackMinutes_with45MinuteRange() throws Exception {
129+
Duration fortyFiveMinutes = Duration.ofMinutes(45);
130+
setField(config, "range", fortyFiveMinutes);
131+
132+
// getLookBackHours should return 0 for sub-hour range
133+
assertEquals(0, config.getLookBackHours());
134+
// getLookBackMinutes should return 45
135+
assertEquals(45, config.getLookBackMinutes());
136+
}
137+
138+
@Test
139+
void testGetLookBackMinutes_withHourRange() throws Exception {
140+
Duration twoHours = Duration.ofHours(2);
141+
setField(config, "range", twoHours);
142+
143+
assertEquals(2, config.getLookBackHours());
144+
assertEquals(120, config.getLookBackMinutes());
145+
}
146+
147+
@Test
148+
void testGetLookBackMinutes_withDayRange() throws Exception {
149+
Duration oneDay = Duration.ofDays(1);
150+
setField(config, "range", oneDay);
151+
152+
assertEquals(24, config.getLookBackHours());
153+
assertEquals(1440, config.getLookBackMinutes());
154+
}
155+
156+
@Test
157+
void testGetLookBackMinutes_withZeroRange() throws Exception {
158+
Duration zeroDuration = Duration.ZERO;
159+
setField(config, "range", zeroDuration);
160+
102161
assertEquals(0, config.getLookBackHours());
162+
assertEquals(0, config.getLookBackMinutes());
103163
}
104164
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,9 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() {
287287
Instant endTime = startTime.plus(Duration.ofHours(1));
288288

289289
String logType = "Exchange";
290-
int lookBackHours = 96; // Configure 4 days range limit
290+
long lookBackMinutes = 96 * 60; // Configure 4 days range limit
291291

292-
when(sourceConfig.getLookBackHours()).thenReturn(lookBackHours);
292+
when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes);
293293
when(office365RestClient.searchAuditLogs(
294294
any(String.class),
295295
any(Instant.class),
@@ -315,6 +315,43 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() {
315315
"Adjusted start time should be exactly 96 hours ago");
316316
}
317317

318+
@Test
319+
void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() {
320+
Clock fixedClock = Clock.fixed(Instant.parse("2025-11-09T21:30:00.00Z"), ZoneOffset.UTC);
321+
Instant now = Instant.now(fixedClock);
322+
323+
Instant startTime = now.minus(Duration.ofMinutes(45));
324+
Instant endTime = startTime.plus(Duration.ofMinutes(15));
325+
326+
String logType = "Exchange";
327+
long lookBackMinutes = 30L;
328+
329+
when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes);
330+
when(office365RestClient.searchAuditLogs(
331+
any(String.class),
332+
any(Instant.class),
333+
any(Instant.class),
334+
any()
335+
)).thenReturn(new AuditLogsResponse(new ArrayList<>(), null));
336+
337+
office365Service.searchAuditLogs(logType, startTime, endTime, null);
338+
339+
// Capture the actual start time that was passed to the REST client
340+
ArgumentCaptor<Instant> startTimeCaptor = ArgumentCaptor.forClass(Instant.class);
341+
verify(office365RestClient).searchAuditLogs(
342+
eq(logType),
343+
startTimeCaptor.capture(),
344+
eq(endTime),
345+
isNull()
346+
);
347+
348+
// Verify that the service adjusted the start time correctly:
349+
Instant capturedStartTime = startTimeCaptor.getValue();
350+
Duration actualLookback = Duration.between(capturedStartTime, now);
351+
assertEquals(45, actualLookback.toMinutes(),
352+
"Adjusted start time should be exactly 30 minutes ago");
353+
}
354+
318355
private Map<String, Object> createTestItem(String contentId, Instant contentCreated) {
319356
Map<String, Object> item = new HashMap<>();
320357
item.put("contentId", contentId);

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
/**
2929
* A crawler implementation that partitions work along two dimensions:
3030
* 1. Dimension type (e.g., log types)
31-
* 2. Time slices (hourly windows)
32-
*
31+
* 2. Time slices (configurable time windows)
3332
* This crawler supports both historical data ingestion and incremental updates,
3433
* creating separate partitions for each combination of dimension type and time window.
34+
* Supports minute-level granularity for historical pulls (e.g., PT15M, PT30M).
3535
*/
3636
@Named
3737
public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSliceWorkerProgressState> {
@@ -42,7 +42,8 @@ public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSlice
4242
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated";
4343
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
4444
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
45-
private static final Duration HOUR_DURATION = Duration.ofHours(1);
45+
private static final long MINUTES_PER_HOUR = 60;
46+
private static final long WAIT_MINUTES_BEFORE_PARTITION_CREATION = WAIT_SECONDS_BEFORE_PARTITION_CREATION / 60; // 5 minutes
4647

4748
private final CrawlerClient client;
4849
private final Counter partitionsCreatedCounter;
@@ -71,7 +72,7 @@ public void initialize(List<String> dimensionTypes) {
7172
}
7273

7374
/**
74-
* Creates partitions for the current crawl cycle. For historical pulls, creates hourly partitions
75+
* Creates partitions for the current crawl cycle. For historical pulls, creates time-based partitions
7576
* for each dimension type. For incremental sync, creates one partition per dimension type.
7677
*/
7778
@Override
@@ -96,34 +97,62 @@ private Instant createPartitions(LeaderPartition leaderPartition,
9697
DimensionalTimeSliceLeaderProgressState leaderProgressState =
9798
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
9899

99-
if (leaderProgressState.getRemainingHours() == 0) {
100+
if (leaderProgressState.getRemainingMinutes() == 0) {
100101
return createPartitionsForIncrementalSync(leaderPartition, coordinator);
101102
} else {
102103
return createPartitionsForHistoricalPull(leaderPartition, coordinator);
103104
}
104105
}
105106

106107
/**
107-
* Creates partitions for historical data pull. Creates hourly partitions
108+
* Creates partitions for historical data pull. Creates time-based partitions
108109
* for each dimension type, working backwards from the current time.
110+
* Supports both sub-hour (minute-based) and hour-based time ranges.
109111
*/
110112
private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartition,
111113
EnhancedSourceCoordinator coordinator) {
112114
DimensionalTimeSliceLeaderProgressState leaderProgressState =
113115
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
114-
int remainingHours = leaderProgressState.getRemainingHours();
116+
long remainingMinutes = leaderProgressState.getRemainingMinutes();
115117
Instant initialTime = leaderProgressState.getLastPollTime();
118+
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
119+
120+
// For sub-hour time ranges (less than 60 minutes), create a single partition
121+
if (remainingMinutes < MINUTES_PER_HOUR) {
122+
log.info("Creating partition for sub-hour historical pull: {} minutes", remainingMinutes);
123+
Instant startTime = initialTime.minus(Duration.ofMinutes(remainingMinutes));
124+
Instant endTime;
125+
if (remainingMinutes <= WAIT_MINUTES_BEFORE_PARTITION_CREATION) {
126+
// For very small ranges, skip the 5-minute delay to create a valid partition
127+
endTime = initialTime;
128+
} else {
129+
endTime = latestModifiedTime;
130+
}
131+
132+
createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator);
133+
updateLeaderProgressState(leaderPartition, 0, endTime, coordinator);
134+
return endTime;
135+
}
136+
137+
// For hour or longer time ranges, use hourly partitioning
138+
long remainingHours = remainingMinutes / MINUTES_PER_HOUR;
139+
long extraMinutes = remainingMinutes % MINUTES_PER_HOUR;
116140
Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS);
117-
for (int i = remainingHours; i > 1; i--) {
141+
142+
// Create hourly partitions for complete hours
143+
for (long i = remainingHours; i > 1; i--) {
118144
Instant startTime = latestHour.minus(Duration.ofHours(i));
119-
Instant endTime = startTime.plus(HOUR_DURATION);
145+
// For the first partition, include any extra minutes
146+
if (i == remainingHours && extraMinutes > 0) {
147+
startTime = startTime.minus(Duration.ofMinutes(extraMinutes));
148+
}
149+
Instant endTime = latestHour.minus(Duration.ofHours(i - 1));
120150

121151
createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator);
122152
}
123153

124-
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
125154
if (latestModifiedTime.isAfter(latestHour)) {
126-
// if checkpointing time is after the latest hour, creat one partition for last hour
155+
// if checkpointing time is after the latest hour, create one partition for last hour
127156
// and one from latest hour to checkpointing time
128157
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestHour, coordinator);
129158
createWorkerPartitionsForDimensionTypes(latestHour, latestModifiedTime, coordinator);
@@ -173,16 +202,16 @@ void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime,
173202
}
174203

175204
/**
176-
* Updates the leader progress state with the latest poll timestamp and remaining hours.
205+
* Updates the leader progress state with the latest poll timestamp and remaining minutes.
177206
* This method also persists the updated state in the source coordinator.
178207
*/
179208
private void updateLeaderProgressState(LeaderPartition leaderPartition,
180-
int remainingHours,
209+
long remainingMinutes,
181210
Instant updatedPollTime,
182211
EnhancedSourceCoordinator coordinator) {
183212
DimensionalTimeSliceLeaderProgressState state =
184213
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
185-
state.setRemainingHours(remainingHours);
214+
state.setRemainingMinutes(remainingMinutes);
186215
state.setLastPollTime(updatedPollTime);
187216
leaderPartition.setLeaderProgressState(state);
188217
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);

0 commit comments

Comments
 (0)