Skip to content

Commit cb47f5c

Browse files
authored
Adding support for rate_limiter processor (#6872)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 14cc3e5 commit cb47f5c

10 files changed

Lines changed: 425 additions & 0 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
plugins {
12+
id 'java'
13+
}
14+
15+
dependencies {
16+
implementation project(':data-prepper-api')
17+
implementation 'com.fasterxml.jackson.core:jackson-databind'
18+
testImplementation project(':data-prepper-test:plugin-test-framework')
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public enum RateLimiterMode {
21+
DROP("drop"),
22+
HOLD("hold");
23+
24+
private static final Map<String, RateLimiterMode> MODES_MAP = Arrays.stream(RateLimiterMode.values())
25+
.collect(Collectors.toMap(
26+
value -> value.name,
27+
value -> value
28+
));
29+
30+
private final String name;
31+
32+
RateLimiterMode(String name) {
33+
this.name = name.toLowerCase();
34+
}
35+
36+
@JsonCreator
37+
static RateLimiterMode fromOptionValue(final String option) {
38+
return MODES_MAP.get(option.toLowerCase());
39+
}
40+
41+
@JsonValue
42+
public String getOptionValue() {
43+
return name;
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
14+
import org.opensearch.dataprepper.metrics.PluginMetrics;
15+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
16+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
17+
import org.opensearch.dataprepper.model.event.Event;
18+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
19+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
20+
import org.opensearch.dataprepper.model.processor.Processor;
21+
import org.opensearch.dataprepper.model.record.Record;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
@DataPrepperPlugin(name = "rate_limiter", pluginType = Processor.class, pluginConfigurationType = RateLimiterProcessorConfig.class)
29+
public class RateLimiterProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
30+
private final int eventsPerSecond;
31+
private final long counterRetentionSeconds;
32+
private final RateLimiterMode whenExceeds;
33+
private final String limitWhen;
34+
private final ExpressionEvaluator expressionEvaluator;
35+
private final ConcurrentHashMap<Long, AtomicInteger> emittedPerSecond = new ConcurrentHashMap<>();
36+
37+
@DataPrepperPluginConstructor
38+
public RateLimiterProcessor(final PluginMetrics pluginMetrics,
39+
final RateLimiterProcessorConfig config,
40+
final ExpressionEvaluator expressionEvaluator) {
41+
super(pluginMetrics);
42+
this.eventsPerSecond = config.getEventsPerSecond();
43+
this.counterRetentionSeconds = config.getCounterRetention().getSeconds();
44+
this.whenExceeds = config.getWhenExceeds();
45+
this.limitWhen = config.getLimitWhen();
46+
this.expressionEvaluator = expressionEvaluator;
47+
48+
if (limitWhen != null && !expressionEvaluator.isValidExpressionStatement(limitWhen)) {
49+
throw new InvalidPluginConfigurationException(
50+
String.format("limit_when \"%s\" is not a valid expression statement.", limitWhen));
51+
}
52+
}
53+
54+
@Override
55+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
56+
final Collection<Record<Event>> output = new ArrayList<>();
57+
58+
for (final Record<Event> record : records) {
59+
if (limitWhen != null && !expressionEvaluator.evaluateConditional(limitWhen, record.getData())) {
60+
output.add(record);
61+
continue;
62+
}
63+
64+
if (whenExceeds == RateLimiterMode.DROP) {
65+
final long now = System.currentTimeMillis() / 1000;
66+
final AtomicInteger count = emittedPerSecond.computeIfAbsent(now, k -> new AtomicInteger(0));
67+
if (count.incrementAndGet() <= eventsPerSecond) {
68+
output.add(record);
69+
}
70+
} else {
71+
waitForCapacity();
72+
output.add(record);
73+
}
74+
}
75+
76+
evictOldCounters();
77+
return output;
78+
}
79+
80+
private void waitForCapacity() {
81+
while (true) {
82+
final long now = System.currentTimeMillis() / 1000;
83+
final AtomicInteger count = emittedPerSecond.computeIfAbsent(now, k -> new AtomicInteger(0));
84+
if (count.incrementAndGet() <= eventsPerSecond) {
85+
return;
86+
}
87+
count.decrementAndGet();
88+
try {
89+
long sleepMs = 1000 - (System.currentTimeMillis() % 1000);
90+
Thread.sleep(sleepMs);
91+
} catch (InterruptedException e) {
92+
Thread.currentThread().interrupt();
93+
return;
94+
}
95+
}
96+
}
97+
98+
private void evictOldCounters() {
99+
final long now = System.currentTimeMillis() / 1000;
100+
emittedPerSecond.keySet().removeIf(second -> now - second > counterRetentionSeconds);
101+
}
102+
103+
104+
@Override
105+
public void prepareForShutdown() {
106+
}
107+
108+
@Override
109+
public boolean isReadyForShutdown() {
110+
return true;
111+
}
112+
113+
@Override
114+
public void shutdown() {
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import com.fasterxml.jackson.annotation.JsonClassDescription;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
16+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.NotNull;
18+
19+
import java.time.Duration;
20+
21+
@JsonPropertyOrder
22+
@JsonClassDescription("The <code>rate_limiter</code> processor controls the number of events processed per second. " +
23+
"By default, <code>rate_limiter</code> drops events that exceed the configured number allowed per second.")
24+
public class RateLimiterProcessorConfig {
25+
26+
@JsonPropertyDescription("The number of events allowed per second.")
27+
@JsonProperty("events_per_second")
28+
@NotNull
29+
private int eventsPerSecond;
30+
31+
@JsonPropertyDescription("Indicates what action the <code>rate_limiter</code> takes when the number of events received is greater than the number of events allowed per second. " +
32+
"Default value is drop, which drops the excess events received in that second.")
33+
@JsonProperty(value = "when_exceeds", defaultValue = "drop")
34+
private RateLimiterMode whenExceeds = RateLimiterMode.DROP;
35+
36+
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a> such as <code>/log_type == \"INFO\"</code>. " +
37+
"When specified, only events where the condition evaluates to true are subject to rate limiting. " +
38+
"Events that do not match pass through unaffected.")
39+
@JsonProperty("limit_when")
40+
private String limitWhen;
41+
42+
@JsonPropertyDescription("The duration to track per-second event counts. " +
43+
"Counters older than this are discarded to free resources. Default is 60 seconds.")
44+
@JsonProperty(value = "counter_retention", defaultValue = "PT60S")
45+
private Duration counterRetention = Duration.ofSeconds(60);
46+
47+
public int getEventsPerSecond() {
48+
return eventsPerSecond;
49+
}
50+
51+
public RateLimiterMode getWhenExceeds() {
52+
return whenExceeds;
53+
}
54+
55+
public String getLimitWhen() {
56+
return limitWhen;
57+
}
58+
59+
public Duration getCounterRetention() {
60+
return counterRetention;
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import java.time.Duration;
16+
17+
import static org.hamcrest.CoreMatchers.equalTo;
18+
import static org.hamcrest.CoreMatchers.nullValue;
19+
import static org.hamcrest.MatcherAssert.assertThat;
20+
21+
class RateLimiterProcessorConfigTest {
22+
23+
@Test
24+
void test_default_config() {
25+
final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig();
26+
assertThat(config.getWhenExceeds(), equalTo(RateLimiterMode.DROP));
27+
assertThat(config.getLimitWhen(), nullValue());
28+
assertThat(config.getCounterRetention(),equalTo(Duration.ofSeconds(60)));
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.junit.jupiter.api.Test;
14+
import org.opensearch.dataprepper.model.event.Event;
15+
import org.opensearch.dataprepper.model.event.EventFactory;
16+
import org.opensearch.dataprepper.model.event.LogEventBuilder;
17+
import org.opensearch.dataprepper.model.processor.Processor;
18+
import org.opensearch.dataprepper.model.record.Record;
19+
import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest;
20+
import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile;
21+
import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import static org.hamcrest.CoreMatchers.equalTo;
29+
import static org.hamcrest.CoreMatchers.notNullValue;
30+
import static org.hamcrest.MatcherAssert.assertThat;
31+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
32+
33+
@DataPrepperPluginTest(pluginName = "rate_limiter", pluginType = Processor.class)
34+
class RateLimiterProcessorTest extends BaseDataPrepperPluginStandardTestSuite {
35+
36+
@Test
37+
void test_events_under_limit_all_pass_through(
38+
@PluginConfigurationFile("rate_limiter_default.yaml") final Processor<Record<Event>, Record<Event>> processor,
39+
final EventFactory eventFactory) {
40+
final List<Record<Event>> records = createEvents(eventFactory, 200);
41+
42+
final Collection<Record<Event>> result = processor.execute(records);
43+
44+
assertThat(result, notNullValue());
45+
assertThat(result.size(), equalTo(200));
46+
}
47+
48+
@Test
49+
void test_events_over_limit_are_dropped(
50+
@PluginConfigurationFile("rate_limiter_default.yaml") final Processor<Record<Event>, Record<Event>> processor,
51+
final EventFactory eventFactory) {
52+
final List<Record<Event>> records = createEvents(eventFactory, 500);
53+
54+
final Collection<Record<Event>> result = processor.execute(records);
55+
56+
assertThat(result, notNullValue());
57+
assertThat(result.size(), lessThanOrEqualTo(400));
58+
}
59+
60+
@Test
61+
void test_counts_accumulate_across_multiple_execute_calls(
62+
@PluginConfigurationFile("rate_limiter_default.yaml") final Processor<Record<Event>, Record<Event>> processor,
63+
final EventFactory eventFactory) {
64+
final Collection<Record<Event>> result1 = processor.execute(createEvents(eventFactory, 300));
65+
final Collection<Record<Event>> result2 = processor.execute(createEvents(eventFactory, 200));
66+
67+
assertThat(result1.size() + result2.size(), lessThanOrEqualTo(400));
68+
}
69+
70+
@Test
71+
void test_limit_when_only_rate_limits_matching_events(
72+
@PluginConfigurationFile("rate_limiter_with_limit_when.yaml") final Processor<Record<Event>, Record<Event>> processor,
73+
final EventFactory eventFactory) {
74+
final List<Record<Event>> records = new ArrayList<>();
75+
records.addAll(createEvents(eventFactory, 5, Map.of("message", "test", "level", "DEBUG")));
76+
records.addAll(createEvents(eventFactory, 5, Map.of("message", "test", "level", "INFO")));
77+
78+
final Collection<Record<Event>> result = processor.execute(records);
79+
80+
assertThat(result, notNullValue());
81+
assertThat(result.size(), equalTo(7));
82+
}
83+
84+
@Test
85+
void test_hold_mode(
86+
@PluginConfigurationFile("rate_limiter_hold_mode.yaml") final Processor<Record<Event>, Record<Event>> processor,
87+
final EventFactory eventFactory) {
88+
final List<Record<Event>> records = createEvents(eventFactory, 3);
89+
final Collection<Record<Event>> result = processor.execute(records);
90+
assertThat(result, notNullValue());
91+
assertThat(result.size(), equalTo(3));
92+
}
93+
94+
private List<Record<Event>> createEvents(final EventFactory eventFactory, final int count) {
95+
return createEvents(eventFactory, count, Map.of("message", "test"));
96+
}
97+
98+
private List<Record<Event>> createEvents(final EventFactory eventFactory, final int count, final Map<String, Object> data) {
99+
final List<Record<Event>> records = new ArrayList<>();
100+
for (int i = 0; i < count; i++) {
101+
final Event event = eventFactory.eventBuilder(LogEventBuilder.class)
102+
.withData(data)
103+
.build();
104+
records.add(new Record<>(event));
105+
}
106+
return records;
107+
}
108+
}

0 commit comments

Comments
 (0)