Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

dependencies {
// Data Prepper dependencies
implementation project(path: ':data-prepper-plugins:saas-source-plugins:source-crawler')
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:common')

// Microsoft Graph API dependencies
implementation 'com.microsoft.graph:microsoft-graph:5.65.0'
implementation 'com.microsoft.azure:msal4j:1.13.9'
implementation 'com.azure:azure-identity:1.11.1'

implementation libs.commons.io
implementation 'io.micrometer:micrometer-core'
implementation 'javax.inject:javax.inject:1'
implementation 'org.jsoup:jsoup:1.18.3'

implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation project(path: ':data-prepper-test-common')
testImplementation 'org.springframework:spring-test:5.3.27'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation(libs.spring.web)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

/**
* Implementation of CrawlerClient for Office 365 audit logs.
* This class manages the crawling process for Office 365 audit logs.
*/
@Slf4j
@Named
public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWorkerProgressState> {
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
private static final String CONTENT_TYPE = "contentType";

private final Office365Service service;
private final Office365Iterator office365Iterator;
private final ExecutorService executorService;
private final Office365SourceConfig configuration;
private final ObjectMapper objectMapper;

public Office365CrawlerClient(final Office365Service service,
final Office365Iterator office365Iterator,
final PluginExecutorServiceProvider executorServiceProvider,
final Office365SourceConfig sourceConfig) {
this.service = service;
this.office365Iterator = office365Iterator;
this.executorService = executorServiceProvider.get();
this.configuration = sourceConfig;
this.objectMapper = new ObjectMapper();
}

@Override
public Iterator<ItemInfo> listItems(final Instant lastPollTime) {
log.info("Starting to list Office 365 audit logs from {}", lastPollTime);
// TODO: Consider moving to a SubscriptionService later
// Initialize subscription every time in the LP to ensure there hasn't been a subscription change
service.initializeSubscriptions();
office365Iterator.initialize(lastPollTime);
return office365Iterator;
}

@Override
public void executePartition(final PaginationCrawlerWorkerProgressState state,
final Buffer<Record<Event>> buffer,
final AcknowledgementSet acknowledgementSet) {
// TODO: Investigate JIRA's approach of using state.getExportStartTime() as eventTime
log.info("Starting to execute partition with {} log(s)", state.getItemIds().size());
List<String> itemIds = state.getItemIds();

List<Record<Event>> records = itemIds.stream()
.map(id -> {
try {
String auditLog = service.getAuditLog(id); // fetch each individual log
JsonNode jsonNode = objectMapper.readTree(auditLog);
Map<String, Object> data;

if (jsonNode.isArray() && !jsonNode.isEmpty()) {
data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {});
} else {
data = objectMapper.readValue(auditLog, new TypeReference<Map<String, Object>>() {});
}

// "Workload" is an Office365-specific field that indicates the source of the audit log
String contentType = (String) data.get("Workload");
log.debug("Processing log with content type: {}", contentType);

Event event = JacksonEvent.builder()
.withEventType(EventType.LOG.toString())
.withData(data)
.build();
event.getMetadata().setAttribute(CONTENT_TYPE, contentType);
return new Record<>(event);
} catch (Exception e) {
// TODO: Handle failed retrievals here so we don't drop record.
// Ideally at this point, we want to fail the entire batch so it'll be retried.
log.error("Error processing audit log entry for ID: {}", id, e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
try {
log.info("Writing {} records to buffer", records.size());
int timeoutMillis = (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis();

if (configuration.isAcknowledgments()) {
// TODO: Change this logic in another PR to manager buffer better.
records.forEach(record -> acknowledgementSet.add(record.getData()));
buffer.writeAll(records, timeoutMillis);
acknowledgementSet.complete();
} else {
buffer.writeAll(records, timeoutMillis);
}
} catch (Exception e) {
log.error("Failed to write records to buffer", e);
throw new RuntimeException("Failed to write records to buffer", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.springframework.util.CollectionUtils;

import javax.inject.Named;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* Iterator implementation for Office 365 audit logs.
* Manages the asynchronous fetching and iteration of audit log entries.
*/
@Slf4j
@Named
public class Office365Iterator implements Iterator<ItemInfo> {
private static final int HAS_NEXT_TIMEOUT = 60;

private final Office365Service service;
private final ExecutorService crawlerTaskExecutor;

@Setter
private long crawlerQWaitTimeMillis = 2000;
private Queue<ItemInfo> itemInfoQueue;
private Instant lastPollTime;
private boolean firstTime = true;
private final List<Future<Boolean>> futureList;

public Office365Iterator(final Office365Service service,
final PluginExecutorServiceProvider executorServiceProvider) {
this.service = service;
this.crawlerTaskExecutor = executorServiceProvider.get();
this.futureList = new ArrayList<>();
}

@Override
public boolean hasNext() {
if (firstTime) {
log.debug("Starting initial crawl for Office 365 audit logs");
startCrawlerThreads();
firstTime = false;
}

int timeout = HAS_NEXT_TIMEOUT;
while (isCrawlerRunning() && isQueueEmpty() && timeout > 0) {
try {
log.trace("Waiting for crawler queue to be filled, timeout in {} seconds", timeout);
Thread.sleep(crawlerQWaitTimeMillis);
timeout--;
} catch (InterruptedException e) {
log.error("Thread interrupted while waiting for crawler queue", e);
Thread.currentThread().interrupt();
return false;
}
}

return !isQueueEmpty();
}

@Override
public ItemInfo next() {
if (hasNext()) {
return itemInfoQueue.remove();
}
throw new NoSuchElementException("No more items available in the Office 365 audit log queue");
}

public void initialize(final Instant startTime) {
log.info("Initializing Office 365 iterator from timestamp: {}", startTime);
this.itemInfoQueue = new ConcurrentLinkedQueue<>();
this.lastPollTime = startTime;
this.firstTime = true;
this.futureList.clear();
}

private boolean isCrawlerRunning() {
if (CollectionUtils.isEmpty(futureList)) {
return false;
}
return futureList.stream().anyMatch(future -> !future.isDone());
}

private boolean isQueueEmpty() {
return itemInfoQueue == null || itemInfoQueue.isEmpty();
}

private void startCrawlerThreads() {
log.debug("Starting crawler thread for Office 365 audit logs");
Future<Boolean> future = crawlerTaskExecutor.submit(() -> {
try {
service.getOffice365Entities(lastPollTime, itemInfoQueue);
return true;
} catch (Exception e) {
log.error("Error in crawler thread while fetching Office 365 audit logs", e);
return false;
}
});
futureList.add(future);
}
}
Loading
Loading