Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,8 @@ public class EMFLoggingMeterRegistry extends StepMeterRegistry {
private final Environment environment;
private static final WarnThenDebugLogger warnThenDebugLogger = new WarnThenDebugLogger(EMFLoggingMeterRegistry.class);

public EMFLoggingMeterRegistry() {
this(new EnvironmentProvider().resolveEnvironment().join());
}

public EMFLoggingMeterRegistry(final Environment environment) {
this(EMFLoggingRegistryConfig.DEFAULT, environment, Clock.SYSTEM);
public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config) {
this(config, new EnvironmentProvider().resolveEnvironment().join(), Clock.SYSTEM);
}

public EMFLoggingMeterRegistry(final EMFLoggingRegistryConfig config, final Environment environment, final Clock clock) {
Expand Down Expand Up @@ -124,13 +120,18 @@ private MetricsLogger prepareMetricsLogger(final List<Tag> tags, final Instant t
.setNamespace(NAMESPACE)
.setTimestamp(timestamp);
addDimensionSet(tags, metricsLogger);
addAdditionalProperties(metricsLogger);
return metricsLogger;
}

private void addDimensionSet(final List<Tag> tags, final MetricsLogger metricsLogger) {
metricsLogger.setDimensions(toDimensionSet(tags));
}

private void addAdditionalProperties(final MetricsLogger metricsLogger) {
config.additionalProperties().forEach(metricsLogger::putProperty);
}

private DimensionSet toDimensionSet(final List<Tag> tags) {
final DimensionSet dimensionSet = new DimensionSet();
tags.stream().filter(this::isAcceptableTag).forEach(tag -> dimensionSet.addDimension(tag.getKey(), tag.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.micrometer.core.instrument.step.StepRegistryConfig;

import java.util.Collections;
import java.util.Map;

public interface EMFLoggingRegistryConfig extends StepRegistryConfig {
EMFLoggingRegistryConfig DEFAULT = k -> null;

@Override
default String prefix() {
return "emf";
}

default Map<String, String> additionalProperties() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
import org.opensearch.dataprepper.core.meter.EMFLoggingRegistryConfig;
import org.opensearch.dataprepper.core.meter.JvmMemoryAggregateMetrics;
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.core.parser.model.MetricRegistryType;
Expand All @@ -28,10 +29,13 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.util.Map;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.core.exception.SdkClientException;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -149,7 +153,8 @@ public MeterRegistry cloudWatchMeterRegistry(
@Bean
public EMFLoggingMeterRegistry emfLoggingMeterRegistry(final DataPrepperConfiguration dataPrepperConfiguration) {
if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.EmbeddedMetricsFormat)) {
final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry();
final EMFLoggingRegistryConfig config = createEMFLoggingRegistryConfig(dataPrepperConfiguration);
final EMFLoggingMeterRegistry meterRegistry = new EMFLoggingMeterRegistry(config);
configureMetricRegistry(
dataPrepperConfiguration.getMetricTags(), dataPrepperConfiguration.getMetricTagFilters(),
dataPrepperConfiguration.getDisabledMetrics(), meterRegistry
Expand Down Expand Up @@ -184,4 +189,18 @@ public CompositeMeterRegistry systemMeterRegistry(
return compositeMeterRegistry;
}

private EMFLoggingRegistryConfig createEMFLoggingRegistryConfig(final DataPrepperConfiguration dataPrepperConfiguration) {
return new EMFLoggingRegistryConfig() {
@Override
public String get(String key) {
return null;
}

@Override
public Map<String, String> additionalProperties() {
return Collections.unmodifiableMap(dataPrepperConfiguration.getEmfAdditionalProperties());
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
private Map<String, String> metricTags = new HashMap<>();
private List<MetricTagFilter> metricTagFilters = new LinkedList<>();
private List<String> disabledMetrics = new LinkedList<>();
private EmfConfig emf = new EmfConfig();
private PeerForwarderConfiguration peerForwarderConfiguration;
private Duration processorShutdownTimeout;
private Duration sinkShutdownTimeout;
Expand Down Expand Up @@ -93,6 +94,8 @@ public DataPrepperConfiguration(
final List<MetricTagFilter> metricTagFilters,
@JsonProperty("disabled_metrics")
final List<String> disabledMetrics,
@JsonProperty("emf_metrics")
final EmfConfig emf,
@JsonProperty("peer_forwarder") final PeerForwarderConfiguration peerForwarderConfiguration,
@JsonProperty("processor_shutdown_timeout")
@JsonAlias("processorShutdownTimeout")
Expand Down Expand Up @@ -126,6 +129,7 @@ public DataPrepperConfiguration(
setServerPort(serverPort);
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.disabledMetrics = disabledMetrics;
this.emf = emf != null ? emf : new EmfConfig();

this.processorShutdownTimeout = processorShutdownTimeout != null ? processorShutdownTimeout : DEFAULT_SHUTDOWN_DURATION;
if (this.processorShutdownTimeout.isNegative()) {
Expand Down Expand Up @@ -181,6 +185,10 @@ public List<String> getDisabledMetrics() {
return disabledMetrics != null ? disabledMetrics : Collections.emptyList();
}

public Map<String, String> getEmfAdditionalProperties() {
return emf.getAdditionalProperties();
}


private void setSsl(final Boolean ssl) {
if (ssl != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Comment thread
dlvenable marked this conversation as resolved.
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.core.parser.model;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
import java.util.Map;

public class EmfConfig {
@JsonProperty("additional_properties")
private Map<String, String> additionalProperties = new HashMap<>();

public Map<String, String> getAdditionalProperties() {
return additionalProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.cloudwatchlogs.emf.model.Unit;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -60,7 +61,7 @@ class EMFLoggingMeterRegistryTest {
private final MockClock clock = new MockClock();
private final EMFLoggingMeterRegistry registry = spy(
new EMFLoggingMeterRegistry(
EMFLoggingRegistryConfig.DEFAULT, new EnvironmentProvider().resolveEnvironment().join(), clock)
k -> null, new EnvironmentProvider().resolveEnvironment().join(), clock)
);
private final EMFLoggingMeterRegistry.Snapshot registrySnapshot = registry.new Snapshot();

Expand Down Expand Up @@ -218,7 +219,7 @@ void snapshotFunctionCounterDataValid() {
void snapshotFunctionCounterDataShouldClampInfiniteValues() {
FunctionCounter functionCounter = FunctionCounter
.builder("my.positive.infinity", Double.POSITIVE_INFINITY, Number::doubleValue).register(registry);
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
clock.add(Duration.ofMinutes(1));
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints1 = registrySnapshot
.functionCounterData(functionCounter)
.collect(Collectors.toList());
Expand All @@ -227,7 +228,7 @@ void snapshotFunctionCounterDataShouldClampInfiniteValues() {

functionCounter = FunctionCounter
.builder("my.negative.infinity", Double.NEGATIVE_INFINITY, Number::doubleValue).register(registry);
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
clock.add(Duration.ofMinutes(1));
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints2 = registrySnapshot
.functionCounterData(functionCounter)
.collect(Collectors.toList());
Expand Down Expand Up @@ -270,7 +271,7 @@ void snapshotShouldNotAddFunctionTimerDataWhenSumIsNaN() {
final FunctionTimer functionTimer = FunctionTimer
.builder("my.function.timer", Double.NaN, Number::longValue, Number::doubleValue, TimeUnit.MILLISECONDS)
.register(registry);
clock.add(EMFLoggingRegistryConfig.DEFAULT.step());
clock.add(Duration.ofMinutes(1));
final List<EMFLoggingMeterRegistry.MetricDataPoint> metricDataPoints = registrySnapshot
.functionTimerData(functionTimer)
.collect(Collectors.toList());
Expand Down Expand Up @@ -356,6 +357,46 @@ private MetricsContext reflectivelyGetMetricsContext(final MetricsLogger metrics
}
}

@Test
void testAdditionalPropertiesAreAddedToMetricsLogger() {
final String randomKey1 = "testKey_" + System.currentTimeMillis();
final String randomValue1 = "testValue_" + Math.random();
final String randomKey2 = "anotherKey_" + System.nanoTime();
final String randomValue2 = "anotherValue_" + Math.random();

final Map<String, String> additionalProperties = Map.of(randomKey1, randomValue1, randomKey2, randomValue2);
final EMFLoggingRegistryConfig config = new EMFLoggingRegistryConfig() {
@Override
public String get(String key) {
return null;
}

@Override
public Map<String, String> additionalProperties() {
return additionalProperties;
}
};

final EMFLoggingMeterRegistry registryWithProperties = new EMFLoggingMeterRegistry(
config, new EnvironmentProvider().resolveEnvironment().join(), clock);

registryWithProperties.gauge("test_gauge", 1.0);
final List<MetricsLogger> metricsLoggers = registryWithProperties.metricsLoggers();

assertThat(metricsLoggers.size(), equalTo(1));

final MetricsLogger metricsLogger = metricsLoggers.get(0);
final MetricsContext context = reflectivelyGetMetricsContext(metricsLogger);

// Verify properties exist
assertThat(context.getProperty(randomKey1), equalTo(randomValue1));
assertThat(context.getProperty(randomKey2), equalTo(randomValue2));

// Verify properties are not dimensions
assertThat(hasDimension(context, randomKey1), is(false));
assertThat(hasDimension(context, randomKey2), is(false));
}

private boolean hasDimension(final MetricsContext context, final String key) {
final List<DimensionSet> dimensionSetList = context.getDimensions();
for (final DimensionSet dimensionSet:dimensionSetList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class EMFLoggingRegistryConfigTest {
@Test
void testDefault() {
final EMFLoggingRegistryConfig objectUnderTest = EMFLoggingRegistryConfig.DEFAULT;
final EMFLoggingRegistryConfig objectUnderTest = k -> null;
assertThat(objectUnderTest.prefix(), equalTo("emf"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,11 @@ void testConfigWithTestExtension() throws IOException {
Map.of("test_extension", Map.of("test_attribute", "test_string"))
));
}

@Test
public void testGetEmfAdditionalPropertiesDefault() {
final DataPrepperConfiguration dataPrepperConfiguration = new DataPrepperConfiguration();
assertThat(dataPrepperConfiguration.getEmfAdditionalProperties(), notNullValue());
assertThat(dataPrepperConfiguration.getEmfAdditionalProperties().isEmpty(), equalTo(true));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Comment thread
dlvenable marked this conversation as resolved.
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.core.parser.model;

import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class EmfConfigTest {

@Test
public void testDefaultConstructor() {
final EmfConfig emfConfiguration = new EmfConfig();
assertThat(emfConfiguration.getAdditionalProperties(), equalTo(Collections.emptyMap()));
}
}
Loading