Skip to content

Commit e714a37

Browse files
committed
Metric Centralization through Dependency Injection and Fix Unstable Unit Test
Signed-off-by: Alexander Christensen <alchrisk@amazon.com>
1 parent 530cb13 commit e714a37

9 files changed

Lines changed: 1017 additions & 398 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 74 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

12-
import io.micrometer.core.instrument.Counter;
13-
import io.micrometer.core.instrument.Timer;
1412
import lombok.extern.slf4j.Slf4j;
15-
import org.opensearch.dataprepper.metrics.PluginMetrics;
1613
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
1714
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1815
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
1917
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryHandler;
2018
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultRetryStrategy;
2119
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler;
@@ -33,18 +31,9 @@
3331
import java.time.Instant;
3432
import java.util.List;
3533
import java.util.Map;
36-
import java.util.Optional;
3734

3835
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
3936
import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES;
40-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.getErrorTypeMetricCounterMap;
41-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishErrorTypeMetricCounter;
42-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishGetResponseSizeMetricInBytes;
43-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishGetRequestsSuccessMetric;
44-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.provideGetRequestsFailureCounter;
45-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishSearchResponseSizeMetricInBytes;
46-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.publishSearchRequestsSuccessMetric;
47-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.provideSearchRequestFailureCounter;
4837

