Skip to content

Commit 1b2b295

Browse files
Update microsoft-365 source pagination logic to not skip time range (#5979)
**Description** This commit updates microsoft-365 source pagination to not skip events in certain cases. PaginationCrawler saves Item lastModifiedAt as next poll attempt's startTime in the coordinator table. Currently m365 source sets Instant.now as lastModifiedAt(nextPollAttemptTime), which could skip new arrived events between current last event's contentCreated time and current timestamp. The lastModifiedAt is updated to be contentCreated time(original implementation) when there is next page, and eventTime+1ms when there is no new page. This can ensure no missing event in any scenario as well as no duplicate event in common scenario. Signed-off-by: Wenjie Yao <wjyao@amazon.com> Co-authored-by: Wenjie Yao <wjyao@amazon.com>
1 parent 0a06b2a commit 1b2b295

2 files changed

Lines changed: 47 additions & 16 deletions

File tree

  • data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,8 @@ private void searchForNewLogs(final Instant timestamp,
108108
break;
109109
}
110110

111-
addItemsToQueue(response.getItems(), contentType, itemInfoQueue);
112111
nextPageUri = response.getNextPageUri();
113-
112+
addItemsToQueue(response.getItems(), contentType, itemInfoQueue, nextPageUri == null);
114113
} while (nextPageUri != null);
115114
} catch (Exception e) {
116115
log.error(NOISY, "Failed to fetch logs for time window {} to {} for content type {}. Will retry this window.",
@@ -140,15 +139,22 @@ private void searchForNewLogs(final Instant timestamp,
140139

141140
private void addItemsToQueue(final List<Map<String, Object>> items,
142141
final String contentType,
143-
final Queue<ItemInfo> itemInfoQueue) {
142+
final Queue<ItemInfo> itemInfoQueue,
143+
final boolean lastPage) {
144144
items.forEach(item -> {
145+
Instant contentCreated = Instant.parse((String) item.get(CONTENT_CREATED_KEY));
146+
// If last page, add 1ms offset for next poll start time to avoid duplication when all events are processed.
147+
// 1ms is m365's smallest time unit so polling data starting from next 1ms would not skip any event.
148+
// If not last page, keep nextPollAttemptStartTime to be contentCreated time so to avoid data loss in a rare scenario
149+
// where 1ms have multiple events and split by nextPageUri
150+
Instant nextPollAttemptStartTime = lastPage ? contentCreated.plusMillis(1) : contentCreated;
145151
ItemInfo itemInfo = Office365ItemInfo.builder()
146152
.itemId((String) item.get(CONTENT_ID_KEY))
147-
.eventTime(Instant.parse((String) item.get(CONTENT_CREATED_KEY)))
153+
.eventTime(contentCreated)
148154
.partitionKey(contentType + UUID.randomUUID())
149155
.metadata(item)
150156
.keyAttributes(Map.of(TYPE_KEY, contentType, CONTENT_URI_KEY, item.get(CONTENT_URI_KEY)))
151-
.lastModifiedAt(Instant.now()) // Used to track the time that it was imported
157+
.lastModifiedAt(nextPollAttemptStartTime)
152158
.build();
153159
itemInfoQueue.add(itemInfo);
154160
});

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,14 @@ void testServiceInitialization() {
7272
@Test
7373
void testGetOffice365EntitiesWithMultipleTimeWindows() {
7474
// Create test data
75-
List<Map<String, Object>> items = new ArrayList<>();
76-
Map<String, Object> item = createTestItem();
77-
items.add(item);
75+
Instant contentCreated = Instant.now().minusSeconds(100);
76+
List<Map<String, Object>> items1 = new ArrayList<>();
77+
List<Map<String, Object>> items2 = new ArrayList<>();
78+
items1.add(createTestItem("id1", contentCreated));
79+
items2.add(createTestItem("id2", contentCreated));
7880

79-
AuditLogsResponse response = new AuditLogsResponse(items, null);
81+
AuditLogsResponse response1 = new AuditLogsResponse(items1, "nextPageUri1");
82+
AuditLogsResponse response2 = new AuditLogsResponse(items2, null);
8083

8184
// Fix the time windows - use current time as reference
8285
Instant now = Instant.now();
@@ -88,7 +91,13 @@ void testGetOffice365EntitiesWithMultipleTimeWindows() {
8891
any(Instant.class),
8992
any(Instant.class),
9093
isNull()))
91-
.thenReturn(response);
94+
.thenReturn(response1);
95+
when(office365RestClient.searchAuditLogs(
96+
anyString(),
97+
any(Instant.class),
98+
any(Instant.class),
99+
eq("nextPageUri1")))
100+
.thenReturn(response2);
92101

93102
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
94103

@@ -112,11 +121,27 @@ void testGetOffice365EntitiesWithMultipleTimeWindows() {
112121
.count();
113122

114123
// Verify the number of items
115-
int expectedItems = Constants.CONTENT_TYPES.length * (int)distinctTimeWindows;
124+
int expectedItems = Constants.CONTENT_TYPES.length * (int)distinctTimeWindows * 2;
116125
assertEquals(expectedItems, itemInfoQueue.size(),
117126
String.format("Expected %d items (%d content types * %d time windows * 1 item), but found %d",
118127
expectedItems, Constants.CONTENT_TYPES.length, distinctTimeWindows, itemInfoQueue.size()));
119128

129+
130+
Instant expectedLastModifiedAtForSecondPage = contentCreated.plusMillis(1);
131+
for (int i = 0; i < Constants.CONTENT_TYPES.length * (int) distinctTimeWindows * 2; i++) {
132+
ItemInfo itemInfo = itemInfoQueue.poll();
133+
assertEquals(contentCreated, itemInfo.getEventTime());
134+
if (i % 2 == 0) {
135+
assertEquals("id1", itemInfo.getItemId());
136+
assertEquals(contentCreated, itemInfo.getLastModifiedAt(),
137+
"Expect first page's lastModifiedAt timestamp to be same as contentCreated timestamp");
138+
} else {
139+
assertEquals("id2", itemInfo.getItemId());
140+
assertEquals(expectedLastModifiedAtForSecondPage, itemInfo.getLastModifiedAt(),
141+
"Expect second page's lastModifiedAt timestamp to be 1ms after contentCreated timestamp");
142+
}
143+
}
144+
120145
// Verify we have at least 3 distinct time windows
121146
assertTrue(distinctTimeWindows >= 3,
122147
"Expected at least 3 distinct time windows, but found " + distinctTimeWindows);
@@ -153,7 +178,7 @@ void testEmptyResponse() {
153178
@Test
154179
void testGetOffice365EntitiesWithSevenDayLimit() {
155180
List<Map<String, Object>> items = new ArrayList<>();
156-
Map<String, Object> item = createTestItem();
181+
Map<String, Object> item = createTestItem("id", Instant.now());
157182
items.add(item);
158183

159184
AuditLogsResponse mockResponse = new AuditLogsResponse(items, null);
@@ -192,7 +217,7 @@ void testRetryBehaviorOnFailure() {
192217

193218
// Create successful response
194219
List<Map<String, Object>> items = new ArrayList<>();
195-
items.add(createTestItem());
220+
items.add(createTestItem("id", Instant.now()));
196221
AuditLogsResponse successResponse = new AuditLogsResponse(items, null);
197222

198223
// Set up mock to fail first then succeed for first content type
@@ -263,10 +288,10 @@ void testGetAuditLog() {
263288
verify(office365RestClient).getAuditLog("test-id");
264289
}
265290

266-
private Map<String, Object> createTestItem() {
291+
private Map<String, Object> createTestItem(String contentId, Instant contentCreated) {
267292
Map<String, Object> item = new HashMap<>();
268-
item.put("contentId", "test-id");
269-
item.put("contentCreated", Instant.now().toString());
293+
item.put("contentId", contentId);
294+
item.put("contentCreated", contentCreated.toString());
270295
item.put("contentUri", "https://test.com");
271296
return item;
272297
}

0 commit comments

Comments
 (0)