-
Notifications
You must be signed in to change notification settings - Fork 326
Adding support for rate_limiter processor #6872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| plugins { | ||
| id 'java' | ||
| } | ||
|
|
||
| dependencies { | ||
| implementation project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| testImplementation project(':data-prepper-test:plugin-test-framework') | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.ratelimiter; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonValue; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public enum RateLimiterMode { | ||
| DROP("drop"); | ||
|
|
||
| private static final Map<String, RateLimiterMode> MODES_MAP = Arrays.stream(RateLimiterMode.values()) | ||
| .collect(Collectors.toMap( | ||
| value -> value.name, | ||
| value -> value | ||
| )); | ||
|
|
||
| private final String name; | ||
|
|
||
| RateLimiterMode(String name) { | ||
| this.name = name.toLowerCase(); | ||
| } | ||
|
|
||
| @JsonCreator | ||
| static RateLimiterMode fromOptionValue(final String option) { | ||
| return MODES_MAP.get(option.toLowerCase()); | ||
| } | ||
|
|
||
| @JsonValue | ||
| public String getOptionValue() { | ||
| return name; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.ratelimiter; | ||
|
|
||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||
| import org.opensearch.dataprepper.model.processor.Processor; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| @DataPrepperPlugin(name = "rate_limiter", pluginType = Processor.class, pluginConfigurationType = RateLimiterProcessorConfig.class) | ||
| public class RateLimiterProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
| private final int eventsPerSecond; | ||
| private final int counterRetentionSeconds; | ||
| private final ConcurrentHashMap<Long, AtomicInteger> eventCountPerSecond = new ConcurrentHashMap<>(); | ||
|
|
||
| @DataPrepperPluginConstructor | ||
| public RateLimiterProcessor(final PluginMetrics pluginMetrics, | ||
| final RateLimiterProcessorConfig config) { | ||
| super(pluginMetrics); | ||
| this.eventsPerSecond = config.getEventsPerSecond(); | ||
| this.counterRetentionSeconds = config.getCounterRetentionSeconds(); | ||
| } | ||
|
|
||
| @Override | ||
| public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | ||
| final Collection<Record<Event>> output = new ArrayList<>(); | ||
|
|
||
| for (final Record<Event> record : records) { | ||
| final Event event = record.getData(); | ||
| final long arrivalSecond = event.getMetadata().getTimeReceived().getEpochSecond(); | ||
|
|
||
| final AtomicInteger count = eventCountPerSecond.computeIfAbsent(arrivalSecond, k -> new AtomicInteger(0)); | ||
| if (count.incrementAndGet() <= eventsPerSecond) { | ||
| output.add(record); | ||
| } | ||
| } | ||
|
|
||
| evictExpiredCounters(); | ||
|
|
||
| return output; | ||
| } | ||
|
|
||
| private void evictExpiredCounters() { | ||
| final long now = Instant.now().getEpochSecond(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
| eventCountPerSecond.keySet().removeIf(second -> now - second > counterRetentionSeconds); | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public void prepareForShutdown() { | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isReadyForShutdown() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void shutdown() { | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.ratelimiter; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonClassDescription; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.fasterxml.jackson.annotation.JsonPropertyDescription; | ||
| import com.fasterxml.jackson.annotation.JsonPropertyOrder; | ||
| import jakarta.validation.constraints.NotNull; | ||
|
|
||
| @JsonPropertyOrder | ||
| @JsonClassDescription("The <code>rate_limiter</code> processor controls the number of events processed per second. " + | ||
| "By default, <code>rate_limiter</code> drops events that exceed the configured number allowed per second.") | ||
| public class RateLimiterProcessorConfig { | ||
|
|
||
| @JsonPropertyDescription("The number of events allowed per second.") | ||
| @JsonProperty("events_per_second") | ||
| @NotNull | ||
| private int eventsPerSecond; | ||
|
|
||
| @JsonPropertyDescription("Indicates what action the <code>rate_limiter</code> takes when the number of events received is greater than the number of events allowed per second. " + | ||
| "Default value is drop, which drops the excess events received in that second.") | ||
| @JsonProperty(value = "when_exceeds", defaultValue = "drop") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should probably have a generic when condition here like other processors. Maybe the configuration is named "limit_when" or "rate_limit_when" as just a generic expression to including in the rate limiting or not. That at least allows users to specify what is ok to rate limit and what is not.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with |
||
| private RateLimiterMode whenExceeds = RateLimiterMode.DROP; | ||
|
|
||
| @JsonPropertyDescription("The duration in seconds to track per-second event counts. " + | ||
| "Counters older than this are discarded to free resources. Default is 60.") | ||
| @JsonProperty(value = "counter_retention_seconds", defaultValue = "60") | ||
| private int counterRetentionSeconds = 60; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a |
||
|
|
||
| public int getEventsPerSecond() { | ||
| return eventsPerSecond; | ||
| } | ||
|
|
||
| public RateLimiterMode getWhenExceeds() { | ||
| return whenExceeds; | ||
| } | ||
|
|
||
| public int getCounterRetentionSeconds() { | ||
| return counterRetentionSeconds; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.ratelimiter; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import static org.hamcrest.CoreMatchers.equalTo; | ||
| import static org.hamcrest.MatcherAssert.assertThat; | ||
|
|
||
| class RateLimiterProcessorConfigTest { | ||
|
|
||
| @Test | ||
| void test_default_when_exceeds_is_drop() { | ||
| final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig(); | ||
| assertThat(config.getWhenExceeds(), equalTo(RateLimiterMode.DROP)); | ||
| } | ||
|
|
||
| @Test | ||
| void test_default_counter_retention_seconds_is_60() { | ||
| final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig(); | ||
| assertThat(config.getCounterRetentionSeconds(), equalTo(60)); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.ratelimiter; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.extension.ExtendWith; | ||
| import org.mockito.Mock; | ||
| import org.mockito.junit.jupiter.MockitoExtension; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| import static org.hamcrest.CoreMatchers.equalTo; | ||
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| @ExtendWith(MockitoExtension.class) | ||
| class RateLimiterProcessorTest { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
|
|
||
| @Mock | ||
| private PluginMetrics pluginMetrics; | ||
|
|
||
| @Mock | ||
| private RateLimiterProcessorConfig config; | ||
|
|
||
| private RateLimiterProcessor createObjectUnderTest(final int eventsPerSecond) { | ||
| when(config.getEventsPerSecond()).thenReturn(eventsPerSecond); | ||
| when(config.getCounterRetentionSeconds()).thenReturn(60); | ||
| return new RateLimiterProcessor(pluginMetrics, config); | ||
| } | ||
|
|
||
| private List<Record<Event>> createEventsInSecond(final Instant second, final int count) { | ||
| final List<Record<Event>> records = new ArrayList<>(); | ||
| for (int i = 0; i < count; i++) { | ||
| final Event event = JacksonEvent.builder() | ||
| .withEventType("event") | ||
| .withData(Collections.singletonMap("message", "test-" + i)) | ||
| .withTimeReceived(second.plusMillis(i)) | ||
| .build(); | ||
| records.add(new Record<>(event)); | ||
| } | ||
| return records; | ||
| } | ||
|
|
||
| @Test | ||
| void test_events_under_limit_all_pass_through() { | ||
| final RateLimiterProcessor processor = createObjectUnderTest(400); | ||
| final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); | ||
| final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 200)); | ||
|
|
||
| assertThat(result.size(), equalTo(200)); | ||
| } | ||
|
|
||
| @Test | ||
| void test_events_over_limit_are_dropped() { | ||
| final RateLimiterProcessor processor = createObjectUnderTest(400); | ||
| final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); | ||
| final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 500)); | ||
|
|
||
| assertThat(result.size(), equalTo(400)); | ||
| } | ||
|
|
||
| @Test | ||
| void test_events_from_different_seconds_are_counted_independently() { | ||
| final RateLimiterProcessor processor = createObjectUnderTest(400); | ||
| final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); | ||
| final Instant nextSecond = now.plusSeconds(1); | ||
|
|
||
| final List<Record<Event>> records = new ArrayList<>(); | ||
| records.addAll(createEventsInSecond(now, 500)); | ||
| records.addAll(createEventsInSecond(nextSecond, 500)); | ||
|
|
||
| final Collection<Record<Event>> result = processor.doExecute(records); | ||
|
|
||
| assertThat(result.size(), equalTo(800)); | ||
| } | ||
|
|
||
| @Test | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a test when we inteweave the seconds like this |
||
| void test_counts_accumulate_across_multiple_doExecute_calls() { | ||
| final RateLimiterProcessor processor = createObjectUnderTest(400); | ||
| final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS); | ||
|
|
||
| final Collection<Record<Event>> result1 = processor.doExecute(createEventsInSecond(now, 300)); | ||
| final Collection<Record<Event>> result2 = processor.doExecute(createEventsInSecond(now, 200)); | ||
|
|
||
| assertThat(result1.size(), equalTo(300)); | ||
| assertThat(result2.size(), equalTo(100)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering about this. We are trading off the possibility of dropping some data due to running processors on batches to another problem which is that we may still greatly exceed the current rate. You could have data from over a wide swath of time coming through and that will result in a large batch of data beyond the requested size going to OpenSearch. It seems the goal of a rate limiter is to prevent overwhelming the destination.