Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
Expand Down Expand Up @@ -42,6 +41,27 @@ public class PluginModel {
private final String pluginName;
private final InternalJsonModel innerModel;

public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were these moved within nested classes? Or just shifted in the code?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just shifted in the code. I now adjusted my IDE formatter not to touch unmodified code.

this(pluginName, new InternalJsonModel(pluginSettings));
}

protected PluginModel(final String pluginName, final InternalJsonModel innerModel) {
this.pluginName = pluginName;
this.innerModel = Objects.requireNonNull(innerModel);
}

public String getPluginName() {
return pluginName;
}

public Map<String, Object> getPluginSettings() {
return innerModel.pluginSettings;
}

<M extends InternalJsonModel> M getInternalJsonModel() {
return (M) innerModel;
}

/**
* This class represents the part of the {@link PluginModel} which sits below the name.
* In the following example, this would be everything below "opensearch":
Expand Down Expand Up @@ -71,27 +91,6 @@ static class InternalJsonModel {
}
}

public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) {
this(pluginName, new InternalJsonModel(pluginSettings));
}

protected PluginModel(final String pluginName, final InternalJsonModel innerModel) {
this.pluginName = pluginName;
this.innerModel = Objects.requireNonNull(innerModel);
}

public String getPluginName() {
return pluginName;
}

public Map<String, Object> getPluginSettings() {
return innerModel.pluginSettings;
}

<M extends InternalJsonModel> M getInternalJsonModel() {
return (M) innerModel;
}

/**
* Custom Serializer for Plugin Model
* <p>
Expand All @@ -114,7 +113,7 @@ public void serialize(
final PluginModel value, final JsonGenerator gen, final SerializerProvider provider) throws IOException {
gen.writeStartObject();
Map<String, Object> serializedInner = SERIALIZER_OBJECT_MAPPER.convertValue(value.innerModel, Map.class);
if(serializedInner != null && serializedInner.isEmpty())
if (serializedInner != null && serializedInner.isEmpty())
serializedInner = null;
gen.writeObjectField(value.getPluginName(), serializedInner);
gen.writeEndObject();
Expand Down Expand Up @@ -145,7 +144,6 @@ public PluginModelDeserializer() {
*
* @param <T> The type inheriting from {@link PluginModel} that you ultimately need deserialized
* @param <M> The type inheriting from {@link InternalJsonModel} that has custom fields.
*
* @see SinkModel.SinkModelDeserializer
*/
abstract static class AbstractPluginModelDeserializer<T extends PluginModel, M extends InternalJsonModel> extends StdDeserializer<PluginModel> {
Expand All @@ -166,7 +164,7 @@ protected AbstractPluginModelDeserializer(
}

@Override
public PluginModel deserialize(final JsonParser jsonParser, final DeserializationContext context) throws IOException, JacksonException {
public PluginModel deserialize(final JsonParser jsonParser, final DeserializationContext context) throws IOException {
final JsonNode node = jsonParser.getCodec().readTree(jsonParser);

final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
Expand All @@ -176,11 +174,24 @@ public PluginModel deserialize(final JsonParser jsonParser, final Deserializatio
final JsonNode value = onlyField.getValue();

M innerModel = SERIALIZER_OBJECT_MAPPER.convertValue(value, innerModelClass);
if(innerModel == null)
if (innerModel == null)
innerModel = emptyInnerModelConstructor.get();

return constructorFunction.apply(pluginName, innerModel);
}
}

@Override
public boolean equals(Object o) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for these new methods.

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginModel that = (PluginModel) o;
return Objects.equals(pluginName, that.pluginName) &&
Objects.equals(innerModel.pluginSettings, that.innerModel.pluginSettings);
}

@Override
public int hashCode() {
return Objects.hash(pluginName, innerModel.pluginSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.core.exception;

public class DynamicPipelineConfigUpdateException extends RuntimeException {
public DynamicPipelineConfigUpdateException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,26 @@
@Named
public class DataPrepperServer {
private static final Logger LOG = LoggerFactory.getLogger(DataPrepperServer.class);
private HttpServer server;
private final HttpServerProvider serverProvider;
private final ListPipelinesHandler listPipelinesHandler;
private final GetPipelinesHandler getPipelinesHandler;
private final UpdatePipelineHandler updatePipelineHandler;
private final IsDynamicallyUpdatablePipelineHandler isDynamicallyUpdatablePipelineHandler;
private final ShutdownHandler shutdownHandler;
private final EncryptionHttpHandler encryptionHttpHandler;
private final PrometheusMeterRegistry prometheusMeterRegistry;
private final Authenticator authenticator;
private final ExecutorService executorService;
private HttpServer server;

@Inject
public DataPrepperServer(
final HttpServerProvider serverProvider,
final ListPipelinesHandler listPipelinesHandler,
final ShutdownHandler shutdownHandler,
final GetPipelinesHandler getPipelinesHandler,
final UpdatePipelineHandler updatePipelineHandler,
final IsDynamicallyUpdatablePipelineHandler isDynamicallyUpdatablePipelineHandler,
@Autowired(required = false) @Nullable final EncryptionHttpHandler encryptionHttpHandler,
@Autowired(required = false) @Nullable final PrometheusMeterRegistry prometheusMeterRegistry,
@Autowired(required = false) @Nullable final Authenticator authenticator
Expand All @@ -53,6 +57,8 @@ public DataPrepperServer(
this.listPipelinesHandler = listPipelinesHandler;
this.shutdownHandler = shutdownHandler;
this.getPipelinesHandler = getPipelinesHandler;
this.updatePipelineHandler = updatePipelineHandler;
this.isDynamicallyUpdatablePipelineHandler = isDynamicallyUpdatablePipelineHandler;
this.encryptionHttpHandler = encryptionHttpHandler;
this.prometheusMeterRegistry = prometheusMeterRegistry;
this.authenticator = authenticator;
Expand All @@ -75,6 +81,8 @@ private HttpServer createServer() {
createContext(server, listPipelinesHandler, authenticator, "/list");
createContext(server, shutdownHandler, authenticator, "/shutdown");
createContext(server, getPipelinesHandler, authenticator, "/pipelines");
createContext(server, updatePipelineHandler, authenticator, "/updatePipelineConfig");
createContext(server, isDynamicallyUpdatablePipelineHandler, authenticator, "/isDynamicallyUpdatablePipelineConfig");

if (encryptionHttpHandler != null) {
createContext(server, encryptionHttpHandler, authenticator, "/encryption/rotate");
Expand All @@ -92,7 +100,7 @@ private void createContext(
final HttpServer httpServer,
final HttpHandler httpHandler,
@Nullable final Authenticator authenticator,
final String ... paths
final String... paths
) {
for (final String path : paths) {
final HttpContext context = httpServer.createContext(path, httpHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.pipeline.server;

import org.opensearch.dataprepper.core.exception.DynamicPipelineConfigUpdateException;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.reflections.Reflections;
import org.reflections.scanners.TypeAnnotationsScanner;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class DynamicPipelineUpdateUtil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use Util classes. We support Spring dependency injection. So use that instead to avoid the long-term difficulties of relying on utility classes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to Spring Service (named bean)


public static final Set<String> excludedProcessorsFromDynamicUpdate = Set.of("aggregate");

public static boolean isDynamicUpdateFeasible(final PipelinesDataFlowModel currentPipelineDataFlowMode,
final PipelinesDataFlowModel targetDataPipelineDataFlowMode) {

checkIfSamePipelineExists(currentPipelineDataFlowMode, targetDataPipelineDataFlowMode);
Set<String> singleThreadedProcessorNames = scanForSingleThreadAnnotatedProcessorPlugins();
singleThreadedProcessorNames.addAll(excludedProcessorsFromDynamicUpdate);

for (String pipelineName : currentPipelineDataFlowMode.getPipelines().keySet()) {
PipelineModel currentPipelineModel = currentPipelineDataFlowMode.getPipelines().get(pipelineName);
PipelineModel targetPipelineModel = targetDataPipelineDataFlowMode.getPipelines().get(pipelineName);

// Check if source configuration remains unchanged
PluginModel currentSource = currentPipelineModel.getSource();
PluginModel targetSource = targetPipelineModel.getSource();
if (!currentSource.equals(targetSource)) {
throw new DynamicPipelineConfigUpdateException(
"Source configuration cannot be modified in pipeline: " + pipelineName);
}

// Check if sinks configuration remains unchanged
List<SinkModel> currentSinks = currentPipelineModel.getSinks();
List<SinkModel> targetSinks = targetPipelineModel.getSinks();
if (!currentSinks.equals(targetSinks)) {
throw new DynamicPipelineConfigUpdateException(
"Sinks configuration cannot be modified in pipeline: " + pipelineName);
}

List<PluginModel> currentProcessors = currentPipelineModel.getProcessors();
List<PluginModel> targetProcessors = targetPipelineModel.getProcessors();

currentProcessors = currentProcessors == null ? List.of() : currentProcessors;
targetProcessors = targetProcessors == null ? List.of() : targetProcessors;

// Collect single-threaded processors in current and target
Set<String> currentSingleThreaded = currentProcessors.stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bit of split logic here. We have other places to look for this annotation. Can we make use of that existing code?

.map(PluginModel::getPluginName)
.filter(singleThreadedProcessorNames::contains)
.collect(Collectors.toSet());

Set<String> targetSingleThreaded = targetProcessors.stream()
.map(PluginModel::getPluginName)
.filter(singleThreadedProcessorNames::contains)
.collect(Collectors.toSet());

// Only throw if target adds new single-threaded processors or modifies existing ones
for (String targetProcessor : targetSingleThreaded) {
if (!currentSingleThreaded.contains(targetProcessor)) {
throw new DynamicPipelineConfigUpdateException(
"Cannot add new single-threaded processor: " + targetProcessor);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we care about this? Is it because of the state?

If so, this is insufficient. The aggregate processor retains state, but it is done in a thread-safe way.

Also, grok is currently labeled as @SingleThread because of a small code gap.

Maybe we should have an @Stateful annotation to indicate that state is kept?

}
}
}
return true;
}

public static void executeDynamicUpdateOfPipelineConfig(final PipelinesDataFlowModel currentPipelineDataFlowMode,
final PipelinesDataFlowModel targetDataPipelineDataFlowMode) {


}

public static void checkIfSamePipelineExists(final PipelinesDataFlowModel currentPipelineDataFlowMode,
final PipelinesDataFlowModel targetDataPipelineDataFlowMode) {

Set<String> currentPipelineNames = currentPipelineDataFlowMode.getPipelines().keySet();
Set<String> targetPipelineNames = targetDataPipelineDataFlowMode.getPipelines().keySet();

if (!currentPipelineNames.equals(targetPipelineNames)) {
String addedPipelines = targetPipelineNames.stream()
.filter(name -> !currentPipelineNames.contains(name))
.collect(Collectors.joining(", "));
String removedPipelines = currentPipelineNames.stream()
.filter(name -> !targetPipelineNames.contains(name))
.collect(Collectors.joining(", "));

StringBuilder errorMessage = new StringBuilder("Pipeline configuration mismatch found.");
if (!addedPipelines.isEmpty()) {
errorMessage.append(" New pipelines found: ").append(addedPipelines).append(".");
}
if (!removedPipelines.isEmpty()) {
errorMessage.append(" Missing pipelines: ").append(removedPipelines).append(".");
}

throw new DynamicPipelineConfigUpdateException(errorMessage.toString());
}
}

public static Set<String> scanForSingleThreadAnnotatedProcessorPlugins() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicating work from the PluginFactory. We should avoid this logic split.

I don't think the PluginFactory should give all @SingleThreaded annotations. But, maybe have a loadPlugin method with a Predicate?

Reflections reflections = new Reflections("org.opensearch.dataprepper", new TypeAnnotationsScanner());
Set<Class<?>> dataPrepperPlugins = reflections.getTypesAnnotatedWith(DataPrepperPlugin.class);
Set<String> singleThreadedProcessorNames = new HashSet<>();

for (Class<?> clazz : dataPrepperPlugins) {
if (clazz.isAnnotationPresent(SingleThread.class) || RequiresPeerForwarding.class.isAssignableFrom(clazz)) {
DataPrepperPlugin pluginAnnotation = clazz.getAnnotation(DataPrepperPlugin.class);
String name = pluginAnnotation.name();
singleThreadedProcessorNames.add(name);
}
}
return singleThreadedProcessorNames;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.pipeline.server;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.opensearch.dataprepper.core.pipeline.PipelinesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* HttpHandler to handle requests for updating pipeline configurations from S3
*/
public class IsDynamicallyUpdatablePipelineHandler extends UpdatePipelineBaseHandler implements HttpHandler {

private static final Logger LOG = LoggerFactory.getLogger(IsDynamicallyUpdatablePipelineHandler.class);

public IsDynamicallyUpdatablePipelineHandler(final PipelinesProvider pipelinesProvider) {
super(pipelinesProvider);
}

@Override
public void handle(final HttpExchange exchange) throws IOException {
baseHandle(exchange, false);
}
}
Loading
Loading