Skip to content

Commit a40f66a

Browse files
committed
Update microsoft-365 source pagination logic to not skip time range
**Description** This commit updates microsoft-365 source pagination to not skip events in certain cases. PaginationCrawler saves Item lastModifiedAt as next poll attempt's startTime in the coordinator table. Currently m365 source sets Instant.now as lastModifiedAt(nextPollAttemptTime), which could skip new arrived events between current last event's contentCreated time and current timestamp. The lastModifiedAt is updated to be contentCreated time(original implementation) when there is next page, and eventTime+1ms when there is no new page. This can ensure no missing event in any scenario as well as no duplicate event in common scenario. Signed-off-by: Wenjie Yao <wjyao@amazon.com>
1 parent 39ef0c1 commit a40f66a

1 file changed

Lines changed: 10 additions & 4 deletions

File tree

  • data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,8 @@ private void searchForNewLogs(final Instant timestamp,
108108
break;
109109
}
110110

111-
addItemsToQueue(response.getItems(), contentType, itemInfoQueue);
112111
nextPageUri = response.getNextPageUri();
113-
112+
addItemsToQueue(response.getItems(), contentType, itemInfoQueue, nextPageUri == null);
114113
} while (nextPageUri != null);
115114
} catch (Exception e) {
116115
log.error(NOISY, "Failed to fetch logs for time window {} to {} for content type {}. Will retry this window.",
@@ -140,15 +139,22 @@ private void searchForNewLogs(final Instant timestamp,
140139

141140
private void addItemsToQueue(final List<Map<String, Object>> items,
142141
final String contentType,
143-
final Queue<ItemInfo> itemInfoQueue) {
142+
final Queue<ItemInfo> itemInfoQueue,
143+
final boolean lastPage) {
144144
items.forEach(item -> {
145+
Instant eventTime = Instant.parse((String) item.get(CONTENT_CREATED_KEY));
146+
// If last page, add 1ms offset for next poll start time to avoid duplication when all events are processed.
147+
// 1ms is m365's smallest time unit so polling data starting from next 1ms would not skip any event.
148+
// If not last page, keep nextPollAttemptStartTime to be contentCreated time so to avoid data loss in a rare scenario
149+
// where 1ms have multiple events and split by nextPageUri
150+
Instant nextPollAttemptStartTime = lastPage ? eventTime.plusMillis(1) : eventTime;
145151
ItemInfo itemInfo = Office365ItemInfo.builder()
146152
.itemId((String) item.get(CONTENT_ID_KEY))
147153
.eventTime(Instant.parse((String) item.get(CONTENT_CREATED_KEY)))
148154
.partitionKey(contentType + UUID.randomUUID())
149155
.metadata(item)
150156
.keyAttributes(Map.of(TYPE_KEY, contentType, CONTENT_URI_KEY, item.get(CONTENT_URI_KEY)))
151-
.lastModifiedAt(Instant.now()) // Used to track the time that it was imported
157+
.lastModifiedAt(nextPollAttemptStartTime)
152158
.build();
153159
itemInfoQueue.add(itemInfo);
154160
});

0 commit comments

Comments
 (0)