Skip to content

Commit 7226b33

Browse files
committed
Standardize Exception handling in souce plugins
Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
1 parent 68d99dd commit 7226b33

11 files changed

Lines changed: 378 additions & 96 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.opensearch.dataprepper.model.event.JacksonEvent;
2525
import org.opensearch.dataprepper.metrics.PluginMetrics;
2626
import org.opensearch.dataprepper.model.record.Record;
27-
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
27+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
2828
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
2929
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
3030
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
@@ -123,20 +123,10 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
123123
if (record != null) {
124124
records.add(record);
125125
}
126-
} catch (Office365Exception e) {
127-
126+
} catch (SaaSCrawlerException e) {
128127
log.error(NOISY, "{} error processing audit log: {}",
129128
e.isRetryable() ? "Retryable" : "Non-retryable", logId, e);
130-
if (e.isRetryable()) {
131-
throw new RuntimeException("Retryable error processing audit log: " + logId, e);
132-
} else {
133-
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
134-
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e);
135-
}
136-
} catch (Exception e) {
137-
// Unexpected errors are treated as retryable to be safe
138-
log.error(NOISY, "Unexpected error processing audit log: {}", logId, e);
139-
throw new RuntimeException("Unexpected error processing audit log: " + logId, e);
129+
throw new SaaSCrawlerException("Error processing audit log: " + logId, e, e.isRetryable());
140130
}
141131
}
142132
}
@@ -161,19 +151,23 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
161151
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
162152
logType, startTime, endTime, e);
163153
requestErrorsCounter.increment();
164-
throw e;
154+
if (e instanceof SaaSCrawlerException) {
155+
throw e;
156+
}
157+
// any other exceptions = non-retryable
158+
throw new SaaSCrawlerException("Failed to process partition", e, false);
165159
}
166160
}
167161

168-
private Record<Event> processAuditLog(Map<String, Object> metadata) throws Office365Exception {
162+
private Record<Event> processAuditLog(Map<String, Object> metadata) throws SaaSCrawlerException {
169163
String contentUri = (String) metadata.get(CONTENT_URI);
170164
if (contentUri == null) {
171-
throw new Office365Exception("Missing contentUri in metadata", false);
165+
throw new SaaSCrawlerException("Missing contentUri in metadata", false);
172166
}
173167

174168
String logContent = service.getAuditLog(contentUri);
175169
if (logContent == null) {
176-
throw new Office365Exception("Received null log content for URI: " + contentUri, false);
170+
throw new SaaSCrawlerException("Received null log content for URI: " + contentUri, false);
177171
}
178172
String logId = (String) metadata.get(CONTENT_ID);
179173

@@ -191,7 +185,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
191185

192186
String contentType = (String) data.get("Workload");
193187
if (contentType == null) {
194-
throw new Office365Exception("Missing Workload field in audit log: " + logId, false);
188+
throw new SaaSCrawlerException("Missing Workload field in audit log: " + logId, false);
195189
}
196190

197191
Event event = JacksonEvent.builder()
@@ -202,7 +196,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
202196
return new Record<>(event);
203197
} catch (JsonProcessingException e) {
204198
// JSON parsing errors are non-retryable as they indicate malformed data
205-
throw new Office365Exception("Failed to parse audit log: " + logId, e, false);
199+
throw new SaaSCrawlerException("Failed to parse audit log: " + logId, e, false);
206200
}
207201
}
208202

@@ -235,7 +229,8 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
235229
retryCount++;
236230
if (retryCount >= maxRetries) {
237231
bufferWriteFailuresCounter.increment();
238-
throw new RuntimeException("Failed to write to buffer after " + maxRetries + " attempts", e);
232+
// allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
233+
throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true);
239234
}
240235

241236
bufferWriteRetryAttemptsCounter.increment();
@@ -244,16 +239,13 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
244239

