-
Notifications
You must be signed in to change notification settings - Fork 326
Update Pipeline Config API #5767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c288efb
33b44e2
e83e151
79d3a14
164a40f
ed2ca71
4df28c8
416f5ad
7c565a6
7a62a7b
e91b222
e237335
4b0ff14
1c9f289
8567ebc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,6 @@ | |
| import com.fasterxml.jackson.annotation.JsonAnyGetter; | ||
| import com.fasterxml.jackson.annotation.JsonAnySetter; | ||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.core.JacksonException; | ||
| import com.fasterxml.jackson.core.JsonGenerator; | ||
| import com.fasterxml.jackson.core.JsonParser; | ||
| import com.fasterxml.jackson.databind.DeserializationContext; | ||
|
|
@@ -42,6 +41,27 @@ public class PluginModel { | |
| private final String pluginName; | ||
| private final InternalJsonModel innerModel; | ||
|
|
||
| public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) { | ||
| this(pluginName, new InternalJsonModel(pluginSettings)); | ||
| } | ||
|
|
||
| protected PluginModel(final String pluginName, final InternalJsonModel innerModel) { | ||
| this.pluginName = pluginName; | ||
| this.innerModel = Objects.requireNonNull(innerModel); | ||
| } | ||
|
|
||
| public String getPluginName() { | ||
| return pluginName; | ||
| } | ||
|
|
||
| public Map<String, Object> getPluginSettings() { | ||
| return innerModel.pluginSettings; | ||
| } | ||
|
|
||
| <M extends InternalJsonModel> M getInternalJsonModel() { | ||
| return (M) innerModel; | ||
| } | ||
|
|
||
| /** | ||
| * This class represents the part of the {@link PluginModel} which sits below the name. | ||
| * In the following example, this would be everything below "opensearch": | ||
|
|
@@ -71,27 +91,6 @@ static class InternalJsonModel { | |
| } | ||
| } | ||
|
|
||
| public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) { | ||
| this(pluginName, new InternalJsonModel(pluginSettings)); | ||
| } | ||
|
|
||
| protected PluginModel(final String pluginName, final InternalJsonModel innerModel) { | ||
| this.pluginName = pluginName; | ||
| this.innerModel = Objects.requireNonNull(innerModel); | ||
| } | ||
|
|
||
| public String getPluginName() { | ||
| return pluginName; | ||
| } | ||
|
|
||
| public Map<String, Object> getPluginSettings() { | ||
| return innerModel.pluginSettings; | ||
| } | ||
|
|
||
| <M extends InternalJsonModel> M getInternalJsonModel() { | ||
| return (M) innerModel; | ||
| } | ||
|
|
||
| /** | ||
| * Custom Serializer for Plugin Model | ||
| * <p> | ||
|
|
@@ -114,7 +113,7 @@ public void serialize( | |
| final PluginModel value, final JsonGenerator gen, final SerializerProvider provider) throws IOException { | ||
| gen.writeStartObject(); | ||
| Map<String, Object> serializedInner = SERIALIZER_OBJECT_MAPPER.convertValue(value.innerModel, Map.class); | ||
| if(serializedInner != null && serializedInner.isEmpty()) | ||
| if (serializedInner != null && serializedInner.isEmpty()) | ||
| serializedInner = null; | ||
| gen.writeObjectField(value.getPluginName(), serializedInner); | ||
| gen.writeEndObject(); | ||
|
|
@@ -145,7 +144,6 @@ public PluginModelDeserializer() { | |
| * | ||
| * @param <T> The type inheriting from {@link PluginModel} that you ultimately need deserialized | ||
| * @param <M> The type inheriting from {@link InternalJsonModel} that has custom fields. | ||
| * | ||
| * @see SinkModel.SinkModelDeserializer | ||
| */ | ||
| abstract static class AbstractPluginModelDeserializer<T extends PluginModel, M extends InternalJsonModel> extends StdDeserializer<PluginModel> { | ||
|
|
@@ -166,7 +164,7 @@ protected AbstractPluginModelDeserializer( | |
| } | ||
|
|
||
| @Override | ||
| public PluginModel deserialize(final JsonParser jsonParser, final DeserializationContext context) throws IOException, JacksonException { | ||
| public PluginModel deserialize(final JsonParser jsonParser, final DeserializationContext context) throws IOException { | ||
| final JsonNode node = jsonParser.getCodec().readTree(jsonParser); | ||
|
|
||
| final Iterator<Map.Entry<String, JsonNode>> fields = node.fields(); | ||
|
|
@@ -176,11 +174,24 @@ public PluginModel deserialize(final JsonParser jsonParser, final Deserializatio | |
| final JsonNode value = onlyField.getValue(); | ||
|
|
||
| M innerModel = SERIALIZER_OBJECT_MAPPER.convertValue(value, innerModelClass); | ||
| if(innerModel == null) | ||
| if (innerModel == null) | ||
| innerModel = emptyInnerModelConstructor.get(); | ||
|
|
||
| return constructorFunction.apply(pluginName, innerModel); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add unit tests for these new methods. |
||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| PluginModel that = (PluginModel) o; | ||
| return Objects.equals(pluginName, that.pluginName) && | ||
| Objects.equals(innerModel.pluginSettings, that.innerModel.pluginSettings); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(pluginName, innerModel.pluginSettings); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package org.opensearch.dataprepper.core.exception; | ||
|
|
||
| public class DynamicPipelineConfigUpdateException extends RuntimeException { | ||
| public DynamicPipelineConfigUpdateException(String message) { | ||
| super(message); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.core.pipeline.server; | ||
|
|
||
| import org.opensearch.dataprepper.core.exception.DynamicPipelineConfigUpdateException; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
| import org.opensearch.dataprepper.model.annotations.SingleThread; | ||
| import org.opensearch.dataprepper.model.configuration.PipelineModel; | ||
| import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; | ||
| import org.opensearch.dataprepper.model.configuration.PluginModel; | ||
| import org.opensearch.dataprepper.model.configuration.SinkModel; | ||
| import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; | ||
| import org.reflections.Reflections; | ||
| import org.reflections.scanners.TypeAnnotationsScanner; | ||
|
|
||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class DynamicPipelineUpdateUtil { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not use
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Converted to Spring Service (named bean) |
||
|
|
||
| public static final Set<String> excludedProcessorsFromDynamicUpdate = Set.of("aggregate"); | ||
|
|
||
| public static boolean isDynamicUpdateFeasible(final PipelinesDataFlowModel currentPipelineDataFlowMode, | ||
| final PipelinesDataFlowModel targetDataPipelineDataFlowMode) { | ||
|
|
||
| checkIfSamePipelineExists(currentPipelineDataFlowMode, targetDataPipelineDataFlowMode); | ||
| Set<String> singleThreadedProcessorNames = scanForSingleThreadAnnotatedProcessorPlugins(); | ||
| singleThreadedProcessorNames.addAll(excludedProcessorsFromDynamicUpdate); | ||
|
|
||
| for (String pipelineName : currentPipelineDataFlowMode.getPipelines().keySet()) { | ||
| PipelineModel currentPipelineModel = currentPipelineDataFlowMode.getPipelines().get(pipelineName); | ||
| PipelineModel targetPipelineModel = targetDataPipelineDataFlowMode.getPipelines().get(pipelineName); | ||
|
|
||
| // Check if source configuration remains unchanged | ||
| PluginModel currentSource = currentPipelineModel.getSource(); | ||
| PluginModel targetSource = targetPipelineModel.getSource(); | ||
| if (!currentSource.equals(targetSource)) { | ||
| throw new DynamicPipelineConfigUpdateException( | ||
| "Source configuration cannot be modified in pipeline: " + pipelineName); | ||
| } | ||
|
|
||
| // Check if sinks configuration remains unchanged | ||
| List<SinkModel> currentSinks = currentPipelineModel.getSinks(); | ||
| List<SinkModel> targetSinks = targetPipelineModel.getSinks(); | ||
| if (!currentSinks.equals(targetSinks)) { | ||
| throw new DynamicPipelineConfigUpdateException( | ||
| "Sinks configuration cannot be modified in pipeline: " + pipelineName); | ||
| } | ||
|
|
||
| List<PluginModel> currentProcessors = currentPipelineModel.getProcessors(); | ||
| List<PluginModel> targetProcessors = targetPipelineModel.getProcessors(); | ||
|
|
||
| currentProcessors = currentProcessors == null ? List.of() : currentProcessors; | ||
| targetProcessors = targetProcessors == null ? List.of() : targetProcessors; | ||
|
|
||
| // Collect single-threaded processors in current and target | ||
| Set<String> currentSingleThreaded = currentProcessors.stream() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a bit of split logic here. We have other places to look for this annotation. Can we make use of that existing code? |
||
| .map(PluginModel::getPluginName) | ||
| .filter(singleThreadedProcessorNames::contains) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| Set<String> targetSingleThreaded = targetProcessors.stream() | ||
| .map(PluginModel::getPluginName) | ||
| .filter(singleThreadedProcessorNames::contains) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| // Only throw if target adds new single-threaded processors or modifies existing ones | ||
| for (String targetProcessor : targetSingleThreaded) { | ||
| if (!currentSingleThreaded.contains(targetProcessor)) { | ||
| throw new DynamicPipelineConfigUpdateException( | ||
| "Cannot add new single-threaded processor: " + targetProcessor); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we care about this? Is it because of the state? If so, this is insufficient. The Also, Maybe we should have an |
||
| } | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| public static void executeDynamicUpdateOfPipelineConfig(final PipelinesDataFlowModel currentPipelineDataFlowMode, | ||
| final PipelinesDataFlowModel targetDataPipelineDataFlowMode) { | ||
|
|
||
|
|
||
| } | ||
|
|
||
| public static void checkIfSamePipelineExists(final PipelinesDataFlowModel currentPipelineDataFlowMode, | ||
| final PipelinesDataFlowModel targetDataPipelineDataFlowMode) { | ||
|
|
||
| Set<String> currentPipelineNames = currentPipelineDataFlowMode.getPipelines().keySet(); | ||
| Set<String> targetPipelineNames = targetDataPipelineDataFlowMode.getPipelines().keySet(); | ||
|
|
||
| if (!currentPipelineNames.equals(targetPipelineNames)) { | ||
| String addedPipelines = targetPipelineNames.stream() | ||
| .filter(name -> !currentPipelineNames.contains(name)) | ||
| .collect(Collectors.joining(", ")); | ||
| String removedPipelines = currentPipelineNames.stream() | ||
| .filter(name -> !targetPipelineNames.contains(name)) | ||
| .collect(Collectors.joining(", ")); | ||
|
|
||
| StringBuilder errorMessage = new StringBuilder("Pipeline configuration mismatch found."); | ||
| if (!addedPipelines.isEmpty()) { | ||
| errorMessage.append(" New pipelines found: ").append(addedPipelines).append("."); | ||
| } | ||
| if (!removedPipelines.isEmpty()) { | ||
| errorMessage.append(" Missing pipelines: ").append(removedPipelines).append("."); | ||
| } | ||
|
|
||
| throw new DynamicPipelineConfigUpdateException(errorMessage.toString()); | ||
| } | ||
| } | ||
|
|
||
| public static Set<String> scanForSingleThreadAnnotatedProcessorPlugins() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is duplicating work from the I don't think the |
||
| Reflections reflections = new Reflections("org.opensearch.dataprepper", new TypeAnnotationsScanner()); | ||
| Set<Class<?>> dataPrepperPlugins = reflections.getTypesAnnotatedWith(DataPrepperPlugin.class); | ||
| Set<String> singleThreadedProcessorNames = new HashSet<>(); | ||
|
|
||
| for (Class<?> clazz : dataPrepperPlugins) { | ||
| if (clazz.isAnnotationPresent(SingleThread.class) || RequiresPeerForwarding.class.isAssignableFrom(clazz)) { | ||
| DataPrepperPlugin pluginAnnotation = clazz.getAnnotation(DataPrepperPlugin.class); | ||
| String name = pluginAnnotation.name(); | ||
| singleThreadedProcessorNames.add(name); | ||
| } | ||
| } | ||
| return singleThreadedProcessorNames; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.core.pipeline.server; | ||
|
|
||
| import com.sun.net.httpserver.HttpExchange; | ||
| import com.sun.net.httpserver.HttpHandler; | ||
| import org.opensearch.dataprepper.core.pipeline.PipelinesProvider; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * HttpHandler to handle requests for updating pipeline configurations from S3 | ||
| */ | ||
| public class IsDynamicallyUpdatablePipelineHandler extends UpdatePipelineBaseHandler implements HttpHandler { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(IsDynamicallyUpdatablePipelineHandler.class); | ||
|
|
||
| public IsDynamicallyUpdatablePipelineHandler(final PipelinesProvider pipelinesProvider) { | ||
| super(pipelinesProvider); | ||
| } | ||
|
|
||
| @Override | ||
| public void handle(final HttpExchange exchange) throws IOException { | ||
| baseHandle(exchange, false); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were these moved within nested classes? Or just shifted in the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just shifted in the code. I now adjusted my IDE formatter not to touch unmodified code.