From 46973a64dc570b5cfb4311b5e423b9f1023f18d4 Mon Sep 17 00:00:00 2001 From: Manuel Mangas Zurita Date: Wed, 12 Nov 2025 10:28:04 -0800 Subject: [PATCH] Add EMF config that enables adding extra properties to the EMF record Signed-off-by: Manuel Mangas Zurita --- .../core/meter/EMFLoggingMeterRegistry.java | 13 ++--- .../core/meter/EMFLoggingRegistryConfig.java | 8 ++- .../core/parser/config/MetricsConfig.java | 21 +++++++- .../model/DataPrepperConfiguration.java | 8 +++ .../core/parser/model/EmfConfig.java | 25 ++++++++++ .../meter/EMFLoggingMeterRegistryTest.java | 49 +++++++++++++++++-- .../meter/EMFLoggingRegistryConfigTest.java | 2 +- .../model/DataPrepperConfigurationTests.java | 7 +++ .../core/parser/model/EmfConfigTest.java | 27 ++++++++++ 9 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/EmfConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/EmfConfigTest.java diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java index ede3e36600..412e4c4d81 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java @@ -65,12 +65,8 @@ public class EMFLoggingMeterRegistry extends StepMeterRegistry { private final Environment environment; private static final WarnThenDebugLogger warnThenDebugLogger = new WarnThenDebugLogger(EMFLoggingMeterRegistry.class); - public EMFLoggingMeterRegistry() { - this(new EnvironmentProvider().resolveEnvironment().join()); - } - - public EMFLoggingMeterRegistry(final Environment environment) { - this(EMFLoggingRegistryConfig.DEFAULT, environment, Clock.SYSTEM); + public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config) { + this(config, new EnvironmentProvider().resolveEnvironment().join(), Clock.SYSTEM); } public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config, final Environment environment, final Clock clock) { @@ -124,6 +120,7 @@ private MetricsLogger prepareMetricsLogger(final List tags, final Instant t .setNamespace(NAMESPACE) .setTimestamp(timestamp); addDimensionSet(tags, metricsLogger); + addAdditionalProperties(metricsLogger); return metricsLogger; } @@ -131,6 +128,10 @@ private void addDimensionSet(final List tags, final MetricsLogger metricsLo metricsLogger.setDimensions(toDimensionSet(tags)); } + private void addAdditionalProperties(final MetricsLogger metricsLogger) { + config.additionalProperties().forEach(metricsLogger::putProperty); + } + private DimensionSet toDimensionSet(final List tags) { final DimensionSet dimensionSet = new DimensionSet(); tags.stream().filter(this::isAcceptableTag).forEach(tag -> dimensionSet.addDimension(tag.getKey(), tag.getValue())); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfig.java index 9beb1d6cd9..9da7d8677c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfig.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfig.java @@ -7,11 +7,17 @@ import io.micrometer.core.instrument.step.StepRegistryConfig; +import java.util.Collections; +import java.util.Map; + public interface EMFLoggingRegistryConfig extends StepRegistryConfig { - EMFLoggingRegistryConfig DEFAULT = k -> null; @Override default String prefix() { return "emf"; } + + default Map additionalProperties() { + return Collections.emptyMap(); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/MetricsConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/MetricsConfig.java index 8bdecc3e36..d3925e2296 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/MetricsConfig.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/MetricsConfig.java @@ -20,6 +20,7 @@ import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry; +import org.opensearch.dataprepper.core.meter.EMFLoggingRegistryConfig; import org.opensearch.dataprepper.core.meter.JvmMemoryAggregateMetrics; import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.core.parser.model.MetricRegistryType; @@ -28,10 +29,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; + +import java.util.Map; import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.core.exception.SdkClientException; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -149,7 +153,8 @@ public MeterRegistry cloudWatchMeterRegistry( @Bean public EMFLoggingMeterRegistry emfLoggingMeterRegistry(final DataPrepperConfiguration dataPrepperConfiguration) { if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.EmbeddedMetricsFormat)) { - final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry(); + final EMFLoggingRegistryConfig config = createEMFLoggingRegistryConfig(dataPrepperConfiguration); + final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry(config); configureMetricRegistry( dataPrepperConfiguration.getMetricTags(), dataPrepperConfiguration.getMetricTagFilters(), dataPrepperConfiguration.getDisabledMetrics(), meterRegistry @@ -184,4 +189,18 @@ public CompositeMeterRegistry systemMeterRegistry( return compositeMeterRegistry; } + private EMFLoggingRegistryConfig createEMFLoggingRegistryConfig(final DataPrepperConfiguration dataPrepperConfiguration) { + return new EMFLoggingRegistryConfig() { + @Override + public String get(String key) { + return null; + } + + @Override + public Map additionalProperties() { + return Collections.unmodifiableMap(dataPrepperConfiguration.getEmfAdditionalProperties()); + } + }; + } + } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 496d9fb68a..d32489a7ef 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -56,6 +56,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC private Map metricTags = new HashMap<>(); private List metricTagFilters = new LinkedList<>(); private List disabledMetrics = new LinkedList<>(); + private EmfConfig emf = new EmfConfig(); private PeerForwarderConfiguration peerForwarderConfiguration; private Duration processorShutdownTimeout; private Duration sinkShutdownTimeout; @@ -93,6 +94,8 @@ public DataPrepperConfiguration( final List metricTagFilters, @JsonProperty("disabled_metrics") final List disabledMetrics, + @JsonProperty("emf_metrics") + final EmfConfig emf, @JsonProperty("peer_forwarder") final PeerForwarderConfiguration peerForwarderConfiguration, @JsonProperty("processor_shutdown_timeout") @JsonAlias("processorShutdownTimeout") @@ -126,6 +129,7 @@ public DataPrepperConfiguration( setServerPort(serverPort); this.peerForwarderConfiguration = peerForwarderConfiguration; this.disabledMetrics = disabledMetrics; + this.emf = emf != null ? emf : new EmfConfig(); this.processorShutdownTimeout = processorShutdownTimeout != null ? processorShutdownTimeout : DEFAULT_SHUTDOWN_DURATION; if (this.processorShutdownTimeout.isNegative()) { @@ -181,6 +185,10 @@ public List getDisabledMetrics() { return disabledMetrics != null ? disabledMetrics : Collections.emptyList(); } + public Map getEmfAdditionalProperties() { + return emf.getAdditionalProperties(); + } + private void setSsl(final Boolean ssl) { if (ssl != null) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/EmfConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/EmfConfig.java new file mode 100644 index 0000000000..f065f2c601 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/EmfConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.core.parser.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +public class EmfConfig { + @JsonProperty("additional_properties") + private Map additionalProperties = new HashMap<>(); + + public Map getAdditionalProperties() { + return additionalProperties; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistryTest.java index 51e4708dcf..8290a5a0e5 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistryTest.java @@ -28,6 +28,7 @@ import software.amazon.cloudwatchlogs.emf.model.Unit; import java.lang.reflect.Field; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -60,7 +61,7 @@ class EMFLoggingMeterRegistryTest { private final MockClock clock = new MockClock(); private final EMFLoggingMeterRegistry registry = spy( new EMFLoggingMeterRegistry( - EMFLoggingRegistryConfig.DEFAULT, new EnvironmentProvider().resolveEnvironment().join(), clock) + k -> null, new EnvironmentProvider().resolveEnvironment().join(), clock) ); private final EMFLoggingMeterRegistry.Snapshot registrySnapshot = registry.new Snapshot(); @@ -218,7 +219,7 @@ void snapshotFunctionCounterDataValid() { void snapshotFunctionCounterDataShouldClampInfiniteValues() { FunctionCounter functionCounter = FunctionCounter .builder("my.positive.infinity", Double.POSITIVE_INFINITY, Number::doubleValue).register(registry); - clock.add(EMFLoggingRegistryConfig.DEFAULT.step()); + clock.add(Duration.ofMinutes(1)); final List metricDataPoints1 = registrySnapshot .functionCounterData(functionCounter) .collect(Collectors.toList()); @@ -227,7 +228,7 @@ void snapshotFunctionCounterDataShouldClampInfiniteValues() { functionCounter = FunctionCounter .builder("my.negative.infinity", Double.NEGATIVE_INFINITY, Number::doubleValue).register(registry); - clock.add(EMFLoggingRegistryConfig.DEFAULT.step()); + clock.add(Duration.ofMinutes(1)); final List metricDataPoints2 = registrySnapshot .functionCounterData(functionCounter) .collect(Collectors.toList()); @@ -270,7 +271,7 @@ void snapshotShouldNotAddFunctionTimerDataWhenSumIsNaN() { final FunctionTimer functionTimer = FunctionTimer .builder("my.function.timer", Double.NaN, Number::longValue, Number::doubleValue, TimeUnit.MILLISECONDS) .register(registry); - clock.add(EMFLoggingRegistryConfig.DEFAULT.step()); + clock.add(Duration.ofMinutes(1)); final List metricDataPoints = registrySnapshot .functionTimerData(functionTimer) .collect(Collectors.toList()); @@ -356,6 +357,46 @@ private MetricsContext reflectivelyGetMetricsContext(final MetricsLogger metrics } } + @Test + void testAdditionalPropertiesAreAddedToMetricsLogger() { + final String randomKey1 = "testKey_" + System.currentTimeMillis(); + final String randomValue1 = "testValue_" + Math.random(); + final String randomKey2 = "anotherKey_" + System.nanoTime(); + final String randomValue2 = "anotherValue_" + Math.random(); + + final Map additionalProperties = Map.of(randomKey1, randomValue1, randomKey2, randomValue2); + final EMFLoggingRegistryConfig config = new EMFLoggingRegistryConfig() { + @Override + public String get(String key) { + return null; + } + + @Override + public Map additionalProperties() { + return additionalProperties; + } + }; + + final EMFLoggingMeterRegistry registryWithProperties = new EMFLoggingMeterRegistry( + config, new EnvironmentProvider().resolveEnvironment().join(), clock); + + registryWithProperties.gauge("test_gauge", 1.0); + final List metricsLoggers = registryWithProperties.metricsLoggers(); + + assertThat(metricsLoggers.size(), equalTo(1)); + + final MetricsLogger metricsLogger = metricsLoggers.get(0); + final MetricsContext context = reflectivelyGetMetricsContext(metricsLogger); + + // Verify properties exist + assertThat(context.getProperty(randomKey1), equalTo(randomValue1)); + assertThat(context.getProperty(randomKey2), equalTo(randomValue2)); + + // Verify properties are not dimensions + assertThat(hasDimension(context, randomKey1), is(false)); + assertThat(hasDimension(context, randomKey2), is(false)); + } + private boolean hasDimension(final MetricsContext context, final String key) { final List dimensionSetList = context.getDimensions(); for (final DimensionSet dimensionSet:dimensionSetList) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfigTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfigTest.java index edbe670013..1daf6b84de 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfigTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfigTest.java @@ -13,7 +13,7 @@ class EMFLoggingRegistryConfigTest { @Test void testDefault() { - final EMFLoggingRegistryConfig objectUnderTest = EMFLoggingRegistryConfig.DEFAULT; + final EMFLoggingRegistryConfig objectUnderTest = k -> null; assertThat(objectUnderTest.prefix(), equalTo("emf")); } } \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfigurationTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfigurationTests.java index 156543accf..d43744dd57 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfigurationTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfigurationTests.java @@ -275,4 +275,11 @@ void testConfigWithTestExtension() throws IOException { Map.of("test_extension", Map.of("test_attribute", "test_string")) )); } + + @Test + public void testGetEmfAdditionalPropertiesDefault() { + final DataPrepperConfiguration dataPrepperConfiguration = new DataPrepperConfiguration(); + assertThat(dataPrepperConfiguration.getEmfAdditionalProperties(), notNullValue()); + assertThat(dataPrepperConfiguration.getEmfAdditionalProperties().isEmpty(), equalTo(true)); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/EmfConfigTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/EmfConfigTest.java new file mode 100644 index 0000000000..c892df0cdd --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/EmfConfigTest.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.core.parser.model; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class EmfConfigTest { + + @Test + public void testDefaultConstructor() { + final EmfConfig emfConfiguration = new EmfConfig(); + assertThat(emfConfiguration.getAdditionalProperties(), equalTo(Collections.emptyMap())); + } +}