Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ private void searchForNewLogs(final Instant timestamp,
break;
}

addItemsToQueue(response.getItems(), contentType, itemInfoQueue);
nextPageUri = response.getNextPageUri();

addItemsToQueue(response.getItems(), contentType, itemInfoQueue, nextPageUri == null);
} while (nextPageUri != null);
} catch (Exception e) {
log.error(NOISY, "Failed to fetch logs for time window {} to {} for content type {}. Will retry this window.",
Expand Down Expand Up @@ -140,15 +139,22 @@ private void searchForNewLogs(final Instant timestamp,

private void addItemsToQueue(final List<Map<String, Object>> items,
final String contentType,
final Queue<ItemInfo> itemInfoQueue) {
final Queue<ItemInfo> itemInfoQueue,
final boolean lastPage) {
items.forEach(item -> {
Instant contentCreated = Instant.parse((String) item.get(CONTENT_CREATED_KEY));
// If last page, add 1ms offset for next poll start time to avoid duplication when all events are processed.
// 1ms is m365's smallest time unit so polling data starting from next 1ms would not skip any event.
// If not last page, keep nextPollAttemptStartTime to be contentCreated time so to avoid data loss in a rare scenario
// where 1ms have multiple events and split by nextPageUri
Instant nextPollAttemptStartTime = lastPage ? contentCreated.plusMillis(1) : contentCreated;
ItemInfo itemInfo = Office365ItemInfo.builder()
.itemId((String) item.get(CONTENT_ID_KEY))
.eventTime(Instant.parse((String) item.get(CONTENT_CREATED_KEY)))
.eventTime(contentCreated)
.partitionKey(contentType + UUID.randomUUID())
.metadata(item)
.keyAttributes(Map.of(TYPE_KEY, contentType, CONTENT_URI_KEY, item.get(CONTENT_URI_KEY)))
.lastModifiedAt(Instant.now()) // Used to track the time that it was imported
.lastModifiedAt(nextPollAttemptStartTime)
.build();
itemInfoQueue.add(itemInfo);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ void testServiceInitialization() {
@Test
void testGetOffice365EntitiesWithMultipleTimeWindows() {
// Create test data
List<Map<String, Object>> items = new ArrayList<>();
Map<String, Object> item = createTestItem();
items.add(item);
Instant contentCreated = Instant.now().minusSeconds(100);
List<Map<String, Object>> items1 = new ArrayList<>();
List<Map<String, Object>> items2 = new ArrayList<>();
items1.add(createTestItem("id1", contentCreated));
items2.add(createTestItem("id2", contentCreated));

AuditLogsResponse response = new AuditLogsResponse(items, null);
AuditLogsResponse response1 = new AuditLogsResponse(items1, "nextPageUri1");
AuditLogsResponse response2 = new AuditLogsResponse(items2, null);

// Fix the time windows - use current time as reference
Instant now = Instant.now();
Expand All @@ -88,7 +91,13 @@ void testGetOffice365EntitiesWithMultipleTimeWindows() {
any(Instant.class),
any(Instant.class),
isNull()))
.thenReturn(response);
.thenReturn(response1);
when(office365RestClient.searchAuditLogs(
anyString(),
any(Instant.class),
any(Instant.class),
eq("nextPageUri1")))
.thenReturn(response2);

Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();

Expand All @@ -112,11 +121,27 @@ void testGetOffice365EntitiesWithMultipleTimeWindows() {
.count();

// Verify the number of items
int expectedItems = Constants.CONTENT_TYPES.length * (int)distinctTimeWindows;
int expectedItems = Constants.CONTENT_TYPES.length * (int)distinctTimeWindows * 2;
assertEquals(expectedItems, itemInfoQueue.size(),
String.format("Expected %d items (%d content types * %d time windows * 1 item), but found %d",
expectedItems, Constants.CONTENT_TYPES.length, distinctTimeWindows, itemInfoQueue.size()));


Instant expectedLastModifiedAtForSecondPage = contentCreated.plusMillis(1);
for (int i = 0; i < Constants.CONTENT_TYPES.length * (int) distinctTimeWindows * 2; i++) {
ItemInfo itemInfo = itemInfoQueue.poll();
assertEquals(contentCreated, itemInfo.getEventTime());
if (i % 2 == 0) {
assertEquals("id1", itemInfo.getItemId());
assertEquals(contentCreated, itemInfo.getLastModifiedAt(),
"Expect first page's lastModifiedAt timestamp to be same as contentCreated timestamp");
} else {
assertEquals("id2", itemInfo.getItemId());
assertEquals(expectedLastModifiedAtForSecondPage, itemInfo.getLastModifiedAt(),
"Expect second page's lastModifiedAt timestamp to be 1ms after contentCreated timestamp");
}
}

// Verify we have at least 3 distinct time windows
assertTrue(distinctTimeWindows >= 3,
"Expected at least 3 distinct time windows, but found " + distinctTimeWindows);
Expand Down Expand Up @@ -153,7 +178,7 @@ void testEmptyResponse() {
@Test
void testGetOffice365EntitiesWithSevenDayLimit() {
List<Map<String, Object>> items = new ArrayList<>();
Map<String, Object> item = createTestItem();
Map<String, Object> item = createTestItem("id", Instant.now());
items.add(item);

AuditLogsResponse mockResponse = new AuditLogsResponse(items, null);
Expand Down Expand Up @@ -192,7 +217,7 @@ void testRetryBehaviorOnFailure() {

// Create successful response
List<Map<String, Object>> items = new ArrayList<>();
items.add(createTestItem());
items.add(createTestItem("id", Instant.now()));
AuditLogsResponse successResponse = new AuditLogsResponse(items, null);

// Set up mock to fail first then succeed for first content type
Expand Down Expand Up @@ -263,10 +288,10 @@ void testGetAuditLog() {
verify(office365RestClient).getAuditLog("test-id");
}

private Map<String, Object> createTestItem() {
private Map<String, Object> createTestItem(String contentId, Instant contentCreated) {
Map<String, Object> item = new HashMap<>();
item.put("contentId", "test-id");
item.put("contentCreated", Instant.now().toString());
item.put("contentId", contentId);
item.put("contentCreated", contentCreated.toString());
item.put("contentUri", "https://test.com");
return item;
}
Expand Down
Loading