Skip to content

Commit 72fc35f

Browse files
dlvenableAlekhya Parisha
authored andcommitted
Enabling experimental plugins specifically by plugin type and name (opensearch-project#5676)
Support enabling experimental plugins specifically by plugin type and name. This also includes a change to the core plugin classes to allow them to define themselves as a plugin component type along with a name that is used to create the mapping in the YAML file. Resolves opensearch-project#5675 Signed-off-by: David Venable <dlv@amazon.com>
1 parent a3006a4 commit 72fc35f

12 files changed

Lines changed: 151 additions & 7 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.model.buffer;
77

88
import org.opensearch.dataprepper.model.CheckpointState;
9+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
910
import org.opensearch.dataprepper.model.record.Record;
1011

1112
import java.time.Duration;
@@ -18,6 +19,7 @@
1819
* Buffer queues the records between TI components and acts as a layer between source and processor/sink. Buffer can
1920
* be in-memory, disk based or other a standalone implementation.
2021
*/
22+
@PluginComponentType("buffer")
2123
public interface Buffer<T extends Record<?>> {
2224
/**
2325
* writes the record to the buffer
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.model.plugin;
11+
12+
import java.lang.annotation.Documented;
13+
import java.lang.annotation.ElementType;
14+
import java.lang.annotation.Retention;
15+
import java.lang.annotation.RetentionPolicy;
16+
import java.lang.annotation.Target;
17+
18+
/**
19+
* Annotation for Data Prepper plugin type components.
20+
* Intended for processor, sink, source, buffer.
21+
*
22+
* @since 2.12
23+
*/
24+
@Documented
25+
@Retention(RetentionPolicy.RUNTIME)
26+
@Target({ElementType.TYPE})
27+
public @interface PluginComponentType {
28+
/**
29+
* Gets the name of the plugin component type.
30+
*
31+
* @return The name of the plugin component type.
32+
*/
33+
String value();
34+
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.model.processor;
77

8+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
89
import org.opensearch.dataprepper.model.record.Record;
910

1011
import java.util.Collection;
@@ -14,6 +15,7 @@
1415
* Processor interface. These are intermediary processing units using which users can filter,
1516
* transform and enrich the records into desired format before publishing to the sink.
1617
*/
18+
@PluginComponentType("processor")
1719
public interface Processor<InputRecord extends Record<?>, OutputRecord extends Record<?>> {
1820

1921
/**

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.model.sink;
77

8+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
89
import org.opensearch.dataprepper.model.record.Record;
910

1011
import java.util.Collection;
@@ -13,6 +14,7 @@
1314
* Data Prepper sink interface. Sink may publish records to a disk, a file,
1415
* to OpenSearch, other pipelines, or other external systems.
1516
*/
17+
@PluginComponentType("sink")
1618
public interface Sink<T extends Record<?>> {
1719

1820
/**

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package org.opensearch.dataprepper.model.source;
77

88
import org.opensearch.dataprepper.model.buffer.Buffer;
9+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
910
import org.opensearch.dataprepper.model.record.Record;
1011
import org.opensearch.dataprepper.model.codec.HasByteDecoder;
1112

1213
/**
1314
* Data Prepper source interface. Source acts as receiver of the events that flow
1415
* through the transformation pipeline.
1516
*/
17+
@PluginComponentType("source")
1618
public interface Source<T extends Record<?>> extends HasByteDecoder {
1719

1820
/**

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,14 @@ private <T> Class<? extends T> getPluginClass(final Class<T> baseClass, final St
144144
.orElseThrow(() -> new NoPluginFoundException(
145145
"Unable to find a plugin named '" + pluginName + "'. Please ensure that plugin is annotated with appropriate values."));
146146

147-
handleDefinedPlugins(pluginClass, pluginName);
147+
handleDefinedPlugins(pluginClass, baseClass, pluginName);
148148
return pluginClass;
149149
}
150150

151-
private <T> void handleDefinedPlugins(final Class<? extends T> pluginClass, final String pluginName) {
152-
final DefinedPlugin<? extends T> definedPlugin = new DefinedPlugin<>(pluginClass, pluginName);
151+
private <T> void handleDefinedPlugins(final Class<? extends T> pluginClass,
152+
final Class<? extends T> pluginTypeClass,
153+
final String pluginName) {
154+
final DefinedPlugin<? extends T> definedPlugin = new DefinedPlugin<>(pluginClass, pluginTypeClass, pluginName);
153155

154156
definedPluginConsumers.forEach(definedPluginConsumer -> definedPluginConsumer.accept(definedPlugin));
155157
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefinedPlugin.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@
99

1010
package org.opensearch.dataprepper.plugin;
1111

12+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
13+
1214
import java.util.Objects;
1315

1416
class DefinedPlugin<T> {
1517
private final Class<? extends T> pluginClass;
18+
private final Class<? extends T> pluginTypeClass;
1619
private final String pluginName;
1720

18-
public DefinedPlugin(final Class<? extends T> pluginClass, final String pluginName) {
21+
public DefinedPlugin(final Class<? extends T> pluginClass,
22+
final Class<? extends T> pluginTypeClass,
23+
final String pluginName) {
1924
this.pluginClass = Objects.requireNonNull(pluginClass);
25+
this.pluginTypeClass = Objects.requireNonNull(pluginTypeClass);
2026
this.pluginName = Objects.requireNonNull(pluginName);
2127
}
2228

@@ -27,4 +33,12 @@ public Class<? extends T> getPluginClass() {
2733
public String getPluginName() {
2834
return pluginName;
2935
}
36+
37+
public String getPluginTypeName() {
38+
if(pluginTypeClass.isAnnotationPresent(PluginComponentType.class)) {
39+
return pluginTypeClass.getAnnotation(PluginComponentType.class).value();
40+
}
41+
42+
return null;
43+
}
3044
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExperimentalConfiguration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
import com.fasterxml.jackson.annotation.JsonProperty;
1313

14+
import java.util.Collections;
15+
import java.util.Map;
16+
import java.util.Set;
17+
1418
/**
1519
* Data Prepper configurations for experimental features.
1620
*
@@ -20,6 +24,9 @@ public class ExperimentalConfiguration {
2024
@JsonProperty("enable_all")
2125
private boolean enableAll = false;
2226

27+
@JsonProperty("enabled_plugins")
28+
private Map<String, Set<String>> enabledPlugins;
29+
2330
public static ExperimentalConfiguration defaultConfiguration() {
2431
return new ExperimentalConfiguration();
2532
}
@@ -32,4 +39,14 @@ public static ExperimentalConfiguration defaultConfiguration() {
3239
public boolean isEnableAll() {
3340
return enableAll;
3441
}
42+
43+
/**
44+
* Gets enabled plugins by plugin type.
45+
*
46+
* @return A map of plugin types to list of allowed plugins by name.
47+
* @since 2.12
48+
*/
49+
public Map<String, Set<String>> getEnabledPlugins() {
50+
return enabledPlugins != null ? enabledPlugins : Collections.emptyMap();
51+
}
3552
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExperimentalPluginValidator.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
1414

1515
import javax.inject.Named;
16+
import java.util.Collections;
17+
import java.util.Set;
1618
import java.util.function.Consumer;
1719

1820
@Named
@@ -25,13 +27,22 @@ class ExperimentalPluginValidator implements Consumer<DefinedPlugin<?>> {
2527

2628
@Override
2729
public void accept(final DefinedPlugin<?> definedPlugin) {
28-
if(isPluginDisallowedAsExperimental(definedPlugin.getPluginClass())) {
30+
if(isPluginDisallowedAsExperimental(definedPlugin)) {
2931
throw new NoPluginFoundException("Unable to create experimental plugin " + definedPlugin.getPluginName() +
3032
". You must enable experimental plugins in data-prepper-config.yaml in order to use them.");
3133
}
3234
}
3335

34-
private boolean isPluginDisallowedAsExperimental(final Class<?> pluginClass) {
35-
return pluginClass.isAnnotationPresent(Experimental.class) && !experimentalConfiguration.isEnableAll();
36+
private boolean isPluginDisallowedAsExperimental(final DefinedPlugin<?> definedPlugin) {
37+
final Class<?> pluginClass = definedPlugin.getPluginClass();
38+
if(!pluginClass.isAnnotationPresent(Experimental.class))
39+
return false;
40+
if(experimentalConfiguration.isEnableAll())
41+
return false;
42+
43+
final Set<String> enabledPluginsForType =
44+
experimentalConfiguration.getEnabledPlugins()
45+
.getOrDefault(definedPlugin.getPluginTypeName(), Collections.emptySet());
46+
return !enabledPluginsForType.contains(definedPlugin.getPluginName());
3647
}
3748
}

data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ void loadPlugin_should_call_all_definedPluginConsumers() {
248248

249249
final DefinedPlugin<?> actualDefinedPlugin = definedPluginArgumentCaptor.getValue();
250250
assertThat(actualDefinedPlugin.getPluginClass(), equalTo(expectedPluginClass));
251+
assertThat(actualDefinedPlugin.getPluginTypeName(), equalTo("sink"));
251252
assertThat(actualDefinedPlugin.getPluginName(), equalTo(pluginName));
252253
}
253254
}
@@ -357,6 +358,7 @@ void loadPlugin_with_varargs_should_call_all_definedPluginConsumers() {
357358

358359
final DefinedPlugin<?> actualDefinedPlugin = definedPluginArgumentCaptor.getValue();
359360
assertThat(actualDefinedPlugin.getPluginClass(), equalTo(expectedPluginClass));
361+
assertThat(actualDefinedPlugin.getPluginTypeName(), equalTo("sink"));
360362
assertThat(actualDefinedPlugin.getPluginName(), equalTo(pluginName));
361363
}
362364
}

0 commit comments

Comments
 (0)