Skip to content

Commit d45c911

Browse files
committed
Changed return type of getLookBackMinutes to Instant
Signed-off-by: enugraju <enugraju@amazon.com>
1 parent b7aca29 commit d45c911

9 files changed

Lines changed: 169 additions & 154 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void start(Buffer<Record<Event>> buffer) {
8686

8787
@Override
8888
protected LeaderProgressState createLeaderProgressState() {
89-
return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackMinutes());
89+
Instant lastPollTime = Instant.now();
90+
return new DimensionalTimeSliceLeaderProgressState(lastPollTime, office365SourceConfig.getLookBackDuration(lastPollTime));
9091
}
9192

9293
@Override

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
1919

2020
import java.time.Duration;
21+
import java.time.Instant;
2122

2223
/**
2324
* Configuration class for Office 365 source plugin.
@@ -60,30 +61,16 @@ public class Office365SourceConfig implements CrawlerSourceConfig {
6061
private Duration range;
6162

6263
/**
63-
* Gets the look back range as minutes for the crawler framework.
64+
* Gets the look back duration as an Instant representing the start time for historical data collection.
6465
* This method supports minute-level granularity for historical pulls.
6566
*
66-
* @return the number of minutes to look back, or 0 if no range is specified
67+
* @return the Instant representing how far back to look, or current time if no range is specified
6768
*/
68-
public long getLookBackMinutes() {
69+
public Instant getLookBackDuration(Instant lastPollTime) {
6970
if (range == null || range.isZero() || range.isNegative()) {
70-
return 0;
71+
return lastPollTime;
7172
}
72-
return range.toMinutes();
73-
}
74-
75-
/**
76-
* Gets the look back range as hours for compatibility with existing crawler framework.
77-
*
78-
* @return the number of hours to look back, or 0 if no range is specified
79-
* @deprecated Use {@link #getLookBackMinutes()} for minute-level granularity support
80-
*/
81-
@Deprecated
82-
public int getLookBackHours() {
83-
if (range == null || range.toHours() <= 0) {
84-
return 0;
85-
}
86-
return (int) range.toHours();
73+
return lastPollTime.minus(range);
8774
}
8875

8976
@Override

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public AuditLogsResponse searchAuditLogs(final String logType,
6363
return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri);
6464
}
6565

