diff --git a/data-prepper-plugins/ocsf-processor/build.gradle b/data-prepper-plugins/ocsf-processor/build.gradle new file mode 100644 index 0000000000..ec15c203ff --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/build.gradle @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-test-common') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'javax.inject:javax.inject:1' + implementation 'io.micrometer:micrometer-core' + implementation files('OCSFTransformers-1.0.jar') + testImplementation libs.commons.lang3 +} + diff --git a/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessor.java b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessor.java new file mode 100644 index 0000000000..fc777c08fd --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessor.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.ocsf; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import java.util.Collection; + + +@DataPrepperPlugin(name = "ocsf", pluginType = Processor.class, pluginConfigurationType = OcsfProcessorConfig.class) +public class OcsfProcessor extends AbstractProcessor, Record> { + OcsfTransformer ocsfTransformer; + private static final String TRANSFORMATION_ERRORS = "transformationErrors"; + private final Counter transformationErrorsCounter; + private final String version; + + @DataPrepperPluginConstructor + public OcsfProcessor(final OcsfProcessorConfig ocsfProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { + super(pluginMetrics); + ocsfTransformer = loadOcsfTransformer(pluginFactory, ocsfProcessorConfig.getSchemaType()); + version = ocsfProcessorConfig.getVersion(); + transformationErrorsCounter = pluginMetrics.counter(TRANSFORMATION_ERRORS); + } + + private OcsfTransformer loadOcsfTransformer(final PluginFactory pluginFactory, final PluginModel modeConfiguration) { + final PluginSetting modePluginSetting = new PluginSetting(modeConfiguration.getPluginName(), modeConfiguration.getPluginSettings()); + return pluginFactory.loadPlugin(OcsfTransformer.class, modePluginSetting); + } + + @Override + public Collection> doExecute(Collection> records) { + for (final Record record: records) { + try { + ocsfTransformer.transform(record.getData(), version); + } catch (Exception e) { + transformationErrorsCounter.increment(); + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + + } +} diff --git a/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfig.java b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfig.java new file mode 100644 index 0000000000..0154cd9763 --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfig.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.ocsf; + +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +public class OcsfProcessorConfig { + public static final String DEFAULT_VERSION = "1.1"; + + @JsonProperty("schema_type") + @JsonPropertyDescription("schema type of the ocsf processor") + @UsesDataPrepperPlugin(pluginType = OcsfTransformer.class) + @NotNull + PluginModel schemaType = null; + + public PluginModel getSchemaType() { + return schemaType; + } + + @JsonProperty("version") + @JsonPropertyDescription("Target OCSF version") + String version = DEFAULT_VERSION; + + public String getVersion() { + return version; + } +} diff --git a/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfTransformer.java b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfTransformer.java new file mode 100644 index 0000000000..ebc72a984d --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfTransformer.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.ocsf; + +import org.opensearch.dataprepper.model.event.Event; + +public interface OcsfTransformer { + void transform(Event event, final String version) throws Exception; +} diff --git a/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfigTest.java b/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfigTest.java new file mode 100644 index 0000000000..faea0eb0e6 --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorConfigTest.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.ocsf; + +import org.opensearch.dataprepper.model.configuration.PluginModel; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.commons.lang3.RandomStringUtils; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import java.lang.reflect.Field; + +public class OcsfProcessorConfigTest { + + private OcsfProcessorConfig ocsfProcessorConfig; + + @BeforeEach + void setUp() { + ocsfProcessorConfig = new OcsfProcessorConfig(); + } + + @Test + void TestDefaultConfig() { + assertThat(ocsfProcessorConfig.getVersion(), equalTo(OcsfProcessorConfig.DEFAULT_VERSION)); + assertThat(ocsfProcessorConfig.getSchemaType(), equalTo(null)); + } + + @Test + void TestCustomConfig() throws Exception { + final String testVersion = RandomStringUtils.randomAlphabetic(10); + reflectivelySetField(ocsfProcessorConfig, "version", testVersion); + assertThat(ocsfProcessorConfig.getVersion(), equalTo(testVersion)); + PluginModel schemaType = mock(PluginModel.class); + reflectivelySetField(ocsfProcessorConfig, "schemaType", schemaType); + assertThat(ocsfProcessorConfig.getSchemaType(), equalTo(schemaType)); + } + + private void reflectivelySetField(final OcsfProcessorConfig ocsfProcessorConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { + final Field field = OcsfProcessorConfig.class.getDeclaredField(fieldName); + try { + field.setAccessible(true); + field.set(ocsfProcessorConfig, value); + } finally { + field.setAccessible(false); + } + } + +} diff --git a/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorTest.java b/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorTest.java new file mode 100644 index 0000000000..ad912f76b3 --- /dev/null +++ b/data-prepper-plugins/ocsf-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/ocsf/OcsfProcessorTest.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.ocsf; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.lenient; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class OcsfProcessorTest { + + public class TestOcsfTransformer implements OcsfTransformer { + public void transform(Event event, final String version) { + if (event.get("shouldThrow", Boolean.class) != null) { + throw new RuntimeException("Exception"); + } + event.put("transformed", true); + } + } + + @Mock + PluginMetrics pluginMetrics; + @Mock + PluginFactory pluginFactory; + @Mock + PluginModel testPlugin; + @Mock + OcsfProcessorConfig ocsfProcessorConfig; + @Mock + Counter errorCounter; + + AtomicInteger errorCount; + + OcsfProcessor ocsfProcessor; + TestOcsfTransformer testOcsfTransformer; + + @BeforeEach + void setup() { + errorCount = new AtomicInteger(0); + testOcsfTransformer = new TestOcsfTransformer(); + ocsfProcessorConfig = mock(OcsfProcessorConfig.class); + testPlugin = mock(PluginModel.class); + when(testPlugin.getPluginName()).thenReturn("test-plugin"); + when(testPlugin.getPluginSettings()).thenReturn(Map.of()); + when(ocsfProcessorConfig.getSchemaType()).thenReturn(testPlugin); + pluginMetrics = mock(PluginMetrics.class); + pluginFactory = mock(PluginFactory.class); + when(pluginFactory.loadPlugin(eq(OcsfTransformer.class), any())).thenReturn(testOcsfTransformer); + errorCounter = mock(Counter.class); + lenient().doAnswer(a -> { + errorCount.getAndAdd(1); + return null; + }).when(errorCounter).increment(); + lenient().doAnswer(a -> { + return errorCounter; + }).when(pluginMetrics).counter(anyString()); + } + + OcsfProcessor createObjectUnderTest() { + return new OcsfProcessor(ocsfProcessorConfig, pluginMetrics, pluginFactory); + } + + @Test + public void testSuccessfulTransformations() { + ocsfProcessor = createObjectUnderTest(); + int numRecords = 10; + Collection> records = getRecordList(numRecords); + List> recordsOut = (List>)ocsfProcessor.doExecute(records); + assertThat(recordsOut.size(), equalTo(numRecords)); + for (final Record record: recordsOut) { + assertThat(record.getData().get("transformed", Boolean.class), equalTo(true)); + } + + } + + @Test + public void testFailedTransformations() { + ocsfProcessor = createObjectUnderTest(); + int numRecords = 10; + Collection> records = getRecordList(numRecords); + for (Record record: records) { + record.getData().put("shouldThrow", true); + } + List> recordsOut = (List>)ocsfProcessor.doExecute(records); + assertThat(recordsOut.size(), equalTo(numRecords)); + for (final Record record: recordsOut) { + assertThat(record.getData().get("transformed", Boolean.class), equalTo(null)); + } + assertThat(errorCount.get(), equalTo(numRecords)); + + } + + private Collection> getRecordList(int numberOfRecords) { + final Collection> recordList = new ArrayList<>(); + List records = generateRecords(numberOfRecords); + for (int i = 0; i < numberOfRecords; i++) { + final Event event = JacksonLog.builder() + .withData(records.get(i)) + .build(); + recordList.add(new Record<>(event)); + } + return recordList; + } + + private List generateRecords(int numberOfRecords) { + List recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + HashMap eventData = new HashMap<>(); + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + recordList.add(eventData); + + } + return recordList; + } +} + diff --git a/settings.gradle b/settings.gradle index bef43b3d7a..e58eb7de93 100644 --- a/settings.gradle +++ b/settings.gradle @@ -109,6 +109,7 @@ include 'data-prepper-plugin-schema-cli' include 'data-prepper-plugins:common' include 'data-prepper-plugins:armeria-common' include 'data-prepper-plugins:anomaly-detector-processor' +include 'data-prepper-plugins:ocsf-processor' include 'data-prepper-plugins:opensearch' include 'data-prepper-plugins:ocsf' include 'data-prepper-plugins:service-map-stateful'