Skip to content

Commit 525f096

Browse files
committed
Support automatic plugin loading in Data Prepper core.
This change allows plugin authors to load other plugins by defining @UsesDataPrepperPlugin on a field with a different type. It will automatically load the plugin using the PluginFactory and give the main plugin the desired plugin instance that they are looking for. This updates the sqs source to use this new capability for its InputCodec defined by the codec property. Resolves #4838 Signed-off-by: David Venable <dlv@amazon.com>
1 parent 49f9918 commit 525f096

23 files changed

Lines changed: 1257 additions & 80 deletions

File tree

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
2727
import org.opensearch.dataprepper.model.source.Source;
2828
import org.opensearch.dataprepper.pipeline.parser.DataPrepperDeserializationProblemHandler;
29+
import org.opensearch.dataprepper.plugins.TestNestedPluginInterface;
2930
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
31+
import org.opensearch.dataprepper.plugins.TestPluginWithNestedPlugin;
32+
import org.opensearch.dataprepper.plugins.TestPluginWithNestedPluginConfig;
3033
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
3134
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
3235
import org.opensearch.dataprepper.plugins.test.TestComponent;
@@ -250,6 +253,34 @@ void loadPlugin_should_succeed_when_a_non_experimental_plugin_has_an_experimenta
250253
assertThat(plugin, notNullValue());
251254
}
252255

