Skip to content

Commit 6fbb171

Browse files
committed
Adding support for rate_limiter processor
1 parent 2f21000 commit 6fbb171

7 files changed

Lines changed: 325 additions & 0 deletions

File tree

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
plugins {
7+
id 'java'
8+
}
9+
10+
dependencies {
11+
implementation project(':data-prepper-api')
12+
implementation 'com.fasterxml.jackson.core:jackson-databind'
13+
testImplementation project(':data-prepper-test:plugin-test-framework')
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public enum RateLimiterMode {
21+
DROP("drop");
22+
23+
private static final Map<String, RateLimiterMode> MODES_MAP = Arrays.stream(RateLimiterMode.values())
24+
.collect(Collectors.toMap(
25+
value -> value.name,
26+
value -> value
27+
));
28+
29+
private final String name;
30+
31+
RateLimiterMode(String name) {
32+
this.name = name.toLowerCase();
33+
}
34+
35+
@JsonCreator
36+
static RateLimiterMode fromOptionValue(final String option) {
37+
return MODES_MAP.get(option.toLowerCase());
38+
}
39+
40+
@JsonValue
41+
public String getOptionValue() {
42+
return name;
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.opensearch.dataprepper.metrics.PluginMetrics;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
15+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
18+
import org.opensearch.dataprepper.model.processor.Processor;
19+
import org.opensearch.dataprepper.model.record.Record;
20+
21+
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
@DataPrepperPlugin(name = "rate_limiter", pluginType = Processor.class, pluginConfigurationType = RateLimiterProcessorConfig.class)
28+
public class RateLimiterProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
29+
private final int eventsPerSecond;
30+
private final int counterRetentionSeconds;
31+
private final ConcurrentHashMap<Long, AtomicInteger> eventCountPerSecond = new ConcurrentHashMap<>();
32+
33+
@DataPrepperPluginConstructor
34+
public RateLimiterProcessor(final PluginMetrics pluginMetrics,
35+
final RateLimiterProcessorConfig config) {
36+
super(pluginMetrics);
37+
this.eventsPerSecond = config.getEventsPerSecond();
38+
this.counterRetentionSeconds = config.getCounterRetentionSeconds();
39+
}
40+
41+
@Override
42+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
43+
final Collection<Record<Event>> output = new ArrayList<>();
44+
45+
for (final Record<Event> record : records) {
46+
final Event event = record.getData();
47+
final long arrivalSecond = event.getMetadata().getTimeReceived().getEpochSecond();
48+
49+
final AtomicInteger count = eventCountPerSecond.computeIfAbsent(arrivalSecond, k -> new AtomicInteger(0));
50+
if (count.incrementAndGet() <= eventsPerSecond) {
51+
output.add(record);
52+
}
53+
}
54+
55+
evictExpiredCounters();
56+
57+
return output;
58+
}
59+
60+
private void evictExpiredCounters() {
61+
final long now = Instant.now().getEpochSecond();
62+
eventCountPerSecond.keySet().removeIf(second -> now - second > counterRetentionSeconds);
63+
}
64+
65+
66+
@Override
67+
public void prepareForShutdown() {
68+
}
69+
70+
@Override
71+
public boolean isReadyForShutdown() {
72+
return true;
73+
}
74+
75+
@Override
76+
public void shutdown() {
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import com.fasterxml.jackson.annotation.JsonClassDescription;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
16+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.NotNull;
18+
19+
@JsonPropertyOrder
20+
@JsonClassDescription("The <code>rate_limiter</code> processor controls the number of events processed per second. " +
21+
"By default, <code>rate_limiter</code> drops events that exceed the configured number allowed per second.")
22+
public class RateLimiterProcessorConfig {
23+
24+
@JsonPropertyDescription("The number of events allowed per second.")
25+
@JsonProperty("events_per_second")
26+
@NotNull
27+
private int eventsPerSecond;
28+
29+
@JsonPropertyDescription("Indicates what action the <code>rate_limiter</code> takes when the number of events received is greater than the number of events allowed per second. " +
30+
"Default value is drop, which drops the excess events received in that second.")
31+
@JsonProperty(value = "when_exceeds", defaultValue = "drop")
32+
private RateLimiterMode whenExceeds = RateLimiterMode.DROP;
33+
34+
@JsonPropertyDescription("The duration in seconds to track per-second event counts. " +
35+
"Counters older than this are discarded to free resources. Default is 60.")
36+
@JsonProperty(value = "counter_retention_seconds", defaultValue = "60")
37+
private int counterRetentionSeconds = 60;
38+
39+
public int getEventsPerSecond() {
40+
return eventsPerSecond;
41+
}
42+
43+
public RateLimiterMode getWhenExceeds() {
44+
return whenExceeds;
45+
}
46+
47+
public int getCounterRetentionSeconds() {
48+
return counterRetentionSeconds;
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import java.lang.reflect.Field;
16+
17+
import static org.hamcrest.CoreMatchers.equalTo;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
class RateLimiterProcessorConfigTest {
21+
22+
@Test
23+
void test_default_when_exceeds_is_drop() {
24+
final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig();
25+
assertThat(config.getWhenExceeds(), equalTo(RateLimiterMode.DROP));
26+
}
27+
28+
@Test
29+
void test_default_counter_retention_seconds_is_60() {
30+
final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig();
31+
assertThat(config.getCounterRetentionSeconds(), equalTo(60));
32+
}
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.ratelimiter;
12+
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.mockito.Mock;
16+
import org.mockito.junit.jupiter.MockitoExtension;
17+
import org.opensearch.dataprepper.metrics.PluginMetrics;
18+
import org.opensearch.dataprepper.model.event.Event;
19+
import org.opensearch.dataprepper.model.event.JacksonEvent;
20+
import org.opensearch.dataprepper.model.record.Record;
21+
22+
import java.time.Instant;
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
import java.util.List;
27+
28+
import static org.hamcrest.CoreMatchers.equalTo;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.mockito.Mockito.when;
31+
32+
@ExtendWith(MockitoExtension.class)
33+
class RateLimiterProcessorTest {
34+
35+
@Mock
36+
private PluginMetrics pluginMetrics;
37+
38+
@Mock
39+
private RateLimiterProcessorConfig config;
40+
41+
private RateLimiterProcessor createObjectUnderTest(final int eventsPerSecond) {
42+
when(config.getEventsPerSecond()).thenReturn(eventsPerSecond);
43+
when(config.getCounterRetentionSeconds()).thenReturn(60);
44+
return new RateLimiterProcessor(pluginMetrics, config);
45+
}
46+
47+
private List<Record<Event>> createEventsInSecond(final Instant second, final int count) {
48+
final List<Record<Event>> records = new ArrayList<>();
49+
for (int i = 0; i < count; i++) {
50+
final Event event = JacksonEvent.builder()
51+
.withEventType("event")
52+
.withData(Collections.singletonMap("message", "test-" + i))
53+
.withTimeReceived(second.plusMillis(i))
54+
.build();
55+
records.add(new Record<>(event));
56+
}
57+
return records;
58+
}
59+
60+
@Test
61+
void test_events_under_limit_all_pass_through() {
62+
final RateLimiterProcessor processor = createObjectUnderTest(400);
63+
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
64+
final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 200));
65+
66+
assertThat(result.size(), equalTo(200));
67+
}
68+
69+
@Test
70+
void test_events_over_limit_are_dropped() {
71+
final RateLimiterProcessor processor = createObjectUnderTest(400);
72+
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
73+
final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 500));
74+
75+
assertThat(result.size(), equalTo(400));
76+
}
77+
78+
@Test
79+
void test_events_from_different_seconds_are_counted_independently() {
80+
final RateLimiterProcessor processor = createObjectUnderTest(400);
81+
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
82+
final Instant nextSecond = now.plusSeconds(1);
83+
84+
final List<Record<Event>> records = new ArrayList<>();
85+
records.addAll(createEventsInSecond(now, 500));
86+
records.addAll(createEventsInSecond(nextSecond, 500));
87+
88+
final Collection<Record<Event>> result = processor.doExecute(records);
89+
90+
assertThat(result.size(), equalTo(800));
91+
}
92+
93+
@Test
94+
void test_counts_accumulate_across_multiple_doExecute_calls() {
95+
final RateLimiterProcessor processor = createObjectUnderTest(400);
96+
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
97+
98+
final Collection<Record<Event>> result1 = processor.doExecute(createEventsInSecond(now, 300));
99+
final Collection<Record<Event>> result2 = processor.doExecute(createEventsInSecond(now, 200));
100+
101+
assertThat(result1.size(), equalTo(300));
102+
assertThat(result2.size(), equalTo(100));
103+
}
104+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ include 'data-prepper-plugins:saas-source-plugins:crowdstrike-source'
216216
include 'data-prepper-plugins:saas-source-plugins:microsoft-office365-source'
217217
include 'data-prepper-plugins:otlp-sink'
218218
include 'data-prepper-plugins:otel-apm-service-map-processor'
219+
include 'data-prepper-plugins:rate-limiter-processor'
219220

220221

221222
include 'e2e-test:kafka-buffer-backward-compatibility'

0 commit comments

Comments
 (0)