4938
/**
5039
* REST client for interacting with Office 365 Management API.
@@ -53,34 +42,17 @@
5342
@Slf4j
5443
@Named
5544
public class Office365RestClient {
56-
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
57-
private static final String API_CALLS = "apiCalls";
58-
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
59-
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
60-
6145
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
6246

6347
private final RestTemplate restTemplate = new RestTemplate();
6448
private final RetryHandler retryHandler;
6549
private final Office365AuthenticationInterface authConfig;
66-
private final Timer auditLogFetchLatencyTimer;
67-
private final Timer searchCallLatencyTimer;
68-
private final Counter auditLogsRequestedCounter;
69-
private final Counter apiCallsCounter;
70-
private final PluginMetrics pluginMetrics;
71-
72-
private Map<String, Counter> errorTypeMetricCounterMap;
50+
private final VendorAPIMetricsRecorder metricsRecorder;
7351

7452
public Office365RestClient(final Office365AuthenticationInterface authConfig,
75-
final PluginMetrics pluginMetrics) {
76-
// TODO: Abstract into a Office365PluginMetrics
53+
final VendorAPIMetricsRecorder metricsRecorder) {
7754
this.authConfig = authConfig;
78-
this.pluginMetrics = pluginMetrics;
79-
this.auditLogFetchLatencyTimer = pluginMetrics.timer(AUDIT_LOG_FETCH_LATENCY);
80-
this.searchCallLatencyTimer = pluginMetrics.timer(SEARCH_CALL_LATENCY);
81-
this.auditLogsRequestedCounter = pluginMetrics.counter(AUDIT_LOGS_REQUESTED);
82-
this.apiCallsCounter = pluginMetrics.counter(API_CALLS);
83-
this.errorTypeMetricCounterMap = getErrorTypeMetricCounterMap(pluginMetrics);
55+
this.metricsRecorder = metricsRecorder;
8456
this.retryHandler = new RetryHandler(
8557
new DefaultRetryStrategy(),
8658
new DefaultStatusCodeHandler());
@@ -91,60 +63,64 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
9163
*/
9264
public void startSubscriptions() {
9365
log.info("Starting Office 365 subscriptions for audit logs");
94-
try {
95-
HttpHeaders headers = new HttpHeaders();
96-
97-
headers.setContentType(MediaType.APPLICATION_JSON);
98-
99-
// TODO: Only start the subscriptions only if the call commented
100-
// out below doesn't return all the audit log types
101-
// Check current subscriptions
102-
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
103-
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
66+
67+
metricsRecorder.recordAuthLatency(() -> {
68+
try {
69+
HttpHeaders headers = new HttpHeaders();
70+
headers.setContentType(MediaType.APPLICATION_JSON);
71+
72+
// TODO: Only start the subscriptions only if the call commented
73+
// out below doesn't return all the audit log types
74+
// Check current subscriptions
75+
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
76+
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
10477
//
105-
// ResponseEntity<String> listResponse = restTemplate.exchange(
106-
// listUrl,
107-
// HttpMethod.GET,
108-
// new HttpEntity<>(headers),
109-
// String.class
110-
// );
111-
// log.debug("Current subscriptions: {}", listResponse.getBody());
112-
113-
// Start subscriptions for each content type
114-
headers.setContentLength(0);
115-
116-
for (String contentType : CONTENT_TYPES) {
117-
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
118-
String url = String.format(SUBSCRIPTION_START_URL,
119-
authConfig.getTenantId(),
120-
contentType);
121-
122-
retryHandler.executeWithRetry(() -> {
123-
try {
124-
headers.setBearerAuth(authConfig.getAccessToken());
125-
apiCallsCounter.increment();
126-
ResponseEntity<String> response = restTemplate.exchange(
127-
url,
128-
HttpMethod.POST,
129-
new HttpEntity<>(headers),
130-
String.class
131-
);
132-
log.debug("Started subscription for {}: {}", contentType, response.getBody());
133-
return response;
134-
} catch (HttpClientErrorException | HttpServerErrorException e) {
135-
if (e.getResponseBodyAsString().contains("AF20024")) {
136-
log.debug("Subscription for {} is already enabled", contentType);
137-
return null;
78+
// ResponseEntity<String> listResponse = restTemplate.exchange(
79+
// listUrl,
80+
// HttpMethod.GET,
81+
// new HttpEntity<>(headers),
82+
// String.class
83+
// );
84+
// log.debug("Current subscriptions: {}", listResponse.getBody());
85+
86+
// Start subscriptions for each content type
87+
headers.setContentLength(0);
88+
89+
for (String contentType : CONTENT_TYPES) {
90+
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
91+
String url = String.format(SUBSCRIPTION_START_URL,
92+
authConfig.getTenantId(),
93+
contentType);
94+
95+
retryHandler.executeWithRetry(() -> {
96+
try {
97+
headers.setBearerAuth(authConfig.getAccessToken());
98+
ResponseEntity<String> response = restTemplate.exchange(
99+
url,
100+
HttpMethod.POST,
101+
new HttpEntity<>(headers),
102+
String.class
103+
);
104+
log.debug("Started subscription for {}: {}", contentType, response.getBody());
105+
metricsRecorder.recordAuthSuccess();
106+
return response;
107+
} catch (HttpClientErrorException | HttpServerErrorException e) {
108+
if (e.getResponseBodyAsString().contains("AF20024")) {
109+
log.debug("Subscription for {} is already enabled", contentType);
110+
metricsRecorder.recordAuthSuccess(); // AF20024 is a successful case
111+
return null;
112+
}
113+
throw e;
138114
}
139-
throw e;
140-
}
141-
}, authConfig::renewCredentials);
115+
}, authConfig::renewCredentials, metricsRecorder::recordAuthFailure);
116+
}
117+
return null; // Void return for Supplier
118+
} catch (Exception e) {
119+
metricsRecorder.error(e);
120+
log.error(NOISY, "Failed to initialize subscriptions", e);
121+
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
142122
}
143-
} catch (Exception e) {
144-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
145-
log.error(NOISY, "Failed to initialize subscriptions", e);
146-
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
147-
}
123+
});
148124
}
149125

150126
/**
@@ -174,12 +150,12 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
174150
log.debug("Searching audit logs with URL: {}", url);
175151
final HttpHeaders headers = new HttpHeaders();
176152

177-
return searchCallLatencyTimer.record(() -> {
153+
return metricsRecorder.recordSearchLatency(() -> {
178154
try {
179155
return retryHandler.executeWithRetry(
180156
() -> {
181157
headers.setBearerAuth(authConfig.getAccessToken());
182-
apiCallsCounter.increment();
158+
metricsRecorder.recordDataApiRequest();
183159

184160
ResponseEntity<List<Map<String, Object>>> response = restTemplate.exchange(
185161
url,
@@ -205,9 +181,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
205181
}
206182
}
207183

208-
// Publish centralized search metrics
209-
publishSearchResponseSizeMetricInBytes(pluginMetrics, response);
210-
publishSearchRequestsSuccessMetric(pluginMetrics);
184+
metricsRecorder.recordSearchResponseSize(response);
185+
metricsRecorder.recordSearchSuccess();
211186

212187
// Extract NextPageUri from response headers
213188
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
@@ -221,10 +196,11 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
221196
return new AuditLogsResponse(response.getBody(), nextPageUri);
222197
},
223198
authConfig::renewCredentials,
224-
Optional.of(provideSearchRequestFailureCounter(pluginMetrics))
199+
metricsRecorder::recordSearchFailure
225200
);
226201
} catch (Exception e) {
227-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
202+
metricsRecorder.error(e);
203+
metricsRecorder.recordSearchFailure();
228204
log.error(NOISY, "Error while fetching audit logs for content type {} from URL: {}",
229205
contentType, url, e);
230206
throw new SaaSCrawlerException("Failed to fetch audit logs", e, true);
@@ -245,14 +221,14 @@ public String getAuditLog(String contentUri) {
245221
}
246222

247223
log.debug("Getting audit log from content URI: {}", contentUri);
248-
auditLogsRequestedCounter.increment();
224+
metricsRecorder.recordLogsRequested();
249225
final HttpHeaders headers = new HttpHeaders();
250226

251-
return auditLogFetchLatencyTimer.record(() -> {
227+
return metricsRecorder.recordGetLatency(() -> {
252228
try {
253229
String response = retryHandler.executeWithRetry(() -> {
254230
headers.setBearerAuth(authConfig.getAccessToken());
255-
apiCallsCounter.increment();
231+
metricsRecorder.recordDataApiRequest();
256232
ResponseEntity<String> responseEntity = restTemplate.exchange(
257233
contentUri,
258234
HttpMethod.GET,
@@ -261,7 +237,7 @@ public String getAuditLog(String contentUri) {
261237
);
262238

263239
return responseEntity.getBody();
264-
}, authConfig::renewCredentials, Optional.of(provideGetRequestsFailureCounter(pluginMetrics)));
240+
}, authConfig::renewCredentials, metricsRecorder::recordGetFailure);
265241

266242
// Log response details
267243
if (response == null) {
@@ -278,13 +254,13 @@ public String getAuditLog(String contentUri) {
278254
}
279255
}
280256

281-
// Publish centralized GET request metrics
282-
publishGetResponseSizeMetricInBytes(pluginMetrics, response);
283-
publishGetRequestsSuccessMetric(pluginMetrics);
257+
metricsRecorder.recordGetResponseSize(response);
258+
metricsRecorder.recordGetSuccess();
284259

285260
return response;
286261
} catch (Exception e) {
287-
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
262+
metricsRecorder.error(e);
263+
metricsRecorder.recordGetFailure();
288264
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
289265
throw new SaaSCrawlerException("Failed to fetch audit log", e, true);
290266
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration;
11+
12+
import org.opensearch.dataprepper.metrics.PluginMetrics;
13+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
14+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.context.annotation.Configuration;
18+
19+
/**
20+
* Spring configuration for Microsoft Office 365 RestClient.
21+
*
22+
* This configuration class creates the Office365RestClient with the required dependencies:
23+
* 1. Office365AuthenticationInterface for authentication
24+
* 2. PluginMetrics for unified metrics recording
25+
*
26+
* The Office365RestClient internally creates VendorAPIMetricsRecorder instances
27+
* for different operation types (GET, SEARCH, AUTH) from the provided PluginMetrics.
28+
*/
29+
@Configuration
30+
public class Office365RestClientConfiguration {
31+
32+
/**
33+
* Creates VendorAPIMetricsRecorder with unified metrics for all operations.
34+
*
35+
* @param pluginMetrics The system plugin metrics instance
36+
* @return Configured VendorAPIMetricsRecorder
37+
*/
38+
@Bean
39+
public VendorAPIMetricsRecorder vendorAPIMetricsRecorder(PluginMetrics pluginMetrics) {
40+
return new VendorAPIMetricsRecorder(pluginMetrics);
41+
}
42+
43+
/**
44+
* Creates Office365RestClient with unified metrics recorder.
45+
*
46+
* @param authConfig The Office 365 authentication provider
47+
* @param vendorAPIMetricsRecorder The unified metrics recorder
48+
* @return Configured Office365RestClient
49+
*/
50+
@Bean
51+
public Office365RestClient office365RestClient(
52+
Office365AuthenticationInterface authConfig,
53+
VendorAPIMetricsRecorder vendorAPIMetricsRecorder) {
54+
return new Office365RestClient(authConfig, vendorAPIMetricsRecorder);
55+
}
56+
}

0 commit comments

Comments
 (0)