245240
try {
246241
Thread.sleep(currentBackoff);
247-
// TODO: Update worker partition state to prevent timeout
248-
// Ideally, we want to call the saveWorkerPartitionState and extend the lease like so
249-
// coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
250242
} catch (InterruptedException ie) {
251243
Thread.currentThread().interrupt();
252-
throw new RuntimeException("Buffer write retry interrupted", ie);
244+
throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true);
253245
}
254246
} catch (Exception e) {
255247
bufferWriteFailuresCounter.increment();
256-
throw new RuntimeException("Error writing to buffer", e);
248+
throw new SaaSCrawlerException("Error writing to buffer", e, true);
257249
}
258250
}
259251
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import lombok.extern.slf4j.Slf4j;
1616
import org.opensearch.dataprepper.metrics.PluginMetrics;
1717
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
18-
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
18+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1919
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
2020
import org.springframework.core.ParameterizedTypeReference;
2121
import org.springframework.http.HttpEntity;
@@ -144,7 +144,7 @@ public void startSubscriptions() {
144144
} catch (Exception e) {
145145
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
146146
log.error(NOISY, "Failed to initialize subscriptions", e);
147-
throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e);
147+
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
148148
}
149149
}
150150

@@ -207,7 +207,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
207207
} catch (Exception e) {
208208
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
209209
log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e);
210-
throw new RuntimeException("Failed to fetch audit logs", e);
210+
throw new SaaSCrawlerException("Failed to fetch audit logs", e, true);
211211
}
212212
});
213213
}
@@ -221,7 +221,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
221221
*/
222222
public String getAuditLog(String contentUri) {
223223
if (!contentUri.startsWith(MANAGEMENT_API_BASE_URL)) {
224-
throw new Office365Exception("ContentUri must be from Office365 Management API: " + contentUri, false);
224+
throw new SaaSCrawlerException("ContentUri must be from Office365 Management API: " + contentUri, false);
225225
}
226226
auditLogsRequestedCounter.increment();
227227
final HttpHeaders headers = new HttpHeaders();
@@ -250,7 +250,7 @@ public String getAuditLog(String contentUri) {
250250
} catch (Exception e) {
251251
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
252252
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
253-
throw new RuntimeException("Failed to fetch audit log", e);
253+
throw new SaaSCrawlerException("Failed to fetch audit log", e, true);
254254
}
255255
});
256256
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/exception/Office365Exception.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.opensearch.dataprepper.metrics.PluginMetrics;
1515
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
1616
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
17-
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
17+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1818
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
1919

2020
import javax.inject.Named;
@@ -52,10 +52,10 @@ public AuditLogsResponse searchAuditLogs(final String logType,
5252
final Instant endTime,
5353
final String nextPageUri) {
5454
if (startTime == null || endTime == null) {
55-
throw new IllegalArgumentException("startTime and endTime must not be null");
55+
throw new SaaSCrawlerException("startTime and endTime must not be null", false);
5656
}
5757
if (logType == null) {
58-
throw new IllegalArgumentException("logType must not be null");
58+
throw new SaaSCrawlerException("logType must not be null", false);
5959
}
6060
try {
6161
// If pagination URI exists, use it directly
@@ -78,7 +78,7 @@ public AuditLogsResponse searchAuditLogs(final String logType,
7878
return response;
7979
} catch (Exception e) {
8080
windowRetryCounter.increment();
81-
throw new Office365Exception(
81+
throw new SaaSCrawlerException(
8282
String.format("Failed to fetch logs for time window %s to %s for log type %s.",
8383
startTime, endTime, logType), e, true);
8484
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.dataprepper.model.record.Record;
2929
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
3030
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
31+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
3132
import org.opensearch.dataprepper.metrics.PluginMetrics;
3233
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
3334
import org.slf4j.Logger;
@@ -41,6 +42,7 @@
4142

4243
import static org.junit.jupiter.api.Assertions.assertEquals;
4344
import static org.junit.jupiter.api.Assertions.assertFalse;
45+
import static org.junit.jupiter.api.Assertions.assertTrue;
4446
import static org.junit.jupiter.api.Assertions.assertNotNull;
4547
import static org.junit.jupiter.api.Assertions.assertThrows;
4648

@@ -180,9 +182,14 @@ void testExecutePartitionWithJsonProcessingError() throws Exception {
180182
return null;
181183
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));
182184

183-
client.executePartition(state, buffer, acknowledgementSet);
185+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
186+
() -> client.executePartition(state, buffer, acknowledgementSet));
187+
188+
assertEquals("Error processing audit log: ID1", exception.getMessage());
189+
assertFalse(exception.isRetryable());
190+
assertTrue(exception.getCause() instanceof SaaSCrawlerException);
191+
assertEquals("Failed to parse audit log: ID1", exception.getCause().getMessage());
184192

185-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
186193
verify(mockRequestErrorsCounter, never()).increment();
187194
}
188195

@@ -250,10 +257,11 @@ void testBufferWriteTimeout() throws Exception {
250257
.when(buffer)
251258
.writeAll(any(), anyInt());
252259

253-
RuntimeException exception = assertThrows(RuntimeException.class,
260+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
254261
() -> client.executePartition(state, buffer, acknowledgementSet));
255262

256263
assertEquals("Error writing to buffer", exception.getMessage());
264+
assertTrue(exception.isRetryable());
257265
verify(buffer).writeAll(any(), anyInt());
258266
}
259267

@@ -281,9 +289,15 @@ void testNonRetryableError() throws Exception {
281289
return null;
282290
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));
283291

