Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void start(Buffer<Record<Event>> buffer) {

@Override
protected LeaderProgressState createLeaderProgressState() {
return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackHours());
Instant lastPollTime = Instant.now();
return new DimensionalTimeSliceLeaderProgressState(lastPollTime, office365SourceConfig.getLookBackDuration(lastPollTime));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;

import java.time.Duration;
import java.time.Instant;

/**
* Configuration class for Office 365 source plugin.
Expand Down Expand Up @@ -60,15 +61,16 @@ public class Office365SourceConfig implements CrawlerSourceConfig {
private Duration range;

/**
* Gets the look back range as hours for compatibility with existing crawler framework.
* Gets the look back duration as an Instant representing the start time for historical data collection.
* This method supports minute-level granularity for historical pulls.
*
* @return the number of hours to look back, or 0 if no range is specified
* @return the Instant representing how far back to look, or current time if no range is specified
*/
public int getLookBackHours() {
if (range == null || range.toHours() <= 0) {
return 0;
public Instant getLookBackDuration(Instant lastPollTime) {
if (range == null || range.isZero() || range.isNegative()) {
return lastPollTime;
}
return (int) range.toHours();
return lastPollTime.minus(range);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,9 @@ public AuditLogsResponse searchAuditLogs(final String logType,
return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri);
}

// Adjust start time based on configured lookback hours
Instant adjustedStartTime = startTime;
Instant lookBackHoursAgo = Instant.now().minus(Duration.ofHours(office365SourceConfig.getLookBackHours()));
if (startTime.isBefore(lookBackHoursAgo) && lookBackHoursAgo.isBefore(endTime)) {
adjustedStartTime = lookBackHoursAgo;
}
// Adjust start time based on configured lookback period (supports Instant-based granularity)
Instant lookBackStartTime = office365SourceConfig.getLookBackDuration(Instant.now());
Instant adjustedStartTime = office365SourceConfig.getAdjustedStartTime(startTime, endTime, lookBackStartTime);

AuditLogsResponse response =
office365RestClient.searchAuditLogs(logType, adjustedStartTime, endTime, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -79,7 +80,8 @@ void testGetters() {
void testDefaultValues() {
assertFalse(config.isAcknowledgments());
assertEquals(4, config.getNumberOfWorkers());
assertEquals(0, config.getLookBackHours());
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
assertNotNull(lookBackDuration);
}

@Test
Expand All @@ -99,7 +101,77 @@ void testNegativeDurationRange() throws Exception {
Duration negativeDuration = Duration.ofDays(-1);
setField(config, "range", negativeDuration);

assertEquals(0, config.getLookBackHours());
Instant lookBackDuration = config.getLookBackDuration(Instant.now());
assertNotNull(lookBackDuration);
}

@Test
void testGetLookBackDuration_withMinuteRange() throws Exception {
Duration fifteenMinutes = Duration.ofMinutes(15);
setField(config, "range", fifteenMinutes);

Instant now = Instant.now();
Instant lookBackDuration = config.getLookBackDuration(now);
// Verify the duration is approximately 15 minutes before now (within 1 second tolerance)
Duration actualDuration = Duration.between(lookBackDuration, now);
assertEquals(15, actualDuration.toMinutes());
}

@Test
void testGetLookBackDuration_with30MinuteRange() throws Exception {
Duration thirtyMinutes = Duration.ofMinutes(30);
setField(config, "range", thirtyMinutes);

Instant now = Instant.now();
Instant lookBackDuration = config.getLookBackDuration(now);
// Verify the duration is approximately 30 minutes before now
Duration actualDuration = Duration.between(lookBackDuration, now);
assertEquals(30, actualDuration.toMinutes());
}

@Test
void testGetLookBackDuration_with45MinuteRange() throws Exception {
Duration fortyFiveMinutes = Duration.ofMinutes(45);
setField(config, "range", fortyFiveMinutes);

Instant now = Instant.now();
Instant lookBackDuration = config.getLookBackDuration(now);
// Verify the duration is approximately 45 minutes before now
Duration actualDuration = Duration.between(lookBackDuration, now);
assertEquals(45, actualDuration.toMinutes());
}

@Test
void testGetLookBackDuration_withHourRange() throws Exception {
Duration twoHours = Duration.ofHours(2);
setField(config, "range", twoHours);

Instant now = Instant.now();
Instant lookBackDuration = config.getLookBackDuration(now);
Duration actualDuration = Duration.between(lookBackDuration, now);
assertEquals(2, actualDuration.toHours());
assertEquals(120, actualDuration.toMinutes());
}

@Test
void testGetLookBackDuration_withDayRange() throws Exception {
Duration oneDay = Duration.ofDays(1);
setField(config, "range", oneDay);

Instant now = Instant.now();
Instant lookBackDuration = config.getLookBackDuration(now);
Duration actualDuration = Duration.between(lookBackDuration, now);
assertEquals(24, actualDuration.toHours());
assertEquals(1440, actualDuration.toMinutes());
}

@Test
void testGetLookBackDuration_withZeroRange() throws Exception {
Duration zeroDuration = Duration.ZERO;
setField(config, "range", zeroDuration);

Instant lookBackDuration = config.getLookBackDuration(Instant.now());
assertNotNull(lookBackDuration);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -58,6 +59,8 @@ class Office365ServiceTest {
@BeforeEach
void setUp() {
office365Service = new Office365Service(sourceConfig, office365RestClient, pluginMetrics);
lenient().when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(Instant.now().minus(Duration.ofDays(365)));
lenient().doCallRealMethod().when(sourceConfig).getAdjustedStartTime(any(Instant.class), any(Instant.class), any(Instant.class));
}

@Test
Expand Down Expand Up @@ -184,7 +187,7 @@ void testSearchAuditLogsError() {
Instant endTime = Instant.now();
String logType = "Exchange";

when(office365RestClient.searchAuditLogs(
lenient().when(office365RestClient.searchAuditLogs(
any(), any(), any(), any()
)).thenThrow(new RuntimeException("API Error"));

Expand Down Expand Up @@ -287,9 +290,9 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() {
Instant endTime = startTime.plus(Duration.ofHours(1));

String logType = "Exchange";
int lookBackHours = 96; // Configure 4 days range limit

when(sourceConfig.getLookBackHours()).thenReturn(lookBackHours);
long lookBackMinutes = 96 * 60; // Configure 4 days range limit
Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes));
when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration);
when(office365RestClient.searchAuditLogs(
any(String.class),
any(Instant.class),
Expand All @@ -315,6 +318,44 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() {
"Adjusted start time should be exactly 96 hours ago");
}

@Test
void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() {
Clock fixedClock = Clock.fixed(Instant.parse("2025-11-09T21:30:00.00Z"), ZoneOffset.UTC);
Instant now = Instant.now(fixedClock);

Instant startTime = now.minus(Duration.ofMinutes(45));
Instant endTime = startTime.plus(Duration.ofMinutes(15));

String logType = "Exchange";
long lookBackMinutes = 30L;
Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes));

when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration);
when(office365RestClient.searchAuditLogs(
any(String.class),
any(Instant.class),
any(Instant.class),
any()
)).thenReturn(new AuditLogsResponse(new ArrayList<>(), null));

office365Service.searchAuditLogs(logType, startTime, endTime, null);

// Capture the actual start time that was passed to the REST client
ArgumentCaptor<Instant> startTimeCaptor = ArgumentCaptor.forClass(Instant.class);
verify(office365RestClient).searchAuditLogs(
eq(logType),
startTimeCaptor.capture(),
eq(endTime),
isNull()
);

// Verify that the service adjusted the start time correctly:
Instant capturedStartTime = startTimeCaptor.getValue();
Duration actualLookback = Duration.between(capturedStartTime, now);
assertEquals(45, actualLookback.toMinutes(),
"Adjusted start time should be exactly 30 minutes ago");
Comment on lines +355 to +356
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion checks for 45 minutes but the message says 30 minutes. Which one is correct?

}

private Map<String, Object> createTestItem(String contentId, Instant contentCreated) {
Map<String, Object> item = new HashMap<>();
item.put("contentId", contentId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import java.time.Duration;
import java.time.Instant;

/**
* Marker interface to all the SAAS connectors configuration
Expand Down Expand Up @@ -55,4 +56,24 @@ default Duration getDurationToDelayRetry() {
default Duration getLeaseInterval() {
return Duration.ofMinutes(1);
}

/**
* Adjusts the start time based on the configured lookback boundary.
* When the requested start time is before the lookback boundary and the lookback boundary
* falls within the time window, the start time is adjusted to the lookback boundary to
* respect API limitations (e.g., Office 365 API's maximum lookback period).
*
* @param startTime the requested start time for the query
* @param endTime the end time for the query
* @param lookBackStartTime the earliest time to query from (e.g., from getLookBackDuration(referenceTime))
* @return the adjusted start time, or the original start time if no adjustment is needed
*/
default Instant getAdjustedStartTime(final Instant startTime,
final Instant endTime,
final Instant lookBackStartTime) {
if (startTime.isBefore(lookBackStartTime) && lookBackStartTime.isBefore(endTime)) {
return lookBackStartTime;
}
return startTime;
}
}
Loading
Loading