Skip to content

Commit a0a3cef

Browse files
wandna-amazonsimonelbaz
authored andcommitted
add partition timing metrics to LeaderOnlyTokenCrawler (opensearch-project#6299)
add partition timing metrics to LeaderOnlyTokenCrawler
1 parent 7b8f218 commit a0a3cef

7 files changed

Lines changed: 71 additions & 14 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,9 +473,8 @@ void testStartSubscriptionsRecordsFailureMetrics() {
473473
// Verify that failure metrics were recorded
474474
// Note: We skip verifying recordSubscriptionLatency parameter since lambda expressions
475475
// get compiled to specific classes that are difficult to match in tests
476-
// The retry handler calls recordSubscriptionFailure() for each retry attempt.
477-
// With 6 content types and multiple retries per content type, we expect multiple failure calls
478-
verify(metricsRecorder, times(6)).recordSubscriptionFailure();
476+
// The retry handler calls recordSubscriptionFailure() once after all retries are exhausted
477+
verify(metricsRecorder, times(1)).recordSubscriptionFailure();
479478
}
480479

481480
/**

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSlice
3939
// delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable
4040
// In general, newly generated events become queryable after 30 ~ 120 second
4141
protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300;
42-
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated";
43-
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
44-
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
42+
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "dimensionalTimeSliceWorkerPartitionsCreated";
43+
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
44+
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
4545
private static final Duration HOUR_DURATION = Duration.ofHours(1);
4646

4747
private final CrawlerClient client;

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr
3939

4040
private static final String METRIC_BATCHES_FAILED = "batchesFailed";
4141
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
42+
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
43+
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
4244
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
4345
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
4446

@@ -55,6 +57,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr
5557
private final Counter acknowledgementSetSuccesses;
5658
private final Counter acknowledgementSetFailures;
5759
private final Timer bufferWriteTimer;
60+
private final Timer partitionWaitTimeTimer;
61+
private final Timer partitionProcessLatencyTimer;
5862

5963
private String lastToken;
6064
private Duration noAckTimeout;
@@ -67,6 +71,8 @@ public LeaderOnlyTokenCrawler(
6771
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
6872
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
6973
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
74+
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
75+
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
7076
this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME);
7177
this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME);
7278
this.noAckTimeout = NO_ACK_TIME_OUT_SECONDS;
@@ -123,7 +129,8 @@ public Instant crawl(LeaderPartition leaderPartition,
123129

124130
@Override
125131
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
126-
client.executePartition(state, buffer, acknowledgementSet);
132+
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
133+
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
127134
}
128135

129136
private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerPr
3333
private static final int batchSize = 50;
3434
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
3535
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
36-
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
37-
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
36+
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
37+
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
3838
private final Timer crawlingTimer;
3939
private final Timer partitionWaitTimeTimer;
4040
private final Timer partitionProcessLatencyTimer;

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public class DimensionalTimeSliceCrawlerTest {
7575
@BeforeEach
7676
void setUp() {
7777
when(pluginMetrics.counter(anyString())).thenReturn(partitionsCreatedCounter);
78-
when(pluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
79-
when(pluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
78+
when(pluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
79+
when(pluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
8080
crawler = new DimensionalTimeSliceCrawler(client, pluginMetrics);
8181
crawler.initialize(LOG_TYPES);
8282
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
2323
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
2424
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
25+
import io.micrometer.core.instrument.Timer;
26+
import io.micrometer.core.instrument.Counter;
2527

2628
import java.time.Duration;
2729
import java.time.Instant;
@@ -42,6 +44,9 @@
4244
import static org.mockito.Mockito.doThrow;
4345
import static org.mockito.Mockito.never;
4446
import static org.mockito.Mockito.doAnswer;
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.reset;
49+
import static org.mockito.Mockito.doNothing;
4550
import static org.mockito.internal.verification.VerificationModeFactory.times;
4651

4752
@ExtendWith(MockitoExtension.class)
@@ -62,6 +67,8 @@ class LeaderOnlyTokenCrawlerTest {
6267
private AcknowledgementSet acknowledgementSet;
6368
@Mock
6469
private Buffer<Record<Event>> buffer;
70+
@Mock
71+
private PaginationCrawlerWorkerProgressState workerState;
6572

6673
private LeaderOnlyTokenCrawler crawler;
6774
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");
@@ -241,6 +248,50 @@ void testAcknowledgmentTimeout() {
241248
verify(coordinator).createPartition(any());
242249
}
243250

251+
@Test
252+
public void testExecutePartitionMetrics() {
253+
reset(leaderPartition);
254+
255+
// mock timers and counters
256+
Timer mockCrawlingTimer = mock(Timer.class);
257+
Timer partitionWaitTimeTimer = mock(Timer.class);
258+
Timer partitionProcessLatencyTimer = mock(Timer.class);
259+
Timer mockBufferWriteTimer = mock(Timer.class);
260+
Counter mockBatchesFailedCounter = mock(Counter.class);
261+
Counter mockAcknowledgementSetSuccesses = mock(Counter.class);
262+
Counter mockAcknowledgementSetFailures = mock(Counter.class);
263+
264+
// setup mock plugin metrics
265+
PluginMetrics mockPluginMetrics = mock(PluginMetrics.class);
266+
when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer);
267+
when(mockPluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
268+
when(mockPluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
269+
when(mockPluginMetrics.timer("bufferWriteTime")).thenReturn(mockBufferWriteTimer);
270+
when(mockPluginMetrics.counter("batchesFailed")).thenReturn(mockBatchesFailedCounter);
271+
when(mockPluginMetrics.counter("acknowledgementSetSuccesses")).thenReturn(mockAcknowledgementSetSuccesses);
272+
when(mockPluginMetrics.counter("acknowledgementSetFailures")).thenReturn(mockAcknowledgementSetFailures);
273+
274+
LeaderOnlyTokenCrawler testCrawler = new LeaderOnlyTokenCrawler(client, mockPluginMetrics);
275+
276+
// test executePartition with metrics
277+
when(workerState.getExportStartTime()).thenReturn(Instant.now().minusSeconds(1));
278+
279+
// make latency timer execute the runnable so client.executePartition() gets called
280+
doAnswer(invocation -> {
281+
Runnable runnable = invocation.getArgument(0);
282+
runnable.run();
283+
return null;
284+
}).when(partitionProcessLatencyTimer).record(any(Runnable.class));
285+
286+
doNothing().when(partitionWaitTimeTimer).record(any(Duration.class));
287+
288+
testCrawler.executePartition(workerState, buffer, acknowledgementSet);
289+
290+
// verify metrics are recorded
291+
verify(partitionProcessLatencyTimer).record(any(Runnable.class));
292+
verify(partitionWaitTimeTimer).record(any(Duration.class));
293+
verify(client).executePartition(workerState, buffer, acknowledgementSet);
294+
}
244295

245296
private List<ItemInfo> createTestItems(int count) {
246297
List<ItemInfo> items = new ArrayList<>();
@@ -249,4 +300,4 @@ private List<ItemInfo> createTestItems(int count) {
249300
}
250301
return items;
251302
}
252-
}
303+
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ public void test_metrics_in_crawler() {
102102
Counter mockInvalidItemsCounter = mock(Counter.class);
103103

104104
when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer);
105-
when(mockPluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
106-
when(mockPluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
105+
when(mockPluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
106+
when(mockPluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
107107
when(mockPluginMetrics.counter("paginationWorkerPartitionsCreated")).thenReturn(mockPartitionsCreatedCounter);
108108
when(mockPluginMetrics.counter("invalidPaginationItems")).thenReturn(mockInvalidItemsCounter);
109109

0 commit comments

Comments
 (0)