1414import com .fasterxml .jackson .databind .JsonNode ;
1515import com .fasterxml .jackson .databind .ObjectMapper ;
1616import com .google .common .annotations .VisibleForTesting ;
17- import io .micrometer .core .instrument .Counter ;
18- import io .micrometer .core .instrument .Timer ;
1917import lombok .extern .slf4j .Slf4j ;
2018import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSet ;
2119import org .opensearch .dataprepper .model .buffer .Buffer ;
2220import org .opensearch .dataprepper .model .event .Event ;
2321import org .opensearch .dataprepper .model .event .EventType ;
2422import org .opensearch .dataprepper .model .event .JacksonEvent ;
25- import org .opensearch .dataprepper .metrics .PluginMetrics ;
2623import org .opensearch .dataprepper .model .record .Record ;
2724import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
2825import org .opensearch .dataprepper .plugins .source .source_crawler .base .CrawlerClient ;
2926import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .DimensionalTimeSliceWorkerProgressState ;
3027import org .opensearch .dataprepper .plugins .source .source_crawler .model .ItemInfo ;
28+ import org .opensearch .dataprepper .plugins .source .source_crawler .metrics .VendorAPIMetricsRecorder ;
3129import org .opensearch .dataprepper .plugins .source .microsoft_office365 .service .Office365Service ;
3230import org .opensearch .dataprepper .plugins .source .microsoft_office365 .models .AuditLogsResponse ;
3331
4240import java .util .concurrent .TimeoutException ;
4341
4442import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
45- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .REQUEST_ERRORS ;
4643
4744/**
4845 * Implementation of CrawlerClient for Office 365 audit logs.
5249@ Named
5350public class Office365CrawlerClient implements CrawlerClient <DimensionalTimeSliceWorkerProgressState > {
5451
55- public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors" ;
56- public static final String RETRYABLE_ERRORS = "retryableErrors" ;
57-
58- private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency" ;
59- private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts" ;
60- private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess" ;
61- private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess" ;
62- private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts" ;
63- private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures" ;
6452 private static final int BUFFER_TIMEOUT_IN_SECONDS = 10 ;
6553 private static final String CONTENT_ID = "contentId" ;
6654 private static final String CONTENT_URI = "contentUri" ;
6755
6856 private final Office365Service service ;
6957 private final Office365SourceConfig configuration ;
70- private final Timer bufferWriteLatencyTimer ;
71- private final Counter bufferWriteAttemptsCounter ;
72- private final Counter bufferWriteSuccessCounter ;
73- private final Counter bufferWriteRetrySuccessCounter ;
74- private final Counter bufferWriteRetryAttemptsCounter ;
75- private final Counter bufferWriteFailuresCounter ;
76- private final Counter requestErrorsCounter ;
77- private final Counter nonRetryableErrorsCounter ;
78- private final Counter retryableErrorsCounter ;
58+ private final VendorAPIMetricsRecorder metricsRecorder ;
7959 private ObjectMapper objectMapper ;
8060
8161 public Office365CrawlerClient (final Office365Service service ,
8262 final Office365SourceConfig sourceConfig ,
83- final PluginMetrics pluginMetrics ) {
63+ final VendorAPIMetricsRecorder metricsRecorder ) {
8464 this .service = service ;
8565 this .configuration = sourceConfig ;
66+ this .metricsRecorder = metricsRecorder ;
8667 this .objectMapper = new ObjectMapper ();
87-
88- // Initialize metrics
89- this .bufferWriteLatencyTimer = pluginMetrics .timer (BUFFER_WRITE_LATENCY );
90- this .bufferWriteAttemptsCounter = pluginMetrics .counter (BUFFER_WRITE_ATTEMPTS );
91- this .bufferWriteSuccessCounter = pluginMetrics .counter (BUFFER_WRITE_SUCCESS );
92- this .bufferWriteRetrySuccessCounter = pluginMetrics .counter (BUFFER_WRITE_RETRY_SUCCESS );
93- this .bufferWriteRetryAttemptsCounter = pluginMetrics .counter (BUFFER_WRITE_RETRY_ATTEMPTS );
94- this .bufferWriteFailuresCounter = pluginMetrics .counter (BUFFER_WRITE_FAILURES );
95- this .requestErrorsCounter = pluginMetrics .counter (REQUEST_ERRORS );
96- this .nonRetryableErrorsCounter = pluginMetrics .counter (NON_RETRYABLE_ERRORS );
97- this .retryableErrorsCounter = pluginMetrics .counter (RETRYABLE_ERRORS );
9868 }
9969
10070 @ VisibleForTesting
@@ -141,13 +111,8 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
141111 }
142112
143113 // Write Records to the buffer after processing a page of data
144- bufferWriteLatencyTimer .record (() -> {
145- try {
146- writeRecordsWithRetry (records , buffer , acknowledgementSet );
147- } catch (Exception e ) {
148- bufferWriteFailuresCounter .increment ();
149- throw e ;
150- }
114+ metricsRecorder .recordBufferWriteLatency (() -> {
115+ writeRecordsWithRetry (records , buffer , acknowledgementSet );
151116 });
152117
153118 nextPageUri = response .getNextPageUri ();
@@ -159,18 +124,18 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
159124 } catch (Exception e ) {
160125 log .error (NOISY , "Failed to process partition for log type {} from {} to {}" ,
161126 logType , startTime , endTime , e );
162- requestErrorsCounter . increment ( );
127+ metricsRecorder . recordError ( e );
163128 if (e instanceof SaaSCrawlerException ) {
164129 SaaSCrawlerException saasException = (SaaSCrawlerException ) e ;
165130 if (saasException .isRetryable ()) {
166- retryableErrorsCounter . increment ();
131+ metricsRecorder . recordRetryableError ();
167132 } else {
168- nonRetryableErrorsCounter . increment ();
133+ metricsRecorder . recordNonRetryableError ();
169134 }
170135 throw e ;
171136 }
172137 // any other exceptions = non-retryable
173- nonRetryableErrorsCounter . increment ();
138+ metricsRecorder . recordNonRetryableError ();
174139 throw new SaaSCrawlerException ("Failed to process partition" , e , false );
175140 }
176141 }
@@ -219,7 +184,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws SaaSC
219184 private void writeRecordsWithRetry (final List <Record <Event >> records ,
220185 final Buffer <Record <Event >> buffer ,
221186 final AcknowledgementSet acknowledgementSet ) {
222- bufferWriteAttemptsCounter . increment ();
187+ metricsRecorder . recordBufferWriteAttempt ();
223188 int retryCount = 0 ;
224189 int currentBackoff = 1000 ; // Start with 1 second
225190 final int maxBackoff = 30000 ; // Max 30 seconds
@@ -235,21 +200,21 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
235200 }
236201
237202 if (retryCount > 0 ) {
238- bufferWriteRetrySuccessCounter . increment ();
203+ metricsRecorder . recordBufferWriteRetrySuccess ();
239204 } else {
240- bufferWriteSuccessCounter . increment ();
205+ metricsRecorder . recordBufferWriteSuccess ();
241206 }
242207 return ;
243208
244209 } catch (TimeoutException e ) {
245210 retryCount ++;
246211 if (retryCount >= maxRetries ) {
247- bufferWriteFailuresCounter . increment ();
212+ metricsRecorder . recordBufferWriteFailure ();
248213 // allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
249214 throw new SaaSCrawlerException ("Failed to write to buffer after " + maxRetries + " attempts" , e , true );
250215 }
251216
252- bufferWriteRetryAttemptsCounter . increment ();
217+ metricsRecorder . recordBufferWriteRetryAttempt ();
253218 currentBackoff = Math .min ((int )(currentBackoff * 2.0 ), maxBackoff );
254219 log .info ("Buffer full, backing off for {} ms before retry" , currentBackoff );
255220
@@ -260,7 +225,7 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
260225 throw new SaaSCrawlerException ("Buffer write retry interrupted" , ie , true );
261226 }
262227 } catch (Exception e ) {
263- bufferWriteFailuresCounter . increment ();
228+ metricsRecorder . recordBufferWriteFailure ();
264229 throw new SaaSCrawlerException ("Error writing to buffer" , e , true );
265230 }
266231 }
0 commit comments