99
1010package org .opensearch .dataprepper .plugins .source .microsoft_office365 ;
1111
12- import com .google . common . annotations . VisibleForTesting ;
12+ import com .fasterxml . jackson . core . JsonProcessingException ;
1313import com .fasterxml .jackson .core .type .TypeReference ;
1414import com .fasterxml .jackson .databind .JsonNode ;
1515import com .fasterxml .jackson .databind .ObjectMapper ;
16- import com .fasterxml . jackson . core . JsonProcessingException ;
16+ import com .google . common . annotations . VisibleForTesting ;
1717import io .micrometer .core .instrument .Counter ;
1818import io .micrometer .core .instrument .Timer ;
1919import lombok .extern .slf4j .Slf4j ;
2424import org .opensearch .dataprepper .model .event .JacksonEvent ;
2525import org .opensearch .dataprepper .metrics .PluginMetrics ;
2626import org .opensearch .dataprepper .model .record .Record ;
27+ import org .opensearch .dataprepper .plugins .source .microsoft_office365 .exception .Office365Exception ;
2728import org .opensearch .dataprepper .plugins .source .source_crawler .base .CrawlerClient ;
28- import org .opensearch .dataprepper .plugins .source .source_crawler .base .PluginExecutorServiceProvider ;
29- import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .PaginationCrawlerWorkerProgressState ;
29+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .DimensionalTimeSliceWorkerProgressState ;
3030import org .opensearch .dataprepper .plugins .source .source_crawler .model .ItemInfo ;
31- import org .opensearch .dataprepper .plugins .source .microsoft_office365 .exception .Office365Exception ;
32- import org .springframework .web .client .ResourceAccessException ;
33- import org .springframework .web .client .HttpClientErrorException ;
3431import org .opensearch .dataprepper .plugins .source .microsoft_office365 .service .Office365Service ;
32+ import org .opensearch .dataprepper .plugins .source .microsoft_office365 .models .AuditLogsResponse ;
3533
3634import javax .inject .Named ;
3735import java .time .Duration ;
3836import java .time .Instant ;
39- import java . util . Iterator ;
37+
4038import java .util .List ;
39+ import java .util .ArrayList ;
4140import java .util .Map ;
42- import java .util .Objects ;
41+ import java .util .Iterator ;
4342import java .util .concurrent .TimeoutException ;
44- import java .util .concurrent .ExecutorService ;
45- import java .util .stream .Collectors ;
4643
4744import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
4845
5249 */
5350@ Slf4j
5451@ Named
55- public class Office365CrawlerClient implements CrawlerClient <PaginationCrawlerWorkerProgressState > {
52+ public class Office365CrawlerClient implements CrawlerClient <DimensionalTimeSliceWorkerProgressState > {
5653 private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency" ;
5754 private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts" ;
5855 private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess" ;
5956 private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess" ;
6057 private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts" ;
6158 private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures" ;
62- private static final String WORKER_STATE_UPDATES = "workerStateUpdates" ;
63-
64- private static final String CONTENT_TYPE = "contentType" ;
6559 private static final int BUFFER_TIMEOUT_IN_SECONDS = 10 ;
60+ private static final String CONTENT_ID = "contentId" ;
61+ private static final String CONTENT_URI = "contentUri" ;
6662
6763 private final Office365Service service ;
68- private final Office365Iterator office365Iterator ;
69- private final ExecutorService executorService ;
7064 private final Office365SourceConfig configuration ;
7165 private final Timer bufferWriteLatencyTimer ;
7266 private final Counter bufferWriteAttemptsCounter ;
@@ -77,13 +71,9 @@ public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWo
7771 private ObjectMapper objectMapper ;
7872
7973 public Office365CrawlerClient (final Office365Service service ,
80- final Office365Iterator office365Iterator ,
81- final PluginExecutorServiceProvider executorServiceProvider ,
8274 final Office365SourceConfig sourceConfig ,
8375 final PluginMetrics pluginMetrics ) {
8476 this .service = service ;
85- this .office365Iterator = office365Iterator ;
86- this .executorService = executorServiceProvider .get ();
8777 this .configuration = sourceConfig ;
8878 this .objectMapper = new ObjectMapper ();
8979
@@ -101,125 +91,116 @@ void injectObjectMapper(final ObjectMapper objectMapper) {
10191 this .objectMapper = objectMapper ;
10292 }
10393
104-
10594 @ Override
10695 public Iterator <ItemInfo > listItems (final Instant lastPollTime ) {
107- log .info ("Starting to list Office 365 audit logs from {}" , lastPollTime );
108-
109- // TODO: Subscription management should be moved to a dedicated class in the future
110- // Currently, we initialize subscriptions in the leader partition to ensure that we're always subscribed
111- // to the required content type to ensure there hasn't been a subscription change
112- service .initializeSubscriptions ();
113- office365Iterator .initialize (lastPollTime );
114- return office365Iterator ;
96+ return null ;
11597 }
11698
11799 @ Override
118- public void executePartition (final PaginationCrawlerWorkerProgressState state ,
100+ public void executePartition (final DimensionalTimeSliceWorkerProgressState state ,
119101 final Buffer <Record <Event >> buffer ,
120102 final AcknowledgementSet acknowledgementSet ) {
121- // Process a batch of audit log IDs and convert them to records
122- // If any record fails to process, the entire batch will be retried
123- log .info ("Starting to execute partition with {} log(s)" , state .getItemIds ().size ());
124- List <String > itemIds = state .getItemIds ();
103+ final Instant startTime = state .getStartTime ();
104+ final Instant endTime = state .getEndTime ();
105+ final String logType = state .getDimensionType ();
125106
126- // Process each audit log ID in the batch
127- List <Record <Event >> records = itemIds .stream ()
128- .map (id -> {
129- try {
130- return processAuditLog (id );
131- } catch (Office365Exception e ) {
132- log .error (NOISY , "{} error processing audit log: {}" ,
133- e .isRetryable () ? "Retryable" : "Non-retryable" , id , e );
134- if (e .isRetryable ()) {
135- throw new RuntimeException ("Retryable error processing audit log: " + id , e );
136- } else {
137- // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
138- log .error (NOISY , "Non-retryable error - record will be dropped. Error processing audit log: {}" , id , e );
139- return null ;
107+ try {
108+ String nextPageUri = null ;
109+ List <Record <Event >> records = new ArrayList <>();
110+
111+ do {
112+ AuditLogsResponse response =
113+ service .searchAuditLogs (logType , startTime , endTime , nextPageUri );
114+
115+ if (response .getItems () != null && !response .getItems ().isEmpty ()) {
116+ for (Map <String , Object > metadata : response .getItems ()) {
117+ String logId = (String ) metadata .get (CONTENT_ID );
118+ try {
119+ Record <Event > record = processAuditLog (metadata );
120+ if (record != null ) {
121+ records .add (record );
122+ }
123+ } catch (Office365Exception e ) {
124+
125+ log .error (NOISY , "{} error processing audit log: {}" ,
126+ e .isRetryable () ? "Retryable" : "Non-retryable" , logId , e );
127+ if (e .isRetryable ()) {
128+ throw new RuntimeException ("Retryable error processing audit log: " + logId , e );
129+ } else {
130+ // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
131+ log .error (NOISY , "Non-retryable error - record will be dropped. Error processing audit log: {}" , logId , e );
132+ }
133+ } catch (Exception e ) {
134+ // Unexpected errors are treated as retryable to be safe
135+ log .error (NOISY , "Unexpected error processing audit log: {}" , logId , e );
136+ throw new RuntimeException ("Unexpected error processing audit log: " + logId , e );
140137 }
141- } catch (Exception e ) {
142- // Unexpected errors are treated as retryable to be safe
143- log .error (NOISY , "Unexpected error processing audit log: {}" , id , e );
144- throw new RuntimeException ("Unexpected error processing audit log: " + id , e );
145138 }
146- })
147- .filter (Objects ::nonNull )
148- .collect (Collectors .toList ());
149-
150- bufferWriteLatencyTimer .record (() -> {
151- try {
152- writeRecordsWithRetry (records , buffer , acknowledgementSet , state );
153- } catch (Exception e ) {
154- bufferWriteFailuresCounter .increment ();
155- throw e ;
156- }
157- });
158- }
139+ }
159140
160- private Record <Event > processAuditLog (String id ) {
161- try {
162- String auditLog = service .getAuditLog (id );
141+ nextPageUri = response .getNextPageUri ();
142+ } while (nextPageUri != null );
163143
164- // Handle HTTP errors in service layer
165- if (auditLog == null ) {
166- throw new Office365Exception ("Received null audit log for ID: " + id , false );
167- }
144+ bufferWriteLatencyTimer .record (() -> {
145+ try {
146+ writeRecordsWithRetry (records , buffer , acknowledgementSet );
147+ } catch (Exception e ) {
148+ bufferWriteFailuresCounter .increment ();
149+ throw e ;
150+ }
151+ });
168152
169- try {
170- JsonNode jsonNode = objectMapper .readTree (auditLog );
171- Map <String , Object > data ;
153+ } catch (Exception e ) {
154+ log .error (NOISY , "Failed to process partition for log type {} from {} to {}" ,
155+ logType , startTime , endTime , e );
156+ throw e ;
157+ }
158+ }
172159
173- // Office 365 API sometimes returns an array with a single item
174- // and sometimes returns a single object directly
175- if (jsonNode .isArray () && !jsonNode .isEmpty ()) {
176- data = objectMapper .convertValue (jsonNode .get (0 ), new TypeReference <Map <String , Object >>() {});
177- } else {
178- data = objectMapper .readValue (auditLog , new TypeReference <Map <String , Object >>() {});
179- }
160+ private Record <Event > processAuditLog (Map <String , Object > metadata ) throws Office365Exception {
161+ String contentUri = (String ) metadata .get (CONTENT_URI );
162+ if (contentUri == null ) {
163+ throw new Office365Exception ("Missing contentUri in metadata" , false );
164+ }
180165
181- // "Workload" is an Office 365 specific field that indicates the source of the audit log
182- String contentType = ( String ) data . get ( "Workload" );
183- if ( contentType == null ) {
184- throw new Office365Exception ( "Missing Workload field in audit log: " + id , false );
185- }
166+ String logContent = service . getAuditLog ( contentUri );
167+ if ( logContent == null ) {
168+ throw new Office365Exception ( "Received null log content for URI: " + contentUri , false );
169+ }
170+ String logId = ( String ) metadata . get ( CONTENT_ID );
186171
187- Event event = JacksonEvent .builder ()
188- .withEventType (EventType .LOG .toString ())
189- .withData (data )
190- .build ();
191- event .getMetadata ().setAttribute (CONTENT_TYPE , contentType );
192- return new Record <>(event );
193- } catch (JsonProcessingException e ) {
194- // JSON parsing errors are non-retryable as they indicate malformed data
195- throw new Office365Exception ("Failed to parse audit log: " + id , e , false );
172+ try {
173+ JsonNode jsonNode = objectMapper .readTree (logContent );
174+ Map <String , Object > data ;
175+
176+ // Office 365 API sometimes returns an array with a single item
177+ // and sometimes returns a single object directly
178+ if (jsonNode .isArray () && !jsonNode .isEmpty ()) {
179+ data = objectMapper .convertValue (jsonNode .get (0 ), new TypeReference <Map <String , Object >>() {});
180+ } else {
181+ data = objectMapper .readValue (logContent , new TypeReference <Map <String , Object >>() {});
196182 }
197- } catch (HttpClientErrorException e ) {
198- switch (e .getStatusCode ()) {
199- case UNAUTHORIZED :
200- case FORBIDDEN :
201- // Auth errors might be temporary due to token expiration
202- throw new Office365Exception ("Authentication failed while fetching audit log: " + id , e , true );
203- case NOT_FOUND :
204- // Log doesn't exist - non-retryable
205- throw new Office365Exception ("Audit log not found: " + id , e , false );
206- case TOO_MANY_REQUESTS :
207- // Rate limiting - retryable
208- throw new Office365Exception ("Rate limited while fetching audit log: " + id , e , true );
209- default :
210- // Other client errors are non-retryable
211- throw new Office365Exception ("Client error while fetching audit log: " + id , e , false );
183+
184+ String contentType = (String ) data .get ("Workload" );
185+ if (contentType == null ) {
186+ throw new Office365Exception ("Missing Workload field in audit log: " + logId , false );
212187 }
213- } catch (ResourceAccessException e ) {
214- // Network/connection issues are retryable
215- throw new Office365Exception ("Network error while fetching audit log: " + id , e , true );
188+
189+ Event event = JacksonEvent .builder ()
190+ .withEventType (EventType .LOG .toString ())
191+ .withData (data )
192+ .build ();
193+ event .getMetadata ().setAttribute ("contentType" , contentType );
194+ return new Record <>(event );
195+ } catch (JsonProcessingException e ) {
196+ // JSON parsing errors are non-retryable as they indicate malformed data
197+ throw new Office365Exception ("Failed to parse audit log: " + logId , e , false );
216198 }
217199 }
218200
219201 private void writeRecordsWithRetry (final List <Record <Event >> records ,
220202 final Buffer <Record <Event >> buffer ,
221- final AcknowledgementSet acknowledgementSet ,
222- final PaginationCrawlerWorkerProgressState state ) {
203+ final AcknowledgementSet acknowledgementSet ) {
223204 bufferWriteAttemptsCounter .increment ();
224205 int retryCount = 0 ;
225206 int currentBackoff = 1000 ; // Start with 1 second
0 commit comments