Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
439e40e
add partition timing metrics to LeaderOnlyTokenCrawler
wandna-amazon Nov 27, 2025
ba3d374
Change dimensional time slice crawler and worker metrics to camel case
wandna-amazon Dec 22, 2025
ecd630a
enforced camel case naming and fixed naming in tests
wandna-amazon Jan 8, 2026
221c6c9
Filesource compression support (#5255)
joelmarty Dec 3, 2025
088b2b3
Increase acknowledgment set timeout for opensearch source (#6291)
graytaylor0 Dec 3, 2025
f9fea79
PrometheusTimeSeries performance fixes (#6316)
kkondaka Dec 4, 2025
59bbaa9
Bump software.amazon.awssdk:auth in /performance-test (#6315)
dependabot[bot] Dec 4, 2025
905773b
Bump commons-validator:commons-validator in /data-prepper-core (#6310)
dependabot[bot] Dec 4, 2025
6a9be46
Bump org.wiremock:wiremock in /data-prepper-plugins/opensearch (#6308)
dependabot[bot] Dec 4, 2025
1eca5af
set retry time interval configurable, increase the http client read t…
Zhangxunmt Dec 4, 2025
3c980cc
Centralize Metrics, Create MetricHelper Unit Tests, and Add M365 Logg…
chrisale000 Dec 9, 2025
a37b576
Use Eclipse Temurin by default in the tarball smoke test. Updates to …
dlvenable Dec 10, 2025
4631ceb
Enable cross-region writes in the S3 sink. (#6323)
dlvenable Dec 10, 2025
f9db3da
Confluence and CloudWatch and multiple other failing tests fix (#6348)
san81 Dec 10, 2025
9c46ba5
Fixes the trace-analytics-sample-app project and updates it. (#6350)
dlvenable Dec 11, 2025
c1739b0
Update dependency urllib3 to v2.6.0 (#6345)
mend-for-github-com[bot] Dec 12, 2025
2aed31e
Adds Kiro and Visual Studio Code directories to the .gitignore file. …
dlvenable Dec 12, 2025
2d9026f
Do not clear offsets after failure to commit offsets due to rebalance…
graytaylor0 Dec 15, 2025
34ad197
Remove experimental lable for M365 (#6351)
vecheka Dec 15, 2025
9d189ee
Refactor Retry Handler To Move Into Source Crawler Package (#6275)
eatulban Dec 15, 2025
c5983ed
Remove usage of buffer accumulator from Kafka custom consumer (#6357)
graytaylor0 Dec 16, 2025
ff8e765
Add forward_to support to opensearch sink (#6349)
kkondaka Dec 16, 2025
2f45cff
Fixed PrometheusSinkBufferWriter getBuffer() to return non-duplicate …
kkondaka Dec 17, 2025
f408d22
Make CWL retry indefinitely for retryable errors when no DLQ configur…
kkondaka Dec 17, 2025
96f11af
Metric Centralization through Dependency Injection (#6354)
chrisale000 Dec 18, 2025
b094611
Rebased to latest to resolve conflicts (#6365)
kkondaka Dec 23, 2025
4b025b7
Fix Data Prepper router to send records through routing strategy befo…
kkondaka Dec 30, 2025
a74febc
Address comments from PR6370 (#6371)
kkondaka Dec 30, 2025
d2bfdb0
Add metric tracking time between poll calls for kafka consumer (#6372)
graytaylor0 Dec 31, 2025
7338357
Add support for passing sts headers in kafka source (#6375)
sb2k16 Jan 2, 2026
bff7c7d
Support otel metrics source with partition keys when persistent buffe…
kkondaka Jan 3, 2026
051c515
Prometheus Sink: Fix setting DLQ pipeline, add NOISY marker for logs …
kkondaka Jan 5, 2026
ca605b9
Updating the copyright headers on a batch of plugins. (#6390)
dlvenable Jan 6, 2026
1dd5317
GitHub Action to verify that newly added files have the license heade…
dlvenable Jan 6, 2026
af4f230
Fixes the license headers in all files in data-prepper-api. (#6393)
dlvenable Jan 6, 2026
2b0ea1a
Bump org.apache.logging.log4j:log4j-bom in /data-prepper-expression (…
dependabot[bot] Jan 7, 2026
470372c
Bump org.apache.logging.log4j:log4j-bom in /data-prepper-core (#6376)
dependabot[bot] Jan 7, 2026
cfdbaae
Bump org.lz4:lz4-java in /data-prepper-plugins/mapdb-processor-state …
dependabot[bot] Jan 7, 2026
6a4d54c
Prefer org.lgz4 artifact over at.yawk.lz4 (#6395)
san81 Jan 7, 2026
15ebb2f
Adding Partition Execution Logging for DimensionalTimeSliceCrawler (#…
chrisale000 Jan 7, 2026
8b77009
Adding Subscription Metrics to Metric Recorder and Onboarding M365 to…
chrisale000 Jan 7, 2026
e69017b
Fix multiple javadoc warnings to reduce build log clutter (#6364)
chrisale000 Jan 7, 2026
0a93e16
Bump org.apache.commons:commons-text (#6382)
dependabot[bot] Jan 7, 2026
e2a0551
Bump net.bytebuddy:byte-buddy in /data-prepper-plugins/opensearch (#6…
dependabot[bot] Jan 7, 2026
e0564d9
Bump net.bytebuddy:byte-buddy-agent in /data-prepper-plugins/opensear…
dependabot[bot] Jan 7, 2026
5450ff3
Added logging for No indices matched (#6342)
Utkarsh-Aga Jan 7, 2026
b234d49
Update simple_pipelines.md (#6274)
sabarinathan590 Jan 7, 2026
d1fa907
Expand necessary OpenSearch permissions for data prepper (#6024)
stelucz Jan 7, 2026
20ac89c
Onboarding new maven snapshots publishing to s3 (data-prepper) (#6246)
peterzhuamazon Jan 8, 2026
2f01475
Optimized RestClient Tests using customized Retry Handler (#6359)
eatulban Jan 8, 2026
e390d45
rebase with main and fix office 365 rest client test
wandna-amazon Jan 8, 2026
71e28a8
Merge branch 'main' into feature/LeaderOnlyTokenCrawler-metrics
wandna-amazon Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,8 @@ void testStartSubscriptionsRecordsFailureMetrics() {
// Verify that failure metrics were recorded
// Note: We skip verifying recordSubscriptionLatency parameter since lambda expressions
// get compiled to specific classes that are difficult to match in tests
// The retry handler calls recordSubscriptionFailure() for each retry attempt.
// With 6 content types and multiple retries per content type, we expect multiple failure calls
verify(metricsRecorder, times(6)).recordSubscriptionFailure();
// The retry handler calls recordSubscriptionFailure() once after all retries are exhausted
verify(metricsRecorder, times(1)).recordSubscriptionFailure();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSlice
// delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable
// In general, newly generated events become queryable after 30 ~ 120 second
protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300;
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated";
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "dimensionalTimeSliceWorkerPartitionsCreated";
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
private static final Duration HOUR_DURATION = Duration.ofHours(1);

private final CrawlerClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr

private static final String METRIC_BATCHES_FAILED = "batchesFailed";
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";

Expand All @@ -55,6 +57,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final Timer bufferWriteTimer;
private final Timer partitionWaitTimeTimer;
private final Timer partitionProcessLatencyTimer;

private String lastToken;
private Duration noAckTimeout;
Expand All @@ -67,6 +71,8 @@ public LeaderOnlyTokenCrawler(
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME);
this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME);
this.noAckTimeout = NO_ACK_TIME_OUT_SECONDS;
Expand Down Expand Up @@ -123,7 +129,8 @@ public Instant crawl(LeaderPartition leaderPartition,

@Override
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
Comment thread
wandna-amazon marked this conversation as resolved.
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
}

private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerPr
private static final int batchSize = 50;
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency";
private final Timer crawlingTimer;
private final Timer partitionWaitTimeTimer;
private final Timer partitionProcessLatencyTimer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class DimensionalTimeSliceCrawlerTest {
@BeforeEach
void setUp() {
when(pluginMetrics.counter(anyString())).thenReturn(partitionsCreatedCounter);
when(pluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(pluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
when(pluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(pluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
crawler = new DimensionalTimeSliceCrawler(client, pluginMetrics);
crawler.initialize(LOG_TYPES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Counter;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -42,6 +44,9 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.doNothing;
import static org.mockito.internal.verification.VerificationModeFactory.times;

@ExtendWith(MockitoExtension.class)
Expand All @@ -62,6 +67,8 @@ class LeaderOnlyTokenCrawlerTest {
private AcknowledgementSet acknowledgementSet;
@Mock
private Buffer<Record<Event>> buffer;
@Mock
private PaginationCrawlerWorkerProgressState workerState;

private LeaderOnlyTokenCrawler crawler;
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");
Expand Down Expand Up @@ -241,6 +248,50 @@ void testAcknowledgmentTimeout() {
verify(coordinator).createPartition(any());
}

@Test
public void testExecutePartitionMetrics() {
reset(leaderPartition);

// mock timers and counters
Timer mockCrawlingTimer = mock(Timer.class);
Timer partitionWaitTimeTimer = mock(Timer.class);
Timer partitionProcessLatencyTimer = mock(Timer.class);
Timer mockBufferWriteTimer = mock(Timer.class);
Counter mockBatchesFailedCounter = mock(Counter.class);
Counter mockAcknowledgementSetSuccesses = mock(Counter.class);
Counter mockAcknowledgementSetFailures = mock(Counter.class);

// setup mock plugin metrics
PluginMetrics mockPluginMetrics = mock(PluginMetrics.class);
when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer);
when(mockPluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(mockPluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
when(mockPluginMetrics.timer("bufferWriteTime")).thenReturn(mockBufferWriteTimer);
when(mockPluginMetrics.counter("batchesFailed")).thenReturn(mockBatchesFailedCounter);
when(mockPluginMetrics.counter("acknowledgementSetSuccesses")).thenReturn(mockAcknowledgementSetSuccesses);
when(mockPluginMetrics.counter("acknowledgementSetFailures")).thenReturn(mockAcknowledgementSetFailures);

LeaderOnlyTokenCrawler testCrawler = new LeaderOnlyTokenCrawler(client, mockPluginMetrics);

// test executePartition with metrics
when(workerState.getExportStartTime()).thenReturn(Instant.now().minusSeconds(1));

// make latency timer execute the runnable so client.executePartition() gets called
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(partitionProcessLatencyTimer).record(any(Runnable.class));

doNothing().when(partitionWaitTimeTimer).record(any(Duration.class));

testCrawler.executePartition(workerState, buffer, acknowledgementSet);

// verify metrics are recorded
verify(partitionProcessLatencyTimer).record(any(Runnable.class));
verify(partitionWaitTimeTimer).record(any(Duration.class));
verify(client).executePartition(workerState, buffer, acknowledgementSet);
}

private List<ItemInfo> createTestItems(int count) {
List<ItemInfo> items = new ArrayList<>();
Expand All @@ -249,4 +300,4 @@ private List<ItemInfo> createTestItems(int count) {
}
return items;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void test_metrics_in_crawler() {
Counter mockInvalidItemsCounter = mock(Counter.class);

when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer);
when(mockPluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(mockPluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
when(mockPluginMetrics.timer("workerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(mockPluginMetrics.timer("workerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
when(mockPluginMetrics.counter("paginationWorkerPartitionsCreated")).thenReturn(mockPartitionsCreatedCounter);
when(mockPluginMetrics.counter("invalidPaginationItems")).thenReturn(mockInvalidItemsCounter);

Expand Down
Loading