1010package org .opensearch .dataprepper .plugins .source .microsoft_office365 ;
1111
1212import io .micrometer .core .instrument .Counter ;
13- import io .micrometer .core .instrument .Timer ;
1413import lombok .extern .slf4j .Slf4j ;
1514import org .opensearch .dataprepper .metrics .PluginMetrics ;
1615import org .opensearch .dataprepper .plugins .source .microsoft_office365 .auth .Office365AuthenticationInterface ;
1716import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
1817import org .opensearch .dataprepper .plugins .source .microsoft_office365 .models .AuditLogsResponse ;
18+ import org .opensearch .dataprepper .plugins .source .source_crawler .metrics .VendorAPIMetricsRecorder ;
1919import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .RetryHandler ;
2020import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .DefaultRetryStrategy ;
2121import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .DefaultStatusCodeHandler ;
3333import java .time .Instant ;
3434import java .util .List ;
3535import java .util .Map ;
36- import java .util .Optional ;
3736
3837import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
3938import static org .opensearch .dataprepper .plugins .source .microsoft_office365 .utils .Constants .CONTENT_TYPES ;
40- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .getErrorTypeMetricCounterMap ;
41- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .publishErrorTypeMetricCounter ;
42- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .publishGetResponseSizeMetricInBytes ;
43- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .publishGetRequestsSuccessMetric ;
44- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .provideGetRequestsFailureCounter ;
45- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .publishSearchResponseSizeMetricInBytes ;
46- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .publishSearchRequestsSuccessMetric ;
47- import static org .opensearch .dataprepper .plugins .source .source_crawler .utils .MetricsHelper .provideSearchRequestFailureCounter ;
4839
4940/**
5041 * REST client for interacting with Office 365 Management API.
5344@ Slf4j
5445@ Named
5546public class Office365RestClient {
56- private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency" ;
57- private static final String API_CALLS = "apiCalls" ;
58- private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested" ;
59- private static final String SEARCH_CALL_LATENCY = "searchCallLatency" ;
60-
6147 private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/" ;
48+ private static final String API_CALLS = "apiCalls" ;
6249
6350 private final RestTemplate restTemplate = new RestTemplate ();
6451 private final RetryHandler retryHandler ;
6552 private final Office365AuthenticationInterface authConfig ;
66- private final Timer auditLogFetchLatencyTimer ;
67- private final Timer searchCallLatencyTimer ;
68- private final Counter auditLogsRequestedCounter ;
53+ private final VendorAPIMetricsRecorder metricsRecorder ;
6954 private final Counter apiCallsCounter ;
70- private final PluginMetrics pluginMetrics ;
71-
72- private Map <String , Counter > errorTypeMetricCounterMap ;
7355
7456 public Office365RestClient (final Office365AuthenticationInterface authConfig ,
75- final PluginMetrics pluginMetrics ) {
76- // TODO: Abstract into a Office365PluginMetrics
57+ final PluginMetrics pluginMetrics ,
58+ final VendorAPIMetricsRecorder metricsRecorder ) {
7759 this .authConfig = authConfig ;
78- this .pluginMetrics = pluginMetrics ;
79- this .auditLogFetchLatencyTimer = pluginMetrics .timer (AUDIT_LOG_FETCH_LATENCY );
80- this .searchCallLatencyTimer = pluginMetrics .timer (SEARCH_CALL_LATENCY );
81- this .auditLogsRequestedCounter = pluginMetrics .counter (AUDIT_LOGS_REQUESTED );
60+ this .metricsRecorder = metricsRecorder ;
8261 this .apiCallsCounter = pluginMetrics .counter (API_CALLS );
83- this .errorTypeMetricCounterMap = getErrorTypeMetricCounterMap (pluginMetrics );
8462 this .retryHandler = new RetryHandler (
8563 new DefaultRetryStrategy (),
8664 new DefaultStatusCodeHandler ());
@@ -93,7 +71,6 @@ public void startSubscriptions() {
9371 log .info ("Starting Office 365 subscriptions for audit logs" );
9472 try {
9573 HttpHeaders headers = new HttpHeaders ();
96-
9774 headers .setContentType (MediaType .APPLICATION_JSON );
9875
9976 // TODO: Only start the subscriptions only if the call commented
@@ -141,7 +118,7 @@ public void startSubscriptions() {
141118 }, authConfig ::renewCredentials );
142119 }
143120 } catch (Exception e ) {
144- publishErrorTypeMetricCounter ( e , this . errorTypeMetricCounterMap );
121+ metricsRecorder . recordError ( e );
145122 log .error (NOISY , "Failed to initialize subscriptions" , e );
146123 throw new SaaSCrawlerException ("Failed to initialize subscriptions: " + e .getMessage (), e , true );
147124 }
@@ -174,12 +151,12 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
174151 log .debug ("Searching audit logs with URL: {}" , url );
175152 final HttpHeaders headers = new HttpHeaders ();
176153
177- return searchCallLatencyTimer . record (() -> {
154+ return metricsRecorder . recordSearchLatency (() -> {
178155 try {
179156 return retryHandler .executeWithRetry (
180157 () -> {
181158 headers .setBearerAuth (authConfig .getAccessToken ());
182- apiCallsCounter . increment ();
159+ metricsRecorder . recordDataApiRequest ();
183160
184161 ResponseEntity <List <Map <String , Object >>> response = restTemplate .exchange (
185162 url ,
@@ -205,9 +182,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
205182 }
206183 }
207184
208- // Publish centralized search metrics
209- publishSearchResponseSizeMetricInBytes (pluginMetrics , response );
210- publishSearchRequestsSuccessMetric (pluginMetrics );
185+ metricsRecorder .recordSearchResponseSize (response );
186+ metricsRecorder .recordSearchSuccess ();
211187
212188 // Extract NextPageUri from response headers
213189 List <String > nextPageHeaders = response .getHeaders ().get ("NextPageUri" );
@@ -221,10 +197,11 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
221197 return new AuditLogsResponse (response .getBody (), nextPageUri );
222198 },
223199 authConfig ::renewCredentials ,
224- Optional . of ( provideSearchRequestFailureCounter ( pluginMetrics ))
200+ metricsRecorder :: recordSearchFailure
225201 );
226202 } catch (Exception e ) {
227- publishErrorTypeMetricCounter (e , this .errorTypeMetricCounterMap );
203+ metricsRecorder .recordError (e );
204+ metricsRecorder .recordSearchFailure ();
228205 log .error (NOISY , "Error while fetching audit logs for content type {} from URL: {}" ,
229206 contentType , url , e );
230207 throw new SaaSCrawlerException ("Failed to fetch audit logs" , e , true );
@@ -245,14 +222,14 @@ public String getAuditLog(String contentUri) {
245222 }
246223
247224 log .debug ("Getting audit log from content URI: {}" , contentUri );
248- auditLogsRequestedCounter . increment ();
225+ metricsRecorder . recordLogsRequested ();
249226 final HttpHeaders headers = new HttpHeaders ();
250227
251- return auditLogFetchLatencyTimer . record (() -> {
228+ return metricsRecorder . recordGetLatency (() -> {
252229 try {
253230 String response = retryHandler .executeWithRetry (() -> {
254231 headers .setBearerAuth (authConfig .getAccessToken ());
255- apiCallsCounter . increment ();
232+ metricsRecorder . recordDataApiRequest ();
256233 ResponseEntity <String > responseEntity = restTemplate .exchange (
257234 contentUri ,
258235 HttpMethod .GET ,
@@ -261,7 +238,7 @@ public String getAuditLog(String contentUri) {
261238 );
262239
263240 return responseEntity .getBody ();
264- }, authConfig ::renewCredentials , Optional . of ( provideGetRequestsFailureCounter ( pluginMetrics )) );
241+ }, authConfig ::renewCredentials , metricsRecorder :: recordGetFailure );
265242
266243 // Log response details
267244 if (response == null ) {
@@ -278,13 +255,13 @@ public String getAuditLog(String contentUri) {
278255 }
279256 }
280257
281- // Publish centralized GET request metrics
282- publishGetResponseSizeMetricInBytes (pluginMetrics , response );
283- publishGetRequestsSuccessMetric (pluginMetrics );
258+ metricsRecorder .recordGetResponseSize (response );
259+ metricsRecorder .recordGetSuccess ();
284260
285261 return response ;
286262 } catch (Exception e ) {
287- publishErrorTypeMetricCounter (e , this .errorTypeMetricCounterMap );
263+ metricsRecorder .recordError (e );
264+ metricsRecorder .recordGetFailure ();
288265 log .error (NOISY , "Error while fetching audit log content from URI: {}" , contentUri , e );
289266 throw new SaaSCrawlerException ("Failed to fetch audit log" , e , true );
290267 }
0 commit comments