Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions data-prepper-plugins/ocsf-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-test-common')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation 'io.micrometer:micrometer-core'
implementation files('OCSFTransformers-1.0.jar')
testImplementation libs.commons.lang3
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.ocsf;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.plugin.PluginFactory;

import java.util.Collection;


@DataPrepperPlugin(name = "ocsf", pluginType = Processor.class, pluginConfigurationType = OcsfProcessorConfig.class)
public class OcsfProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
OcsfTransformer ocsfTransformer;
private static final String TRANSFORMATION_ERRORS = "transformationErrors";
private final Counter transformationErrorsCounter;
private final String version;

@DataPrepperPluginConstructor
public OcsfProcessor(final OcsfProcessorConfig ocsfProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
super(pluginMetrics);
ocsfTransformer = loadOcsfTransformer(pluginFactory, ocsfProcessorConfig.getSchemaType());
version = ocsfProcessorConfig.getVersion();
transformationErrorsCounter = pluginMetrics.counter(TRANSFORMATION_ERRORS);
}

private OcsfTransformer loadOcsfTransformer(final PluginFactory pluginFactory, final PluginModel modeConfiguration) {
final PluginSetting modePluginSetting = new PluginSetting(modeConfiguration.getPluginName(), modeConfiguration.getPluginSettings());
return pluginFactory.loadPlugin(OcsfTransformer.class, modePluginSetting);
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (final Record<Event> record: records) {
try {
ocsfTransformer.transform(record.getData(), version);
} catch (Exception e) {
transformationErrorsCounter.increment();
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.ocsf;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

public class OcsfProcessorConfig {
public static final String DEFAULT_VERSION = "1.1";

@JsonProperty("schema_type")
@JsonPropertyDescription("schema type of the ocsf processor")
@UsesDataPrepperPlugin(pluginType = OcsfTransformer.class)
@NotNull
PluginModel schemaType = null;

public PluginModel getSchemaType() {
return schemaType;
}

@JsonProperty("version")
@JsonPropertyDescription("Target OCSF version")
String version = DEFAULT_VERSION;

public String getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.ocsf;

import org.opensearch.dataprepper.model.event.Event;

public interface OcsfTransformer {
void transform(Event event, final String version) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.ocsf;

import org.opensearch.dataprepper.model.configuration.PluginModel;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.commons.lang3.RandomStringUtils;
import static org.mockito.Mockito.mock;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.lang.reflect.Field;

public class OcsfProcessorConfigTest {

private OcsfProcessorConfig ocsfProcessorConfig;

@BeforeEach
void setUp() {
ocsfProcessorConfig = new OcsfProcessorConfig();
}

@Test
void TestDefaultConfig() {
assertThat(ocsfProcessorConfig.getVersion(), equalTo(OcsfProcessorConfig.DEFAULT_VERSION));
assertThat(ocsfProcessorConfig.getSchemaType(), equalTo(null));
}

@Test
void TestCustomConfig() throws Exception {
final String testVersion = RandomStringUtils.randomAlphabetic(10);
reflectivelySetField(ocsfProcessorConfig, "version", testVersion);
assertThat(ocsfProcessorConfig.getVersion(), equalTo(testVersion));
PluginModel schemaType = mock(PluginModel.class);
reflectivelySetField(ocsfProcessorConfig, "schemaType", schemaType);
assertThat(ocsfProcessorConfig.getSchemaType(), equalTo(schemaType));
}

private void reflectivelySetField(final OcsfProcessorConfig ocsfProcessorConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
final Field field = OcsfProcessorConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(ocsfProcessorConfig, value);
} finally {
field.setAccessible(false);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.ocsf;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.plugin.PluginFactory;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.mockito.Mock;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class OcsfProcessorTest {

public class TestOcsfTransformer implements OcsfTransformer {
public void transform(Event event, final String version) {
if (event.get("shouldThrow", Boolean.class) != null) {
throw new RuntimeException("Exception");
}
event.put("transformed", true);
}
}

@Mock
PluginMetrics pluginMetrics;
@Mock
PluginFactory pluginFactory;
@Mock
PluginModel testPlugin;
@Mock
OcsfProcessorConfig ocsfProcessorConfig;
@Mock
Counter errorCounter;

AtomicInteger errorCount;

OcsfProcessor ocsfProcessor;
TestOcsfTransformer testOcsfTransformer;

@BeforeEach
void setup() {
errorCount = new AtomicInteger(0);
testOcsfTransformer = new TestOcsfTransformer();
ocsfProcessorConfig = mock(OcsfProcessorConfig.class);
testPlugin = mock(PluginModel.class);
when(testPlugin.getPluginName()).thenReturn("test-plugin");
when(testPlugin.getPluginSettings()).thenReturn(Map.of());
when(ocsfProcessorConfig.getSchemaType()).thenReturn(testPlugin);
pluginMetrics = mock(PluginMetrics.class);
pluginFactory = mock(PluginFactory.class);
when(pluginFactory.loadPlugin(eq(OcsfTransformer.class), any())).thenReturn(testOcsfTransformer);
errorCounter = mock(Counter.class);
lenient().doAnswer(a -> {
errorCount.getAndAdd(1);
return null;
}).when(errorCounter).increment();
lenient().doAnswer(a -> {
return errorCounter;
}).when(pluginMetrics).counter(anyString());
}

OcsfProcessor createObjectUnderTest() {
return new OcsfProcessor(ocsfProcessorConfig, pluginMetrics, pluginFactory);
}

@Test
public void testSuccessfulTransformations() {
ocsfProcessor = createObjectUnderTest();
int numRecords = 10;
Collection<Record<Event>> records = getRecordList(numRecords);
List<Record<Event>> recordsOut = (List<Record<Event>>)ocsfProcessor.doExecute(records);
assertThat(recordsOut.size(), equalTo(numRecords));
for (final Record<Event> record: recordsOut) {
assertThat(record.getData().get("transformed", Boolean.class), equalTo(true));
}

}

@Test
public void testFailedTransformations() {
ocsfProcessor = createObjectUnderTest();
int numRecords = 10;
Collection<Record<Event>> records = getRecordList(numRecords);
for (Record<Event> record: records) {
record.getData().put("shouldThrow", true);
}
List<Record<Event>> recordsOut = (List<Record<Event>>)ocsfProcessor.doExecute(records);
assertThat(recordsOut.size(), equalTo(numRecords));
for (final Record<Event> record: recordsOut) {
assertThat(record.getData().get("transformed", Boolean.class), equalTo(null));
}
assertThat(errorCount.get(), equalTo(numRecords));

}

private Collection<Record<Event>> getRecordList(int numberOfRecords) {
final Collection<Record<Event>> recordList = new ArrayList<>();
List<HashMap> records = generateRecords(numberOfRecords);
for (int i = 0; i < numberOfRecords; i++) {
final Event event = JacksonLog.builder()
.withData(records.get(i))
.build();
recordList.add(new Record<>(event));
}
return recordList;
}

private List<HashMap> generateRecords(int numberOfRecords) {
List<HashMap> recordList = new ArrayList<>();

for (int rows = 0; rows < numberOfRecords; rows++) {
HashMap<String, String> eventData = new HashMap<>();
eventData.put("name", "Person" + rows);
eventData.put("age", Integer.toString(rows));
recordList.add(eventData);

}
return recordList;
}
}

1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ include 'data-prepper-plugin-schema-cli'
include 'data-prepper-plugins:common'
include 'data-prepper-plugins:armeria-common'
include 'data-prepper-plugins:anomaly-detector-processor'
include 'data-prepper-plugins:ocsf-processor'
include 'data-prepper-plugins:opensearch'
include 'data-prepper-plugins:ocsf'
include 'data-prepper-plugins:service-map-stateful'
Expand Down