256+
@Test
257+
void loadPlugin_should_resolve_nested_plugin_annotated_with_UsesDataPrepperPlugin() {
258+
final String nameValue = UUID.randomUUID().toString();
259+
final String nestedTestValue = UUID.randomUUID().toString();
260+
261+
final Map<String, Object> nestedPluginSettings = new HashMap<>();
262+
nestedPluginSettings.put("test_value", nestedTestValue);
263+
264+
final Map<String, Object> pluginSettingMap = new HashMap<>();
265+
pluginSettingMap.put("name", nameValue);
266+
pluginSettingMap.put("nested_plugin", Collections.singletonMap("test_nested_plugin", nestedPluginSettings));
267+
268+
final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_nested", pluginSettingMap);
269+
pluginSetting.setPipelineName(pipelineName);
270+
271+
final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting);
272+
273+
assertThat(plugin, instanceOf(TestPluginWithNestedPlugin.class));
274+
275+
final TestPluginWithNestedPlugin testPlugin = (TestPluginWithNestedPlugin) plugin;
276+
final TestPluginWithNestedPluginConfig configuration = testPlugin.getConfiguration();
277+
278+
assertThat(configuration.getName(), equalTo(nameValue));
279+
assertThat(configuration.getNestedPlugin(), notNullValue());
280+
assertThat(configuration.getNestedPlugin(), instanceOf(TestNestedPluginInterface.class));
281+
assertThat(configuration.getNestedPlugin().getValue(), equalTo(nestedTestValue));
282+
}
283+
253284
private PluginSetting createPluginSettings(final Map<String, Object> pluginSettingMap) {
254285
final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap);
255286
pluginSetting.setPipelineName(pipelineName);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
15+
@DataPrepperPlugin(name = "test_nested_plugin", pluginType = TestNestedPluginInterface.class, pluginConfigurationType = TestNestedPluginConfig.class)
16+
public class TestNestedPlugin implements TestNestedPluginInterface {
17+
private final TestNestedPluginConfig config;
18+
19+
@DataPrepperPluginConstructor
20+
public TestNestedPlugin(final TestNestedPluginConfig config) {
21+
this.config = config;
22+
}
23+
24+
@Override
25+
public String getValue() {
26+
return config.getTestValue();
27+
}
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
14+
public class TestNestedPluginConfig {
15+
@JsonProperty("test_value")
16+
private String testValue;
17+
18+
public String getTestValue() {
19+
return testValue;
20+
}
21+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
public interface TestNestedPluginInterface {
13+
String getValue();
14+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
15+
16+
@DataPrepperPlugin(name = "test_plugin_with_nested", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithNestedPluginConfig.class)
17+
public class TestPluginWithNestedPlugin implements TestPluggableInterface {
18+
private final TestPluginWithNestedPluginConfig configuration;
19+
20+
@DataPrepperPluginConstructor
21+
public TestPluginWithNestedPlugin(final TestPluginWithNestedPluginConfig configuration) {
22+
this.configuration = configuration;
23+
}
24+
25+
public TestPluginWithNestedPluginConfig getConfiguration() {
26+
return configuration;
27+
}
28+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
14+
15+
public class TestPluginWithNestedPluginConfig {
16+
@JsonProperty("nested_plugin")
17+
@UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class)
18+
private TestNestedPluginInterface nestedPlugin;
19+
20+
@JsonProperty("name")
21+
private String name;
22+
23+
public TestNestedPluginInterface getNestedPlugin() {
24+
return nestedPlugin;
25+
}
26+
27+
public String getName() {
28+
return name;
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugin;
11+
12+
import com.fasterxml.jackson.databind.BeanDescription;
13+
import com.fasterxml.jackson.databind.DeserializationConfig;
14+
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
15+
import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder;
16+
import com.fasterxml.jackson.databind.deser.SettableBeanProperty;
17+
import com.fasterxml.jackson.databind.introspect.AnnotatedField;
18+
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
19+
import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition;
20+
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
21+
import org.opensearch.dataprepper.model.configuration.PluginModel;
22+
23+
import java.util.Iterator;
24+
import java.util.List;
25+
26+
class DataPrepperPluginBeanDeserializerModifier extends BeanDeserializerModifier {
27+
28+
@Override
29+
public BeanDeserializerBuilder updateBuilder(
30+
final DeserializationConfig config,
31+
final BeanDescription beanDesc,
32+
final BeanDeserializerBuilder builder) {
33+
34+
final List<BeanPropertyDefinition> properties = beanDesc.findProperties();
35+
36+
for (final BeanPropertyDefinition propertyDef : properties) {
37+
final UsesDataPrepperPlugin annotation = findAnnotation(propertyDef);
38+
if (annotation == null) {
39+
continue;
40+
}
41+
42+
if (PluginModel.class.isAssignableFrom(propertyDef.getRawPrimaryType())) {
43+
continue;
44+
}
45+
46+
final Class<?> pluginType = annotation.pluginType();
47+
final NestedPluginDeserializer deserializer = new NestedPluginDeserializer(pluginType);
48+
49+
final Iterator<SettableBeanProperty> propertyIterator = builder.getProperties();
50+
while (propertyIterator.hasNext()) {
51+
final SettableBeanProperty property = propertyIterator.next();
52+
if (property.getName().equals(propertyDef.getName())) {
53+
final SettableBeanProperty updatedProperty = property.withValueDeserializer(deserializer);
54+
builder.addOrReplaceProperty(updatedProperty, true);
55+
break;
56+
}
57+
}
58+
}
59+
60+
return builder;
61+
}
62+
63+
private UsesDataPrepperPlugin findAnnotation(final BeanPropertyDefinition propertyDef) {
64+
final AnnotatedField field = propertyDef.getField();
65+
if (field != null) {
66+
final UsesDataPrepperPlugin annotation = field.getAnnotation(UsesDataPrepperPlugin.class);
67+
if (annotation != null) {
68+
return annotation;
69+
}
70+
}
71+
72+
final AnnotatedMember mutator = propertyDef.getMutator();
73+
if (mutator != null) {
74+
final UsesDataPrepperPlugin annotation = mutator.getAnnotation(UsesDataPrepperPlugin.class);
75+
if (annotation != null) {
76+
return annotation;
77+
}
78+
}
79+
80+
return null;
81+
}
82+
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugin;
711

812
import org.opensearch.dataprepper.model.configuration.PluginSetting;
913
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
1014
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
15+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1116

1217
import java.util.Map;
1318
import java.util.concurrent.ConcurrentHashMap;
1419

15-
public class DefaultPluginConfigObservable implements PluginConfigObservable {
20+
class DefaultPluginConfigObservable implements PluginConfigObservable {
1621
private final Map<PluginConfigObserver, Boolean> pluginConfigObserverBooleanMap
1722
= new ConcurrentHashMap<>();
1823
private final PluginConfigurationConverter pluginConfigurationConverter;
1924
private final Class<?> pluginConfigClass;
2025
private final PluginSetting rawPluginSettings;
26+
private final PluginFactory pluginFactory;
2127

22-
public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter,
23-
final Class<?> pluginConfigClass,
24-
final PluginSetting rawPluginSettings) {
28+
DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter,
29+
final Class<?> pluginConfigClass,
30+
final PluginSetting rawPluginSettings,
31+
final PluginFactory pluginFactory) {
2532
this.pluginConfigurationConverter = pluginConfigurationConverter;
2633
this.pluginConfigClass = pluginConfigClass;
2734
this.rawPluginSettings = rawPluginSettings;
35+
this.pluginFactory = pluginFactory;
2836
}
2937

3038
@Override
@@ -36,7 +44,7 @@ public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigOb
3644
@Override
3745
public void update() {
3846
final Object newPluginConfiguration = pluginConfigurationConverter.convert(
39-
pluginConfigClass, rawPluginSettings);
47+
pluginConfigClass, rawPluginSettings, pluginFactory);
4048
pluginConfigObserverBooleanMap.keySet().forEach(
4149
pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration));
4250
}

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugin;
@@ -117,9 +121,9 @@ private <T> ComponentPluginArgumentsContext getConstructionContext(final PluginS
117121

118122
final Class<?> pluginConfigurationType = pluginAnnotation.pluginConfigurationType();
119123

120-
final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting);
124+
final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting, this);
121125
final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory
122-
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting);
126+
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting, this);
123127

124128
Class[] markersToScan = pluginAnnotation.packagesToScan();
125129
BeanFactory beanFactory = pluginBeanFactoryProvider.createPluginSpecificContext(markersToScan, configuration, pluginSetting);

0 commit comments

Comments
 (0)