284-
client.executePartition(state, buffer, acknowledgementSet);
292+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
293+
() -> client.executePartition(state, buffer, acknowledgementSet));
285294

286-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
295+
assertEquals("Error processing audit log: ID1", exception.getMessage());
296+
assertFalse(exception.isRetryable());
297+
assertTrue(exception.getCause() instanceof SaaSCrawlerException);
298+
assertEquals("Received null log content for URI: uri1", exception.getCause().getMessage());
299+
300+
verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt());
287301
}
288302

289303
@Test
@@ -311,9 +325,15 @@ void testMissingWorkloadField() throws Exception {
311325
return null;
312326
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));
313327

314-
client.executePartition(state, buffer, acknowledgementSet);
328+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
329+
() -> client.executePartition(state, buffer, acknowledgementSet));
330+
331+
assertEquals("Error processing audit log: ID1", exception.getMessage());
332+
assertFalse(exception.isRetryable());
333+
assertTrue(exception.getCause() instanceof SaaSCrawlerException);
334+
assertEquals("Missing Workload field in audit log: ID1", exception.getCause().getMessage());
315335

316-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
336+
verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt());
317337
}
318338

319339
@Test
@@ -330,14 +350,43 @@ void testExecutePartitionWithSearchAuditLogsError() throws Exception {
330350
any(Instant.class),
331351
any(Instant.class),
332352
isNull()
333-
)).thenThrow(new RuntimeException("Search audit logs failed"));
353+
)).thenThrow(new SaaSCrawlerException("Search audit logs failed", true));
334354

335355
// Execute and verify exception
336-
RuntimeException exception = assertThrows(RuntimeException.class,
356+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
337357
() -> client.executePartition(state, buffer, acknowledgementSet));
338358

339359
// Verify exception message and counter increment
340360
assertEquals("Search audit logs failed", exception.getMessage());
361+
assertTrue(exception.isRetryable());
341362
verify(mockRequestErrorsCounter).increment();
342363
}
364+
365+
@Test
366+
void testExecutePartitionWithNonSaaSCrawlerException() throws Exception {
367+
// Create the counter mock before creating the client
368+
Counter mockRequestErrorsCounter = mock(Counter.class);
369+
when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); // Use the constant
370+
371+
// Create client after counter is mocked
372+
Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);
373+
374+
// Simulate a non-SaaSCrawlerException (like RuntimeException)
375+
when(service.searchAuditLogs(
376+
anyString(),
377+
any(Instant.class),
378+
any(Instant.class),
379+
any()
380+
)).thenThrow(new RuntimeException("Unexpected error"));
381+
382+
// Execute and verify exception
383+
SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class,
384+
() -> client.executePartition(state, buffer, acknowledgementSet));
385+
386+
// Verify:
387+
assertEquals("Failed to process partition", exception.getMessage());
388+
assertFalse(exception.isRetryable());
389+
assertTrue(exception.getCause() instanceof RuntimeException);
390+
verify(mockRequestErrorsCounter).increment(); // Verify counter increment
391+
}
343392
}

0 commit comments

Comments
 (0)