From 163d99c6b0fb4671765671d516ee580e4cb9ca44 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 22 May 2026 10:22:29 -0500 Subject: [PATCH] Support automatic plugin loading in Data Prepper core. This change allows plugin authors to load other plugins by defining @UsesDataPrepperPlugin on a field with a different type. It will automatically load the plugin using the PluginFactory and give the main plugin the desired plugin instance that they are looking for. This updates the sqs source to use this new capability for its InputCodec defined by the codec property. Resolves #4838 Signed-off-by: David Venable --- .../plugin/DefaultPluginFactoryIT.java | 61 +++ .../dataprepper/plugins/TestNestedPlugin.java | 28 ++ .../plugins/TestNestedPluginConfig.java | 21 + .../plugins/TestNestedPluginInterface.java | 14 + .../plugins/TestPluginWithNestedPlugin.java | 28 ++ .../TestPluginWithNestedPluginConfig.java | 30 ++ .../plugins/TestPluginWithPluginModel.java | 28 ++ .../TestPluginWithPluginModelConfig.java | 31 ++ ...PrepperPluginBeanDeserializerModifier.java | 82 ++++ .../plugin/DefaultPluginConfigObservable.java | 18 +- .../plugin/DefaultPluginFactory.java | 8 +- .../plugin/NestedPluginDeserializer.java | 81 ++++ .../plugin/ObjectMapperConfiguration.java | 4 + .../plugin/PluginConfigurationConverter.java | 28 +- .../PluginConfigurationObservableFactory.java | 9 +- ...perPluginBeanDeserializerModifierTest.java | 410 ++++++++++++++++++ .../plugin/DefaultPluginFactoryTest.java | 50 ++- .../plugin/NestedPluginDeserializerTest.java | 271 ++++++++++++ .../PluginConfigObservableFactoryTest.java | 10 +- .../PluginConfigurationConverterTest.java | 204 ++++++++- .../plugins/source/sqs/QueueConfig.java | 9 +- .../plugins/source/sqs/SqsService.java | 15 +- .../plugins/source/sqs/SqsSource.java | 11 +- .../plugins/source/sqs/SqsServiceTest.java | 12 +- .../plugins/source/sqs/SqsSourceTest.java | 6 +- 25 files changed, 1389 insertions(+), 80 deletions(-) create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java create mode 100644 data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java create mode 100644 data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java create mode 100644 data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java create mode 100644 data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index 7dc3931b18..bc8cc959e4 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -26,7 +26,12 @@ import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.pipeline.parser.DataPrepperDeserializationProblemHandler; +import org.opensearch.dataprepper.plugins.TestNestedPluginInterface; import org.opensearch.dataprepper.plugins.TestObjectPlugin; +import org.opensearch.dataprepper.plugins.TestPluginWithNestedPlugin; +import org.opensearch.dataprepper.plugins.TestPluginWithNestedPluginConfig; +import org.opensearch.dataprepper.plugins.TestPluginWithPluginModel; +import org.opensearch.dataprepper.plugins.TestPluginWithPluginModelConfig; import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject; import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig; import org.opensearch.dataprepper.plugins.test.TestComponent; @@ -250,6 +255,62 @@ void loadPlugin_should_succeed_when_a_non_experimental_plugin_has_an_experimenta assertThat(plugin, notNullValue()); } + @Test + void loadPlugin_should_resolve_nested_plugin_annotated_with_UsesDataPrepperPlugin() { + final String nameValue = UUID.randomUUID().toString(); + final String nestedTestValue = UUID.randomUUID().toString(); + + final Map nestedPluginSettings = new HashMap<>(); + nestedPluginSettings.put("test_value", nestedTestValue); + + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("name", nameValue); + pluginSettingMap.put("nested_plugin", Collections.singletonMap("test_nested_plugin", nestedPluginSettings)); + + final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_nested", pluginSettingMap); + pluginSetting.setPipelineName(pipelineName); + + final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting); + + assertThat(plugin, instanceOf(TestPluginWithNestedPlugin.class)); + + final TestPluginWithNestedPlugin testPlugin = (TestPluginWithNestedPlugin) plugin; + final TestPluginWithNestedPluginConfig configuration = testPlugin.getConfiguration(); + + assertThat(configuration.getName(), equalTo(nameValue)); + assertThat(configuration.getNestedPlugin(), notNullValue()); + assertThat(configuration.getNestedPlugin(), instanceOf(TestNestedPluginInterface.class)); + assertThat(configuration.getNestedPlugin().getValue(), equalTo(nestedTestValue)); + } + + @Test + void loadPlugin_should_deserialize_PluginModel_field_annotated_with_UsesDataPrepperPlugin_without_loading_nested_plugin() { + final String nameValue = UUID.randomUUID().toString(); + final String nestedTestValue = UUID.randomUUID().toString(); + + final Map nestedPluginSettings = new HashMap<>(); + nestedPluginSettings.put("test_value", nestedTestValue); + + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("name", nameValue); + pluginSettingMap.put("nested_action", Collections.singletonMap("test_nested_plugin", nestedPluginSettings)); + + final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_plugin_model", pluginSettingMap); + pluginSetting.setPipelineName(pipelineName); + + final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting); + + assertThat(plugin, instanceOf(TestPluginWithPluginModel.class)); + + final TestPluginWithPluginModel testPlugin = (TestPluginWithPluginModel) plugin; + final TestPluginWithPluginModelConfig configuration = testPlugin.getConfiguration(); + + assertThat(configuration.getName(), equalTo(nameValue)); + assertThat(configuration.getNestedAction(), notNullValue()); + assertThat(configuration.getNestedAction().getPluginName(), equalTo("test_nested_plugin")); + assertThat(configuration.getNestedAction().getPluginSettings().get("test_value"), equalTo(nestedTestValue)); + } + private PluginSetting createPluginSettings(final Map pluginSettingMap) { final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap); pluginSetting.setPipelineName(pipelineName); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java new file mode 100644 index 0000000000..1164cfb677 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; + +@DataPrepperPlugin(name = "test_nested_plugin", pluginType = TestNestedPluginInterface.class, pluginConfigurationType = TestNestedPluginConfig.class) +public class TestNestedPlugin implements TestNestedPluginInterface { + private final TestNestedPluginConfig config; + + @DataPrepperPluginConstructor + public TestNestedPlugin(final TestNestedPluginConfig config) { + this.config = config; + } + + @Override + public String getValue() { + return config.getTestValue(); + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java new file mode 100644 index 0000000000..80f08af23d --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestNestedPluginConfig { + @JsonProperty("test_value") + private String testValue; + + public String getTestValue() { + return testValue; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java new file mode 100644 index 0000000000..22ee90ee62 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +public interface TestNestedPluginInterface { + String getValue(); +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java new file mode 100644 index 0000000000..8ffc7ebd72 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; + +@DataPrepperPlugin(name = "test_plugin_with_nested", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithNestedPluginConfig.class) +public class TestPluginWithNestedPlugin implements TestPluggableInterface { + private final TestPluginWithNestedPluginConfig configuration; + + @DataPrepperPluginConstructor + public TestPluginWithNestedPlugin(final TestPluginWithNestedPluginConfig configuration) { + this.configuration = configuration; + } + + public TestPluginWithNestedPluginConfig getConfiguration() { + return configuration; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java new file mode 100644 index 0000000000..7bd8841847 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; + +public class TestPluginWithNestedPluginConfig { + @JsonProperty("nested_plugin") + @UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class) + private TestNestedPluginInterface nestedPlugin; + + @JsonProperty("name") + private String name; + + public TestNestedPluginInterface getNestedPlugin() { + return nestedPlugin; + } + + public String getName() { + return name; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java new file mode 100644 index 0000000000..68ba2ccf4d --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; + +@DataPrepperPlugin(name = "test_plugin_with_plugin_model", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithPluginModelConfig.class) +public class TestPluginWithPluginModel implements TestPluggableInterface { + private final TestPluginWithPluginModelConfig configuration; + + @DataPrepperPluginConstructor + public TestPluginWithPluginModel(final TestPluginWithPluginModelConfig configuration) { + this.configuration = configuration; + } + + public TestPluginWithPluginModelConfig getConfiguration() { + return configuration; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java new file mode 100644 index 0000000000..cfc04cba45 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +public class TestPluginWithPluginModelConfig { + @JsonProperty("name") + private String name; + + @JsonProperty("nested_action") + @UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class) + private PluginModel nestedAction; + + public String getName() { + return name; + } + + public PluginModel getNestedAction() { + return nestedAction; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java new file mode 100644 index 0000000000..334864e3bf --- /dev/null +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; +import com.fasterxml.jackson.databind.deser.SettableBeanProperty; +import com.fasterxml.jackson.databind.introspect.AnnotatedField; +import com.fasterxml.jackson.databind.introspect.AnnotatedMember; +import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +import java.util.Iterator; +import java.util.List; + +class DataPrepperPluginBeanDeserializerModifier extends BeanDeserializerModifier { + + @Override + public BeanDeserializerBuilder updateBuilder( + final DeserializationConfig config, + final BeanDescription beanDesc, + final BeanDeserializerBuilder builder) { + + final List properties = beanDesc.findProperties(); + + for (final BeanPropertyDefinition propertyDef : properties) { + final UsesDataPrepperPlugin annotation = findAnnotation(propertyDef); + if (annotation == null) { + continue; + } + + if (PluginModel.class.isAssignableFrom(propertyDef.getRawPrimaryType())) { + continue; + } + + final Class pluginType = annotation.pluginType(); + final NestedPluginDeserializer deserializer = new NestedPluginDeserializer(pluginType); + + final Iterator propertyIterator = builder.getProperties(); + while (propertyIterator.hasNext()) { + final SettableBeanProperty property = propertyIterator.next(); + if (property.getName().equals(propertyDef.getName())) { + final SettableBeanProperty updatedProperty = property.withValueDeserializer(deserializer); + builder.addOrReplaceProperty(updatedProperty, true); + break; + } + } + } + + return builder; + } + + private UsesDataPrepperPlugin findAnnotation(final BeanPropertyDefinition propertyDef) { + final AnnotatedField field = propertyDef.getField(); + if (field != null) { + final UsesDataPrepperPlugin annotation = field.getAnnotation(UsesDataPrepperPlugin.class); + if (annotation != null) { + return annotation; + } + } + + final AnnotatedMember mutator = propertyDef.getMutator(); + if (mutator != null) { + final UsesDataPrepperPlugin annotation = mutator.getAnnotation(UsesDataPrepperPlugin.class); + if (annotation != null) { + return annotation; + } + } + + return null; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java index 4caf3ebadf..debcd3d773 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -8,23 +12,27 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginConfigObserver; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class DefaultPluginConfigObservable implements PluginConfigObservable { +class DefaultPluginConfigObservable implements PluginConfigObservable { private final Map pluginConfigObserverBooleanMap = new ConcurrentHashMap<>(); private final PluginConfigurationConverter pluginConfigurationConverter; private final Class pluginConfigClass; private final PluginSetting rawPluginSettings; + private final PluginFactory pluginFactory; - public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter, - final Class pluginConfigClass, - final PluginSetting rawPluginSettings) { + DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter, + final Class pluginConfigClass, + final PluginSetting rawPluginSettings, + final PluginFactory pluginFactory) { this.pluginConfigurationConverter = pluginConfigurationConverter; this.pluginConfigClass = pluginConfigClass; this.rawPluginSettings = rawPluginSettings; + this.pluginFactory = pluginFactory; } @Override @@ -36,7 +44,7 @@ public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigOb @Override public void update() { final Object newPluginConfiguration = pluginConfigurationConverter.convert( - pluginConfigClass, rawPluginSettings); + pluginConfigClass, rawPluginSettings, pluginFactory); pluginConfigObserverBooleanMap.keySet().forEach( pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration)); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 28a08e03b7..656417e524 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -117,9 +121,9 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); - final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting); + final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting, this); final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory - .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting); + .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting, this); Class[] markersToScan = pluginAnnotation.packagesToScan(); BeanFactory beanFactory = pluginBeanFactoryProvider.createPluginSpecificContext(markersToScan, configuration, pluginSetting); diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java new file mode 100644 index 0000000000..901a350e8c --- /dev/null +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +class NestedPluginDeserializer extends JsonDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(NestedPluginDeserializer.class); + static final String PLUGIN_FACTORY_ATTRIBUTE_KEY = "pluginFactory"; + + private final Class pluginType; + + NestedPluginDeserializer(final Class pluginType) { + this.pluginType = pluginType; + } + + @Override + public Object deserialize(final JsonParser parser, final DeserializationContext ctxt) throws IOException { + final PluginFactory pluginFactory = (PluginFactory) ctxt.getAttribute(PLUGIN_FACTORY_ATTRIBUTE_KEY); + if (pluginFactory == null) { + throw ctxt.instantiationException(pluginType, + "PluginFactory is not available in the deserialization context"); + } + + if (parser.currentToken() != JsonToken.START_OBJECT) { + throw ctxt.wrongTokenException(parser, Map.class, JsonToken.START_OBJECT, + "Nested plugin configuration must be a map with the plugin name as key"); + } + + parser.nextToken(); + final String pluginName = parser.currentName(); + parser.nextToken(); + + final Map pluginSettings; + if (parser.currentToken() == JsonToken.START_OBJECT) { + pluginSettings = parser.readValueAs(Map.class); + } else if (parser.currentToken() == JsonToken.VALUE_NULL) { + pluginSettings = Collections.emptyMap(); + } else { + pluginSettings = Collections.emptyMap(); + } + + parser.nextToken(); + if (parser.currentToken() != JsonToken.END_OBJECT) { + LOG.warn("Plugin configuration for '{}' should have exactly one key (the plugin name), " + + "but additional keys were found. Only the first plugin '{}' will be used.", + pluginType.getSimpleName(), pluginName); + while (parser.currentToken() != JsonToken.END_OBJECT) { + parser.nextToken(); + parser.skipChildren(); + parser.nextToken(); + } + } + + final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettings); + return pluginFactory.loadPlugin(pluginType, pluginSetting); + } + + @Override + public Object getNullValue(final DeserializationContext ctxt) { + return null; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java index ce3e14d49d..9615caaa70 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java @@ -60,9 +60,13 @@ ObjectMapper pluginConfigObjectMapper( TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer( clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz))); + final SimpleModule nestedPluginModule = new SimpleModule("DataPrepperNestedPluginModule"); + nestedPluginModule.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + return new ObjectMapper() .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) .registerModule(simpleModule) + .registerModule(nestedPluginModule) .addHandler(dataPrepperDeserializationProblemHandler); } } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java index 5af6d3dea9..974dad534f 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java @@ -1,18 +1,26 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; import jakarta.validation.ConstraintViolation; import jakarta.validation.Validator; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.springframework.context.annotation.DependsOn; import javax.inject.Named; @@ -45,17 +53,20 @@ class PluginConfigurationConverter { } /** - * Converts and validates to a plugin model type. The conversion happens via - * Java Bean Validation 2.0. + * Converts and validates to a plugin model type, resolving nested plugin fields + * annotated with {@link org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin}. * * @param pluginConfigurationType the destination type * @param pluginSetting The source {@link PluginSetting} + * @param pluginFactory The {@link PluginFactory} for loading nested plugins * @return The converted object of type pluginConfigurationType * @throws InvalidPluginConfigurationException - If the plugin configuration is invalid */ - public Object convert(final Class pluginConfigurationType, final PluginSetting pluginSetting) { + public Object convert(final Class pluginConfigurationType, final PluginSetting pluginSetting, + final PluginFactory pluginFactory) { Objects.requireNonNull(pluginConfigurationType); Objects.requireNonNull(pluginSetting); + Objects.requireNonNull(pluginFactory); if (pluginConfigurationType.equals(PluginSetting.class)) { final Map settings = pluginSetting.getSettings(); @@ -64,7 +75,7 @@ public Object convert(final Class pluginConfigurationType, final PluginSettin return pluginSetting; } - final Object configuration = convertSettings(pluginConfigurationType, pluginSetting); + final Object configuration = convertSettings(pluginConfigurationType, pluginSetting, pluginFactory); final Set> constraintViolations = validator.validate(configuration); @@ -81,13 +92,18 @@ public Object convert(final Class pluginConfigurationType, final PluginSettin return configuration; } - private Object convertSettings(final Class pluginConfigurationType, final PluginSetting pluginSetting) { + private Object convertSettings(final Class pluginConfigurationType, final PluginSetting pluginSetting, + final PluginFactory pluginFactory) { Map settingsMap = pluginSetting.getSettings(); if (settingsMap == null) settingsMap = Collections.emptyMap(); try { - return objectMapper.convertValue(settingsMap, pluginConfigurationType); + final JsonNode tree = objectMapper.valueToTree(settingsMap); + final ObjectReader reader = objectMapper.readerFor(pluginConfigurationType) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + final JsonParser parser = tree.traverse(objectMapper); + return reader.readValue(parser); } catch (final Exception e) { throw pluginConfigurationErrorHandler.handleException(pluginSetting, e); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java index e8c882643b..62ee182a2d 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java @@ -1,12 +1,17 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import javax.inject.Named; @@ -14,8 +19,8 @@ public class PluginConfigurationObservableFactory { public final PluginConfigObservable createDefaultPluginConfigObservable( final PluginConfigurationConverter pluginConfigurationConverter, final Class pluginConfigClass, - final PluginSetting rawPluginSettings) { + final PluginSetting rawPluginSettings, final PluginFactory pluginFactory) { return new DefaultPluginConfigObservable( - pluginConfigurationConverter, pluginConfigClass, rawPluginSettings); + pluginConfigurationConverter, pluginConfigClass, rawPluginSettings, pluginFactory); } } diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java new file mode 100644 index 0000000000..f65caa8629 --- /dev/null +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java @@ -0,0 +1,410 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DataPrepperPluginBeanDeserializerModifierTest { + + @Mock + private PluginFactory pluginFactory; + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + final SimpleModule module = new SimpleModule("TestNestedPluginModule"); + module.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + objectMapper = new ObjectMapper().registerModule(module); + } + + interface TestPlugin { + String getValue(); + } + + static class ConfigWithNoAnnotatedFields { + @JsonProperty("name") + private String name; + + @JsonProperty("count") + private int count; + + public String getName() { + return name; + } + + public int getCount() { + return count; + } + } + + static class ConfigWithAnnotatedField { + @JsonProperty("name") + private String name; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin myPlugin; + + public String getName() { + return name; + } + + public TestPlugin getMyPlugin() { + return myPlugin; + } + } + + interface InnerPlugin { + String getInnerValue(); + } + + static class ConfigWithNestedPluginThatAlsoLoadsPlugin { + @JsonProperty("name") + private String name; + + @JsonProperty("outer_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin outerPlugin; + + public String getName() { + return name; + } + + public TestPlugin getOuterPlugin() { + return outerPlugin; + } + } + + static class OuterPluginConfig { + @JsonProperty("setting") + private String setting; + + @JsonProperty("inner_plugin") + @UsesDataPrepperPlugin(pluginType = InnerPlugin.class) + private InnerPlugin innerPlugin; + + public String getSetting() { + return setting; + } + + public InnerPlugin getInnerPlugin() { + return innerPlugin; + } + } + + static class ConfigWithPluginModelField { + @JsonProperty("name") + private String name; + + @JsonProperty("action") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private PluginModel action; + + public String getName() { + return name; + } + + public PluginModel getAction() { + return action; + } + } + + static class ConfigWithMultipleFields { + @JsonProperty("label") + private String label; + + @JsonProperty("first_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin firstPlugin; + + @JsonProperty("description") + private String description; + + @JsonProperty("second_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin secondPlugin; + + public String getLabel() { + return label; + } + + public TestPlugin getFirstPlugin() { + return firstPlugin; + } + + public String getDescription() { + return description; + } + + public TestPlugin getSecondPlugin() { + return secondPlugin; + } + } + + @Nested + class WithNoAnnotatedFields { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithNoAnnotatedFields.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void does_not_invoke_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"count\": 42}"; + + final ConfigWithNoAnnotatedFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getCount(), equalTo(42)); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithAnnotatedField { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithAnnotatedField.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void resolves_plugin_via_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {\"option\": \"value\"}}}"; + + final ConfigWithAnnotatedField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), any(PluginSetting.class)); + } + + @Test + void leaves_field_null_when_not_present_in_input() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithAnnotatedField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithMultipleAnnotatedFields { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithMultipleFields.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void resolves_all_annotated_fields_independently() throws JsonProcessingException { + final String label = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + final String firstValue = UUID.randomUUID().toString(); + final String secondValue = UUID.randomUUID().toString(); + final TestPlugin firstMock = () -> firstValue; + final TestPlugin secondMock = () -> secondValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(firstMock) + .thenReturn(secondMock); + + final String firstPluginName = UUID.randomUUID().toString(); + final String secondPluginName = UUID.randomUUID().toString(); + final String json = "{\"label\": \"" + label + "\", \"first_plugin\": {\"" + firstPluginName + "\": {}}, " + + "\"description\": \"" + description + "\", \"second_plugin\": {\"" + secondPluginName + "\": {\"key\": \"val\"}}}"; + + final ConfigWithMultipleFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getLabel(), equalTo(label)); + assertThat(result.getDescription(), equalTo(description)); + assertThat(result.getFirstPlugin(), sameInstance(firstMock)); + assertThat(result.getSecondPlugin(), sameInstance(secondMock)); + } + + @Test + void resolves_only_present_fields_and_leaves_others_null() throws JsonProcessingException { + final String label = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"label\": \"" + label + "\", \"first_plugin\": {\"" + pluginName + "\": {}}, \"description\": \"" + description + "\"}"; + + final ConfigWithMultipleFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getLabel(), equalTo(label)); + assertThat(result.getDescription(), equalTo(description)); + assertThat(result.getFirstPlugin(), sameInstance(mockPlugin)); + assertThat(result.getSecondPlugin(), nullValue()); + } + } + + @Nested + class WithPluginModelField { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPluginModelField.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void deserializes_as_plugin_model_without_invoking_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"action\": {\"" + pluginName + "\": {\"key\": \"" + settingValue + "\"}}}"; + + final ConfigWithPluginModelField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getAction(), notNullValue()); + assertThat(result.getAction().getPluginName(), equalTo(pluginName)); + assertThat(result.getAction().getPluginSettings().get("key"), equalTo(settingValue)); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + + @Test + void handles_null_plugin_model_field() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithPluginModelField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getAction(), nullValue()); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithNestedPluginThatAlsoLoadsPlugin { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithNestedPluginThatAlsoLoadsPlugin.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void outer_plugin_loading_triggers_inner_plugin_loading() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String outerPluginName = UUID.randomUUID().toString(); + final String innerPluginName = UUID.randomUUID().toString(); + final String outerSetting = UUID.randomUUID().toString(); + final String innerValue = UUID.randomUUID().toString(); + final String outerValue = UUID.randomUUID().toString(); + + final InnerPlugin innerPlugin = () -> innerValue; + final TestPlugin outerPlugin = () -> outerValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenAnswer(invocation -> { + final PluginSetting outerPluginSetting = invocation.getArgument(1); + assertThat(outerPluginSetting.getName(), equalTo(outerPluginName)); + + final Map outerSettings = outerPluginSetting.getSettings(); + assertThat(outerSettings.get("setting"), equalTo(outerSetting)); + + @SuppressWarnings("unchecked") + final Map innerPluginMap = (Map) outerSettings.get("inner_plugin"); + assertThat(innerPluginMap, notNullValue()); + assertThat(innerPluginMap.containsKey(innerPluginName), equalTo(true)); + + final PluginSetting innerPluginSetting = new PluginSetting(innerPluginName, + innerPluginMap.get(innerPluginName) != null + ? (Map) innerPluginMap.get(innerPluginName) + : Collections.emptyMap()); + + when(pluginFactory.loadPlugin(eq(InnerPlugin.class), any(PluginSetting.class))) + .thenReturn(innerPlugin); + + final InnerPlugin resolvedInner = pluginFactory.loadPlugin(InnerPlugin.class, innerPluginSetting); + assertThat(resolvedInner, sameInstance(innerPlugin)); + + return outerPlugin; + }); + + final String json = "{\"name\": \"" + name + "\", \"outer_plugin\": {\"" + outerPluginName + + "\": {\"setting\": \"" + outerSetting + "\", \"inner_plugin\": {\"" + innerPluginName + "\": {\"key\": \"val\"}}}}}"; + + final ConfigWithNestedPluginThatAlsoLoadsPlugin result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getOuterPlugin(), sameInstance(outerPlugin)); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), any(PluginSetting.class)); + verify(pluginFactory).loadPlugin(eq(InnerPlugin.class), any(PluginSetting.class)); + } + } +} diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java index 6a71346e37..209344b568 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -17,6 +21,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.test.TestDISource; @@ -91,7 +96,8 @@ void setUp() { given(pluginConfigurationObservableFactory.createDefaultPluginConfigObservable( eq(pluginConfigurationConverter), any(Class.class), - any(PluginSetting.class) + any(PluginSetting.class), + any(DefaultPluginFactory.class) )).willReturn(pluginConfigObservable); applicationContextToTypedSuppliers = mock(ApplicationContextToTypedSuppliers.class); @@ -205,7 +211,7 @@ void loadPlugin_should_create_a_new_instance_of_the_plugin_with_di_initialized() final TestDISource expectedInstance = mock(TestDISource.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(firstPluginProvider.findPluginClass(Source.class, pluginName)) .willReturn(Optional.of(TestDISource.class)); @@ -215,7 +221,7 @@ void loadPlugin_should_create_a_new_instance_of_the_plugin_with_di_initialized() assertThat(createObjectUnderTest().loadPlugin(Source.class, pluginSetting), equalTo(expectedInstance)); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{TestDISource.class}, convertedConfiguration, pluginSetting); } @@ -224,7 +230,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -232,10 +238,28 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() { assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting), equalTo(expectedInstance)); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); } + @Test + void loadPlugin_should_pass_itself_as_the_plugin_factory_to_convert() { + final Object convertedConfiguration = mock(Object.class); + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) + .willReturn(convertedConfiguration); + + final DefaultPluginFactory objectUnderTest = createObjectUnderTest(); + objectUnderTest.loadPlugin(baseClass, pluginSetting); + + final ArgumentCaptor pluginFactoryCaptor = ArgumentCaptor.forClass(PluginFactory.class); + verify(pluginConfigurationConverter).convert(eq(PluginSetting.class), eq(pluginSetting), pluginFactoryCaptor.capture()); + assertThat(pluginFactoryCaptor.getValue(), sameInstance(objectUnderTest)); + + verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable( + eq(pluginConfigurationConverter), eq(PluginSetting.class), eq(pluginSetting), pluginFactoryCaptor.capture()); + assertThat(pluginFactoryCaptor.getValue(), sameInstance(objectUnderTest)); + } + @Test void loadPlugin_should_call_all_definedPluginConsumers() { createObjectUnderTest().loadPlugin(baseClass, pluginSetting); @@ -292,7 +316,7 @@ void loadPlugins_should_return_an_empty_list_when_the_number_of_instances_is_0() void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_is_1() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -302,7 +326,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_ verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); final ArgumentCaptor pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class); verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName)); final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue(); @@ -322,7 +346,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number final Object object = new Object(); final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName), eq(object))) .willReturn(expectedInstance); @@ -331,7 +355,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); final ArgumentCaptor pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class); verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName), eq(object)); final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue(); @@ -369,7 +393,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() { final TestSink expectedInstance2 = mock(TestSink.class); final TestSink expectedInstance3 = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance1) @@ -410,7 +434,7 @@ void loadPlugins_should_return_a_single_instance_with_values_from_ApplicationCon final String suppliedAdditionalArgument = UUID.randomUUID().toString(); Map, Supplier> additionalArgumentsSuppliers = Map.of(String.class, () -> suppliedAdditionalArgument); when(applicationContextToTypedSuppliers.getArgumentsSuppliers()).thenReturn(additionalArgumentsSuppliers); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -453,7 +477,7 @@ void setUp() { void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_correct_name_and_deprecated_name() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(TEST_SINK_DEPRECATED_NAME))) .willReturn(expectedInstance); @@ -482,7 +506,7 @@ void setUp() { void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_correct_name_and_alternate_name() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(TEST_SINK_ALTERNATE_NAME))) .willReturn(expectedInstance); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java new file mode 100644 index 0000000000..0be7e348de --- /dev/null +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java @@ -0,0 +1,271 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import java.util.Collections; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NestedPluginDeserializerTest { + + @Mock + private PluginFactory pluginFactory; + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + final SimpleModule module = new SimpleModule("TestModule"); + module.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + objectMapper = new ObjectMapper().registerModule(module); + } + + interface TestPlugin { + String getValue(); + } + + static class ConfigWithPlugin { + @JsonProperty("name") + private String name; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin myPlugin; + + public String getName() { + return name; + } + + public TestPlugin getMyPlugin() { + return myPlugin; + } + } + + @Nested + class WithPluginFactoryPresent { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPlugin.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void deserializes_plugin_with_settings_map() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingKey = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + + "\": {\"" + settingKey + "\": \"" + settingValue + "\"}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings().get(settingKey), equalTo(settingValue)); + } + + @Test + void deserializes_plugin_with_empty_settings() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings(), notNullValue()); + assertThat(captor.getValue().getSettings().isEmpty(), equalTo(true)); + } + + @Test + void deserializes_plugin_with_null_settings() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": null}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings(), equalTo(Collections.emptyMap())); + } + + @Test + void returns_null_when_field_is_null_in_json() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": null}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + } + + @Test + void returns_null_when_field_is_absent() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + } + + @Test + void throws_when_plugin_value_is_not_an_object() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": \"not_an_object\"}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_empty_string() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": \"\"}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_an_array() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": [\"a\", \"b\"]}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_a_number() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": 123}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void uses_first_plugin_and_ignores_extra_keys_when_multiple_keys_present_for_backward_compatibility() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String firstPluginName = UUID.randomUUID().toString(); + final String secondPluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + firstPluginName + + "\": {\"key\": \"val\"}, \"" + secondPluginName + "\": {\"other\": \"data\"}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(firstPluginName)); + } + } + + @Nested + class WithoutPluginFactory { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPlugin.class); + } + + @Test + void throws_when_plugin_factory_is_not_in_context() { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {}}}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("PluginFactory is not available")); + } + } +} diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java index c93295a37c..43d21b9e57 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -9,6 +13,7 @@ import org.mockito.Mock; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -20,6 +25,9 @@ class PluginConfigObservableFactoryTest { @Mock private PluginSetting pluginSetting; + @Mock + private PluginFactory pluginFactory; + private final Class baseClass = Object.class; private final PluginConfigurationObservableFactory objectUnderTest = new PluginConfigurationObservableFactory(); @@ -27,7 +35,7 @@ class PluginConfigObservableFactoryTest { @Test void testCreateDefaultPluginConfigurationObservableFactory() { assertThat(objectUnderTest.createDefaultPluginConfigObservable( - pluginConfigurationConverter, baseClass, pluginSetting), + pluginConfigurationConverter, baseClass, pluginSetting, pluginFactory), instanceOf(PluginConfigObservable.class)); } } \ No newline at end of file diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java index bd00826277..3faa427afa 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java @@ -1,12 +1,17 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import jakarta.validation.ConstraintViolation; import jakarta.validation.Path; import jakarta.validation.Payload; @@ -16,15 +21,21 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import javax.annotation.Nonnull; import javax.annotation.meta.When; import java.lang.annotation.Annotation; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.hamcrest.CoreMatchers.containsString; @@ -37,9 +48,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -49,7 +62,9 @@ class PluginConfigurationConverterTest { @Mock private PluginConfigurationErrorHandler pluginConfigurationErrorHandler; - private ObjectMapper objectMapper = new ObjectMapper(); + @Mock + private PluginFactory pluginFactory; + private ObjectMapper objectMapper = createObjectMapperWithNestedPluginSupport(); static class TestConfiguration { @SuppressWarnings("unused") @@ -77,7 +92,7 @@ void convert_with_null_configurationType_should_throw() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); assertThrows(NullPointerException.class, - () -> objectUnderTest.convert(null, pluginSetting)); + () -> objectUnderTest.convert(null, pluginSetting, pluginFactory)); } @Test @@ -85,12 +100,20 @@ void convert_with_null_pluginSetting_should_throw() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); assertThrows(NullPointerException.class, - () -> objectUnderTest.convert(PluginSetting.class, null)); + () -> objectUnderTest.convert(PluginSetting.class, null, pluginFactory)); + } + + @Test + void convert_with_null_pluginFactory_should_throw() { + final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); + + assertThrows(NullPointerException.class, + () -> objectUnderTest.convert(PluginSetting.class, pluginSetting, null)); } @Test void convert_with_PluginSetting_target_should_return_pluginSetting_object_directly() { - assertThat(createObjectUnderTest().convert(PluginSetting.class, pluginSetting), + assertThat(createObjectUnderTest().convert(PluginSetting.class, pluginSetting, pluginFactory), sameInstance(pluginSetting)); then(pluginSetting).should().setSettings(anyMap()); @@ -104,7 +127,7 @@ void convert_with_other_target_should_return_pluginSetting_object_directly() { given(pluginSetting.getSettings()) .willReturn(Collections.singletonMap("my_value", value)); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); assertThat(convertedConfiguration, notNullValue()); assertThat(convertedConfiguration, instanceOf(TestConfiguration.class)); @@ -119,7 +142,7 @@ void convert_with_other_target_should_return_empty_when_settings_are_null() { given(pluginSetting.getSettings()) .willReturn(null); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); assertThat(convertedConfiguration, notNullValue()); assertThat(convertedConfiguration, instanceOf(TestConfiguration.class)); @@ -136,7 +159,7 @@ void convert_with_other_target_should_validate_configuration() { given(pluginSetting.getSettings()) .willReturn(Collections.singletonMap("my_value", value)); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); then(validator) .should() @@ -189,7 +212,7 @@ public Class annotationType() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, - () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); assertThat(actualException.getMessage(), containsString(pluginName)); assertThat(actualException.getMessage(), containsString(pipelineName)); @@ -249,7 +272,7 @@ public Class annotationType() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, - () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); assertThat(actualException.getMessage(), containsString(pluginName)); assertThat(actualException.getMessage(), containsString(pipelineName)); @@ -266,14 +289,14 @@ void convert_with_error_when_converting_with_object_mapper_calls_plugin_configur final RuntimeException e = mock(RuntimeException.class); - when(objectMapper.convertValue(pluginSetting.getSettings(), TestConfiguration.class)) + when(objectMapper.valueToTree(any())) .thenThrow(e); when(pluginConfigurationErrorHandler.handleException(pluginSetting, e)).thenReturn(new IllegalArgumentException()); final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); - assertThrows(IllegalArgumentException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); } @Test @@ -286,13 +309,168 @@ void convert_with_error_when_throws_InvalidPluginConfiguration_when_plugin_confi final RuntimeException e = mock(RuntimeException.class); - when(objectMapper.convertValue(pluginSetting.getSettings(), TestConfiguration.class)) + when(objectMapper.valueToTree(any())) .thenThrow(e); when(pluginConfigurationErrorHandler.handleException(pluginSetting, e)).thenReturn(new InvalidPluginConfigurationException(UUID.randomUUID().toString())); final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); - assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); + } + + interface TestNestedPlugin { + String doSomething(); + } + + static class TestConfigurationWithPlugin { + @JsonProperty("my_value") + private String myValue; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestNestedPlugin.class) + private TestNestedPlugin myPlugin; + + public String getMyValue() { + return myValue; + } + + public TestNestedPlugin getMyPlugin() { + return myPlugin; + } + } + + static class TestConfigurationWithPluginModel { + @JsonProperty("my_value") + private String myValue; + + @JsonProperty("action") + @UsesDataPrepperPlugin(pluginType = TestNestedPlugin.class) + private PluginModel action; + + public String getMyValue() { + return myValue; + } + + public PluginModel getAction() { + return action; + } + } + + @Test + void convert_with_plugin_factory_resolves_nested_plugin_field() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map pluginSettings = Map.of("setting_a", "value_a"); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", Map.of("test_codec", pluginSettings)); + + given(pluginSetting.getSettings()).willReturn(settings); + + final TestNestedPlugin mockPlugin = () -> "result"; + when(pluginFactory.loadPlugin(eq(TestNestedPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor pluginSettingCaptor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestNestedPlugin.class), pluginSettingCaptor.capture()); + assertThat(pluginSettingCaptor.getValue().getName(), equalTo("test_codec")); + assertThat(pluginSettingCaptor.getValue().getSettings(), equalTo(pluginSettings)); + } + + @Test + void convert_with_plugin_factory_handles_null_plugin_value() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), nullValue()); + } + + @Test + void convert_with_plugin_factory_handles_explicit_null_in_settings_map() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", null); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), nullValue()); + } + + @Test + void convert_with_plugin_model_field_deserializes_without_invoking_plugin_factory() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final Map actionSettings = Map.of("key", settingValue); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("action", Map.of(pluginName, actionSettings)); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPluginModel.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPluginModel.class)); + final TestConfigurationWithPluginModel config = (TestConfigurationWithPluginModel) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getAction(), notNullValue()); + assertThat(config.getAction().getPluginName(), equalTo(pluginName)); + assertThat(config.getAction().getPluginSettings().get("key"), equalTo(settingValue)); + } + + @Test + void convert_with_plugin_factory_throws_when_plugin_value_is_empty_string() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", ""); + + given(pluginSetting.getSettings()).willReturn(settings); + + when(pluginConfigurationErrorHandler.handleException(eq(pluginSetting), any(Exception.class))) + .thenReturn(new InvalidPluginConfigurationException("test")); + + final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); + + assertThrows(InvalidPluginConfigurationException.class, + () -> objectUnderTest.convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory)); + } + + private static ObjectMapper createObjectMapperWithNestedPluginSupport() { + final SimpleModule nestedPluginModule = new SimpleModule("DataPrepperNestedPluginModule"); + nestedPluginModule.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + return new ObjectMapper().registerModule(nestedPluginModule); } + } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index d0715ee8c1..6d00c68d06 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,10 +16,11 @@ import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import java.time.Duration; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.plugins.source.sqs.common.OnErrorOption; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; -import org.opensearch.dataprepper.model.configuration.PluginModel; public class QueueConfig { @@ -69,7 +69,8 @@ public class QueueConfig { private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; @JsonProperty("codec") - private PluginModel codec = null; + @UsesDataPrepperPlugin(pluginType = InputCodec.class) + private InputCodec codec; @JsonProperty("on_error") private OnErrorOption onErrorOption = OnErrorOption.RETAIN_MESSAGES; @@ -104,7 +105,7 @@ public Duration getPollDelay() { return pollDelay; } - public PluginModel getCodec() { + public InputCodec getCodec() { return codec; } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index 3c48b76f4e..3cf6b3942b 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -14,10 +13,6 @@ import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; - import org.opensearch.dataprepper.model.codec.InputCodec; - import org.opensearch.dataprepper.model.configuration.PluginModel; - import org.opensearch.dataprepper.model.configuration.PluginSetting; - import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsClientFactory; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsWorkerCommon; @@ -45,7 +40,6 @@ public class SqsService { static final long SHUTDOWN_TIMEOUT = 30L; private final SqsSourceConfig sqsSourceConfig; private final PluginMetrics pluginMetrics; - private final PluginFactory pluginFactory; private final AcknowledgementSetManager acknowledgementSetManager; private final List allSqsUrlExecutorServices; private final List sqsWorkers; @@ -58,12 +52,10 @@ public SqsService(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsSourceConfig sqsSourceConfig, final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory, final AwsCredentialsProvider credentialsProvider) { - + this.sqsSourceConfig = sqsSourceConfig; this.pluginMetrics = pluginMetrics; - this.pluginFactory = pluginFactory; this.credentialsProvider = credentialsProvider; this.acknowledgementSetManager = acknowledgementSetManager; this.allSqsUrlExecutorServices = new ArrayList<>(); @@ -85,10 +77,7 @@ public void start() { SqsEventProcessor sqsEventProcessor; MessageFieldStrategy strategy; if (queueConfig.getCodec() != null) { - final PluginModel codecConfiguration = queueConfig.getCodec(); - final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - strategy = new CodecBulkMessageFieldStrategy(codec); + strategy = new CodecBulkMessageFieldStrategy(queueConfig.getCodec()); } else { strategy = new StandardMessageFieldStrategy(); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index e722b76780..0497876559 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,7 +16,6 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -27,9 +25,8 @@ public class SqsSource implements Source> { private final PluginMetrics pluginMetrics; - private final PluginFactory pluginFactory; private final SqsSourceConfig sqsSourceConfig; - private SqsService sqsService; + private SqsService sqsService; private final AcknowledgementSetManager acknowledgementSetManager; private final AwsCredentialsSupplier awsCredentialsSupplier; private final boolean acknowledgementsEnabled; @@ -38,12 +35,10 @@ public class SqsSource implements Source> { @DataPrepperPluginConstructor public SqsSource(final PluginMetrics pluginMetrics, final SqsSourceConfig sqsSourceConfig, - final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final AwsCredentialsSupplier awsCredentialsSupplier) { - + this.pluginMetrics = pluginMetrics; - this.pluginFactory = pluginFactory; this.sqsSourceConfig = sqsSourceConfig; this.acknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.acknowledgementSetManager = acknowledgementSetManager; @@ -58,7 +53,7 @@ public void start(Buffer> buffer) { } final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, sqsSourceConfig); final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); - sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider); + sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider); sqsService.start(); } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java index 4b4777d434..c317852903 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -16,7 +15,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -30,7 +28,6 @@ class SqsServiceTest { private SqsSourceConfig sqsSourceConfig; private PluginMetrics pluginMetrics; - private PluginFactory pluginFactory; private AcknowledgementSetManager acknowledgementSetManager; private Buffer> buffer; private AwsCredentialsProvider credentialsProvider; @@ -39,7 +36,6 @@ class SqsServiceTest { void setUp() { sqsSourceConfig = mock(SqsSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); - pluginFactory = mock(PluginFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); buffer = mock(Buffer.class); credentialsProvider = mock(AwsCredentialsProvider.class); @@ -54,8 +50,8 @@ void start_with_single_queue_starts_workers() { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(2); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider)); - sqsService.start(); // if no exception is thrown here, then workers have been started + SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider)); + sqsService.start(); } @Test @@ -64,9 +60,9 @@ void stop_should_shutdown_executors_and_workers() throws InterruptedException { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(1); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider) {}; + SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider) {}; sqsService.start(); - sqsService.stop(); // again assuming that if no exception is thrown here, then workers and client have been stopped + sqsService.stop(); } } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java index b028c67177..767ee56a60 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,7 +16,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -33,7 +31,6 @@ class SqsSourceTest { private final String TEST_PIPELINE_NAME = "test_pipeline"; private SqsSource sqsSource; private PluginMetrics pluginMetrics; - private PluginFactory pluginFactory; private SqsSourceConfig sqsSourceConfig; private AcknowledgementSetManager acknowledgementSetManager; private AwsCredentialsSupplier awsCredentialsSupplier; @@ -43,11 +40,10 @@ class SqsSourceTest { @BeforeEach void setUp() { pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); - pluginFactory = mock(PluginFactory.class); sqsSourceConfig = mock(SqsSourceConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); - sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, pluginFactory, acknowledgementSetManager, awsCredentialsSupplier); + sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, acknowledgementSetManager, awsCredentialsSupplier); buffer = mock(Buffer.class); }