diff --git a/data-prepper-plugins/rate-limiter-processor/build.gradle b/data-prepper-plugins/rate-limiter-processor/build.gradle new file mode 100644 index 0000000000..2bde5db48a --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/build.gradle @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation project(':data-prepper-test:plugin-test-framework') +} diff --git a/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterMode.java b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterMode.java new file mode 100644 index 0000000000..3b1bfbcc9f --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterMode.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.ratelimiter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum RateLimiterMode { + DROP("drop"); + + private static final Map MODES_MAP = Arrays.stream(RateLimiterMode.values()) + .collect(Collectors.toMap( + value -> value.name, + value -> value + )); + + private final String name; + + RateLimiterMode(String name) { + this.name = name.toLowerCase(); + } + + @JsonCreator + static RateLimiterMode fromOptionValue(final String option) { + return MODES_MAP.get(option.toLowerCase()); + } + + @JsonValue + public String getOptionValue() { + return name; + } +} diff --git a/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessor.java b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessor.java new file mode 100644 index 0000000000..971d99cc5d --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessor.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.ratelimiter; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +@DataPrepperPlugin(name = "rate_limiter", pluginType = Processor.class, pluginConfigurationType = RateLimiterProcessorConfig.class) +public class RateLimiterProcessor extends AbstractProcessor, Record> { + private final int eventsPerSecond; + private final int counterRetentionSeconds; + private final ConcurrentHashMap eventCountPerSecond = new ConcurrentHashMap<>(); + + @DataPrepperPluginConstructor + public RateLimiterProcessor(final PluginMetrics pluginMetrics, + final RateLimiterProcessorConfig config) { + super(pluginMetrics); + this.eventsPerSecond = config.getEventsPerSecond(); + this.counterRetentionSeconds = config.getCounterRetentionSeconds(); + } + + @Override + public Collection> doExecute(final Collection> records) { + final Collection> output = new ArrayList<>(); + + for (final Record record : records) { + final Event event = record.getData(); + final long arrivalSecond = event.getMetadata().getTimeReceived().getEpochSecond(); + + final AtomicInteger count = eventCountPerSecond.computeIfAbsent(arrivalSecond, k -> new AtomicInteger(0)); + if (count.incrementAndGet() <= eventsPerSecond) { + output.add(record); + } + } + + evictExpiredCounters(); + + return output; + } + + private void evictExpiredCounters() { + final long now = Instant.now().getEpochSecond(); + eventCountPerSecond.keySet().removeIf(second -> now - second > counterRetentionSeconds); + } + + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfig.java b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfig.java new file mode 100644 index 0000000000..1918ad5072 --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.ratelimiter; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.NotNull; + +@JsonPropertyOrder +@JsonClassDescription("The rate_limiter processor controls the number of events processed per second. " + + "By default, rate_limiter drops events that exceed the configured number allowed per second.") +public class RateLimiterProcessorConfig { + + @JsonPropertyDescription("The number of events allowed per second.") + @JsonProperty("events_per_second") + @NotNull + private int eventsPerSecond; + + @JsonPropertyDescription("Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. " + + "Default value is drop, which drops the excess events received in that second.") + @JsonProperty(value = "when_exceeds", defaultValue = "drop") + private RateLimiterMode whenExceeds = RateLimiterMode.DROP; + + @JsonPropertyDescription("The duration in seconds to track per-second event counts. " + + "Counters older than this are discarded to free resources. Default is 60.") + @JsonProperty(value = "counter_retention_seconds", defaultValue = "60") + private int counterRetentionSeconds = 60; + + public int getEventsPerSecond() { + return eventsPerSecond; + } + + public RateLimiterMode getWhenExceeds() { + return whenExceeds; + } + + public int getCounterRetentionSeconds() { + return counterRetentionSeconds; + } +} diff --git a/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfigTest.java b/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfigTest.java new file mode 100644 index 0000000000..ad54f40540 --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorConfigTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.ratelimiter; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class RateLimiterProcessorConfigTest { + + @Test + void test_default_when_exceeds_is_drop() { + final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig(); + assertThat(config.getWhenExceeds(), equalTo(RateLimiterMode.DROP)); + } + + @Test + void test_default_counter_retention_seconds_is_60() { + final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig(); + assertThat(config.getCounterRetentionSeconds(), equalTo(60)); + } + +} diff --git a/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorTest.java b/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorTest.java new file mode 100644 index 0000000000..50db1e106a --- /dev/null +++ b/data-prepper-plugins/rate-limiter-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ratelimiter/RateLimiterProcessorTest.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.ratelimiter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class RateLimiterProcessorTest { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private RateLimiterProcessorConfig config; + + private RateLimiterProcessor createObjectUnderTest(final int eventsPerSecond) { + when(config.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(config.getCounterRetentionSeconds()).thenReturn(60); + return new RateLimiterProcessor(pluginMetrics, config); + } + + private List> createEventsInSecond(final Instant second, final int count) { + final List> records = new ArrayList<>(); + for (int i = 0; i < count; i++) { + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(Collections.singletonMap("message", "test-" + i)) + .withTimeReceived(second.plusMillis(i)) + .build(); + records.add(new Record<>(event)); + } + return records; + } + + @Test + void test_events_under_limit_all_pass_through() { + final RateLimiterProcessor processor = createObjectUnderTest(400); + final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); + final Collection> result = processor.doExecute(createEventsInSecond(now, 200)); + + assertThat(result.size(), equalTo(200)); + } + + @Test + void test_events_over_limit_are_dropped() { + final RateLimiterProcessor processor = createObjectUnderTest(400); + final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); + final Collection> result = processor.doExecute(createEventsInSecond(now, 500)); + + assertThat(result.size(), equalTo(400)); + } + + @Test + void test_events_from_different_seconds_are_counted_independently() { + final RateLimiterProcessor processor = createObjectUnderTest(400); + final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); + final Instant nextSecond = now.plusSeconds(1); + + final List> records = new ArrayList<>(); + records.addAll(createEventsInSecond(now, 500)); + records.addAll(createEventsInSecond(nextSecond, 500)); + + final Collection> result = processor.doExecute(records); + + assertThat(result.size(), equalTo(800)); + } + + @Test + void test_counts_accumulate_across_multiple_doExecute_calls() { + final RateLimiterProcessor processor = createObjectUnderTest(400); + final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); + + final Collection> result1 = processor.doExecute(createEventsInSecond(now, 300)); + final Collection> result2 = processor.doExecute(createEventsInSecond(now, 200)); + + assertThat(result1.size(), equalTo(300)); + assertThat(result2.size(), equalTo(100)); + } +} diff --git a/settings.gradle b/settings.gradle index 99988e8249..d61750887f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -216,6 +216,7 @@ include 'data-prepper-plugins:saas-source-plugins:crowdstrike-source' include 'data-prepper-plugins:saas-source-plugins:microsoft-office365-source' include 'data-prepper-plugins:otlp-sink' include 'data-prepper-plugins:otel-apm-service-map-processor' +include 'data-prepper-plugins:rate-limiter-processor' include 'e2e-test:kafka-buffer-backward-compatibility'