Skip to content

Commit 3d1ff30

Browse files
author
Kiran Kumar Veeravelly
committed
feat(osgi): add opt-in Apache Felix plugin framework
Add an opt-in plugin-loading mode backed by an embedded Apache Felix OSGi framework, enabled with the system property -Dplugin.framework=osgi. The default classpath loader remains the default when the flag is not set. When the flag is not set, plugin discovery is functionally identical to today: the same ServiceLoader-based providers are returned in the same order, the OSGi framework never starts, and no OSGi code path executes. The only added work in legacy mode is the construction of a couple of inert Spring beans. The new plugin-framework-osgi module embeds the Felix lifecycle (FelixPluginManager), bootstraps it in OSGi mode through a Spring-managed runner (OsgiFrameworkRunner), and exposes a PluginProvider backed by the OSGi service registry (OsgiPluginRegistry). At startup in OSGi mode, StaticBundleLoader installs each legacy plugin JAR through BundleAdapter, which installs already-OSGi JARs directly and wraps legacy (non-OSGi) JARs by generating a versioned OSGi manifest. Wrapped plugins' @DataPrepperPlugin classes are registered as OSGi services (LegacyPluginBundleActivator). Data Prepper API packages are exported to bundles with semver version attributes read from data-prepper-api build metadata (DataPrepperOsgiPackages, DataPrepperApiVersion), consistent with the data-prepper-api backward-compatibility contract from #6607, so a plugin built on 2.15 resolves on a 2.16 host. DataPrepperOsgiPackages also exports the framework package org.opensearch.dataprepper.plugin.osgi so adapted bundles can load the LegacyPluginBundleActivator from the system bundle. StaticBundleLoader installs, resolves, and starts bundles with fail-fast behavior, a startup summary log, and Micrometer metrics. BundleResolutionErrorTranslator turns Felix resolution failures into readable diagnostics, and BundleHealthCheck verifies framework, bundle, and classloader isolation state. BundleClassLoaderScope manages the thread context classloader at the plugin invocation boundary so ServiceLoader and SPI resolve against the bundle classloader. PluginHealthProbe is a seam for future functional health checks. The OsgiFrameworkRunner bean is always instantiated by Spring but does nothing unless -Dplugin.framework=osgi is set, because it returns early in its @PostConstruct method. The Felix framework JAR ships on the runtime classpath as a small (<1MB) library with zero transitive dependencies, but it is not exercised in legacy mode. Scope is limited to classloader isolation and deploy-time validation. Bundles install once at startup and stop once at shutdown, the same lifecycle as classpath plugins today. No production hot-reload is included. The PluginHotLoader helper is test-scoped only and not on the production classpath. Wiring changes: settings.gradle adds the module, a Felix version-catalog entry, and the bnd plugin; build-resources.gradle adds the module to coreProjects; data-prepper-core declares a runtimeOnly dependency on the new module; PluginProviderLoader is made public with a registerProvider() hook and caches the framework-mode flag at construction; a test in data-prepper-plugin-framework's PluginProviderLoaderTest verifies the mode-caching behavior. Legacy behavior is unchanged. A new, permanent, minimal example plugin data-prepper-plugins/echo-processor is added to the build to demonstrate a minimal @DataPrepperPlugin and the SPI path. Resolves #6760 Signed-off-by: Kiran Kumar Veeravelly <veeravkk@amazon.com>
1 parent 4818896 commit 3d1ff30

46 files changed

Lines changed: 5477 additions & 6 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build-resources.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ ext.coreProjects = [
1919
project(':data-prepper-test'),
2020
project(':data-prepper-plugin-framework'),
2121
project(':data-prepper-plugin-schema'),
22-
project(':data-prepper-plugin-schema-cli')
22+
project(':data-prepper-plugin-schema-cli'),
23+
project(':plugin-framework-osgi')
2324
]