66-
// Adjust start time based on configured lookback period (supports minute-level granularity)
66+
// Adjust start time based on configured lookback period (supports Instant-based granularity)
6767
Instant adjustedStartTime = startTime;
68-
Instant lookBackTimeAgo = Instant.now().minus(Duration.ofMinutes(office365SourceConfig.getLookBackMinutes()));
69-
if (startTime.isBefore(lookBackTimeAgo) && lookBackTimeAgo.isBefore(endTime)) {
70-
adjustedStartTime = lookBackTimeAgo;
68+
Instant lookBackDuration = office365SourceConfig.getLookBackDuration(Instant.now());
69+
if (startTime.isBefore(lookBackDuration) && lookBackDuration.isBefore(endTime)) {
70+
adjustedStartTime = lookBackDuration;
7171
}
7272

7373
AuditLogsResponse response =

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.lang.reflect.Field;
2222
import java.time.Duration;
23+
import java.time.Instant;
2324

2425
import static org.junit.jupiter.api.Assertions.assertEquals;
2526
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,7 +80,8 @@ void testGetters() {
7980
void testDefaultValues() {
8081
assertFalse(config.isAcknowledgments());
8182
assertEquals(4, config.getNumberOfWorkers());
82-
assertEquals(0, config.getLookBackMinutes());
83+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
84+
assertNotNull(lookBackDuration);
8385
}
8486

8587
@Test
@@ -99,67 +101,77 @@ void testNegativeDurationRange() throws Exception {
99101
Duration negativeDuration = Duration.ofDays(-1);
100102
setField(config, "range", negativeDuration);
101103

102-
assertEquals(0, config.getLookBackMinutes());
104+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
105+
assertNotNull(lookBackDuration);
103106
}
104107

105108
@Test
106-
void testGetLookBackMinutes_withMinuteRange() throws Exception {
109+
void testGetLookBackDuration_withMinuteRange() throws Exception {
107110
Duration fifteenMinutes = Duration.ofMinutes(15);
108111
setField(config, "range", fifteenMinutes);
109112

110-
// getLookBackHours should return 0 for sub-hour range
111-
assertEquals(0, config.getLookBackHours());
112-
// getLookBackMinutes should return 15
113-
assertEquals(15, config.getLookBackMinutes());
113+
Instant now = Instant.now();
114+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
115+
// Verify the duration is approximately 15 minutes before now (within 1 second tolerance)
116+
Duration actualDuration = Duration.between(lookBackDuration, now);
117+
assertEquals(15, actualDuration.toMinutes());
114118
}
115119

116120
@Test
117-
void testGetLookBackMinutes_with30MinuteRange() throws Exception {
121+
void testGetLookBackDuration_with30MinuteRange() throws Exception {
118122
Duration thirtyMinutes = Duration.ofMinutes(30);
119123
setField(config, "range", thirtyMinutes);
120124

121-
// getLookBackHours should return 0 for sub-hour range
122-
assertEquals(0, config.getLookBackHours());
123-
// getLookBackMinutes should return 30
124-
assertEquals(30, config.getLookBackMinutes());
125+
Instant now = Instant.now();
126+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
127+
// Verify the duration is approximately 30 minutes before now
128+
Duration actualDuration = Duration.between(lookBackDuration, now);
129+
assertEquals(30, actualDuration.toMinutes());
125130
}
126131

127132
@Test
128-
void testGetLookBackMinutes_with45MinuteRange() throws Exception {
133+
void testGetLookBackDuration_with45MinuteRange() throws Exception {
129134
Duration fortyFiveMinutes = Duration.ofMinutes(45);
130135
setField(config, "range", fortyFiveMinutes);
131136

132-
// getLookBackHours should return 0 for sub-hour range
133-
assertEquals(0, config.getLookBackHours());
134-
// getLookBackMinutes should return 45
135-
assertEquals(45, config.getLookBackMinutes());
137+
Instant now = Instant.now();
138+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
139+
// Verify the duration is approximately 45 minutes before now
140+
Duration actualDuration = Duration.between(lookBackDuration, now);
141+
assertEquals(45, actualDuration.toMinutes());
136142
}
137143

138144
@Test
139-
void testGetLookBackMinutes_withHourRange() throws Exception {
145+
void testGetLookBackDuration_withHourRange() throws Exception {
140146
Duration twoHours = Duration.ofHours(2);
141147
setField(config, "range", twoHours);
142148

143-
assertEquals(2, config.getLookBackHours());
144-
assertEquals(120, config.getLookBackMinutes());
149+
Instant now = Instant.now();
150+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
151+
Duration actualDuration = Duration.between(lookBackDuration, now);
152+
assertEquals(2, actualDuration.toHours());
153+
assertEquals(120, actualDuration.toMinutes());
145154
}
146155

147156
@Test
148-
void testGetLookBackMinutes_withDayRange() throws Exception {
157+
void testGetLookBackDuration_withDayRange() throws Exception {
149158
Duration oneDay = Duration.ofDays(1);
150159
setField(config, "range", oneDay);
151160

152-
assertEquals(24, config.getLookBackHours());
153-
assertEquals(1440, config.getLookBackMinutes());
161+
Instant now = Instant.now();
162+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
163+
Duration actualDuration = Duration.between(lookBackDuration, now);
164+
assertEquals(24, actualDuration.toHours());
165+
assertEquals(1440, actualDuration.toMinutes());
154166
}
155167

156168
@Test
157-
void testGetLookBackMinutes_withZeroRange() throws Exception {
169+
void testGetLookBackDuration_withZeroRange() throws Exception {
158170
Duration zeroDuration = Duration.ZERO;
159171
setField(config, "range", zeroDuration);
160172

161-
assertEquals(0, config.getLookBackHours());
162-
assertEquals(0, config.getLookBackMinutes());
173+
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
174+
assertNotNull(lookBackDuration);
163175
}
164176

165177
@Test

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.mockito.ArgumentMatchers.anyString;
4141
import static org.mockito.ArgumentMatchers.eq;
4242
import static org.mockito.ArgumentMatchers.isNull;
43+
import static org.mockito.Mockito.lenient;
4344
import static org.mockito.Mockito.verify;
4445
import static org.mockito.Mockito.when;
4546

@@ -58,6 +59,7 @@ class Office365ServiceTest {
5859
@BeforeEach
5960
void setUp() {
6061
office365Service = new Office365Service(sourceConfig, office365RestClient, pluginMetrics);
62+
lenient().when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(Instant.now().minus(Duration.ofDays(365)));
6163
}
6264

6365
@Test
@@ -184,7 +186,7 @@ void testSearchAuditLogsError() {
184186
Instant endTime = Instant.now();
185187
String logType = "Exchange";
186188

187-
when(office365RestClient.searchAuditLogs(
189+
lenient().when(office365RestClient.searchAuditLogs(
188190
any(), any(), any(), any()
189191
)).thenThrow(new RuntimeException("API Error"));
190192

@@ -288,8 +290,8 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() {
288290

289291
String logType = "Exchange";
290292
long lookBackMinutes = 96 * 60; // Configure 4 days range limit
291-
292-
when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes);
293+
Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes));
294+
when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration);
293295
when(office365RestClient.searchAuditLogs(
294296
any(String.class),
295297
any(Instant.class),
@@ -325,8 +327,9 @@ void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() {
325327

326328
String logType = "Exchange";
327329
long lookBackMinutes = 30L;
330+
Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes));
328331

329-
when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes);
332+
when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration);
330333
when(office365RestClient.searchAuditLogs(
331334
any(String.class),
332335
any(Instant.class),

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ private Instant createPartitions(LeaderPartition leaderPartition,
100100
DimensionalTimeSliceLeaderProgressState leaderProgressState =
101101
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
102102

103-
if (leaderProgressState.getRemainingMinutes() == 0) {
103+
Instant remainingDuration = leaderProgressState.getRemainingDuration();
104+
Instant lastPollTime = leaderProgressState.getLastPollTime();
105+
if (remainingDuration.equals(lastPollTime)) {
104106
return createPartitionsForIncrementalSync(leaderPartition, coordinator);
105107
} else {
106108
return createPartitionsForHistoricalPull(leaderPartition, coordinator);
@@ -116,9 +118,10 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
116118
EnhancedSourceCoordinator coordinator) {
117119
DimensionalTimeSliceLeaderProgressState leaderProgressState =
118120
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
119-
long remainingMinutes = leaderProgressState.getRemainingMinutes();
120121
Instant initialTime = leaderProgressState.getLastPollTime();
121122
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
123+
Instant remainingDuration = leaderProgressState.getRemainingDuration();
124+
long remainingMinutes = Duration.between(remainingDuration, initialTime).toMinutes();
122125

123126
// For sub-hour time ranges (less than 60 minutes), create a single partition
124127
if (remainingMinutes < MINUTES_PER_HOUR) {
@@ -133,7 +136,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
133136
}
134137

135138
createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator);
136-
updateLeaderProgressState(leaderPartition, 0, endTime, coordinator);
139+
updateLeaderProgressState(leaderPartition, endTime, coordinator);
137140
return endTime;
138141
}
139142

@@ -164,7 +167,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio
164167
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestModifiedTime, coordinator);
165168
}
166169

167-
updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
170+
updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator);
168171

169172
return latestModifiedTime;
170173
}
@@ -183,7 +186,7 @@ private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartiti
183186
// Create one partition from lastPollTime to latestModifiedTime for each type
184187
createWorkerPartitionsForDimensionTypes(lastPollTime, latestModifiedTime, coordinator);
185188

186-
updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
189+
updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator);
187190
return latestModifiedTime;
188191
}
189192

@@ -205,16 +208,15 @@ void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime,
205208
}
206209

207210
/**
208-
* Updates the leader progress state with the latest poll timestamp and remaining minutes.
211+
* Updates the leader progress state with the latest poll timestamp and remaining duration.
209212
* This method also persists the updated state in the source coordinator.
210213
*/
211214
private void updateLeaderProgressState(LeaderPartition leaderPartition,
212-
long remainingMinutes,
213215
Instant updatedPollTime,
214216
EnhancedSourceCoordinator coordinator) {
215217
DimensionalTimeSliceLeaderProgressState state =
216218
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
217-
state.setRemainingMinutes(remainingMinutes);
219+
state.setRemainingDuration(updatedPollTime);
218220
state.setLastPollTime(updatedPollTime);
219221
leaderPartition.setLeaderProgressState(state);
220222
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);

0 commit comments

Comments
 (0)