Skip to content

Commit 93df770

Browse files
committed
Metric Centralization through Dependency Injection
Signed-off-by: Alexander Christensen <alchrisk@amazon.com>
1 parent 530cb13 commit 93df770

9 files changed

Lines changed: 1133 additions & 398 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 73 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

12-
import io.micrometer.core.instrument.Counter;
13-
import io.micrometer.core.instrument.Timer;
1412
import lombok.extern.slf4j.Slf4j;
15-
import org.opensearch.dataprepper.metrics.PluginMetrics;
1613
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
1714
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1815
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
1917
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryHandler;
2018
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultRetryStrategy;
2119
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler;
@@ -33,18 +31,9 @@
3331
import java.time.Instant;
3432
import java.util.List;
3533
import java.util.Map;
36-
import java.util.Optional;
3734

3835
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
3936
import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES;
40-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.getErrorTypeMetricCounterMap;
41-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishErrorTypeMetricCounter;
42-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishGetResponseSizeMetricInBytes;
43-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishGetRequestsSuccessMetric;
44-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.provideGetRequestsFailureCounter;
45-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishSearchResponseSizeMetricInBytes;
46-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishSearchRequestsSuccessMetric;
47-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.provideSearchRequestFailureCounter;
4837

4938
/**
5039
* REST client for interacting with Office 365 Management API.
@@ -53,34 +42,17 @@
5342
@Slf4j
5443
@Named
5544
public class Office365RestClient {
56-
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
57-
private static final String API_CALLS = "apiCalls";
58-
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
59-
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
60-
6145
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
6246

6347
private final RestTemplate restTemplate = new RestTemplate();
6448
private final RetryHandler retryHandler;
6549
private final Office365AuthenticationInterface authConfig;
66-
private final Timer auditLogFetchLatencyTimer;
67-
private final Timer searchCallLatencyTimer;
68-
private final Counter auditLogsRequestedCounter;
69-
private final Counter apiCallsCounter;
70-
private final PluginMetrics pluginMetrics;
71-
72-
private Map<String, Counter> errorTypeMetricCounterMap;
50+
private final VendorAPIMetricsRecorder metricsRecorder;
7351

7452
public Office365RestClient(final Office365AuthenticationInterface authConfig,
75-
final PluginMetrics pluginMetrics) {
76-
// TODO: Abstract into a Office365PluginMetrics
53+
final VendorAPIMetricsRecorder metricsRecorder) {
7754
this.authConfig = authConfig;
78-
this.pluginMetrics = pluginMetrics;
79-
this.auditLogFetchLatencyTimer = pluginMetrics.timer(AUDIT_LOG_FETCH_LATENCY);
80-
this.searchCallLatencyTimer = pluginMetrics.timer(SEARCH_CALL_LATENCY);
81-
this.auditLogsRequestedCounter = pluginMetrics.counter(AUDIT_LOGS_REQUESTED);
82-
this.apiCallsCounter = pluginMetrics.counter(API_CALLS);
83-
this.errorTypeMetricCounterMap = getErrorTypeMetricCounterMap(pluginMetrics);
55+
this.metricsRecorder = metricsRecorder;
8456
this.retryHandler = new RetryHandler(
8557
new DefaultRetryStrategy(),
8658
new DefaultStatusCodeHandler());
@@ -91,60 +63,63 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
9163
*/
9264
public void startSubscriptions() {
9365
log.info("Starting Office 365 subscriptions for audit logs");
94-
try {
95-
HttpHeaders headers = new HttpHeaders();
96-
97-
headers.setContentType(MediaType.APPLICATION_JSON);
98-
99-
// TODO: Only start the subscriptions only if the call commented
100-
// out below doesn't return all the audit log types
101-
// Check current subscriptions
102-
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
103-
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
66+
67+
metricsRecorder.recordAuthLatency(() -> {
68+
try {
69+
HttpHeaders headers = new HttpHeaders();
70+
headers.setContentType(MediaType.APPLICATION_JSON);
71+
72+
// TODO: Only start the subscriptions only if the call commented
73+
// out below doesn't return all the audit log types
74+
// Check current subscriptions
75+
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
76+
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
10477
//
105-
// ResponseEntity<String> listResponse = restTemplate.exchange(
106-
// listUrl,
107-
// HttpMethod.GET,
108-
// new HttpEntity<>(headers),
109-
// String.class
110-
// );
111-
// log.debug("Current subscriptions: {}", listResponse.getBody());
112-
113-
// Start subscriptions for each content type
114-
headers.setContentLength(0);
115-
116-
for (String contentType : CONTENT_TYPES) {
117-
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
118-
String url = String.format(SUBSCRIPTION_START_URL,
119-
authConfig.getTenantId(),
120-
contentType);
121-
122-
retryHandler.executeWithRetry(() -> {
123-
try {
124-
headers.setBearerAuth(authConfig.getAccessToken());
125-
apiCallsCounter.increment();
126-
ResponseEntity<String> response = restTemplate.exchange(
127-
url,
128-
HttpMethod.POST,
129-
new HttpEntity<>(headers),
130-
String.class
131-
);
132-
log.debug("Started subscription for {}: {}", contentType, response.getBody());
133-
return response;
134-
} catch (HttpClientErrorException | HttpServerErrorException e) {
135-
if (e.getResponseBodyAsString().contains("AF20024")) {
136-
log.debug("Subscription for {} is already enabled", contentType);
137-
return null;
78+
// ResponseEntity<String> listResponse = restTemplate.exchange(
79+
// listUrl,
80+
// HttpMethod.GET,
81+
// new HttpEntity<>(headers),
82+
// String.class
83+
// );
84+
// log.debug("Current subscriptions: {}", listResponse.getBody());
85+
86+
// Start subscriptions for each content type
87+
headers.setContentLength(0);
88+
89+
for (String contentType : CONTENT_TYPES) {
90+
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
91+
String url = String.format(SUBSCRIPTION_START_URL,
92+
authConfig.getTenantId(),
93+
contentType);
94+
95+
retryHandler.executeWithRetry(() -> {
96+
try {
97+
headers.setBearerAuth(authConfig.getAccessToken());
98+
ResponseEntity<String> response = restTemplate.exchange(
99+
url,
100+
HttpMethod.POST,
101+
new HttpEntity<>(headers),
102+
String.class
103+
);
104+
log.debug("Started subscription for {}: {}", contentType, response.getBody());
105+
metricsRecorder.recordAuthSuccess();
106+
return response;
107+
} catch (HttpClientErrorException | HttpServerErrorException e) {
108+
if (e.getResponseBodyAsString().contains("AF20024")) {
109+
log.debug("Subscription for {} is already enabled", contentType);
110+
metricsRecorder.recordAuthSuccess(); // AF20024 is a successful case
111+
return null;
112+
}
113+
throw e;
138114
}
139-
throw e;
140-
}
141-
}, authConfig::renewCredentials);
115+
}, authConfig::renewCredentials, metricsRecorder::recordAuthFailure);
116+
}
117+
} catch (Exception e) {
118+
metricsRecorder.recordError(e);
119+
log.error(NOISY, "Failed to initialize subscriptions", e);
120+
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
142121
}
143-
} catch (Exception e) {
144-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
145-
log.error(NOISY, "Failed to initialize subscriptions", e);
146-
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
147-
}
122+
});
148123
}
149124

150125
/**
@@ -174,12 +149,12 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
174149
log.debug("Searching audit logs with URL: {}", url);
175150
final HttpHeaders headers = new HttpHeaders();
176151

177-
return searchCallLatencyTimer.record(() -> {
152+
return metricsRecorder.recordSearchLatency(() -> {
178153
try {
179154
return retryHandler.executeWithRetry(
180155
() -> {
181156
headers.setBearerAuth(authConfig.getAccessToken());
182-
apiCallsCounter.increment();
157+
metricsRecorder.recordDataApiRequest();
183158

184159
ResponseEntity<List<Map<String, Object>>> response = restTemplate.exchange(
185160
url,
@@ -205,9 +180,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
205180
}
206181
}
207182

208-
// Publish centralized search metrics
209-
publishSearchResponseSizeMetricInBytes(pluginMetrics, response);
210-
publishSearchRequestsSuccessMetric(pluginMetrics);
183+
metricsRecorder.recordSearchResponseSize(response);
184+
metricsRecorder.recordSearchSuccess();
211185

212186
// Extract NextPageUri from response headers
213187
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
@@ -221,10 +195,11 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
221195
return new AuditLogsResponse(response.getBody(), nextPageUri);
222196
},
223197
authConfig::renewCredentials,
224-
Optional.of(provideSearchRequestFailureCounter(pluginMetrics))
198+
metricsRecorder::recordSearchFailure
225199
);
226200
} catch (Exception e) {
227-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
201+
metricsRecorder.recordError(e);
202+
metricsRecorder.recordSearchFailure();
228203
log.error(NOISY, "Error while fetching audit logs for content type {} from URL: {}",
229204
contentType, url, e);
230205
throw new SaaSCrawlerException("Failed to fetch audit logs", e, true);
@@ -245,14 +220,14 @@ public String getAuditLog(String contentUri) {
245220
}
246221

247222
log.debug("Getting audit log from content URI: {}", contentUri);
248-
auditLogsRequestedCounter.increment();
223+
metricsRecorder.recordLogsRequested();
249224
final HttpHeaders headers = new HttpHeaders();
250225

251-
return auditLogFetchLatencyTimer.record(() -> {
226+
return metricsRecorder.recordGetLatency(() -> {
252227
try {
253228
String response = retryHandler.executeWithRetry(() -> {
254229
headers.setBearerAuth(authConfig.getAccessToken());
255-
apiCallsCounter.increment();
230+
metricsRecorder.recordDataApiRequest();
256231
ResponseEntity<String> responseEntity = restTemplate.exchange(
257232
contentUri,
258233
HttpMethod.GET,
@@ -261,7 +236,7 @@ public String getAuditLog(String contentUri) {
261236
);
262237

263238
return responseEntity.getBody();
264-
}, authConfig::renewCredentials, Optional.of(provideGetRequestsFailureCounter(pluginMetrics)));
239+
}, authConfig::renewCredentials, metricsRecorder::recordGetFailure);
265240

266241
// Log response details
267242
if (response == null) {
@@ -278,13 +253,13 @@ public String getAuditLog(String contentUri) {
278253
}
279254
}
280255

281-
// Publish centralized GET request metrics
282-
publishGetResponseSizeMetricInBytes(pluginMetrics, response);
283-
publishGetRequestsSuccessMetric(pluginMetrics);
256+
metricsRecorder.recordGetResponseSize(response);
257+
metricsRecorder.recordGetSuccess();
284258

285259
return response;
286260
} catch (Exception e) {
287-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
261+
metricsRecorder.recordError(e);
262+
metricsRecorder.recordGetFailure();
288263
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
289264
throw new SaaSCrawlerException("Failed to fetch audit log", e, true);
290265
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration;
11+
12+
import org.opensearch.dataprepper.metrics.PluginMetrics;
13+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
14+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.context.annotation.Configuration;
18+
19+
/**
20+
* Spring configuration for Microsoft Office 365 RestClient.
21+
*
22+
* This configuration class creates the Office365RestClient with the required dependencies:
23+
* 1. Office365AuthenticationInterface for authentication
24+
* 2. PluginMetrics for unified metrics recording
25+
*
26+
* The Office365RestClient internally creates VendorAPIMetricsRecorder instances
27+
* for different operation types (GET, SEARCH, AUTH) from the provided PluginMetrics.
28+
*/
29+
@Configuration
30+
public class Office365RestClientConfiguration {
31+
32+
/**
33+
* Creates VendorAPIMetricsRecorder with unified metrics for all operations.
34+
*
35+
* @param pluginMetrics The system plugin metrics instance
36+
* @return Configured VendorAPIMetricsRecorder
37+
*/
38+
@Bean
39+
public VendorAPIMetricsRecorder vendorAPIMetricsRecorder(PluginMetrics pluginMetrics) {
40+
return new VendorAPIMetricsRecorder(pluginMetrics);
41+
}
42+
43+
/**
44+
* Creates Office365RestClient with unified metrics recorder.
45+
*
46+
* @param authConfig The Office 365 authentication provider
47+
* @param vendorAPIMetricsRecorder The unified metrics recorder
48+
* @return Configured Office365RestClient
49+
*/
50+
@Bean
51+
public Office365RestClient office365RestClient(
52+
Office365AuthenticationInterface authConfig,
53+
VendorAPIMetricsRecorder vendorAPIMetricsRecorder) {
54+
return new Office365RestClient(authConfig, vendorAPIMetricsRecorder);
55+
}
56+
}

0 commit comments

Comments
 (0)