data-prepper-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ dependencies {
3737
implementation project(':data-prepper-logstash-configuration')
3838
implementation project(':data-prepper-pipeline-parser')
3939
implementation project(':data-prepper-plugin-framework')
40+
runtimeOnly project(':plugin-framework-osgi')
4041
testImplementation project(':data-prepper-plugin-framework').sourceSets.test.output
4142
testImplementation project(':data-prepper-plugins:common').sourceSets.test.output
4243
testImplementation project(':data-prepper-plugins:file-source')

data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginProviderLoader.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugin;
@@ -9,24 +14,56 @@
914
import org.slf4j.LoggerFactory;
1015

1116
import javax.inject.Named;
17+
import java.util.ArrayList;
1218
import java.util.Collection;
1319
import java.util.List;
1420
import java.util.ServiceLoader;
21+
import java.util.concurrent.CopyOnWriteArrayList;
1522
import java.util.stream.Collectors;
1623
import java.util.stream.StreamSupport;
1724

25+
/**
26+
* Loads and provides plugin providers via SPI and optional runtime registration.
27+
* Made public solely for the OSGi integration module to register additional providers.
28+
*/
1829
@Named
19-
class PluginProviderLoader {
30+
public class PluginProviderLoader {
2031
private static final Logger LOG = LoggerFactory.getLogger(PluginProviderLoader.class);
21-
private final ServiceLoader<PluginProvider> serviceLoader;
32+
private static final String PLUGIN_FRAMEWORK_PROPERTY = "plugin.framework";
33+
private static final String MODE_OSGI = "osgi";
34+
private final List<PluginProvider> classpathProviders;
35+
private final List<PluginProvider> additionalProviders = new CopyOnWriteArrayList<>();
36+
private final String frameworkMode;
2237

2338
PluginProviderLoader() {
24-
serviceLoader = ServiceLoader.load(PluginProvider.class);
39+
final ServiceLoader<PluginProvider> serviceLoader = ServiceLoader.load(PluginProvider.class);
40+
classpathProviders = StreamSupport.stream(serviceLoader.spliterator(), false)
41+
.collect(Collectors.toList());
42+
frameworkMode = System.getProperty(PLUGIN_FRAMEWORK_PROPERTY, "legacy");
43+
}
44+
45+
/**
46+
* Registers an additional PluginProvider at runtime.
47+
* Used by the OSGi integration to inject the OsgiPluginRegistry.
48+
*
49+
* @param provider the provider to add
50+
*/
51+
public void registerProvider(final PluginProvider provider) {
52+
additionalProviders.add(provider);
53+
LOG.info("Registered additional PluginProvider: {}", provider.getClass().getSimpleName());
2554
}
2655

2756
Collection<PluginProvider> getPluginProviders() {
28-
final List<PluginProvider> pluginProviders = StreamSupport.stream(serviceLoader.spliterator(), false)
29-
.collect(Collectors.toList());
57+
final List<PluginProvider> pluginProviders;
58+
59+
if (MODE_OSGI.equalsIgnoreCase(frameworkMode) && !additionalProviders.isEmpty()) {
60+
pluginProviders = new ArrayList<>(additionalProviders);
61+
pluginProviders.addAll(classpathProviders);
62+
LOG.debug("Plugin framework running in OSGi mode with {} providers ({} OSGi + {} classpath)",
63+
pluginProviders.size(), additionalProviders.size(), classpathProviders.size());
64+
} else {
65+
pluginProviders = new ArrayList<>(classpathProviders);
66+
}
3067

3168
LOG.debug("Data Prepper is configured with {} distinct plugin providers.", pluginProviders.size());
3269

data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginProviderLoaderTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugin;
@@ -66,4 +71,28 @@ void getPluginProviders_returns_a_collection_of_all_PluginProvider_instances() {
6671
assertThat(actualPluginProviders, not(sameInstance(originalPluginProviders)));
6772
assertThat(actualPluginProviders, equalTo(originalPluginProviders));
6873
}
74+
75+
@Test
76+
void getPluginProviders_caches_mode_at_construction_and_ignores_later_property_change() {
77+
// Construct with legacy mode (default)
78+
given(serviceLoader.spliterator())
79+
.willReturn(Collections.<PluginProvider>emptyList().spliterator());
80+
final PluginProviderLoader loader = createObjectUnderTest();
81+
82+
// Register an additional provider
83+
final PluginProvider osgiProvider = mock(PluginProvider.class);
84+
loader.registerProvider(osgiProvider);
85+
86+
// Even if we change the system property now, the mode was cached at construction
87+
System.setProperty("plugin.framework", "osgi");
88+
try {
89+
given(serviceLoader.spliterator())
90+
.willReturn(Collections.<PluginProvider>emptyList().spliterator());
91+
final Collection<PluginProvider> providers = loader.getPluginProviders();
92+
// Should use legacy behavior since the mode was cached as "legacy" at construction
93+
assertThat(providers.size(), equalTo(0));
94+
} finally {
95+
System.clearProperty("plugin.framework");
96+
}
97+
}
6998
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
plugins {
7+
id 'java'
8+
}
9+
10+
dependencies {
11+
implementation project(':data-prepper-api')
12+
}
13+
14+
test {
15+
useJUnitPlatform()
16+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.echo;
12+
13+
import org.opensearch.dataprepper.metrics.PluginMetrics;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
15+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
18+
import org.opensearch.dataprepper.model.processor.Processor;
19+
import org.opensearch.dataprepper.model.record.Record;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.Collection;
24+
25+
/**
26+
* A simple pass-through processor that logs each event and returns them unchanged.
27+
* Used to validate the OSGi plugin loading pipeline end-to-end.
28+
*/
29+
@DataPrepperPlugin(name = "echo", pluginType = Processor.class)
30+
public class EchoProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
31+
private static final Logger LOG = LoggerFactory.getLogger(EchoProcessor.class);
32+
33+
@DataPrepperPluginConstructor
34+
public EchoProcessor(final PluginMetrics pluginMetrics) {
35+
super(pluginMetrics);
36+
}
37+
38+
@Override
39+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
40+
for (final Record<Event> record : records) {
41+
LOG.debug("echo: {}", record.getData().toJsonString());
42+
}
43+
return records;
44+
}
45+
46+
@Override
47+
public void prepareForShutdown() {
48+
}
49+
50+
@Override
51+
public boolean isReadyForShutdown() {
52+
return true;
53+
}
54+
55+
@Override
56+
public void shutdown() {
57+
}
58+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#
2+
# Copyright OpenSearch Contributors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
org.opensearch.dataprepper.plugin.packages=org.opensearch.dataprepper.plugins
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.echo;
12+
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.extension.ExtendWith;
16+
import org.mockito.Mock;
17+
import org.mockito.junit.jupiter.MockitoExtension;
18+
import org.opensearch.dataprepper.metrics.PluginMetrics;
19+
import org.opensearch.dataprepper.model.event.Event;
20+
import org.opensearch.dataprepper.model.event.JacksonEvent;
21+
import org.opensearch.dataprepper.model.record.Record;
22+
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
27+
import static org.hamcrest.CoreMatchers.is;
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
30+
@ExtendWith(MockitoExtension.class)
31+
class EchoProcessorTest {
32+
33+
@Mock
34+
private PluginMetrics pluginMetrics;
35+
36+
private EchoProcessor echoProcessor;
37+
38+
@BeforeEach
39+
void setUp() {
40+
echoProcessor = new EchoProcessor(pluginMetrics);
41+
}
42+
43+
@Test
44+
void doExecute_returns_same_records() {
45+
final Record<Event> record = new Record<>(JacksonEvent.builder()
46+
.withEventType("event")
47+
.withData(Collections.singletonMap("key", "value"))
48+
.build());
49+
50+
final Collection<Record<Event>> result = echoProcessor.doExecute(Arrays.asList(record));
51+
52+
assertThat(result.size(), is(1));
53+
assertThat(result.iterator().next(), is(record));
54+
}
55+
56+
@Test
57+
void doExecute_with_empty_collection_returns_empty() {
58+
final Collection<Record<Event>> result = echoProcessor.doExecute(Collections.emptyList());
59+
assertThat(result.isEmpty(), is(true));
60+
}
61+
62+
@Test
63+
void isReadyForShutdown_returns_true() {
64+
assertThat(echoProcessor.isReadyForShutdown(), is(true));
65+
}
66+
67+
@Test
68+
void shutdown_does_not_throw() {
69+
echoProcessor.shutdown();
70+
}
71+
72+
@Test
73+
void prepareForShutdown_does_not_throw() {
74+
echoProcessor.prepareForShutdown();
75+
}
76+
}

0 commit comments

Comments
 (0)