Skip to content

Commit a18b56c

Browse files
authored
Support automatic plugin loading in Data Prepper core. (#6882)
This change allows plugin authors to load other plugins by defining @UsesDataPrepperPlugin on a field with a different type. It will automatically load the plugin using the PluginFactory and give the main plugin the desired plugin instance that they are looking for. This updates the sqs source to use this new capability for its InputCodec defined by the codec property. Resolves #4838 Signed-off-by: David Venable <dlv@amazon.com>
1 parent 1a5e0c9 commit a18b56c

25 files changed

Lines changed: 1389 additions & 80 deletions

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
2727
import org.opensearch.dataprepper.model.source.Source;
2828
import org.opensearch.dataprepper.pipeline.parser.DataPrepperDeserializationProblemHandler;
29+
import org.opensearch.dataprepper.plugins.TestNestedPluginInterface;
2930
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
31+
import org.opensearch.dataprepper.plugins.TestPluginWithNestedPlugin;
32+
import org.opensearch.dataprepper.plugins.TestPluginWithNestedPluginConfig;
33+
import org.opensearch.dataprepper.plugins.TestPluginWithPluginModel;
34+
import org.opensearch.dataprepper.plugins.TestPluginWithPluginModelConfig;
3035
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
3136
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
3237
import org.opensearch.dataprepper.plugins.test.TestComponent;
@@ -250,6 +255,62 @@ void loadPlugin_should_succeed_when_a_non_experimental_plugin_has_an_experimenta
250255
assertThat(plugin, notNullValue());
251256
}
252257

