diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index 7dc3931b18..bc8cc959e4 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -26,7 +26,12 @@ import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.pipeline.parser.DataPrepperDeserializationProblemHandler; +import org.opensearch.dataprepper.plugins.TestNestedPluginInterface; import org.opensearch.dataprepper.plugins.TestObjectPlugin; +import org.opensearch.dataprepper.plugins.TestPluginWithNestedPlugin; +import org.opensearch.dataprepper.plugins.TestPluginWithNestedPluginConfig; +import org.opensearch.dataprepper.plugins.TestPluginWithPluginModel; +import org.opensearch.dataprepper.plugins.TestPluginWithPluginModelConfig; import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject; import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig; import org.opensearch.dataprepper.plugins.test.TestComponent; @@ -250,6 +255,62 @@ void loadPlugin_should_succeed_when_a_non_experimental_plugin_has_an_experimenta assertThat(plugin, notNullValue()); } + @Test + void loadPlugin_should_resolve_nested_plugin_annotated_with_UsesDataPrepperPlugin() { + final String nameValue = UUID.randomUUID().toString(); + final String nestedTestValue = UUID.randomUUID().toString(); + + final Map nestedPluginSettings = new HashMap<>(); + nestedPluginSettings.put("test_value", nestedTestValue); + + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("name", nameValue); + pluginSettingMap.put("nested_plugin", Collections.singletonMap("test_nested_plugin", nestedPluginSettings)); + + final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_nested", pluginSettingMap); + pluginSetting.setPipelineName(pipelineName); + + final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting); + + assertThat(plugin, instanceOf(TestPluginWithNestedPlugin.class)); + + final TestPluginWithNestedPlugin testPlugin = (TestPluginWithNestedPlugin) plugin; + final TestPluginWithNestedPluginConfig configuration = testPlugin.getConfiguration(); + + assertThat(configuration.getName(), equalTo(nameValue)); + assertThat(configuration.getNestedPlugin(), notNullValue()); + assertThat(configuration.getNestedPlugin(), instanceOf(TestNestedPluginInterface.class)); + assertThat(configuration.getNestedPlugin().getValue(), equalTo(nestedTestValue)); + } + + @Test + void loadPlugin_should_deserialize_PluginModel_field_annotated_with_UsesDataPrepperPlugin_without_loading_nested_plugin() { + final String nameValue = UUID.randomUUID().toString(); + final String nestedTestValue = UUID.randomUUID().toString(); + + final Map nestedPluginSettings = new HashMap<>(); + nestedPluginSettings.put("test_value", nestedTestValue); + + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("name", nameValue); + pluginSettingMap.put("nested_action", Collections.singletonMap("test_nested_plugin", nestedPluginSettings)); + + final PluginSetting pluginSetting = new PluginSetting("test_plugin_with_plugin_model", pluginSettingMap); + pluginSetting.setPipelineName(pipelineName); + + final TestPluggableInterface plugin = createObjectUnderTest().loadPlugin(TestPluggableInterface.class, pluginSetting); + + assertThat(plugin, instanceOf(TestPluginWithPluginModel.class)); + + final TestPluginWithPluginModel testPlugin = (TestPluginWithPluginModel) plugin; + final TestPluginWithPluginModelConfig configuration = testPlugin.getConfiguration(); + + assertThat(configuration.getName(), equalTo(nameValue)); + assertThat(configuration.getNestedAction(), notNullValue()); + assertThat(configuration.getNestedAction().getPluginName(), equalTo("test_nested_plugin")); + assertThat(configuration.getNestedAction().getPluginSettings().get("test_value"), equalTo(nestedTestValue)); + } + private PluginSetting createPluginSettings(final Map pluginSettingMap) { final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap); pluginSetting.setPipelineName(pipelineName); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java new file mode 100644 index 0000000000..1164cfb677 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPlugin.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; + +@DataPrepperPlugin(name = "test_nested_plugin", pluginType = TestNestedPluginInterface.class, pluginConfigurationType = TestNestedPluginConfig.class) +public class TestNestedPlugin implements TestNestedPluginInterface { + private final TestNestedPluginConfig config; + + @DataPrepperPluginConstructor + public TestNestedPlugin(final TestNestedPluginConfig config) { + this.config = config; + } + + @Override + public String getValue() { + return config.getTestValue(); + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java new file mode 100644 index 0000000000..80f08af23d --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestNestedPluginConfig { + @JsonProperty("test_value") + private String testValue; + + public String getTestValue() { + return testValue; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java new file mode 100644 index 0000000000..22ee90ee62 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestNestedPluginInterface.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +public interface TestNestedPluginInterface { + String getValue(); +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java new file mode 100644 index 0000000000..8ffc7ebd72 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPlugin.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; + +@DataPrepperPlugin(name = "test_plugin_with_nested", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithNestedPluginConfig.class) +public class TestPluginWithNestedPlugin implements TestPluggableInterface { + private final TestPluginWithNestedPluginConfig configuration; + + @DataPrepperPluginConstructor + public TestPluginWithNestedPlugin(final TestPluginWithNestedPluginConfig configuration) { + this.configuration = configuration; + } + + public TestPluginWithNestedPluginConfig getConfiguration() { + return configuration; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java new file mode 100644 index 0000000000..7bd8841847 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithNestedPluginConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; + +public class TestPluginWithNestedPluginConfig { + @JsonProperty("nested_plugin") + @UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class) + private TestNestedPluginInterface nestedPlugin; + + @JsonProperty("name") + private String name; + + public TestNestedPluginInterface getNestedPlugin() { + return nestedPlugin; + } + + public String getName() { + return name; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java new file mode 100644 index 0000000000..68ba2ccf4d --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModel.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; + +@DataPrepperPlugin(name = "test_plugin_with_plugin_model", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginWithPluginModelConfig.class) +public class TestPluginWithPluginModel implements TestPluggableInterface { + private final TestPluginWithPluginModelConfig configuration; + + @DataPrepperPluginConstructor + public TestPluginWithPluginModel(final TestPluginWithPluginModelConfig configuration) { + this.configuration = configuration; + } + + public TestPluginWithPluginModelConfig getConfiguration() { + return configuration; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java new file mode 100644 index 0000000000..cfc04cba45 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginWithPluginModelConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +public class TestPluginWithPluginModelConfig { + @JsonProperty("name") + private String name; + + @JsonProperty("nested_action") + @UsesDataPrepperPlugin(pluginType = TestNestedPluginInterface.class) + private PluginModel nestedAction; + + public String getName() { + return name; + } + + public PluginModel getNestedAction() { + return nestedAction; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java new file mode 100644 index 0000000000..334864e3bf --- /dev/null +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifier.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; +import com.fasterxml.jackson.databind.deser.SettableBeanProperty; +import com.fasterxml.jackson.databind.introspect.AnnotatedField; +import com.fasterxml.jackson.databind.introspect.AnnotatedMember; +import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +import java.util.Iterator; +import java.util.List; + +class DataPrepperPluginBeanDeserializerModifier extends BeanDeserializerModifier { + + @Override + public BeanDeserializerBuilder updateBuilder( + final DeserializationConfig config, + final BeanDescription beanDesc, + final BeanDeserializerBuilder builder) { + + final List properties = beanDesc.findProperties(); + + for (final BeanPropertyDefinition propertyDef : properties) { + final UsesDataPrepperPlugin annotation = findAnnotation(propertyDef); + if (annotation == null) { + continue; + } + + if (PluginModel.class.isAssignableFrom(propertyDef.getRawPrimaryType())) { + continue; + } + + final Class pluginType = annotation.pluginType(); + final NestedPluginDeserializer deserializer = new NestedPluginDeserializer(pluginType); + + final Iterator propertyIterator = builder.getProperties(); + while (propertyIterator.hasNext()) { + final SettableBeanProperty property = propertyIterator.next(); + if (property.getName().equals(propertyDef.getName())) { + final SettableBeanProperty updatedProperty = property.withValueDeserializer(deserializer); + builder.addOrReplaceProperty(updatedProperty, true); + break; + } + } + } + + return builder; + } + + private UsesDataPrepperPlugin findAnnotation(final BeanPropertyDefinition propertyDef) { + final AnnotatedField field = propertyDef.getField(); + if (field != null) { + final UsesDataPrepperPlugin annotation = field.getAnnotation(UsesDataPrepperPlugin.class); + if (annotation != null) { + return annotation; + } + } + + final AnnotatedMember mutator = propertyDef.getMutator(); + if (mutator != null) { + final UsesDataPrepperPlugin annotation = mutator.getAnnotation(UsesDataPrepperPlugin.class); + if (annotation != null) { + return annotation; + } + } + + return null; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java index 4caf3ebadf..debcd3d773 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -8,23 +12,27 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginConfigObserver; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class DefaultPluginConfigObservable implements PluginConfigObservable { +class DefaultPluginConfigObservable implements PluginConfigObservable { private final Map pluginConfigObserverBooleanMap = new ConcurrentHashMap<>(); private final PluginConfigurationConverter pluginConfigurationConverter; private final Class pluginConfigClass; private final PluginSetting rawPluginSettings; + private final PluginFactory pluginFactory; - public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter, - final Class pluginConfigClass, - final PluginSetting rawPluginSettings) { + DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter, + final Class pluginConfigClass, + final PluginSetting rawPluginSettings, + final PluginFactory pluginFactory) { this.pluginConfigurationConverter = pluginConfigurationConverter; this.pluginConfigClass = pluginConfigClass; this.rawPluginSettings = rawPluginSettings; + this.pluginFactory = pluginFactory; } @Override @@ -36,7 +44,7 @@ public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigOb @Override public void update() { final Object newPluginConfiguration = pluginConfigurationConverter.convert( - pluginConfigClass, rawPluginSettings); + pluginConfigClass, rawPluginSettings, pluginFactory); pluginConfigObserverBooleanMap.keySet().forEach( pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration)); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 28a08e03b7..656417e524 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -117,9 +121,9 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); - final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting); + final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting, this); final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory - .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting); + .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting, this); Class[] markersToScan = pluginAnnotation.packagesToScan(); BeanFactory beanFactory = pluginBeanFactoryProvider.createPluginSpecificContext(markersToScan, configuration, pluginSetting); diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java new file mode 100644 index 0000000000..901a350e8c --- /dev/null +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializer.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +class NestedPluginDeserializer extends JsonDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(NestedPluginDeserializer.class); + static final String PLUGIN_FACTORY_ATTRIBUTE_KEY = "pluginFactory"; + + private final Class pluginType; + + NestedPluginDeserializer(final Class pluginType) { + this.pluginType = pluginType; + } + + @Override + public Object deserialize(final JsonParser parser, final DeserializationContext ctxt) throws IOException { + final PluginFactory pluginFactory = (PluginFactory) ctxt.getAttribute(PLUGIN_FACTORY_ATTRIBUTE_KEY); + if (pluginFactory == null) { + throw ctxt.instantiationException(pluginType, + "PluginFactory is not available in the deserialization context"); + } + + if (parser.currentToken() != JsonToken.START_OBJECT) { + throw ctxt.wrongTokenException(parser, Map.class, JsonToken.START_OBJECT, + "Nested plugin configuration must be a map with the plugin name as key"); + } + + parser.nextToken(); + final String pluginName = parser.currentName(); + parser.nextToken(); + + final Map pluginSettings; + if (parser.currentToken() == JsonToken.START_OBJECT) { + pluginSettings = parser.readValueAs(Map.class); + } else if (parser.currentToken() == JsonToken.VALUE_NULL) { + pluginSettings = Collections.emptyMap(); + } else { + pluginSettings = Collections.emptyMap(); + } + + parser.nextToken(); + if (parser.currentToken() != JsonToken.END_OBJECT) { + LOG.warn("Plugin configuration for '{}' should have exactly one key (the plugin name), " + + "but additional keys were found. Only the first plugin '{}' will be used.", + pluginType.getSimpleName(), pluginName); + while (parser.currentToken() != JsonToken.END_OBJECT) { + parser.nextToken(); + parser.skipChildren(); + parser.nextToken(); + } + } + + final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettings); + return pluginFactory.loadPlugin(pluginType, pluginSetting); + } + + @Override + public Object getNullValue(final DeserializationContext ctxt) { + return null; + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java index ce3e14d49d..9615caaa70 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java @@ -60,9 +60,13 @@ ObjectMapper pluginConfigObjectMapper( TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer( clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz))); + final SimpleModule nestedPluginModule = new SimpleModule("DataPrepperNestedPluginModule"); + nestedPluginModule.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + return new ObjectMapper() .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) .registerModule(simpleModule) + .registerModule(nestedPluginModule) .addHandler(dataPrepperDeserializationProblemHandler); } } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java index 5af6d3dea9..974dad534f 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java @@ -1,18 +1,26 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; import jakarta.validation.ConstraintViolation; import jakarta.validation.Validator; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.springframework.context.annotation.DependsOn; import javax.inject.Named; @@ -45,17 +53,20 @@ class PluginConfigurationConverter { } /** - * Converts and validates to a plugin model type. The conversion happens via - * Java Bean Validation 2.0. + * Converts and validates to a plugin model type, resolving nested plugin fields + * annotated with {@link org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin}. * * @param pluginConfigurationType the destination type * @param pluginSetting The source {@link PluginSetting} + * @param pluginFactory The {@link PluginFactory} for loading nested plugins * @return The converted object of type pluginConfigurationType * @throws InvalidPluginConfigurationException - If the plugin configuration is invalid */ - public Object convert(final Class pluginConfigurationType, final PluginSetting pluginSetting) { + public Object convert(final Class pluginConfigurationType, final PluginSetting pluginSetting, + final PluginFactory pluginFactory) { Objects.requireNonNull(pluginConfigurationType); Objects.requireNonNull(pluginSetting); + Objects.requireNonNull(pluginFactory); if (pluginConfigurationType.equals(PluginSetting.class)) { final Map settings = pluginSetting.getSettings(); @@ -64,7 +75,7 @@ public Object convert(final Class pluginConfigurationType, final PluginSettin return pluginSetting; } - final Object configuration = convertSettings(pluginConfigurationType, pluginSetting); + final Object configuration = convertSettings(pluginConfigurationType, pluginSetting, pluginFactory); final Set> constraintViolations = validator.validate(configuration); @@ -81,13 +92,18 @@ public Object convert(final Class pluginConfigurationType, final PluginSettin return configuration; } - private Object convertSettings(final Class pluginConfigurationType, final PluginSetting pluginSetting) { + private Object convertSettings(final Class pluginConfigurationType, final PluginSetting pluginSetting, + final PluginFactory pluginFactory) { Map settingsMap = pluginSetting.getSettings(); if (settingsMap == null) settingsMap = Collections.emptyMap(); try { - return objectMapper.convertValue(settingsMap, pluginConfigurationType); + final JsonNode tree = objectMapper.valueToTree(settingsMap); + final ObjectReader reader = objectMapper.readerFor(pluginConfigurationType) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + final JsonParser parser = tree.traverse(objectMapper); + return reader.readValue(parser); } catch (final Exception e) { throw pluginConfigurationErrorHandler.handleException(pluginSetting, e); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java index e8c882643b..62ee182a2d 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java @@ -1,12 +1,17 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import javax.inject.Named; @@ -14,8 +19,8 @@ public class PluginConfigurationObservableFactory { public final PluginConfigObservable createDefaultPluginConfigObservable( final PluginConfigurationConverter pluginConfigurationConverter, final Class pluginConfigClass, - final PluginSetting rawPluginSettings) { + final PluginSetting rawPluginSettings, final PluginFactory pluginFactory) { return new DefaultPluginConfigObservable( - pluginConfigurationConverter, pluginConfigClass, rawPluginSettings); + pluginConfigurationConverter, pluginConfigClass, rawPluginSettings, pluginFactory); } } diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java new file mode 100644 index 0000000000..f65caa8629 --- /dev/null +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DataPrepperPluginBeanDeserializerModifierTest.java @@ -0,0 +1,410 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DataPrepperPluginBeanDeserializerModifierTest { + + @Mock + private PluginFactory pluginFactory; + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + final SimpleModule module = new SimpleModule("TestNestedPluginModule"); + module.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + objectMapper = new ObjectMapper().registerModule(module); + } + + interface TestPlugin { + String getValue(); + } + + static class ConfigWithNoAnnotatedFields { + @JsonProperty("name") + private String name; + + @JsonProperty("count") + private int count; + + public String getName() { + return name; + } + + public int getCount() { + return count; + } + } + + static class ConfigWithAnnotatedField { + @JsonProperty("name") + private String name; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin myPlugin; + + public String getName() { + return name; + } + + public TestPlugin getMyPlugin() { + return myPlugin; + } + } + + interface InnerPlugin { + String getInnerValue(); + } + + static class ConfigWithNestedPluginThatAlsoLoadsPlugin { + @JsonProperty("name") + private String name; + + @JsonProperty("outer_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin outerPlugin; + + public String getName() { + return name; + } + + public TestPlugin getOuterPlugin() { + return outerPlugin; + } + } + + static class OuterPluginConfig { + @JsonProperty("setting") + private String setting; + + @JsonProperty("inner_plugin") + @UsesDataPrepperPlugin(pluginType = InnerPlugin.class) + private InnerPlugin innerPlugin; + + public String getSetting() { + return setting; + } + + public InnerPlugin getInnerPlugin() { + return innerPlugin; + } + } + + static class ConfigWithPluginModelField { + @JsonProperty("name") + private String name; + + @JsonProperty("action") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private PluginModel action; + + public String getName() { + return name; + } + + public PluginModel getAction() { + return action; + } + } + + static class ConfigWithMultipleFields { + @JsonProperty("label") + private String label; + + @JsonProperty("first_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin firstPlugin; + + @JsonProperty("description") + private String description; + + @JsonProperty("second_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin secondPlugin; + + public String getLabel() { + return label; + } + + public TestPlugin getFirstPlugin() { + return firstPlugin; + } + + public String getDescription() { + return description; + } + + public TestPlugin getSecondPlugin() { + return secondPlugin; + } + } + + @Nested + class WithNoAnnotatedFields { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithNoAnnotatedFields.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void does_not_invoke_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"count\": 42}"; + + final ConfigWithNoAnnotatedFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getCount(), equalTo(42)); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithAnnotatedField { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithAnnotatedField.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void resolves_plugin_via_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {\"option\": \"value\"}}}"; + + final ConfigWithAnnotatedField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), any(PluginSetting.class)); + } + + @Test + void leaves_field_null_when_not_present_in_input() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithAnnotatedField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithMultipleAnnotatedFields { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithMultipleFields.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void resolves_all_annotated_fields_independently() throws JsonProcessingException { + final String label = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + final String firstValue = UUID.randomUUID().toString(); + final String secondValue = UUID.randomUUID().toString(); + final TestPlugin firstMock = () -> firstValue; + final TestPlugin secondMock = () -> secondValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(firstMock) + .thenReturn(secondMock); + + final String firstPluginName = UUID.randomUUID().toString(); + final String secondPluginName = UUID.randomUUID().toString(); + final String json = "{\"label\": \"" + label + "\", \"first_plugin\": {\"" + firstPluginName + "\": {}}, " + + "\"description\": \"" + description + "\", \"second_plugin\": {\"" + secondPluginName + "\": {\"key\": \"val\"}}}"; + + final ConfigWithMultipleFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getLabel(), equalTo(label)); + assertThat(result.getDescription(), equalTo(description)); + assertThat(result.getFirstPlugin(), sameInstance(firstMock)); + assertThat(result.getSecondPlugin(), sameInstance(secondMock)); + } + + @Test + void resolves_only_present_fields_and_leaves_others_null() throws JsonProcessingException { + final String label = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"label\": \"" + label + "\", \"first_plugin\": {\"" + pluginName + "\": {}}, \"description\": \"" + description + "\"}"; + + final ConfigWithMultipleFields result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getLabel(), equalTo(label)); + assertThat(result.getDescription(), equalTo(description)); + assertThat(result.getFirstPlugin(), sameInstance(mockPlugin)); + assertThat(result.getSecondPlugin(), nullValue()); + } + } + + @Nested + class WithPluginModelField { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPluginModelField.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void deserializes_as_plugin_model_without_invoking_plugin_factory() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"action\": {\"" + pluginName + "\": {\"key\": \"" + settingValue + "\"}}}"; + + final ConfigWithPluginModelField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getAction(), notNullValue()); + assertThat(result.getAction().getPluginName(), equalTo(pluginName)); + assertThat(result.getAction().getPluginSettings().get("key"), equalTo(settingValue)); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + + @Test + void handles_null_plugin_model_field() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithPluginModelField result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getAction(), nullValue()); + verify(pluginFactory, never()).loadPlugin(any(), any(PluginSetting.class)); + } + } + + @Nested + class WithNestedPluginThatAlsoLoadsPlugin { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithNestedPluginThatAlsoLoadsPlugin.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void outer_plugin_loading_triggers_inner_plugin_loading() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String outerPluginName = UUID.randomUUID().toString(); + final String innerPluginName = UUID.randomUUID().toString(); + final String outerSetting = UUID.randomUUID().toString(); + final String innerValue = UUID.randomUUID().toString(); + final String outerValue = UUID.randomUUID().toString(); + + final InnerPlugin innerPlugin = () -> innerValue; + final TestPlugin outerPlugin = () -> outerValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenAnswer(invocation -> { + final PluginSetting outerPluginSetting = invocation.getArgument(1); + assertThat(outerPluginSetting.getName(), equalTo(outerPluginName)); + + final Map outerSettings = outerPluginSetting.getSettings(); + assertThat(outerSettings.get("setting"), equalTo(outerSetting)); + + @SuppressWarnings("unchecked") + final Map innerPluginMap = (Map) outerSettings.get("inner_plugin"); + assertThat(innerPluginMap, notNullValue()); + assertThat(innerPluginMap.containsKey(innerPluginName), equalTo(true)); + + final PluginSetting innerPluginSetting = new PluginSetting(innerPluginName, + innerPluginMap.get(innerPluginName) != null + ? (Map) innerPluginMap.get(innerPluginName) + : Collections.emptyMap()); + + when(pluginFactory.loadPlugin(eq(InnerPlugin.class), any(PluginSetting.class))) + .thenReturn(innerPlugin); + + final InnerPlugin resolvedInner = pluginFactory.loadPlugin(InnerPlugin.class, innerPluginSetting); + assertThat(resolvedInner, sameInstance(innerPlugin)); + + return outerPlugin; + }); + + final String json = "{\"name\": \"" + name + "\", \"outer_plugin\": {\"" + outerPluginName + + "\": {\"setting\": \"" + outerSetting + "\", \"inner_plugin\": {\"" + innerPluginName + "\": {\"key\": \"val\"}}}}}"; + + final ConfigWithNestedPluginThatAlsoLoadsPlugin result = reader.readValue(json); + + assertThat(result, notNullValue()); + assertThat(result.getName(), equalTo(name)); + assertThat(result.getOuterPlugin(), sameInstance(outerPlugin)); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), any(PluginSetting.class)); + verify(pluginFactory).loadPlugin(eq(InnerPlugin.class), any(PluginSetting.class)); + } + } +} diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java index 6a71346e37..209344b568 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -17,6 +21,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.test.TestDISource; @@ -91,7 +96,8 @@ void setUp() { given(pluginConfigurationObservableFactory.createDefaultPluginConfigObservable( eq(pluginConfigurationConverter), any(Class.class), - any(PluginSetting.class) + any(PluginSetting.class), + any(DefaultPluginFactory.class) )).willReturn(pluginConfigObservable); applicationContextToTypedSuppliers = mock(ApplicationContextToTypedSuppliers.class); @@ -205,7 +211,7 @@ void loadPlugin_should_create_a_new_instance_of_the_plugin_with_di_initialized() final TestDISource expectedInstance = mock(TestDISource.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(firstPluginProvider.findPluginClass(Source.class, pluginName)) .willReturn(Optional.of(TestDISource.class)); @@ -215,7 +221,7 @@ void loadPlugin_should_create_a_new_instance_of_the_plugin_with_di_initialized() assertThat(createObjectUnderTest().loadPlugin(Source.class, pluginSetting), equalTo(expectedInstance)); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{TestDISource.class}, convertedConfiguration, pluginSetting); } @@ -224,7 +230,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -232,10 +238,28 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() { assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting), equalTo(expectedInstance)); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); } + @Test + void loadPlugin_should_pass_itself_as_the_plugin_factory_to_convert() { + final Object convertedConfiguration = mock(Object.class); + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) + .willReturn(convertedConfiguration); + + final DefaultPluginFactory objectUnderTest = createObjectUnderTest(); + objectUnderTest.loadPlugin(baseClass, pluginSetting); + + final ArgumentCaptor pluginFactoryCaptor = ArgumentCaptor.forClass(PluginFactory.class); + verify(pluginConfigurationConverter).convert(eq(PluginSetting.class), eq(pluginSetting), pluginFactoryCaptor.capture()); + assertThat(pluginFactoryCaptor.getValue(), sameInstance(objectUnderTest)); + + verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable( + eq(pluginConfigurationConverter), eq(PluginSetting.class), eq(pluginSetting), pluginFactoryCaptor.capture()); + assertThat(pluginFactoryCaptor.getValue(), sameInstance(objectUnderTest)); + } + @Test void loadPlugin_should_call_all_definedPluginConsumers() { createObjectUnderTest().loadPlugin(baseClass, pluginSetting); @@ -292,7 +316,7 @@ void loadPlugins_should_return_an_empty_list_when_the_number_of_instances_is_0() void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_is_1() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -302,7 +326,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_ verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); final ArgumentCaptor pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class); verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName)); final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue(); @@ -322,7 +346,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number final Object object = new Object(); final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName), eq(object))) .willReturn(expectedInstance); @@ -331,7 +355,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting); verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter), - eq(PluginSetting.class), eq(pluginSetting)); + eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class)); final ArgumentCaptor pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class); verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName), eq(object)); final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue(); @@ -369,7 +393,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() { final TestSink expectedInstance2 = mock(TestSink.class); final TestSink expectedInstance3 = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance1) @@ -410,7 +434,7 @@ void loadPlugins_should_return_a_single_instance_with_values_from_ApplicationCon final String suppliedAdditionalArgument = UUID.randomUUID().toString(); Map, Supplier> additionalArgumentsSuppliers = Map.of(String.class, () -> suppliedAdditionalArgument); when(applicationContextToTypedSuppliers.getArgumentsSuppliers()).thenReturn(additionalArgumentsSuppliers); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(pluginName))) .willReturn(expectedInstance); @@ -453,7 +477,7 @@ void setUp() { void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_correct_name_and_deprecated_name() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(TEST_SINK_DEPRECATED_NAME))) .willReturn(expectedInstance); @@ -482,7 +506,7 @@ void setUp() { void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_correct_name_and_alternate_name() { final TestSink expectedInstance = mock(TestSink.class); final Object convertedConfiguration = mock(Object.class); - given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting)) + given(pluginConfigurationConverter.convert(eq(PluginSetting.class), eq(pluginSetting), any(DefaultPluginFactory.class))) .willReturn(convertedConfiguration); given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(ComponentPluginArgumentsContext.class), eq(TEST_SINK_ALTERNATE_NAME))) .willReturn(expectedInstance); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java new file mode 100644 index 0000000000..0be7e348de --- /dev/null +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/NestedPluginDeserializerTest.java @@ -0,0 +1,271 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import java.util.Collections; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NestedPluginDeserializerTest { + + @Mock + private PluginFactory pluginFactory; + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + final SimpleModule module = new SimpleModule("TestModule"); + module.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + objectMapper = new ObjectMapper().registerModule(module); + } + + interface TestPlugin { + String getValue(); + } + + static class ConfigWithPlugin { + @JsonProperty("name") + private String name; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestPlugin.class) + private TestPlugin myPlugin; + + public String getName() { + return name; + } + + public TestPlugin getMyPlugin() { + return myPlugin; + } + } + + @Nested + class WithPluginFactoryPresent { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPlugin.class) + .withAttribute(NestedPluginDeserializer.PLUGIN_FACTORY_ATTRIBUTE_KEY, pluginFactory); + } + + @Test + void deserializes_plugin_with_settings_map() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingKey = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + + "\": {\"" + settingKey + "\": \"" + settingValue + "\"}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings().get(settingKey), equalTo(settingValue)); + } + + @Test + void deserializes_plugin_with_empty_settings() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings(), notNullValue()); + assertThat(captor.getValue().getSettings().isEmpty(), equalTo(true)); + } + + @Test + void deserializes_plugin_with_null_settings() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": null}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(pluginName)); + assertThat(captor.getValue().getSettings(), equalTo(Collections.emptyMap())); + } + + @Test + void returns_null_when_field_is_null_in_json() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": null}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + } + + @Test + void returns_null_when_field_is_absent() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\"}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getName(), equalTo(name)); + assertThat(result.getMyPlugin(), nullValue()); + } + + @Test + void throws_when_plugin_value_is_not_an_object() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": \"not_an_object\"}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_empty_string() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": \"\"}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_an_array() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": [\"a\", \"b\"]}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void throws_when_plugin_value_is_a_number() { + final String name = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": 123}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("Nested plugin configuration must be a map")); + } + + @Test + void uses_first_plugin_and_ignores_extra_keys_when_multiple_keys_present_for_backward_compatibility() throws JsonProcessingException { + final String name = UUID.randomUUID().toString(); + final String firstPluginName = UUID.randomUUID().toString(); + final String secondPluginName = UUID.randomUUID().toString(); + final String pluginValue = UUID.randomUUID().toString(); + final TestPlugin mockPlugin = () -> pluginValue; + + when(pluginFactory.loadPlugin(eq(TestPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + firstPluginName + + "\": {\"key\": \"val\"}, \"" + secondPluginName + "\": {\"other\": \"data\"}}}"; + + final ConfigWithPlugin result = reader.readValue(json); + + assertThat(result.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestPlugin.class), captor.capture()); + assertThat(captor.getValue().getName(), equalTo(firstPluginName)); + } + } + + @Nested + class WithoutPluginFactory { + private ObjectReader reader; + + @BeforeEach + void setUp() { + reader = objectMapper.readerFor(ConfigWithPlugin.class); + } + + @Test + void throws_when_plugin_factory_is_not_in_context() { + final String name = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String json = "{\"name\": \"" + name + "\", \"my_plugin\": {\"" + pluginName + "\": {}}}"; + + final JsonMappingException exception = assertThrows(JsonMappingException.class, + () -> reader.readValue(json)); + + assertThat(exception.getMessage(), containsString("PluginFactory is not available")); + } + } +} diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java index c93295a37c..43d21b9e57 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; @@ -9,6 +13,7 @@ import org.mockito.Mock; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -20,6 +25,9 @@ class PluginConfigObservableFactoryTest { @Mock private PluginSetting pluginSetting; + @Mock + private PluginFactory pluginFactory; + private final Class baseClass = Object.class; private final PluginConfigurationObservableFactory objectUnderTest = new PluginConfigurationObservableFactory(); @@ -27,7 +35,7 @@ class PluginConfigObservableFactoryTest { @Test void testCreateDefaultPluginConfigurationObservableFactory() { assertThat(objectUnderTest.createDefaultPluginConfigObservable( - pluginConfigurationConverter, baseClass, pluginSetting), + pluginConfigurationConverter, baseClass, pluginSetting, pluginFactory), instanceOf(PluginConfigObservable.class)); } } \ No newline at end of file diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java index bd00826277..3faa427afa 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java @@ -1,12 +1,17 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugin; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import jakarta.validation.ConstraintViolation; import jakarta.validation.Path; import jakarta.validation.Payload; @@ -16,15 +21,21 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import javax.annotation.Nonnull; import javax.annotation.meta.When; import java.lang.annotation.Annotation; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.hamcrest.CoreMatchers.containsString; @@ -37,9 +48,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -49,7 +62,9 @@ class PluginConfigurationConverterTest { @Mock private PluginConfigurationErrorHandler pluginConfigurationErrorHandler; - private ObjectMapper objectMapper = new ObjectMapper(); + @Mock + private PluginFactory pluginFactory; + private ObjectMapper objectMapper = createObjectMapperWithNestedPluginSupport(); static class TestConfiguration { @SuppressWarnings("unused") @@ -77,7 +92,7 @@ void convert_with_null_configurationType_should_throw() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); assertThrows(NullPointerException.class, - () -> objectUnderTest.convert(null, pluginSetting)); + () -> objectUnderTest.convert(null, pluginSetting, pluginFactory)); } @Test @@ -85,12 +100,20 @@ void convert_with_null_pluginSetting_should_throw() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); assertThrows(NullPointerException.class, - () -> objectUnderTest.convert(PluginSetting.class, null)); + () -> objectUnderTest.convert(PluginSetting.class, null, pluginFactory)); + } + + @Test + void convert_with_null_pluginFactory_should_throw() { + final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); + + assertThrows(NullPointerException.class, + () -> objectUnderTest.convert(PluginSetting.class, pluginSetting, null)); } @Test void convert_with_PluginSetting_target_should_return_pluginSetting_object_directly() { - assertThat(createObjectUnderTest().convert(PluginSetting.class, pluginSetting), + assertThat(createObjectUnderTest().convert(PluginSetting.class, pluginSetting, pluginFactory), sameInstance(pluginSetting)); then(pluginSetting).should().setSettings(anyMap()); @@ -104,7 +127,7 @@ void convert_with_other_target_should_return_pluginSetting_object_directly() { given(pluginSetting.getSettings()) .willReturn(Collections.singletonMap("my_value", value)); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); assertThat(convertedConfiguration, notNullValue()); assertThat(convertedConfiguration, instanceOf(TestConfiguration.class)); @@ -119,7 +142,7 @@ void convert_with_other_target_should_return_empty_when_settings_are_null() { given(pluginSetting.getSettings()) .willReturn(null); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); assertThat(convertedConfiguration, notNullValue()); assertThat(convertedConfiguration, instanceOf(TestConfiguration.class)); @@ -136,7 +159,7 @@ void convert_with_other_target_should_validate_configuration() { given(pluginSetting.getSettings()) .willReturn(Collections.singletonMap("my_value", value)); - final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting); + final Object convertedConfiguration = createObjectUnderTest().convert(TestConfiguration.class, pluginSetting, pluginFactory); then(validator) .should() @@ -189,7 +212,7 @@ public Class annotationType() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, - () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); assertThat(actualException.getMessage(), containsString(pluginName)); assertThat(actualException.getMessage(), containsString(pipelineName)); @@ -249,7 +272,7 @@ public Class annotationType() { final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, - () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); assertThat(actualException.getMessage(), containsString(pluginName)); assertThat(actualException.getMessage(), containsString(pipelineName)); @@ -266,14 +289,14 @@ void convert_with_error_when_converting_with_object_mapper_calls_plugin_configur final RuntimeException e = mock(RuntimeException.class); - when(objectMapper.convertValue(pluginSetting.getSettings(), TestConfiguration.class)) + when(objectMapper.valueToTree(any())) .thenThrow(e); when(pluginConfigurationErrorHandler.handleException(pluginSetting, e)).thenReturn(new IllegalArgumentException()); final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); - assertThrows(IllegalArgumentException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); } @Test @@ -286,13 +309,168 @@ void convert_with_error_when_throws_InvalidPluginConfiguration_when_plugin_confi final RuntimeException e = mock(RuntimeException.class); - when(objectMapper.convertValue(pluginSetting.getSettings(), TestConfiguration.class)) + when(objectMapper.valueToTree(any())) .thenThrow(e); when(pluginConfigurationErrorHandler.handleException(pluginSetting, e)).thenReturn(new InvalidPluginConfigurationException(UUID.randomUUID().toString())); final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); - assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting, pluginFactory)); + } + + interface TestNestedPlugin { + String doSomething(); + } + + static class TestConfigurationWithPlugin { + @JsonProperty("my_value") + private String myValue; + + @JsonProperty("my_plugin") + @UsesDataPrepperPlugin(pluginType = TestNestedPlugin.class) + private TestNestedPlugin myPlugin; + + public String getMyValue() { + return myValue; + } + + public TestNestedPlugin getMyPlugin() { + return myPlugin; + } + } + + static class TestConfigurationWithPluginModel { + @JsonProperty("my_value") + private String myValue; + + @JsonProperty("action") + @UsesDataPrepperPlugin(pluginType = TestNestedPlugin.class) + private PluginModel action; + + public String getMyValue() { + return myValue; + } + + public PluginModel getAction() { + return action; + } + } + + @Test + void convert_with_plugin_factory_resolves_nested_plugin_field() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map pluginSettings = Map.of("setting_a", "value_a"); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", Map.of("test_codec", pluginSettings)); + + given(pluginSetting.getSettings()).willReturn(settings); + + final TestNestedPlugin mockPlugin = () -> "result"; + when(pluginFactory.loadPlugin(eq(TestNestedPlugin.class), any(PluginSetting.class))) + .thenReturn(mockPlugin); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), sameInstance(mockPlugin)); + + final ArgumentCaptor pluginSettingCaptor = ArgumentCaptor.forClass(PluginSetting.class); + verify(pluginFactory).loadPlugin(eq(TestNestedPlugin.class), pluginSettingCaptor.capture()); + assertThat(pluginSettingCaptor.getValue().getName(), equalTo("test_codec")); + assertThat(pluginSettingCaptor.getValue().getSettings(), equalTo(pluginSettings)); + } + + @Test + void convert_with_plugin_factory_handles_null_plugin_value() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), nullValue()); + } + + @Test + void convert_with_plugin_factory_handles_explicit_null_in_settings_map() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", null); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPlugin.class)); + final TestConfigurationWithPlugin config = (TestConfigurationWithPlugin) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getMyPlugin(), nullValue()); + } + + @Test + void convert_with_plugin_model_field_deserializes_without_invoking_plugin_factory() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final String pluginName = UUID.randomUUID().toString(); + final String settingValue = UUID.randomUUID().toString(); + final Map actionSettings = Map.of("key", settingValue); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("action", Map.of(pluginName, actionSettings)); + + given(pluginSetting.getSettings()).willReturn(settings); + + final Object result = createObjectUnderTest().convert(TestConfigurationWithPluginModel.class, pluginSetting, pluginFactory); + + assertThat(result, instanceOf(TestConfigurationWithPluginModel.class)); + final TestConfigurationWithPluginModel config = (TestConfigurationWithPluginModel) result; + assertThat(config.getMyValue(), equalTo(value)); + assertThat(config.getAction(), notNullValue()); + assertThat(config.getAction().getPluginName(), equalTo(pluginName)); + assertThat(config.getAction().getPluginSettings().get("key"), equalTo(settingValue)); + } + + @Test + void convert_with_plugin_factory_throws_when_plugin_value_is_empty_string() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + + final String value = UUID.randomUUID().toString(); + final Map settings = new HashMap<>(); + settings.put("my_value", value); + settings.put("my_plugin", ""); + + given(pluginSetting.getSettings()).willReturn(settings); + + when(pluginConfigurationErrorHandler.handleException(eq(pluginSetting), any(Exception.class))) + .thenReturn(new InvalidPluginConfigurationException("test")); + + final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); + + assertThrows(InvalidPluginConfigurationException.class, + () -> objectUnderTest.convert(TestConfigurationWithPlugin.class, pluginSetting, pluginFactory)); + } + + private static ObjectMapper createObjectMapperWithNestedPluginSupport() { + final SimpleModule nestedPluginModule = new SimpleModule("DataPrepperNestedPluginModule"); + nestedPluginModule.setDeserializerModifier(new DataPrepperPluginBeanDeserializerModifier()); + return new ObjectMapper().registerModule(nestedPluginModule); } + } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index d0715ee8c1..6d00c68d06 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,10 +16,11 @@ import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import java.time.Duration; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.plugins.source.sqs.common.OnErrorOption; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; -import org.opensearch.dataprepper.model.configuration.PluginModel; public class QueueConfig { @@ -69,7 +69,8 @@ public class QueueConfig { private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; @JsonProperty("codec") - private PluginModel codec = null; + @UsesDataPrepperPlugin(pluginType = InputCodec.class) + private InputCodec codec; @JsonProperty("on_error") private OnErrorOption onErrorOption = OnErrorOption.RETAIN_MESSAGES; @@ -104,7 +105,7 @@ public Duration getPollDelay() { return pollDelay; } - public PluginModel getCodec() { + public InputCodec getCodec() { return codec; } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index 3c48b76f4e..3cf6b3942b 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -14,10 +13,6 @@ import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; - import org.opensearch.dataprepper.model.codec.InputCodec; - import org.opensearch.dataprepper.model.configuration.PluginModel; - import org.opensearch.dataprepper.model.configuration.PluginSetting; - import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsClientFactory; import org.opensearch.dataprepper.plugins.source.sqs.common.SqsWorkerCommon; @@ -45,7 +40,6 @@ public class SqsService { static final long SHUTDOWN_TIMEOUT = 30L; private final SqsSourceConfig sqsSourceConfig; private final PluginMetrics pluginMetrics; - private final PluginFactory pluginFactory; private final AcknowledgementSetManager acknowledgementSetManager; private final List allSqsUrlExecutorServices; private final List sqsWorkers; @@ -58,12 +52,10 @@ public SqsService(final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final SqsSourceConfig sqsSourceConfig, final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory, final AwsCredentialsProvider credentialsProvider) { - + this.sqsSourceConfig = sqsSourceConfig; this.pluginMetrics = pluginMetrics; - this.pluginFactory = pluginFactory; this.credentialsProvider = credentialsProvider; this.acknowledgementSetManager = acknowledgementSetManager; this.allSqsUrlExecutorServices = new ArrayList<>(); @@ -85,10 +77,7 @@ public void start() { SqsEventProcessor sqsEventProcessor; MessageFieldStrategy strategy; if (queueConfig.getCodec() != null) { - final PluginModel codecConfiguration = queueConfig.getCodec(); - final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - strategy = new CodecBulkMessageFieldStrategy(codec); + strategy = new CodecBulkMessageFieldStrategy(queueConfig.getCodec()); } else { strategy = new StandardMessageFieldStrategy(); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index e722b76780..0497876559 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,7 +16,6 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -27,9 +25,8 @@ public class SqsSource implements Source> { private final PluginMetrics pluginMetrics; - private final PluginFactory pluginFactory; private final SqsSourceConfig sqsSourceConfig; - private SqsService sqsService; + private SqsService sqsService; private final AcknowledgementSetManager acknowledgementSetManager; private final AwsCredentialsSupplier awsCredentialsSupplier; private final boolean acknowledgementsEnabled; @@ -38,12 +35,10 @@ public class SqsSource implements Source> { @DataPrepperPluginConstructor public SqsSource(final PluginMetrics pluginMetrics, final SqsSourceConfig sqsSourceConfig, - final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final AwsCredentialsSupplier awsCredentialsSupplier) { - + this.pluginMetrics = pluginMetrics; - this.pluginFactory = pluginFactory; this.sqsSourceConfig = sqsSourceConfig; this.acknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); this.acknowledgementSetManager = acknowledgementSetManager; @@ -58,7 +53,7 @@ public void start(Buffer> buffer) { } final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, sqsSourceConfig); final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); - sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider); + sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider); sqsService.start(); } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java index 4b4777d434..c317852903 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsServiceTest.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -16,7 +15,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -30,7 +28,6 @@ class SqsServiceTest { private SqsSourceConfig sqsSourceConfig; private PluginMetrics pluginMetrics; - private PluginFactory pluginFactory; private AcknowledgementSetManager acknowledgementSetManager; private Buffer> buffer; private AwsCredentialsProvider credentialsProvider; @@ -39,7 +36,6 @@ class SqsServiceTest { void setUp() { sqsSourceConfig = mock(SqsSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); - pluginFactory = mock(PluginFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); buffer = mock(Buffer.class); credentialsProvider = mock(AwsCredentialsProvider.class); @@ -54,8 +50,8 @@ void start_with_single_queue_starts_workers() { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(2); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider)); - sqsService.start(); // if no exception is thrown here, then workers have been started + SqsService sqsService = spy(new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider)); + sqsService.start(); } @Test @@ -64,9 +60,9 @@ void stop_should_shutdown_executors_and_workers() throws InterruptedException { when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getNumWorkers()).thenReturn(1); when(sqsSourceConfig.getQueues()).thenReturn(List.of(queueConfig)); - SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, pluginFactory, credentialsProvider) {}; + SqsService sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, pluginMetrics, credentialsProvider) {}; sqsService.start(); - sqsService.stop(); // again assuming that if no exception is thrown here, then workers and client have been stopped + sqsService.stop(); } } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java index b028c67177..767ee56a60 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceTest.java @@ -5,7 +5,6 @@ * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. - * */ package org.opensearch.dataprepper.plugins.source.sqs; @@ -17,7 +16,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -33,7 +31,6 @@ class SqsSourceTest { private final String TEST_PIPELINE_NAME = "test_pipeline"; private SqsSource sqsSource; private PluginMetrics pluginMetrics; - private PluginFactory pluginFactory; private SqsSourceConfig sqsSourceConfig; private AcknowledgementSetManager acknowledgementSetManager; private AwsCredentialsSupplier awsCredentialsSupplier; @@ -43,11 +40,10 @@ class SqsSourceTest { @BeforeEach void setUp() { pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); - pluginFactory = mock(PluginFactory.class); sqsSourceConfig = mock(SqsSourceConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); - sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, pluginFactory, acknowledgementSetManager, awsCredentialsSupplier); + sqsSource = new SqsSource(pluginMetrics, sqsSourceConfig, acknowledgementSetManager, awsCredentialsSupplier); buffer = mock(Buffer.class); }