Skip to content

Implement dimensional time slice crawler#6011

Merged
san81 merged 1 commit into
opensearch-project:mainfrom
alparish:feature/dimensional-tscrawler
Aug 27, 2025
Merged

Implement dimensional time slice crawler#6011
san81 merged 1 commit into
opensearch-project:mainfrom
alparish:feature/dimensional-tscrawler

Conversation

@alparish

Copy link
Copy Markdown
Contributor

Signed-off-by: Alekhya Parisha aparisha@amazon.com

Description

  • Added DimensionalTimeSliceCrawler to partition data by dimensionType and time
  • Replaced PaginationCrawler with DimensionalTimeSliceCrawler in Office365 source
  • Updated tests to support dimensional partitioning

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

client.executePartition(state, buffer, acknowledgementSet);
}

private void createPartitionsFordimensionTypes(LeaderPartition leaderPartition,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: capitalize Dimension

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

DimensionalTimeSliceLeaderProgressState leaderProgressState =
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
int remainingHours = leaderProgressState.getRemainingHours();
Instant nowUtc = latestModifiedTime.truncatedTo(ChronoUnit.HOURS);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use latestModifiedTime (Instant.now) instead of leaderProgressState.getLastpollTime (ref)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually confused on the original code. Why todayUTC is equal to getLastPollTime. Can we divide into it a bit.

I agree we should be consistent with TimeSliceCrawler if possible.

Instant initialDate = leaderProgressState.getLastPollTime();
Instant todayUtc = initialDate.truncatedTo(ChronoUnit.DAYS);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historical pull occurs only during the first run. Initially, lastPollTime is set to Instant.now() when the plugin starts, while latestModifiedTime is set to Instant.now() when the crawl begins. I previously used latestModifiedTime to accommodate Office 365's 7-day limit, but this approach isn't generic enough for all connectors. I'll refactor the crawler to consistently use lastPollTime and handle the 7-day limit specifically in the O365 connector.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to truncate to hour boundaries (nowUTC) as it causes unnecessary data pulls. For example, if lastPollTime is 14:37:22, truncating to 14:00:00 means we'd pull an extra 37 minutes of data. I'll use lastPollTime directly to ensure we only retrieve data from the exact time the plugin started.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to truncate for now to keep consistent with TimeSliceCrawler--it is better to truncate & pull in extra logs than have potential dataloss

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, will truncate to keep it consistent with TimeSliceCrawler

}
} catch (Office365Exception e) {

log.error(NOISY, "{} error processing audit log: {}",

@san81 san81 Aug 22, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding logType to the error log would help, I guess?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logType is already included in the outer catch block's error message, which covers the entire partition, so adding it to individual error logs would be redundant.

Instant latestModifiedTime = Instant.now();
double startCount = partitionsCreatedCounter.count();

createPartitionsFordimensionTypes(leaderPartition, coordinator, latestModifiedTime, dimensionTypes);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Naming is createPartitionsForDimensionTypes should be capitalize D

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

DimensionalTimeSliceLeaderProgressState leaderProgressState =
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
int remainingHours = leaderProgressState.getRemainingHours();
Instant nowUtc = latestModifiedTime.truncatedTo(ChronoUnit.HOURS);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually confused on the original code. Why todayUTC is equal to getLastPollTime. Can we divide into it a bit.

I agree we should be consistent with TimeSliceCrawler if possible.

Instant initialDate = leaderProgressState.getLastPollTime();
Instant todayUtc = initialDate.truncatedTo(ChronoUnit.DAYS);


return restTemplate.exchange(
url,
contentUri,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are hitting the contentUri received from external source. For security, we should validate this url before hitting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

throw new RuntimeException("Interrupted while waiting to retry time window", ie);
}
public AuditLogsResponse searchAuditLogs(final String logType, final Instant startTime, final Instant endTime, final String nextPageUri) {
try {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a null check for startTime & endTime? Example

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added null checks

@alparish alparish force-pushed the feature/dimensional-tscrawler branch from aabf54e to 243fa1a Compare August 25, 2025 20:25
Comment on lines -112 to -114
service.initializeSubscriptions();
office365Iterator.initialize(lastPollTime);
return office365Iterator;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these steps taking place now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add this initialization before, as subscriptions were likely already set up for this tenant, allowing log fetching without explicit initialization. I'll now add it to Office365Source's start() method, which is a more appropriate place since it's a one-time setup that should happen when the plugin starts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we no longer need the iterator pattern since we're now processing logs in time-based chunks rather than page by page.

Signed-off-by: Alekhya Parisha <aparisha@amazon.com>
@alparish alparish force-pushed the feature/dimensional-tscrawler branch from 243fa1a to 2f95fba Compare August 26, 2025 17:55
return new Record<>(event);
} catch (JsonProcessingException e) {
// JSON parsing errors are non-retryable as they indicate malformed data
throw new Office365Exception("Failed to parse audit log: " + logId, e, false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause infinite loop of processing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is marked as non-retryable and swallowed / sent to pipeline DLQ (when available).

@san81 san81 merged commit a6d2a53 into opensearch-project:main Aug 27, 2025
40 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants