Skip to content

Commit aabf54e

Browse files
author
Alekhya Parisha
committed
Implement dimensional time slice crawler
Signed-off-by: Alekhya Parisha <aparisha@amazon.com>
1 parent fab4ac1 commit aabf54e

19 files changed

Lines changed: 867 additions & 902 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 98 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99

1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

12-
import com.google.common.annotations.VisibleForTesting;
12+
import com.fasterxml.jackson.core.JsonProcessingException;
1313
import com.fasterxml.jackson.core.type.TypeReference;
1414
import com.fasterxml.jackson.databind.JsonNode;
1515
import com.fasterxml.jackson.databind.ObjectMapper;
16-
import com.fasterxml.jackson.core.JsonProcessingException;
16+
import com.google.common.annotations.VisibleForTesting;
1717
import io.micrometer.core.instrument.Counter;
1818
import io.micrometer.core.instrument.Timer;
1919
import lombok.extern.slf4j.Slf4j;
@@ -24,25 +24,22 @@
2424
import org.opensearch.dataprepper.model.event.JacksonEvent;
2525
import org.opensearch.dataprepper.metrics.PluginMetrics;
2626
import org.opensearch.dataprepper.model.record.Record;
27+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
2728
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
28-
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
29-
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
29+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
3030
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
31-
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
32-
import org.springframework.web.client.ResourceAccessException;
33-
import org.springframework.web.client.HttpClientErrorException;
3431
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
32+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
3533

3634
import javax.inject.Named;
3735
import java.time.Duration;
3836
import java.time.Instant;
39-
import java.util.Iterator;
37+
4038
import java.util.List;
39+
import java.util.ArrayList;
4140
import java.util.Map;
42-
import java.util.Objects;
41+
import java.util.Iterator;
4342
import java.util.concurrent.TimeoutException;
44-
import java.util.concurrent.ExecutorService;
45-
import java.util.stream.Collectors;
4643

4744
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
4845

@@ -52,21 +49,18 @@
5249
*/
5350
@Slf4j
5451
@Named
55-
public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWorkerProgressState> {
52+
public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> {
5653
private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
5754
private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
5855
private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
5956
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
6057
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
6158
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
62-
private static final String WORKER_STATE_UPDATES = "workerStateUpdates";
63-
64-
private static final String CONTENT_TYPE = "contentType";
6559
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
60+
private static final String CONTENT_ID = "contentId";
61+
private static final String CONTENT_URI = "contentUri";
6662

6763
private final Office365Service service;
68-
private final Office365Iterator office365Iterator;
69-
private final ExecutorService executorService;
7064
private final Office365SourceConfig configuration;
7165
private final Timer bufferWriteLatencyTimer;
7266
private final Counter bufferWriteAttemptsCounter;
@@ -77,13 +71,9 @@ public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWo
7771
private ObjectMapper objectMapper;
7872

7973
public Office365CrawlerClient(final Office365Service service,
80-
final Office365Iterator office365Iterator,
81-
final PluginExecutorServiceProvider executorServiceProvider,
8274
final Office365SourceConfig sourceConfig,
8375
final PluginMetrics pluginMetrics) {
8476
this.service = service;
85-
this.office365Iterator = office365Iterator;
86-
this.executorService = executorServiceProvider.get();
8777
this.configuration = sourceConfig;
8878
this.objectMapper = new ObjectMapper();
8979

@@ -101,125 +91,116 @@ void injectObjectMapper(final ObjectMapper objectMapper) {
10191
this.objectMapper = objectMapper;
10292
}
10393

104-
10594
@Override
10695
public Iterator<ItemInfo> listItems(final Instant lastPollTime) {
107-
log.info("Starting to list Office 365 audit logs from {}", lastPollTime);
108-
109-
// TODO: Subscription management should be moved to a dedicated class in the future
110-
// Currently, we initialize subscriptions in the leader partition to ensure that we're always subscribed
111-
// to the required content type to ensure there hasn't been a subscription change
112-
service.initializeSubscriptions();
113-
office365Iterator.initialize(lastPollTime);
114-
return office365Iterator;
96+
return null;
11597
}
11698

11799
@Override
118-
public void executePartition(final PaginationCrawlerWorkerProgressState state,
100+
public void executePartition(final DimensionalTimeSliceWorkerProgressState state,
119101
final Buffer<Record<Event>> buffer,
120102
final AcknowledgementSet acknowledgementSet) {
121-
// Process a batch of audit log IDs and convert them to records
122-
// If any record fails to process, the entire batch will be retried
123-
log.info("Starting to execute partition with {} log(s)", state.getItemIds().size());
124-
List<String> itemIds = state.getItemIds();
103+
final Instant startTime = state.getStartTime();
104+
final Instant endTime = state.getEndTime();
105+
final String logType = state.getDimensionType();
125106

126-
// Process each audit log ID in the batch
127-
List<Record<Event>> records = itemIds.stream()
128-
.map(id -> {
129-
try {
130-
return processAuditLog(id);
131-
} catch (Office365Exception e) {
132-
log.error(NOISY, "{} error processing audit log: {}",
133-
e.isRetryable() ? "Retryable" : "Non-retryable", id, e);
134-
if (e.isRetryable()) {
135-
throw new RuntimeException("Retryable error processing audit log: " + id, e);
136-
} else {
137-
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
138-
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", id, e);
139-
return null;
107+
try {
108+
String nextPageUri = null;
109+
List<Record<Event>> records = new ArrayList<>();
110+
111+
do {
112+
AuditLogsResponse response =
113+
service.searchAuditLogs(logType, startTime, endTime, nextPageUri);
114+
115+
if (response.getItems() != null && !response.getItems().isEmpty()) {
116+
for (Map<String, Object> metadata : response.getItems()) {
117+
String logId = (String) metadata.get(CONTENT_ID);
118+
try {
119+
Record<Event> record = processAuditLog(metadata);
120+
if (record != null) {
121+
records.add(record);
122+
}
123+
} catch (Office365Exception e) {
124+
125+
log.error(NOISY, "{} error processing audit log: {}",
126+
e.isRetryable() ? "Retryable" : "Non-retryable", logId, e);
127+
if (e.isRetryable()) {
128+
throw new RuntimeException("Retryable error processing audit log: " + logId, e);
129+
} else {
130+
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
131+
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e);
132+
}
133+
} catch (Exception e) {
134+
// Unexpected errors are treated as retryable to be safe
135+
log.error(NOISY, "Unexpected error processing audit log: {}", logId, e);
136+
throw new RuntimeException("Unexpected error processing audit log: " + logId, e);
140137
}
141-
} catch (Exception e) {
142-
// Unexpected errors are treated as retryable to be safe
143-
log.error(NOISY, "Unexpected error processing audit log: {}", id, e);
144-
throw new RuntimeException("Unexpected error processing audit log: " + id, e);
145138
}
146-
})
147-
.filter(Objects::nonNull)
148-
.collect(Collectors.toList());
149-
150-
bufferWriteLatencyTimer.record(() -> {
151-
try {
152-
writeRecordsWithRetry(records, buffer, acknowledgementSet, state);
153-
} catch (Exception e) {
154-
bufferWriteFailuresCounter.increment();
155-
throw e;
156-
}
157-
});
158-
}
139+
}
159140

160-
private Record<Event> processAuditLog(String id) {
161-
try {
162-
String auditLog = service.getAuditLog(id);
141+
nextPageUri = response.getNextPageUri();
142+
} while (nextPageUri != null);
163143

164-
// Handle HTTP errors in service layer
165-
if (auditLog == null) {
166-
throw new Office365Exception("Received null audit log for ID: " + id, false);
167-
}
144+
bufferWriteLatencyTimer.record(() -> {
145+
try {
146+
writeRecordsWithRetry(records, buffer, acknowledgementSet);
147+
} catch (Exception e) {
148+
bufferWriteFailuresCounter.increment();
149+
throw e;
150+
}
151+
});
168152

169-
try {
170-
JsonNode jsonNode = objectMapper.readTree(auditLog);
171-
Map<String, Object> data;
153+
} catch (Exception e) {
154+
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
155+
logType, startTime, endTime, e);
156+
throw e;
157+
}
158+
}
172159

173-
// Office 365 API sometimes returns an array with a single item
174-
// and sometimes returns a single object directly
175-
if (jsonNode.isArray() && !jsonNode.isEmpty()) {
176-
data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {});
177-
} else {
178-
data = objectMapper.readValue(auditLog, new TypeReference<Map<String, Object>>() {});
179-
}
160+
private Record<Event> processAuditLog(Map<String, Object> metadata) throws Office365Exception {
161+
String contentUri = (String) metadata.get(CONTENT_URI);
162+
if (contentUri == null) {
163+
throw new Office365Exception("Missing contentUri in metadata", false);
164+
}
180165

181-
// "Workload" is an Office 365 specific field that indicates the source of the audit log
182-
String contentType = (String) data.get("Workload");
183-
if (contentType == null) {
184-
throw new Office365Exception("Missing Workload field in audit log: " + id, false);
185-
}
166+
String logContent = service.getAuditLog(contentUri);
167+
if (logContent == null) {
168+
throw new Office365Exception("Received null log content for URI: " + contentUri, false);
169+
}
170+
String logId = (String) metadata.get(CONTENT_ID);
186171

187-
Event event = JacksonEvent.builder()
188-
.withEventType(EventType.LOG.toString())
189-
.withData(data)
190-
.build();
191-
event.getMetadata().setAttribute(CONTENT_TYPE, contentType);
192-
return new Record<>(event);
193-
} catch (JsonProcessingException e) {
194-
// JSON parsing errors are non-retryable as they indicate malformed data
195-
throw new Office365Exception("Failed to parse audit log: " + id, e, false);
172+
try {
173+
JsonNode jsonNode = objectMapper.readTree(logContent);
174+
Map<String, Object> data;
175+
176+
// Office 365 API sometimes returns an array with a single item
177+
// and sometimes returns a single object directly
178+
if (jsonNode.isArray() && !jsonNode.isEmpty()) {
179+
data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {});
180+
} else {
181+
data = objectMapper.readValue(logContent, new TypeReference<Map<String, Object>>() {});
196182
}
197-
} catch (HttpClientErrorException e) {
198-
switch (e.getStatusCode()) {
199-
case UNAUTHORIZED:
200-
case FORBIDDEN:
201-
// Auth errors might be temporary due to token expiration
202-
throw new Office365Exception("Authentication failed while fetching audit log: " + id, e, true);
203-
case NOT_FOUND:
204-
// Log doesn't exist - non-retryable
205-
throw new Office365Exception("Audit log not found: " + id, e, false);
206-
case TOO_MANY_REQUESTS:
207-
// Rate limiting - retryable
208-
throw new Office365Exception("Rate limited while fetching audit log: " + id, e, true);
209-
default:
210-
// Other client errors are non-retryable
211-
throw new Office365Exception("Client error while fetching audit log: " + id, e, false);
183+
184+
String contentType = (String) data.get("Workload");
185+
if (contentType == null) {
186+
throw new Office365Exception("Missing Workload field in audit log: " + logId, false);
212187
}
213-
} catch (ResourceAccessException e) {
214-
// Network/connection issues are retryable
215-
throw new Office365Exception("Network error while fetching audit log: " + id, e, true);
188+
189+
Event event = JacksonEvent.builder()
190+
.withEventType(EventType.LOG.toString())
191+
.withData(data)
192+
.build();
193+
event.getMetadata().setAttribute("contentType", contentType);
194+
return new Record<>(event);
195+
} catch (JsonProcessingException e) {
196+
// JSON parsing errors are non-retryable as they indicate malformed data
197+
throw new Office365Exception("Failed to parse audit log: " + logId, e, false);
216198
}
217199
}
218200

219201
private void writeRecordsWithRetry(final List<Record<Event>> records,
220202
final Buffer<Record<Event>> buffer,
221-
final AcknowledgementSet acknowledgementSet,
222-
final PaginationCrawlerWorkerProgressState state) {
203+
final AcknowledgementSet acknowledgementSet) {
223204
bufferWriteAttemptsCounter.increment();
224205
int retryCount = 0;
225206
int currentBackoff = 1000; // Start with 1 second

0 commit comments

Comments
 (0)