258+
@Test
259+
void loadPlugin_should_resolve_nested_plugin_annotated_with_UsesDataPrepperPlugin() {
260+
final String nameValue = UUID.randomUUID().toString();
261+
final String nestedTestValue = UUID.randomUUID().toString();
262+
263+
final Map<String, Object> nestedPluginSettings = new HashMap<>();
264+
nestedPluginSettings.put("test_value", nestedTestValue);
265+
266+
final Map<String, Object> pluginSettingMap = new HashMap<>();
267+
pluginSettingMap.put("name", nameValue);
268+
pluginSettingMap.put("nested_plugin", Collections.singletonMap("test_nested_plugin", nestedPluginSettings));
269+
270+
final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_nested", pluginSettingMap);
271+
pluginSetting.setPipelineName(pipelineName);
272+
273+
final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting);
274+
275+
assertThat(plugin, instanceOf(TestPluginWithNestedPlugin.class));
276+
277+
final TestPluginWithNestedPlugin testPlugin = (TestPluginWithNestedPlugin) plugin;
278+
final TestPluginWithNestedPluginConfig configuration = testPlugin.getConfiguration();
279+
280+
assertThat(configuration.getName(), equalTo(nameValue));
281+
assertThat(configuration.getNestedPlugin(), notNullValue());
282+
assertThat(configuration.getNestedPlugin(), instanceOf(TestNestedPluginInterface.class));
283+
assertThat(configuration.getNestedPlugin().getValue(), equalTo(nestedTestValue));
284+
}
285+
286+
@Test
287+
void loadPlugin_should_deserialize_PluginModel_field_annotated_with_UsesDataPrepperPlugin_without_loading_nested_plugin() {
288+
final String nameValue = UUID.randomUUID().toString();
289+
final String nestedTestValue = UUID.randomUUID().toString();
290+
291+
final Map<String, Object> nestedPluginSettings = new HashMap<>();
292+
nestedPluginSettings.put("test_value", nestedTestValue);
293+
294+
final Map<String, Object> pluginSettingMap = new HashMap<>();
295+
pluginSettingMap.put("name", nameValue);
296+
pluginSettingMap.put("nested_action", Collections.singletonMap("test_nested_plugin", nestedPluginSettings));
297+
298+
final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_plugin_model", pluginSettingMap);
299+
pluginSetting.setPipelineName(pipelineName);
300+
301+
final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting);
302+
303+
assertThat(plugin, instanceOf(TestPluginWithPluginModel.class));
304+
305+
final TestPluginWithPluginModel testPlugin = (TestPluginWithPluginModel) plugin;
306+
final TestPluginWithPluginModelConfig configuration = testPlugin.getConfiguration();
307+
308+
assertThat(configuration.getName(), equalTo(nameValue));
309+
assertThat(configuration.getNestedAction(), notNullValue());
310+
assertThat(configuration.getNestedAction().getPluginName(), equalTo("test_nested_plugin"));
311+
assertThat(configuration.getNestedAction().getPluginSettings().get("test_value"), equalTo(nestedTestValue));
312+
}
313+
253314
private PluginSetting createPluginSettings(final Map<String, Object> pluginSettingMap) {
254315
final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap);
255316
pluginSetting.setPipelineName(pipelineName);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
15+
@DataPrepperPlugin(name = "test_nested_plugin", pluginType = TestNestedPluginInterface.class, pluginConfigurationType = TestNestedPluginConfig.class)
16+
public class TestNestedPlugin implements TestNestedPluginInterface {
17+
private final TestNestedPluginConfig config;
18+
19+
@DataPrepperPluginConstructor
20+
public TestNestedPlugin(final TestNestedPluginConfig config) {
21+
this.config = config;
22+
}
23+
24+
@Override
25+
public String getValue() {
26+
return config.getTestValue();
27+
}
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
14+
public class TestNestedPluginConfig {
15+
@JsonProperty("test_value")
16+
private String testValue;
17+
18+
public String getTestValue() {
19+
return testValue;
20+
}
21+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
public interface TestNestedPluginInterface {
13+
String getValue();
14+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
15+
16+
@DataPrepperPlugin(name = "test_plugin_with_nested", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithNestedPluginConfig.class)
17+
public class TestPluginWithNestedPlugin implements TestPluggableInterface {
18+
private final TestPluginWithNestedPluginConfig configuration;
19+
20+
@DataPrepperPluginConstructor
21+
public TestPluginWithNestedPlugin(final TestPluginWithNestedPluginConfig configuration) {
22+
this.configuration = configuration;
23+
}
24+
25+
public TestPluginWithNestedPluginConfig getConfiguration() {
26+
return configuration;
27+
}
28+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
14+
15+
public class TestPluginWithNestedPluginConfig {
16+
@JsonProperty("nested_plugin")
17+
@UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class)
18+
private TestNestedPluginInterface nestedPlugin;
19+
20+
@JsonProperty("name")
21+
private String name;
22+
23+
public TestNestedPluginInterface getNestedPlugin() {
24+
return nestedPlugin;
25+
}
26+
27+
public String getName() {
28+
return name;
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
15+
16+
@DataPrepperPlugin(name = "test_plugin_with_plugin_model", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithPluginModelConfig.class)
17+
public class TestPluginWithPluginModel implements TestPluggableInterface {
18+
private final TestPluginWithPluginModelConfig configuration;
19+
20+
@DataPrepperPluginConstructor
21+
public TestPluginWithPluginModel(final TestPluginWithPluginModelConfig configuration) {
22+
this.configuration = configuration;
23+
}
24+
25+
public TestPluginWithPluginModelConfig getConfiguration() {
26+
return configuration;
27+
}
28+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
14+
import org.opensearch.dataprepper.model.configuration.PluginModel;
15+
16+
public class TestPluginWithPluginModelConfig {
17+
@JsonProperty("name")
18+
private String name;
19+
20+
@JsonProperty("nested_action")
21+
@UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class)
22+
private PluginModel nestedAction;
23+
24+
public String getName() {
25+
return name;
26+
}
27+
28+
public PluginModel getNestedAction() {
29+
return nestedAction;
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugin;
11+
12+
import com.fasterxml.jackson.databind.BeanDescription;
13+
import com.fasterxml.jackson.databind.DeserializationConfig;
14+
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
15+
import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder;
16+
import com.fasterxml.jackson.databind.deser.SettableBeanProperty;
17+
import com.fasterxml.jackson.databind.introspect.AnnotatedField;
18+
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
19+
import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition;
20+
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
21+
import org.opensearch.dataprepper.model.configuration.PluginModel;
22+
23+
import java.util.Iterator;
24+
import java.util.List;
25+
26+
class DataPrepperPluginBeanDeserializerModifier extends BeanDeserializerModifier {
27+
28+
@Override
29+
public BeanDeserializerBuilder updateBuilder(
30+
final DeserializationConfig config,
31+
final BeanDescription beanDesc,
32+
final BeanDeserializerBuilder builder) {
33+
34+
final List<BeanPropertyDefinition> properties = beanDesc.findProperties();
35+
36+
for (final BeanPropertyDefinition propertyDef : properties) {
37+
final UsesDataPrepperPlugin annotation = findAnnotation(propertyDef);
38+
if (annotation == null) {
39+
continue;
40+
}
41+
42+
if (PluginModel.class.isAssignableFrom(propertyDef.getRawPrimaryType())) {
43+
continue;
44+
}
45+
46+
final Class<?> pluginType = annotation.pluginType();
47+
final NestedPluginDeserializer deserializer = new NestedPluginDeserializer(pluginType);
48+
49+
final Iterator<SettableBeanProperty> propertyIterator = builder.getProperties();
50+
while (propertyIterator.hasNext()) {
51+
final SettableBeanProperty property = propertyIterator.next();
52+
if (property.getName().equals(propertyDef.getName())) {
53+
final SettableBeanProperty updatedProperty = property.withValueDeserializer(deserializer);
54+
builder.addOrReplaceProperty(updatedProperty, true);
55+
break;
56+
}
57+
}
58+
}
59+
60+
return builder;
61+
}
62+
63+
private UsesDataPrepperPlugin findAnnotation(final BeanPropertyDefinition propertyDef) {
64+
final AnnotatedField field = propertyDef.getField();
65+
if (field != null) {
66+
final UsesDataPrepperPlugin annotation = field.getAnnotation(UsesDataPrepperPlugin.class);
67+
if (annotation != null) {
68+
return annotation;
69+
}
70+
}
71+
72+
final AnnotatedMember mutator = propertyDef.getMutator();
73+
if (mutator != null) {
74+
final UsesDataPrepperPlugin annotation = mutator.getAnnotation(UsesDataPrepperPlugin.class);
75+
if (annotation != null) {
76+
return annotation;
77+
}
78+
}
79+
80+
return null;
81+
}
82+
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugin;
711

812
import org.opensearch.dataprepper.model.configuration.PluginSetting;
913
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
1014
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
15+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1116

1217
import java.util.Map;
1318
import java.util.concurrent.ConcurrentHashMap;
1419

15-
public class DefaultPluginConfigObservable implements PluginConfigObservable {
20+
class DefaultPluginConfigObservable implements PluginConfigObservable {
1621
private final Map<PluginConfigObserver, Boolean> pluginConfigObserverBooleanMap
1722
= new ConcurrentHashMap<>();
1823
private final PluginConfigurationConverter pluginConfigurationConverter;
1924
private final Class<?> pluginConfigClass;
2025
private final PluginSetting rawPluginSettings;
26+
private final PluginFactory pluginFactory;
2127

22-
public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter,
23-
final Class<?> pluginConfigClass,
24-
final PluginSetting rawPluginSettings) {
28+
DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter,
29+
final Class<?> pluginConfigClass,
30+
final PluginSetting rawPluginSettings,
31+
final PluginFactory pluginFactory) {
2532
this.pluginConfigurationConverter = pluginConfigurationConverter;
2633
this.pluginConfigClass = pluginConfigClass;
2734
this.rawPluginSettings = rawPluginSettings;
35+
this.pluginFactory = pluginFactory;
2836
}
2937

3038
@Override
@@ -36,7 +44,7 @@ public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigOb
3644
@Override
3745
public void update() {
3846
final Object newPluginConfiguration = pluginConfigurationConverter.convert(
39-
pluginConfigClass, rawPluginSettings);
47+
pluginConfigClass, rawPluginSettings, pluginFactory);
4048
pluginConfigObserverBooleanMap.keySet().forEach(
4149
pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration));
4250
}

0 commit comments

Comments
 (0)