Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions data-prepper-plugins/rate-limiter-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test:plugin-test-framework')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.ratelimiter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum RateLimiterMode {
DROP("drop");

private static final Map<String, RateLimiterMode> MODES_MAP = Arrays.stream(RateLimiterMode.values())
.collect(Collectors.toMap(
value -> value.name,
value -> value
));

private final String name;

RateLimiterMode(String name) {
this.name = name.toLowerCase();
}

@JsonCreator
static RateLimiterMode fromOptionValue(final String option) {
return MODES_MAP.get(option.toLowerCase());
}

@JsonValue
public String getOptionValue() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.ratelimiter;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@DataPrepperPlugin(name = "rate_limiter", pluginType = Processor.class, pluginConfigurationType = RateLimiterProcessorConfig.class)
public class RateLimiterProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private final int eventsPerSecond;
private final int counterRetentionSeconds;
private final ConcurrentHashMap<Long, AtomicInteger> eventCountPerSecond = new ConcurrentHashMap<>();

@DataPrepperPluginConstructor
public RateLimiterProcessor(final PluginMetrics pluginMetrics,
final RateLimiterProcessorConfig config) {
super(pluginMetrics);
this.eventsPerSecond = config.getEventsPerSecond();
this.counterRetentionSeconds = config.getCounterRetentionSeconds();
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final Collection<Record<Event>> output = new ArrayList<>();

for (final Record<Event> record : records) {
final Event event = record.getData();
final long arrivalSecond = event.getMetadata().getTimeReceived().getEpochSecond();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about this. We are trading off the possibility of dropping some data due to running processors on batches to another problem which is that we may still greatly exceed the current rate. You could have data from over a wide swath of time coming through and that will result in a large batch of data beyond the requested size going to OpenSearch. It seems the goal of a rate limiter is to prevent overwhelming the destination.


final AtomicInteger count = eventCountPerSecond.computeIfAbsent(arrivalSecond, k -> new AtomicInteger(0));
if (count.incrementAndGet() <= eventsPerSecond) {
output.add(record);
}
}

evictExpiredCounters();

return output;
}

private void evictExpiredCounters() {
final long now = Instant.now().getEpochSecond();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use System.currentTimeMillis() / 1000 which is faster than now().

eventCountPerSecond.keySet().removeIf(second -> now - second > counterRetentionSeconds);
}


@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.ratelimiter;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotNull;

@JsonPropertyOrder
@JsonClassDescription("The <code>rate_limiter</code> processor controls the number of events processed per second. " +
"By default, <code>rate_limiter</code> drops events that exceed the configured number allowed per second.")
public class RateLimiterProcessorConfig {

@JsonPropertyDescription("The number of events allowed per second.")
@JsonProperty("events_per_second")
@NotNull
private int eventsPerSecond;

@JsonPropertyDescription("Indicates what action the <code>rate_limiter</code> takes when the number of events received is greater than the number of events allowed per second. " +
"Default value is drop, which drops the excess events received in that second.")
@JsonProperty(value = "when_exceeds", defaultValue = "drop")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably have a generic when condition here like other processors. Maybe the configuration is named "limit_when" or "rate_limit_when" as just a generic expression to including in the rate limiting or not. That at least allows users to specify what is ok to rate limit and what is not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with limit_when, but this is saying what do do when it exceeds - drop or hold.

private RateLimiterMode whenExceeds = RateLimiterMode.DROP;

@JsonPropertyDescription("The duration in seconds to track per-second event counts. " +
"Counters older than this are discarded to free resources. Default is 60.")
@JsonProperty(value = "counter_retention_seconds", defaultValue = "60")
private int counterRetentionSeconds = 60;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a Duration and named counter_retention.


public int getEventsPerSecond() {
return eventsPerSecond;
}

public RateLimiterMode getWhenExceeds() {
return whenExceeds;
}

public int getCounterRetentionSeconds() {
return counterRetentionSeconds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.ratelimiter;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

class RateLimiterProcessorConfigTest {

@Test
void test_default_when_exceeds_is_drop() {
final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig();
assertThat(config.getWhenExceeds(), equalTo(RateLimiterMode.DROP));
}

@Test
void test_default_counter_retention_seconds_is_60() {
final RateLimiterProcessorConfig config = new RateLimiterProcessorConfig();
assertThat(config.getCounterRetentionSeconds(), equalTo(60));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.ratelimiter;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class RateLimiterProcessorTest {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @DataPrepperPluginTest. You can search usages in the project for other projects that have started using this and how to do it.


@Mock
private PluginMetrics pluginMetrics;

@Mock
private RateLimiterProcessorConfig config;

private RateLimiterProcessor createObjectUnderTest(final int eventsPerSecond) {
when(config.getEventsPerSecond()).thenReturn(eventsPerSecond);
when(config.getCounterRetentionSeconds()).thenReturn(60);
return new RateLimiterProcessor(pluginMetrics, config);
}

private List<Record<Event>> createEventsInSecond(final Instant second, final int count) {
final List<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Event event = JacksonEvent.builder()
.withEventType("event")
.withData(Collections.singletonMap("message", "test-" + i))
.withTimeReceived(second.plusMillis(i))
.build();
records.add(new Record<>(event));
}
return records;
}

@Test
void test_events_under_limit_all_pass_through() {
final RateLimiterProcessor processor = createObjectUnderTest(400);
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 200));

assertThat(result.size(), equalTo(200));
}

@Test
void test_events_over_limit_are_dropped() {
final RateLimiterProcessor processor = createObjectUnderTest(400);
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
final Collection<Record<Event>> result = processor.doExecute(createEventsInSecond(now, 500));

assertThat(result.size(), equalTo(400));
}

@Test
void test_events_from_different_seconds_are_counted_independently() {
final RateLimiterProcessor processor = createObjectUnderTest(400);
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);
final Instant nextSecond = now.plusSeconds(1);

final List<Record<Event>> records = new ArrayList<>();
records.addAll(createEventsInSecond(now, 500));
records.addAll(createEventsInSecond(nextSecond, 500));

final Collection<Record<Event>> result = processor.doExecute(records);

assertThat(result.size(), equalTo(800));
}

@Test
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test when we inteweave the seconds like this

records.addAll(createEventsInSecond(now, 500));
records.addAll(createEventsInSecond(nextSecond, 500));
records.addAll(createEventsInSecond(now, 500));
records.addAll(createEventsInSecond(nextSecond + 1, 500));
records.addAll(createEventsInSecond(nextSecond, 500));

void test_counts_accumulate_across_multiple_doExecute_calls() {
final RateLimiterProcessor processor = createObjectUnderTest(400);
final Instant now = Instant.now().truncatedTo(java.time.temporal.ChronoUnit.SECONDS);

final Collection<Record<Event>> result1 = processor.doExecute(createEventsInSecond(now, 300));
final Collection<Record<Event>> result2 = processor.doExecute(createEventsInSecond(now, 200));

assertThat(result1.size(), equalTo(300));
assertThat(result2.size(), equalTo(100));
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ include 'data-prepper-plugins:saas-source-plugins:crowdstrike-source'
include 'data-prepper-plugins:saas-source-plugins:microsoft-office365-source'
include 'data-prepper-plugins:otlp-sink'
include 'data-prepper-plugins:otel-apm-service-map-processor'
include 'data-prepper-plugins:rate-limiter-processor'


include 'e2e-test:kafka-buffer-backward-compatibility'
Loading