Skip to content

Commit 08bd403

Browse files
authored
refactor(metrics): migrate buffer/retry metrics to unified VendorAPIMetricsRecorder (#6428)
Signed-off-by: Alexander Christensen <alchrisk@amazon.com>
1 parent c02bf25 commit 08bd403

5 files changed

Lines changed: 526 additions & 120 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 16 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,18 @@
1414
import com.fasterxml.jackson.databind.JsonNode;
1515
import com.fasterxml.jackson.databind.ObjectMapper;
1616
import com.google.common.annotations.VisibleForTesting;
17-
import io.micrometer.core.instrument.Counter;
18-
import io.micrometer.core.instrument.Timer;
1917
import lombok.extern.slf4j.Slf4j;
2018
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
2119
import org.opensearch.dataprepper.model.buffer.Buffer;
2220
import org.opensearch.dataprepper.model.event.Event;
2321
import org.opensearch.dataprepper.model.event.EventType;
2422
import org.opensearch.dataprepper.model.event.JacksonEvent;
25-
import org.opensearch.dataprepper.metrics.PluginMetrics;
2623
import org.opensearch.dataprepper.model.record.Record;
2724
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
2825
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
2926
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
3027
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
28+
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
3129
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
3230
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
3331

@@ -42,7 +40,6 @@
4240
import java.util.concurrent.TimeoutException;
4341

4442
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
45-
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS;
4643

4744
/**
4845
* Implementation of CrawlerClient for Office 365 audit logs.
@@ -52,49 +49,22 @@
5249
@Named
5350
public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> {
5451

55-
public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors";
56-
public static final String RETRYABLE_ERRORS = "retryableErrors";
57-
58-
private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
59-
private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
60-
private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
61-
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
62-
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
63-
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
6452
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
6553
private static final String CONTENT_ID = "contentId";
6654
private static final String CONTENT_URI = "contentUri";
6755

6856
private final Office365Service service;
6957
private final Office365SourceConfig configuration;
70-
private final Timer bufferWriteLatencyTimer;
71-
private final Counter bufferWriteAttemptsCounter;
72-
private final Counter bufferWriteSuccessCounter;
73-
private final Counter bufferWriteRetrySuccessCounter;
74-
private final Counter bufferWriteRetryAttemptsCounter;
75-
private final Counter bufferWriteFailuresCounter;
76-
private final Counter requestErrorsCounter;
77-
private final Counter nonRetryableErrorsCounter;
78-
private final Counter retryableErrorsCounter;
58+
private final VendorAPIMetricsRecorder metricsRecorder;
7959
private ObjectMapper objectMapper;
8060

8161
public Office365CrawlerClient(final Office365Service service,
8262
final Office365SourceConfig sourceConfig,
83-
final PluginMetrics pluginMetrics) {
63+
final VendorAPIMetricsRecorder metricsRecorder) {
8464
this.service = service;
8565
this.configuration = sourceConfig;
66+
this.metricsRecorder = metricsRecorder;
8667
this.objectMapper = new ObjectMapper();
87-
88-
// Initialize metrics
89-
this.bufferWriteLatencyTimer = pluginMetrics.timer(BUFFER_WRITE_LATENCY);
90-
this.bufferWriteAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_ATTEMPTS);
91-
this.bufferWriteSuccessCounter = pluginMetrics.counter(BUFFER_WRITE_SUCCESS);
92-
this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS);
93-
this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS);
94-
this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES);
95-
this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS);
96-
this.nonRetryableErrorsCounter = pluginMetrics.counter(NON_RETRYABLE_ERRORS);
97-
this.retryableErrorsCounter = pluginMetrics.counter(RETRYABLE_ERRORS);
9868
}
9969

10070
@VisibleForTesting
@@ -141,13 +111,8 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
141111
}
142112

143113
// Write Records to the buffer after processing a page of data
144-
bufferWriteLatencyTimer.record(() -> {
145-
try {
146-
writeRecordsWithRetry(records, buffer, acknowledgementSet);
147-
} catch (Exception e) {
148-
bufferWriteFailuresCounter.increment();
149-
throw e;
150-
}
114+
metricsRecorder.recordBufferWriteLatency(() -> {
115+
writeRecordsWithRetry(records, buffer, acknowledgementSet);
151116
});
152117

153118
nextPageUri = response.getNextPageUri();
@@ -159,18 +124,18 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
159124
} catch (Exception e) {
160125
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
161126
logType, startTime, endTime, e);
162-
requestErrorsCounter.increment();
127+
metricsRecorder.recordError(e);
163128
if (e instanceof SaaSCrawlerException) {
164129
SaaSCrawlerException saasException = (SaaSCrawlerException) e;
165130
if (saasException.isRetryable()) {
166-
retryableErrorsCounter.increment();
131+
metricsRecorder.recordRetryableError();
167132
} else {
168-
nonRetryableErrorsCounter.increment();
133+
metricsRecorder.recordNonRetryableError();
169134
}
170135
throw e;
171136
}
172137
// any other exceptions = non-retryable
173-
nonRetryableErrorsCounter.increment();
138+
metricsRecorder.recordNonRetryableError();
174139
throw new SaaSCrawlerException("Failed to process partition", e, false);
175140
}
176141
}
@@ -219,7 +184,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws SaaSC
219184
private void writeRecordsWithRetry(final List<Record<Event>> records,
220185
final Buffer<Record<Event>> buffer,
221186
final AcknowledgementSet acknowledgementSet) {
222-
bufferWriteAttemptsCounter.increment();
187+
metricsRecorder.recordBufferWriteAttempt();
223188
int retryCount = 0;
224189
int currentBackoff = 1000; // Start with 1 second
225190
final int maxBackoff = 30000; // Max 30 seconds
@@ -235,21 +200,21 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
235200
}
236201

237202
if (retryCount > 0) {
238-
bufferWriteRetrySuccessCounter.increment();
203+
metricsRecorder.recordBufferWriteRetrySuccess();
239204
} else {
240-
bufferWriteSuccessCounter.increment();
205+
metricsRecorder.recordBufferWriteSuccess();
241206
}
242207
return;
243208

244209
} catch (TimeoutException e) {
245210
retryCount++;
246211
if (retryCount >= maxRetries) {
247-
bufferWriteFailuresCounter.increment();
212+
metricsRecorder.recordBufferWriteFailure();
248213
// allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
249214
throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true);
250215
}
251216

252-
bufferWriteRetryAttemptsCounter.increment();
217+
metricsRecorder.recordBufferWriteRetryAttempt();
253218
currentBackoff = Math.min((int)(currentBackoff * 2.0), maxBackoff);
254219
log.info("Buffer full, backing off for {} ms before retry", currentBackoff);
255220

@@ -260,7 +225,7 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
260225
throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true);
261226
}
262227
} catch (Exception e) {
263-
bufferWriteFailuresCounter.increment();
228+
metricsRecorder.recordBufferWriteFailure();
264229
throw new SaaSCrawlerException("Error writing to buffer", e, true);
265230
}
266231
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365Configuration.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration;
1111

1212
import org.opensearch.dataprepper.metrics.PluginMetrics;
13+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient;
1314
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
1415
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
1516
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
1617
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider;
18+
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
1719
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
1820
import org.springframework.context.annotation.Bean;
1921
import org.springframework.context.annotation.Configuration;
@@ -72,4 +74,20 @@ public Office365RestClient office365RestClient(
7274
VendorAPIMetricsRecorder vendorAPIMetricsRecorder) {
7375
return new Office365RestClient(authConfig, vendorAPIMetricsRecorder);
7476
}
77+
78+
/**
79+
* Creates Office365CrawlerClient with unified metrics recorder.
80+
*
81+
* @param service The Office 365 service
82+
* @param sourceConfig The Office 365 source configuration
83+
* @param metricsRecorder The unified metrics recorder
84+
* @return Configured Office365CrawlerClient
85+
*/
86+
@Bean
87+
public Office365CrawlerClient office365CrawlerClient(
88+
Office365Service service,
89+
Office365SourceConfig sourceConfig,
90+
VendorAPIMetricsRecorder metricsRecorder) {
91+
return new Office365CrawlerClient(service, sourceConfig, metricsRecorder);
92+
}
7593
}

0 commit comments

Comments
 (0)