2424import org .opensearch .dataprepper .model .event .JacksonEvent ;
2525import org .opensearch .dataprepper .metrics .PluginMetrics ;
2626import org .opensearch .dataprepper .model .record .Record ;
27- import org .opensearch .dataprepper .plugins .source .microsoft_office365 .exception .Office365Exception ;
27+ import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
2828import org .opensearch .dataprepper .plugins .source .source_crawler .base .CrawlerClient ;
2929import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .DimensionalTimeSliceWorkerProgressState ;
3030import org .opensearch .dataprepper .plugins .source .source_crawler .model .ItemInfo ;
5151@ Slf4j
5252@ Named
5353public class Office365CrawlerClient implements CrawlerClient <DimensionalTimeSliceWorkerProgressState > {
54+
55+ public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors" ;
56+ public static final String RETRYABLE_ERRORS = "retryableErrors" ;
57+
5458 private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency" ;
5559 private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts" ;
5660 private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess" ;
5761 private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess" ;
5862 private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts" ;
5963 private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures" ;
60- private static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors" ;
61- private static final String RETRYABLE_ERRORS = "retryableErrors" ;
6264 private static final int BUFFER_TIMEOUT_IN_SECONDS = 10 ;
6365 private static final String CONTENT_ID = "contentId" ;
6466 private static final String CONTENT_URI = "contentUri" ;
@@ -129,23 +131,11 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
129131 if (record != null ) {
130132 records .add (record );
131133 }
132- } catch (Office365Exception e ) {
133-
134+ } catch (SaaSCrawlerException e ) {
135+ boolean isRetryable = e . isRetryable ();
134136 log .error (NOISY , "{} error processing audit log: {}" ,
135- e .isRetryable () ? "Retryable" : "Non-retryable" , logId , e );
136- if (e .isRetryable ()) {
137- retryableErrorsCounter .increment ();
138- throw new RuntimeException ("Retryable error processing audit log: " + logId , e );
139- } else {
140- nonRetryableErrorsCounter .increment ();
141- // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
142- log .error (NOISY , "Non-retryable error - record will be dropped. Error processing audit log: {}" , logId , e );
143- }
144- } catch (Exception e ) {
145- // Unexpected errors are treated as retryable to be safe
146- retryableErrorsCounter .increment ();
147- log .error (NOISY , "Unexpected error processing audit log: {}" , logId , e );
148- throw new RuntimeException ("Unexpected error processing audit log: " + logId , e );
137+ isRetryable ? "Retryable" : "Non-retryable" , logId , e );
138+ throw new SaaSCrawlerException ("Error processing audit log: " + logId , e , isRetryable );
149139 }
150140 }
151141 }
@@ -170,19 +160,30 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
170160 log .error (NOISY , "Failed to process partition for log type {} from {} to {}" ,
171161 logType , startTime , endTime , e );
172162 requestErrorsCounter .increment ();
173- throw e ;
163+ if (e instanceof SaaSCrawlerException ) {
164+ SaaSCrawlerException saasException = (SaaSCrawlerException ) e ;
165+ if (saasException .isRetryable ()) {
166+ retryableErrorsCounter .increment ();
167+ } else {
168+ nonRetryableErrorsCounter .increment ();
169+ }
170+ throw e ;
171+ }
172+ // any other exceptions = non-retryable
173+ nonRetryableErrorsCounter .increment ();
174+ throw new SaaSCrawlerException ("Failed to process partition" , e , false );
174175 }
175176 }
176177
177- private Record <Event > processAuditLog (Map <String , Object > metadata ) throws Office365Exception {
178+ private Record <Event > processAuditLog (Map <String , Object > metadata ) throws SaaSCrawlerException {
178179 String contentUri = (String ) metadata .get (CONTENT_URI );
179180 if (contentUri == null ) {
180- throw new Office365Exception ("Missing contentUri in metadata" , false );
181+ throw new SaaSCrawlerException ("Missing contentUri in metadata" , false );
181182 }
182183
183184 String logContent = service .getAuditLog (contentUri );
184185 if (logContent == null ) {
185- throw new Office365Exception ("Received null log content for URI: " + contentUri , false );
186+ throw new SaaSCrawlerException ("Received null log content for URI: " + contentUri , false );
186187 }
187188 String logId = (String ) metadata .get (CONTENT_ID );
188189
@@ -200,7 +201,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
200201
201202 String contentType = (String ) data .get ("Workload" );
202203 if (contentType == null ) {
203- throw new Office365Exception ("Missing Workload field in audit log: " + logId , false );
204+ throw new SaaSCrawlerException ("Missing Workload field in audit log: " + logId , false );
204205 }
205206
206207 Event event = JacksonEvent .builder ()
@@ -211,7 +212,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
211212 return new Record <>(event );
212213 } catch (JsonProcessingException e ) {
213214 // JSON parsing errors are non-retryable as they indicate malformed data
214- throw new Office365Exception ("Failed to parse audit log: " + logId , e , false );
215+ throw new SaaSCrawlerException ("Failed to parse audit log: " + logId , e , false );
215216 }
216217 }
217218
@@ -244,7 +245,8 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
244245 retryCount ++;
245246 if (retryCount >= maxRetries ) {
246247 bufferWriteFailuresCounter .increment ();
247- throw new RuntimeException ("Failed to write to buffer after " + maxRetries + " attempts" , e );
248+ // allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
249+ throw new SaaSCrawlerException ("Failed to write to buffer after " + maxRetries + " attempts" , e , true );
248250 }
249251
250252 bufferWriteRetryAttemptsCounter .increment ();
@@ -253,16 +255,13 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
253255
254256 try {
255257 Thread .sleep (currentBackoff );
256- // TODO: Update worker partition state to prevent timeout
257- // Ideally, we want to call the saveWorkerPartitionState and extend the lease like so
258- // coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
259258 } catch (InterruptedException ie ) {
260259 Thread .currentThread ().interrupt ();
261- throw new RuntimeException ("Buffer write retry interrupted" , ie );
260+ throw new SaaSCrawlerException ("Buffer write retry interrupted" , ie , true );
262261 }
263262 } catch (Exception e ) {
264263 bufferWriteFailuresCounter .increment ();
265- throw new RuntimeException ("Error writing to buffer" , e );
264+ throw new SaaSCrawlerException ("Error writing to buffer" , e , true );
266265 }
267266 }
268267 }
0 commit comments