Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import com.google.common.annotations.VisibleForTesting;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,25 +24,22 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.HttpClientErrorException;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;

import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

Expand All @@ -52,21 +49,18 @@
*/
@Slf4j
@Named
public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWorkerProgressState> {
public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> {
private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
private static final String WORKER_STATE_UPDATES = "workerStateUpdates";

private static final String CONTENT_TYPE = "contentType";
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
private static final String CONTENT_ID = "contentId";
private static final String CONTENT_URI = "contentUri";

private final Office365Service service;
private final Office365Iterator office365Iterator;
private final ExecutorService executorService;
private final Office365SourceConfig configuration;
private final Timer bufferWriteLatencyTimer;
private final Counter bufferWriteAttemptsCounter;
Expand All @@ -77,13 +71,9 @@ public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWo
private ObjectMapper objectMapper;

public Office365CrawlerClient(final Office365Service service,
final Office365Iterator office365Iterator,
final PluginExecutorServiceProvider executorServiceProvider,
final Office365SourceConfig sourceConfig,
final PluginMetrics pluginMetrics) {
this.service = service;
this.office365Iterator = office365Iterator;
this.executorService = executorServiceProvider.get();
this.configuration = sourceConfig;
this.objectMapper = new ObjectMapper();

Expand All @@ -101,125 +91,116 @@ void injectObjectMapper(final ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}


@Override
public Iterator<ItemInfo> listItems(final Instant lastPollTime) {
log.info("Starting to list Office 365 audit logs from {}", lastPollTime);

// TODO: Subscription management should be moved to a dedicated class in the future
// Currently, we initialize subscriptions in the leader partition to ensure that we're always subscribed
// to the required content type to ensure there hasn't been a subscription change
service.initializeSubscriptions();
office365Iterator.initialize(lastPollTime);
return office365Iterator;
Comment on lines -112 to -114

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these steps taking place now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add this initialization before, as subscriptions were likely already set up for this tenant, allowing log fetching without explicit initialization. I'll now add it to Office365Source's start() method, which is a more appropriate place since it's a one-time setup that should happen when the plugin starts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we no longer need the iterator pattern since we're now processing logs in time-based chunks rather than page by page.

return null;
}

@Override
public void executePartition(final PaginationCrawlerWorkerProgressState state,
public void executePartition(final DimensionalTimeSliceWorkerProgressState state,
final Buffer<Record<Event>> buffer,
final AcknowledgementSet acknowledgementSet) {
// Process a batch of audit log IDs and convert them to records
// If any record fails to process, the entire batch will be retried
log.info("Starting to execute partition with {} log(s)", state.getItemIds().size());
List<String> itemIds = state.getItemIds();
final Instant startTime = state.getStartTime();
final Instant endTime = state.getEndTime();
final String logType = state.getDimensionType();

// Process each audit log ID in the batch
List<Record<Event>> records = itemIds.stream()
.map(id -> {
try {
return processAuditLog(id);
} catch (Office365Exception e) {
log.error(NOISY, "{} error processing audit log: {}",
e.isRetryable() ? "Retryable" : "Non-retryable", id, e);
if (e.isRetryable()) {
throw new RuntimeException("Retryable error processing audit log: " + id, e);
} else {
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", id, e);
return null;
try {
String nextPageUri = null;
List<Record<Event>> records = new ArrayList<>();

do {
AuditLogsResponse response =
service.searchAuditLogs(logType, startTime, endTime, nextPageUri);

if (response.getItems() != null && !response.getItems().isEmpty()) {
for (Map<String, Object> metadata : response.getItems()) {
String logId = (String) metadata.get(CONTENT_ID);
try {
Record<Event> record = processAuditLog(metadata);
if (record != null) {
records.add(record);
}
} catch (Office365Exception e) {

log.error(NOISY, "{} error processing audit log: {}",

@san81 san81 Aug 22, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding logType to the error log would help, I guess?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logType is already included in the outer catch block's error message, which covers the entire partition, so adding it to individual error logs would be redundant.

e.isRetryable() ? "Retryable" : "Non-retryable", logId, e);
if (e.isRetryable()) {
throw new RuntimeException("Retryable error processing audit log: " + logId, e);
} else {
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e);
}
} catch (Exception e) {
// Unexpected errors are treated as retryable to be safe
log.error(NOISY, "Unexpected error processing audit log: {}", logId, e);
throw new RuntimeException("Unexpected error processing audit log: " + logId, e);
}
} catch (Exception e) {
// Unexpected errors are treated as retryable to be safe
log.error(NOISY, "Unexpected error processing audit log: {}", id, e);
throw new RuntimeException("Unexpected error processing audit log: " + id, e);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

bufferWriteLatencyTimer.record(() -> {
try {
writeRecordsWithRetry(records, buffer, acknowledgementSet, state);
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw e;
}
});
}
}

private Record<Event> processAuditLog(String id) {
try {
String auditLog = service.getAuditLog(id);
nextPageUri = response.getNextPageUri();
} while (nextPageUri != null);

// Handle HTTP errors in service layer
if (auditLog == null) {
throw new Office365Exception("Received null audit log for ID: " + id, false);
}
bufferWriteLatencyTimer.record(() -> {
try {
writeRecordsWithRetry(records, buffer, acknowledgementSet);
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw e;
}
});

try {
JsonNode jsonNode = objectMapper.readTree(auditLog);
Map<String, Object> data;
} catch (Exception e) {
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
logType, startTime, endTime, e);
throw e;
}
}

// Office 365 API sometimes returns an array with a single item
// and sometimes returns a single object directly
if (jsonNode.isArray() && !jsonNode.isEmpty()) {
data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {});
} else {
data = objectMapper.readValue(auditLog, new TypeReference<Map<String, Object>>() {});
}
private Record<Event> processAuditLog(Map<String, Object> metadata) throws Office365Exception {
String contentUri = (String) metadata.get(CONTENT_URI);
if (contentUri == null) {
throw new Office365Exception("Missing contentUri in metadata", false);
}

// "Workload" is an Office 365 specific field that indicates the source of the audit log
String contentType = (String) data.get("Workload");
if (contentType == null) {
throw new Office365Exception("Missing Workload field in audit log: " + id, false);
}
String logContent = service.getAuditLog(contentUri);
if (logContent == null) {
throw new Office365Exception("Received null log content for URI: " + contentUri, false);
}
String logId = (String) metadata.get(CONTENT_ID);

Event event = JacksonEvent.builder()
.withEventType(EventType.LOG.toString())
.withData(data)
.build();
event.getMetadata().setAttribute(CONTENT_TYPE, contentType);
return new Record<>(event);
} catch (JsonProcessingException e) {
// JSON parsing errors are non-retryable as they indicate malformed data
throw new Office365Exception("Failed to parse audit log: " + id, e, false);
try {
JsonNode jsonNode = objectMapper.readTree(logContent);
Map<String, Object> data;

// Office 365 API sometimes returns an array with a single item
// and sometimes returns a single object directly
if (jsonNode.isArray() && !jsonNode.isEmpty()) {
data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {});
} else {
data = objectMapper.readValue(logContent, new TypeReference<Map<String, Object>>() {});
}
} catch (HttpClientErrorException e) {
switch (e.getStatusCode()) {
case UNAUTHORIZED:
case FORBIDDEN:
// Auth errors might be temporary due to token expiration
throw new Office365Exception("Authentication failed while fetching audit log: " + id, e, true);
case NOT_FOUND:
// Log doesn't exist - non-retryable
throw new Office365Exception("Audit log not found: " + id, e, false);
case TOO_MANY_REQUESTS:
// Rate limiting - retryable
throw new Office365Exception("Rate limited while fetching audit log: " + id, e, true);
default:
// Other client errors are non-retryable
throw new Office365Exception("Client error while fetching audit log: " + id, e, false);

String contentType = (String) data.get("Workload");
if (contentType == null) {
throw new Office365Exception("Missing Workload field in audit log: " + logId, false);
}
} catch (ResourceAccessException e) {
// Network/connection issues are retryable
throw new Office365Exception("Network error while fetching audit log: " + id, e, true);

Event event = JacksonEvent.builder()
.withEventType(EventType.LOG.toString())
.withData(data)
.build();
event.getMetadata().setAttribute("contentType", contentType);
return new Record<>(event);
} catch (JsonProcessingException e) {
// JSON parsing errors are non-retryable as they indicate malformed data
throw new Office365Exception("Failed to parse audit log: " + logId, e, false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause infinite loop of processing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is marked as non-retryable and swallowed / sent to pipeline DLQ (when available).

}
}

private void writeRecordsWithRetry(final List<Record<Event>> records,
final Buffer<Record<Event>> buffer,
final AcknowledgementSet acknowledgementSet,
final PaginationCrawlerWorkerProgressState state) {
final AcknowledgementSet acknowledgementSet) {
bufferWriteAttemptsCounter.increment();
int retryCount = 0;
int currentBackoff = 1000; // Start with 1 second
Expand Down
Loading
Loading