Skip to content

Commit 6f71e89

Browse files
committed
Add EMF config that enables adding extra properties to the EMF record
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 1b4979c commit 6f71e89

9 files changed

Lines changed: 142 additions & 13 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,8 @@ public class EMFLoggingMeterRegistry extends StepMeterRegistry {
6565
private final Environment environment;
6666
private static final WarnThenDebugLogger warnThenDebugLogger = new WarnThenDebugLogger(EMFLoggingMeterRegistry.class);
6767

68-
public EMFLoggingMeterRegistry() {
69-
this(new EnvironmentProvider().resolveEnvironment().join());
70-
}
71-
72-
public EMFLoggingMeterRegistry(final Environment environment) {
73-
this(EMFLoggingRegistryConfig.DEFAULT, environment, Clock.SYSTEM);
68+
public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config) {
69+
this(config, new EnvironmentProvider().resolveEnvironment().join(), Clock.SYSTEM);
7470
}
7571

7672
public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config, final Environment environment, final Clock clock) {
@@ -124,13 +120,18 @@ private MetricsLogger prepareMetricsLogger(final List<Tag> tags, final Instant t
124120
.setNamespace(NAMESPACE)
125121
.setTimestamp(timestamp);
126122
addDimensionSet(tags, metricsLogger);
123+
addAdditionalProperties(metricsLogger);
127124
return metricsLogger;
128125
}
129126

130127
private void addDimensionSet(final List<Tag> tags, final MetricsLogger metricsLogger) {
131128
metricsLogger.setDimensions(toDimensionSet(tags));
132129
}
133130

131+
private void addAdditionalProperties(final MetricsLogger metricsLogger) {
132+
config.additionalProperties().forEach(metricsLogger::putProperty);
133+
}
134+
134135
private DimensionSet toDimensionSet(final List<Tag> tags) {
135136
final DimensionSet dimensionSet = new DimensionSet();
136137
tags.stream().filter(this::isAcceptableTag).forEach(tag -> dimensionSet.addDimension(tag.getKey(), tag.getValue()));

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@
77

88
import io.micrometer.core.instrument.step.StepRegistryConfig;
99

10+
import java.util.Collections;
11+
import java.util.Map;
12+
1013
public interface EMFLoggingRegistryConfig extends StepRegistryConfig {
11-
EMFLoggingRegistryConfig DEFAULT = k -> null;
1214

1315
@Override
1416
default String prefix() {
1517
return "emf";
1618
}
19+
20+
default Map<String, String> additionalProperties() {
21+
return Collections.emptyMap();
22+
}
1723
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/MetricsConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.micrometer.prometheus.PrometheusConfig;
2121
import io.micrometer.prometheus.PrometheusMeterRegistry;
2222
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
23+
import org.opensearch.dataprepper.core.meter.EMFLoggingRegistryConfig;
2324
import org.opensearch.dataprepper.core.meter.JvmMemoryAggregateMetrics;
2425
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
2526
import org.opensearch.dataprepper.core.parser.model.MetricRegistryType;
@@ -28,10 +29,13 @@
2829
import org.slf4j.LoggerFactory;
2930
import org.springframework.beans.factory.annotation.Autowired;
3031
import org.springframework.context.annotation.Bean;
32+
33+
import java.util.Map;
3134
import org.springframework.context.annotation.Configuration;
3235
import software.amazon.awssdk.core.exception.SdkClientException;
3336

3437
import javax.annotation.Nullable;
38+
import java.util.Collections;
3539
import java.util.List;
3640
import java.util.Map;
3741

@@ -149,7 +153,8 @@ public MeterRegistry cloudWatchMeterRegistry(
149153
@Bean
150154
public EMFLoggingMeterRegistry emfLoggingMeterRegistry(final DataPrepperConfiguration dataPrepperConfiguration) {
151155
if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.EmbeddedMetricsFormat)) {
152-
final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry();
156+
final EMFLoggingRegistryConfig config = createEMFLoggingRegistryConfig(dataPrepperConfiguration);
157+
final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry(config);
153158
configureMetricRegistry(
154159
dataPrepperConfiguration.getMetricTags(), dataPrepperConfiguration.getMetricTagFilters(),
155160
dataPrepperConfiguration.getDisabledMetrics(), meterRegistry
@@ -184,4 +189,13 @@ public CompositeMeterRegistry systemMeterRegistry(
184189
return compositeMeterRegistry;
185190
}
186191

192+
private EMFLoggingRegistryConfig createEMFLoggingRegistryConfig(final DataPrepperConfiguration dataPrepperConfiguration) {
193+
return new EMFLoggingRegistryConfig() {
194+
@Override
195+
public Map<String, String> additionalProperties() {
196+
return Collections.unmodifiableMap(dataPrepperConfiguration.getEmfAdditionalProperties());
197+
}
198+
};
199+
}
200+
187201
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
5656
private Map<String, String> metricTags = new HashMap<>();
5757
private List<MetricTagFilter> metricTagFilters = new LinkedList<>();
5858
private List<String> disabledMetrics = new LinkedList<>();
59+
private EmfConfig emf = new EmfConfig();
5960
private PeerForwarderConfiguration peerForwarderConfiguration;
6061
private Duration processorShutdownTimeout;
6162
private Duration sinkShutdownTimeout;
@@ -93,6 +94,8 @@ public DataPrepperConfiguration(
9394
final List<MetricTagFilter> metricTagFilters,
9495
@JsonProperty("disabled_metrics")
9596
final List<String> disabledMetrics,
97+
@JsonProperty("emf_metrics")
98+
final EmfConfig emf,
9699
@JsonProperty("peer_forwarder") final PeerForwarderConfiguration peerForwarderConfiguration,
97100
@JsonProperty("processor_shutdown_timeout")
98101
@JsonAlias("processorShutdownTimeout")
@@ -126,6 +129,7 @@ public DataPrepperConfiguration(
126129
setServerPort(serverPort);
127130
this.peerForwarderConfiguration = peerForwarderConfiguration;
128131
this.disabledMetrics = disabledMetrics;
132+
this.emf = emf != null ? emf : new EmfConfig();
129133

130134
this.processorShutdownTimeout = processorShutdownTimeout != null ? processorShutdownTimeout : DEFAULT_SHUTDOWN_DURATION;
131135
if (this.processorShutdownTimeout.isNegative()) {
@@ -181,6 +185,10 @@ public List<String> getDisabledMetrics() {
181185
return disabledMetrics != null ? disabledMetrics : Collections.emptyList();
182186
}
183187

188+
public Map<String, String> getEmfAdditionalProperties() {
189+
return emf.getAdditionalProperties();
190+
}
191+
184192

185193
private void setSsl(final Boolean ssl) {
186194
if (ssl != null) {
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.core.parser.model;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class EmfConfig {
19+
@JsonProperty("additional_properties")
20+
private Map<String, String> additionalProperties = new HashMap<>();
21+
22+
public Map<String, String> getAdditionalProperties() {
23+
return additionalProperties;
24+
}
25+
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistryTest.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import software.amazon.cloudwatchlogs.emf.model.Unit;
2929

3030
import java.lang.reflect.Field;
31+
import java.time.Duration;
3132
import java.util.Arrays;
3233
import java.util.Collections;
3334
import java.util.List;
@@ -60,7 +61,7 @@ class EMFLoggingMeterRegistryTest {
6061
private final MockClock clock = new MockClock();
6162
private final EMFLoggingMeterRegistry registry = spy(
6263
new EMFLoggingMeterRegistry(
63-
EMFLoggingRegistryConfig.DEFAULT, new EnvironmentProvider().resolveEnvironment().join(), clock)
64+
k -> null, new EnvironmentProvider().resolveEnvironment().join(), clock)
6465
);
6566
private final EMFLoggingMeterRegistry.Snapshot registrySnapshot = registry.new Snapshot();
6667

@@ -218,7 +219,7 @@ void snapshotFunctionCounterDataValid() {
218219
void snapshotFunctionCounterDataShouldClampInfiniteValues() {
219220
FunctionCounter functionCounter = FunctionCounter
220221
.builder("my.positive.infinity", Double.POSITIVE_INFINITY, Number::doubleValue).register(registry);
221-
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
222+
clock.add(Duration.ofMinutes(1));
222223
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints1 = registrySnapshot
223224
.functionCounterData(functionCounter)
224225
.collect(Collectors.toList());
@@ -227,7 +228,7 @@ void snapshotFunctionCounterDataShouldClampInfiniteValues() {
227228

228229
functionCounter = FunctionCounter
229230
.builder("my.negative.infinity", Double.NEGATIVE_INFINITY, Number::doubleValue).register(registry);
230-
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
231+
clock.add(Duration.ofMinutes(1));
231232
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints2 = registrySnapshot
232233
.functionCounterData(functionCounter)
233234
.collect(Collectors.toList());
@@ -270,7 +271,7 @@ void snapshotShouldNotAddFunctionTimerDataWhenSumIsNaN() {
270271
final FunctionTimer functionTimer = FunctionTimer
271272
.builder("my.function.timer", Double.NaN, Number::longValue, Number::doubleValue, TimeUnit.MILLISECONDS)
272273
.register(registry);
273-
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
274+
clock.add(Duration.ofMinutes(1));
274275
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints = registrySnapshot
275276
.functionTimerData(functionTimer)
276277
.collect(Collectors.toList());
@@ -356,6 +357,46 @@ private MetricsContext reflectivelyGetMetricsContext(final MetricsLogger metrics
356357
}
357358
}
358359

360+
@Test
361+
void testAdditionalPropertiesAreAddedToMetricsLogger() {
362+
final String randomKey1 = "testKey_" + System.currentTimeMillis();
363+
final String randomValue1 = "testValue_" + Math.random();
364+
final String randomKey2 = "anotherKey_" + System.nanoTime();
365+
final String randomValue2 = "anotherValue_" + Math.random();
366+
367+
final Map<String, String> additionalProperties = Map.of(randomKey1, randomValue1, randomKey2, randomValue2);
368+
final EMFLoggingRegistryConfig config = new EMFLoggingRegistryConfig() {
369+
@Override
370+
public String get(String key) {
371+
return null;
372+
}
373+
374+
@Override
375+
public Map<String, String> additionalProperties() {
376+
return additionalProperties;
377+
}
378+
};
379+
380+
final EMFLoggingMeterRegistry registryWithProperties = new EMFLoggingMeterRegistry(
381+
config, new EnvironmentProvider().resolveEnvironment().join(), clock);
382+
383+
registryWithProperties.gauge("test_gauge", 1.0);
384+
final List<MetricsLogger> metricsLoggers = registryWithProperties.metricsLoggers();
385+
386+
assertThat(metricsLoggers.size(), equalTo(1));
387+
388+
final MetricsLogger metricsLogger = metricsLoggers.get(0);
389+
final MetricsContext context = reflectivelyGetMetricsContext(metricsLogger);
390+
391+
// Verify properties exist
392+
assertThat(context.getProperty(randomKey1), equalTo(randomValue1));
393+
assertThat(context.getProperty(randomKey2), equalTo(randomValue2));
394+
395+
// Verify properties are not dimensions
396+
assertThat(hasDimension(context, randomKey1), is(false));
397+
assertThat(hasDimension(context, randomKey2), is(false));
398+
}
399+
359400
private boolean hasDimension(final MetricsContext context, final String key) {
360401
final List<DimensionSet> dimensionSetList = context.getDimensions();
361402
for (final DimensionSet dimensionSet:dimensionSetList) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/meter/EMFLoggingRegistryConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
class EMFLoggingRegistryConfigTest {
1414
@Test
1515
void testDefault() {
16-
final EMFLoggingRegistryConfig objectUnderTest = EMFLoggingRegistryConfig.DEFAULT;
16+
final EMFLoggingRegistryConfig objectUnderTest = k -> null;
1717
assertThat(objectUnderTest.prefix(), equalTo("emf"));
1818
}
1919
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfigurationTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,11 @@ void testConfigWithTestExtension() throws IOException {
275275
Map.of("test_extension", Map.of("test_attribute", "test_string"))
276276
));
277277
}
278+
279+
@Test
280+
public void testGetEmfAdditionalPropertiesDefault() {
281+
final DataPrepperConfiguration dataPrepperConfiguration = new DataPrepperConfiguration();
282+
assertThat(dataPrepperConfiguration.getEmfAdditionalProperties(), notNullValue());
283+
assertThat(dataPrepperConfiguration.getEmfAdditionalProperties().isEmpty(), equalTo(true));
284+
}
278285
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.core.parser.model;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import java.util.Collections;
16+
17+
import static org.hamcrest.CoreMatchers.equalTo;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
public class EmfConfigTest {
21+
22+
@Test
23+
public void testDefaultConstructor() {
24+
final EmfConfig emfConfiguration = new EmfConfig();
25+
assertThat(emfConfiguration.getAdditionalProperties(), equalTo(Collections.emptyMap()));
26+
}
27+
}

0 commit comments

Comments
 (0)