diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/host/HostContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/host/HostContext.java new file mode 100644 index 0000000000..fa2eeb546d --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/host/HostContext.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.host; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; + +/** + * Provides the hostname of the current Data Prepper instance. + * This is intended as a shared utility so that hostname resolution + * is consistent across all components (processors, source coordinators, etc.). + */ +public class HostContext { + + private static final Logger LOG = LoggerFactory.getLogger(HostContext.class); + private static final String UNKNOWN_HOST = "unknown"; + private static final String HOSTNAME = resolveHostname(); + + static String resolveHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (final Exception e) { + LOG.warn("Failed to resolve hostname, using '{}': {}", UNKNOWN_HOST, e.getMessage()); + return UNKNOWN_HOST; + } + } + + /** + * Returns the hostname of the current Data Prepper host. + * + * @return the hostname + */ + public static String getHostname() { + return HOSTNAME; + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/host/HostContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/host/HostContextTest.java new file mode 100644 index 0000000000..7ccee0efc8 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/host/HostContextTest.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.host; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.emptyString; +import static org.mockito.Mockito.mockStatic; + +class HostContextTest { + + @Test + void getHostname_returns_non_null_non_empty_value() { + final String hostname = HostContext.getHostname(); + assertThat(hostname, notNullValue()); + assertThat(hostname, not(emptyString())); + } + + @Test + void getHostname_returns_consistent_value() { + final String first = HostContext.getHostname(); + final String second = HostContext.getHostname(); + assertThat(first, equalTo(second)); + } + + @Test + void getHostname_matches_InetAddress_hostname() throws UnknownHostException { + final String expected = InetAddress.getLocalHost().getHostName(); + assertThat(HostContext.getHostname(), equalTo(expected)); + } + + @Test + void resolveHostname_returns_valid_hostname() throws UnknownHostException { + final String hostname = HostContext.resolveHostname(); + assertThat(hostname, equalTo(InetAddress.getLocalHost().getHostName())); + } + + @Test + void resolveHostname_returns_unknown_when_hostname_cannot_be_resolved() { + try (final MockedStatic inetAddressMock = mockStatic(InetAddress.class)) { + inetAddressMock.when(InetAddress::getLocalHost) + .thenThrow(new UnknownHostException("test exception")); + + assertThat(HostContext.resolveHostname(), equalTo("unknown")); + } + } + + @Test + void constructor_can_be_created() { + final HostContext hostContext = new HostContext(); + assertThat(hostContext, notNullValue()); + } +} diff --git a/data-prepper-plugins/otel-apm-service-map-processor/README.md b/data-prepper-plugins/otel-apm-service-map-processor/README.md index 9ae39c9394..81fb2c682c 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/README.md +++ b/data-prepper-plugins/otel-apm-service-map-processor/README.md @@ -34,6 +34,8 @@ processor: | `window_duration` | Duration | `60s` | Fixed time window in seconds for evaluating APM service map relationships | | `db_path` | String | `"data/otel-apm-service-map/"` | Directory path for database files storing transient processing data | | `group_by_attributes` | List\ | `[]` | OpenTelemetry resource attributes to include in service grouping | +| `metric_timestamp_source` | String | `"arrival_time"` | Timestamp source for emitted metrics. `"arrival_time"` uses processing time at window evaluation (avoids late-span data loss in Prometheus/AMP). `"span_end_time"` uses the span's `endTime` field. | +| `metric_timestamp_granularity` | String | `"seconds"` | Truncation granularity for metric and service map timestamps. `"seconds"` truncates to second boundaries (1s collision window). `"minutes"` truncates to minute boundaries (60s collision window). | ### Advanced Configuration @@ -42,6 +44,8 @@ processor: - otel_apm_service_map: window_duration: 120s # 2-minute windows for high-latency services db_path: "/tmp/apm-service-map/" + metric_timestamp_source: arrival_time + metric_timestamp_granularity: seconds group_by_attributes: - "service.version" - "deployment.environment" @@ -49,6 +53,28 @@ processor: - "k8s.cluster.name" ``` +### Metric Timestamp Source + +The `metric_timestamp_source` option controls what timestamp is used for emitted metrics. + +| Value | Timestamp used | Late-span safe | Description | +|---|---|---|---| +| `arrival_time` (default) | `clock.instant()` at window evaluation | Yes | All spans in a window share the same processing timestamp. Matches the [OTel Collector spanmetrics connector](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/spanmetricsconnector) approach. | +| `span_end_time` | Span's `endTime` field | No | Each span's end time is used. Late-arriving spans may produce metrics with timestamps that collide with previously written data points, causing silent data loss in Prometheus/AMP. | + +**Recommendation:** Use the default `arrival_time` unless you have a specific requirement for span-aligned timestamps and accept the risk of late-span data loss. + +### Metric Timestamp Granularity + +The `metric_timestamp_granularity` option controls the truncation granularity for all emitted timestamps (metrics and service map events). + +| Value | Collision window (`span_end_time` mode) | Data points per window | Description | +|---|---|---|---| +| `seconds` (default) | 1 second | More (one per unique second) | Truncates to second boundaries. Minimizes collision risk in `span_end_time` mode. | +| `minutes` | 60 seconds | Fewer (one per unique minute) | Truncates to minute boundaries. Higher collision risk but fewer data points. | + +In `arrival_time` mode, granularity has minimal impact since all spans in a window share the same `clock.instant()` — each window always produces one data point per label combination regardless of truncation. + ## Pipeline Examples ### Basic Pipeline diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularity.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularity.java new file mode 100644 index 0000000000..feddd103ac --- /dev/null +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularity.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum MetricTimestampGranularity { + SECONDS("seconds", ChronoUnit.SECONDS), + MINUTES("minutes", ChronoUnit.MINUTES); + + private static final Map OPTIONS_MAP = Arrays.stream(MetricTimestampGranularity.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + private final ChronoUnit chronoUnit; + + MetricTimestampGranularity(final String option, final ChronoUnit chronoUnit) { + this.option = option; + this.chronoUnit = chronoUnit; + } + + public String getOption() { + return option; + } + + public ChronoUnit getChronoUnit() { + return chronoUnit; + } + + @JsonCreator + public static MetricTimestampGranularity fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSource.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSource.java new file mode 100644 index 0000000000..5d8bf8493a --- /dev/null +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSource.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum MetricTimestampSource { + ARRIVAL_TIME("arrival_time"), + SPAN_END_TIME("span_end_time"); + + private static final Map OPTIONS_MAP = Arrays.stream(MetricTimestampSource.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + + MetricTimestampSource(final String option) { + this.option = option; + } + + public String getOption() { + return option; + } + + @JsonCreator + public static MetricTimestampSource fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java index 3186306d01..28a2131de1 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java @@ -45,9 +45,14 @@ import org.slf4j.LoggerFactory; import java.io.File; +import org.opensearch.dataprepper.model.host.HostContext; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -92,7 +97,10 @@ public class OTelApmServiceMapProcessor extends AbstractProcessor, private static Clock clock; private final int thisProcessorId; + private final String hostId; private final List groupByAttributes; + private final MetricTimestampSource metricTimestampSource; + private final MetricTimestampGranularity metricTimestampGranularity; private final EventFactory eventFactory; @DataPrepperPluginConstructor @@ -107,7 +115,9 @@ public OTelApmServiceMapProcessor( pipelineDescription.getNumberOfProcessWorkers(), eventFactory, pluginMetrics, - config.getGroupByAttributes()); + config.getGroupByAttributes(), + config.getMetricTimestampSource(), + config.getMetricTimestampGranularity()); } OTelApmServiceMapProcessor(final Duration windowDuration, @@ -116,7 +126,8 @@ public OTelApmServiceMapProcessor( final int processWorkers, final EventFactory eventFactory, final PluginMetrics pluginMetrics) { - this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics, Collections.emptyList()); + this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics, + Collections.emptyList(), MetricTimestampSource.SPAN_END_TIME, MetricTimestampGranularity.SECONDS); } OTelApmServiceMapProcessor(final Duration windowDuration, @@ -126,9 +137,37 @@ public OTelApmServiceMapProcessor( final EventFactory eventFactory, final PluginMetrics pluginMetrics, final List groupByAttributes) { + this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics, + groupByAttributes, MetricTimestampSource.SPAN_END_TIME, MetricTimestampGranularity.SECONDS); + } + + OTelApmServiceMapProcessor(final Duration windowDuration, + final File databasePath, + final Clock clock, + final int processWorkers, + final EventFactory eventFactory, + final PluginMetrics pluginMetrics, + final List groupByAttributes, + final MetricTimestampSource metricTimestampSource) { + this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics, + groupByAttributes, metricTimestampSource, MetricTimestampGranularity.SECONDS); + } + + OTelApmServiceMapProcessor(final Duration windowDuration, + final File databasePath, + final Clock clock, + final int processWorkers, + final EventFactory eventFactory, + final PluginMetrics pluginMetrics, + final List groupByAttributes, + final MetricTimestampSource metricTimestampSource, + final MetricTimestampGranularity metricTimestampGranularity) { super(pluginMetrics); + this.hostId = resolveHostId(); this.groupByAttributes = groupByAttributes != null ? Collections.unmodifiableList(groupByAttributes) : Collections.emptyList(); + this.metricTimestampSource = metricTimestampSource != null ? metricTimestampSource : MetricTimestampSource.ARRIVAL_TIME; + this.metricTimestampGranularity = metricTimestampGranularity != null ? metricTimestampGranularity : MetricTimestampGranularity.SECONDS; this.eventFactory = eventFactory; OTelApmServiceMapProcessor.clock = clock; @@ -375,7 +414,9 @@ private Collection> processCurrentWindowSpans() { final EphemeralSpanDecorations ephemeralDecorations = new EphemeralSpanDecorations(); - final Map metricsStateByKey = new HashMap<>(); + final Map sumStateByKey = new HashMap<>(); + final Map histogramStateByKey = new HashMap<>(); + final Set dedupedNodeDetails = new HashSet<>(); final Map> previousSpansByTraceId = buildSpansByTraceIdMap(previousWindow); final Map> currentSpansByTraceId = buildSpansByTraceIdMap(currentWindow); @@ -388,11 +429,24 @@ private Collection> processCurrentWindowSpans() { if (!traceData.getProcessingSpans().isEmpty()) { decorateSpansInTraceWithEphemeralStorage(traceData); - apmEvents.addAll(generateNodeOperationDetailEvents(traceData, currentTime, metricsStateByKey)); + generateNodeOperationDetailEvents(traceData, currentTime, sumStateByKey, histogramStateByKey, dedupedNodeDetails); } } - final List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(metricsStateByKey); + // Convert deduped NodeOperationDetails to events + for (NodeOperationDetail detail : dedupedNodeDetails) { + final EventMetadata eventMetadata = new DefaultEventMetadata.Builder() + .withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build(); + + final Event event = eventFactory.eventBuilder(EventBuilder.class) + .withEventMetadata(eventMetadata) + .withData(detail) + .build(); + + apmEvents.add(new Record<>(event)); + } + + final List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(sumStateByKey, histogramStateByKey); metrics.sort(Comparator.comparing(JacksonMetric::getTime)); final List> apmEventsSorted = new ArrayList<>(); @@ -444,15 +498,23 @@ private Map extractGroupByAttributes(final Span span) { } /** - * Get anchor timestamp from span's endTime, truncated to minute boundary + * Get anchor timestamp for metrics, truncated to the specified unit. + * When metric_timestamp_source is ARRIVAL_TIME, uses fallbackTime (clock.instant()). + * When metric_timestamp_source is SPAN_END_TIME, uses the span's endTime field. * * @param spanStateData The span to extract timestamp from - * @param fallbackTime Current system time to use if span endTime is null - * @return Instant truncated to the lower 1-minute boundary + * @param fallbackTime Current system time to use as arrival time or if span endTime is null + * @param truncationUnit The ChronoUnit to truncate the timestamp to + * @return Instant truncated to the specified boundary */ - private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, final Instant fallbackTime) { - Instant timestamp = fallbackTime; // Default to current system time + private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, final Instant fallbackTime, + final ChronoUnit truncationUnit) { + if (metricTimestampSource == MetricTimestampSource.ARRIVAL_TIME) { + return fallbackTime.truncatedTo(truncationUnit); + } + // SPAN_END_TIME mode: parse span's endTime, fall back to system time + Instant timestamp = fallbackTime; final String endTime = spanStateData.getEndTime(); try { if (endTime != null && !endTime.isEmpty()) { @@ -463,7 +525,23 @@ private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, fi endTime, e.getMessage()); } - return timestamp.truncatedTo(java.time.temporal.ChronoUnit.MINUTES); + return timestamp.truncatedTo(truncationUnit); + } + + /** + * Resolve a stable host identifier for this Data Prepper instance. + * Uses a truncated SHA-256 hash of the hostname (from {@link HostContext}) + * to ensure uniqueness without revealing the actual hostname in emitted metrics. + */ + private String resolveHostId() { + try { + final String hostname = HostContext.getHostname(); + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + final byte[] hash = digest.digest(hostname.getBytes(StandardCharsets.UTF_8)); + return Hex.encodeHexString(hash).substring(0, 16); + } catch (final java.security.NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 algorithm not available", e); + } } /** @@ -718,11 +796,11 @@ private void decorateServerSpansSecondPassWithEphemeralStorage(final ThreeWindow * @param metricsStateByKey Shared map for metric aggregation across all traces * @return Collection of NodeOperationDetail events */ - private Collection> generateNodeOperationDetailEvents(final ThreeWindowTraceDataWithDecorations traceData, - final Instant currentTime, - final Map metricsStateByKey) { - final Collection> events = new HashSet<>(); - + private void generateNodeOperationDetailEvents(final ThreeWindowTraceDataWithDecorations traceData, + final Instant currentTime, + final Map sumStateByKey, + final Map histogramStateByKey, + final Set dedupedNodeDetails) { // Step 1: CLIENT spans — primary emission path for (SpanStateData clientSpan : traceData.getProcessingSpans()) { if (SPAN_KIND_CLIENT.equals(clientSpan.getSpanKind())) { @@ -746,24 +824,18 @@ private Collection> generateNodeOperationDetailEvents(final ThreeW : null; final Operation targetOp = new Operation(decoration.getRemoteOperation()); - final Instant anchorTimestamp = getAnchorTimestampFromSpan(clientSpan, currentTime); + final Instant anchor = getAnchorTimestampFromSpan(clientSpan, currentTime, + metricTimestampGranularity.getChronoUnit()); final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail( - sourceNode, targetNode, sourceOp, targetOp, anchorTimestamp); - - final EventMetadata eventMetadata = new DefaultEventMetadata.Builder() - .withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build(); + sourceNode, targetNode, sourceOp, targetOp, anchor); - final Event event = eventFactory.eventBuilder(EventBuilder.class) - .withEventMetadata(eventMetadata) - .withData(nodeOperationDetail) - .build(); - - events.add(new Record<>(event)); + dedupedNodeDetails.add(nodeOperationDetail); if (decoration.getParentServerOperationName() != null) { ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - clientSpan, decoration, currentTime, metricsStateByKey, anchorTimestamp); + clientSpan, decoration, currentTime, sumStateByKey, histogramStateByKey, + anchor, hostId); } } } @@ -772,9 +844,11 @@ private Collection> generateNodeOperationDetailEvents(final ThreeW // Step 2: SERVER spans — metrics for all, leaf NodeOperationDetail for those with no CLIENT descendants for (SpanStateData serverSpan : traceData.getProcessingSpans()) { if (SPAN_KIND_SERVER.equals(serverSpan.getSpanKind())) { - final Instant anchorTimestamp = getAnchorTimestampFromSpan(serverSpan, currentTime); + final Instant anchor = getAnchorTimestampFromSpan(serverSpan, currentTime, + metricTimestampGranularity.getChronoUnit()); ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - serverSpan, currentTime, metricsStateByKey, anchorTimestamp); + serverSpan, currentTime, sumStateByKey, histogramStateByKey, + anchor, hostId); final ServerSpanDecoration decoration = traceData.getDecorations().getServerDecoration(serverSpan.getSpanId()); @@ -788,22 +862,12 @@ private Collection> generateNodeOperationDetailEvents(final ThreeW final Operation sourceOp = new Operation(serverSpan.getOperationName()); final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail( - sourceNode, null, sourceOp, null, anchorTimestamp); - - final EventMetadata eventMetadata = new DefaultEventMetadata.Builder() - .withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build(); + sourceNode, null, sourceOp, null, anchor); - final Event event = eventFactory.eventBuilder(EventBuilder.class) - .withEventMetadata(eventMetadata) - .withData(nodeOperationDetail) - .build(); - - events.add(new Record<>(event)); + dedupedNodeDetails.add(nodeOperationDetail); } } } - - return events; } /** diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfig.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfig.java index 08f855315d..52c68765c7 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfig.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfig.java @@ -43,6 +43,16 @@ public class OTelApmServiceMapProcessorConfig { "when present on the span's resource attributes. Only applied to primary Service objects, not dependency services.") private List groupByAttributes = Collections.emptyList(); + @JsonProperty("metric_timestamp_source") + @JsonPropertyDescription("The timestamp source for emitted metrics. 'arrival_time' (default) uses processing time " + + "at window evaluation, avoiding late-span data loss in Prometheus/AMP. 'span_end_time' uses the span's endTime field.") + private MetricTimestampSource metricTimestampSource = MetricTimestampSource.ARRIVAL_TIME; + + @JsonProperty("metric_timestamp_granularity") + @JsonPropertyDescription("The truncation granularity for metric and service map timestamps. " + + "'seconds' (default) truncates to second boundaries. 'minutes' truncates to minute boundaries.") + private MetricTimestampGranularity metricTimestampGranularity = MetricTimestampGranularity.SECONDS; + public Duration getWindowDuration() { return windowDuration; } @@ -54,4 +64,12 @@ public String getDbPath() { public List getGroupByAttributes() { return groupByAttributes != null ? Collections.unmodifiableList(groupByAttributes) : Collections.emptyList(); } + + public MetricTimestampSource getMetricTimestampSource() { + return metricTimestampSource; + } + + public MetricTimestampGranularity getMetricTimestampGranularity() { + return metricTimestampGranularity; + } } diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/model/NodeOperationDetail.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/model/NodeOperationDetail.java index bc5320cb8b..3e94ceeb01 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/model/NodeOperationDetail.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/model/NodeOperationDetail.java @@ -60,10 +60,12 @@ public NodeOperationDetail(final Node sourceNode, this.timestamp = DateTimeFormatter.ISO_INSTANT.format(timestamp); this.nodeConnectionHash = String.valueOf(Objects.hash(sourceNode, targetNode)); - if (sourceOperation != null && sourceOperation.getName() != null) { - final String targetOpName = targetOperation != null ? targetOperation.getName() : null; + final String sourceOpName = sourceOperation != null ? sourceOperation.getName() : null; + final String targetOpName = targetOperation != null ? targetOperation.getName() : null; + + if (sourceOpName != null || targetOpName != null) { this.operationConnectionHash = String.valueOf( - Objects.hash(sourceNode, targetNode, sourceOperation.getName(), targetOpName)); + Objects.hash(sourceNode, targetNode, sourceOpName, targetOpName)); } else { this.operationConnectionHash = null; } diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtil.java b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtil.java index ac4f4caec6..a03b658af7 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtil.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtil.java @@ -30,149 +30,142 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertUnixNanosToISO8601; /** - * Utility class for handling APM service map metrics generation and processing + * Utility class for handling APM service map metrics generation and processing. + * + * All metrics (sum and histogram) share the same anchor timestamp, truncated to + * the configured granularity (seconds or minutes) via {@code metric_timestamp_granularity}. */ public final class ApmServiceMapMetricsUtil { private static final Logger LOG = LoggerFactory.getLogger(ApmServiceMapMetricsUtil.class); + private static final String HOST_ID_LABEL = "service_map_processor_host_id"; // Standard latency buckets in seconds private static final List EXPLICIT_BOUNDS = List.of(0.0, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0); /** - * Generate metrics for a CLIENT span using decorated relationship data - * Uses CLIENT-specific metric labels with remote service information + * Generate metrics for a CLIENT span using decorated relationship data. * * @param clientSpan The CLIENT span * @param decoration The CLIENT span decoration containing pre-computed relationship data * @param currentTime Current timestamp - * @param metricsStateByKey Shared map for metric aggregation - * @param anchorTimestamp The anchor timestamp for metrics + * @param sumStateByKey Map for sum metric aggregation + * @param histogramStateByKey Map for histogram metric aggregation + * @param anchorTimestamp Timestamp truncated to the configured granularity for all metrics + * @param hostId Stable identifier for this Data Prepper host */ public static void generateMetricsForClientSpan(final SpanStateData clientSpan, final ClientSpanDecoration decoration, final Instant currentTime, - final Map metricsStateByKey, - final Instant anchorTimestamp) { + final Map sumStateByKey, + final Map histogramStateByKey, + final Instant anchorTimestamp, + final String hostId) { // Build CLIENT-side metric labels using decorated relationship data final Map labels = new HashMap<>(); - labels.put("namespace", "span_derived"); - labels.put("environment", clientSpan.getEnvironment()); // Environment = CLIENT span's environment - labels.put("service", clientSpan.getServiceName()); // Service = CLIENT span's own service name - labels.put("operation", decoration.getParentServerOperationName()); // Operation = parentServerOperationName from decoration - labels.put("remoteEnvironment", decoration.getRemoteEnvironment()); // RemoteEnvironment = remote span's environment - labels.put("remoteService", decoration.getRemoteService()); // RemoteService = remoteService from decoration - labels.put("remoteOperation", decoration.getRemoteOperation()); // RemoteOperation = remoteOperation from decoration - labels.putAll(clientSpan.getGroupByAttributes()); // groupByAttributes = read from SpanStateData - - final MetricKey metricKey = new MetricKey(labels, anchorTimestamp); - - // Get or create aggregation state for this metric key - MetricAggregationState state = metricsStateByKey.computeIfAbsent(metricKey, k -> new MetricAggregationState()); - - // Increment request count for every CLIENT span - state.incrementRequestCount(1); - - // Accumulate latency duration in seconds for histogram - Long durationInNanos = clientSpan.getDurationInNanos(); - if (durationInNanos != null && durationInNanos > 0) { - final double durationInSeconds = durationInNanos / 1_000_000_000.0; - state.addLatencyDuration(durationInSeconds); + putCommonLabels(labels, clientSpan.getEnvironment(), clientSpan.getServiceName(), + decoration.getParentServerOperationName(), hostId); + labels.put("remoteEnvironment", decoration.getRemoteEnvironment()); + labels.put("remoteService", decoration.getRemoteService()); + labels.put("remoteOperation", decoration.getRemoteOperation()); + labels.putAll(clientSpan.getGroupByAttributes()); + + // Sum metrics (request, error, fault) + final MetricKey sumKey = new MetricKey(labels, anchorTimestamp); + final MetricAggregationState sumState = sumStateByKey.computeIfAbsent(sumKey, k -> new MetricAggregationState()); + sumState.incrementRequestCount(1); + sumState.incrementErrorCount(clientSpan.getError()); + sumState.incrementFaultCount(clientSpan.getFault()); + + if (clientSpan.getError() == 1 && sumState.getErrorExemplars().size() < 10) { + sumState.addErrorExemplar(createExemplarFromSpan(clientSpan, sumState.getErrorCount())); } - - // Use pre-computed error and fault indicators from SpanStateData - state.incrementErrorCount(clientSpan.getError()); - state.incrementFaultCount(clientSpan.getFault()); - - // Add exemplars for error spans - if (clientSpan.getError() == 1 && state.getErrorExemplars().size() < 10) { - state.addErrorExemplar(createExemplarFromSpan(clientSpan, state.getErrorCount())); + if (clientSpan.getFault() == 1 && sumState.getFaultExemplars().size() < 10) { + sumState.addFaultExemplar(createExemplarFromSpan(clientSpan, sumState.getFaultCount())); } - // Add exemplars for fault spans - if (clientSpan.getFault() == 1 && state.getFaultExemplars().size() < 10) { - state.addFaultExemplar(createExemplarFromSpan(clientSpan, state.getFaultCount())); + // Histogram metrics (latency) + final Long durationInNanos = clientSpan.getDurationInNanos(); + if (durationInNanos != null && durationInNanos > 0) { + final MetricKey histKey = new MetricKey(labels, anchorTimestamp); + final MetricAggregationState histState = histogramStateByKey.computeIfAbsent(histKey, k -> new MetricAggregationState()); + histState.addLatencyDuration(durationInNanos / 1_000_000_000.0); } } /** - * Generate metrics for a SERVER span using span data directly + * Generate metrics for a SERVER span using span data directly. * * @param serverSpan The SERVER span * @param currentTime Current timestamp - * @param metricsStateByKey Shared map for metric aggregation - * @param anchorTimestamp The anchor timestamp for metrics + * @param sumStateByKey Map for sum metric aggregation + * @param histogramStateByKey Map for histogram metric aggregation + * @param anchorTimestamp Timestamp truncated to the configured granularity for all metrics + * @param hostId Stable identifier for this Data Prepper host */ public static void generateMetricsForServerSpan(final SpanStateData serverSpan, final Instant currentTime, - final Map metricsStateByKey, - final Instant anchorTimestamp) { - // Build metric labels using span's groupByAttributes (read directly from SpanStateData) + final Map sumStateByKey, + final Map histogramStateByKey, + final Instant anchorTimestamp, + final String hostId) { + // Build metric labels using span's groupByAttributes final Map labels = new HashMap<>(); - labels.put("namespace", "span_derived"); - labels.put("environment", serverSpan.getEnvironment()); - labels.put("service", serverSpan.getServiceName()); - labels.put("operation", serverSpan.getOperationName()); + putCommonLabels(labels, serverSpan.getEnvironment(), serverSpan.getServiceName(), + serverSpan.getOperationName(), hostId); labels.putAll(serverSpan.getGroupByAttributes()); - final MetricKey metricKey = new MetricKey(labels, anchorTimestamp); - - // Get or create aggregation state for this metric key - MetricAggregationState state = metricsStateByKey.computeIfAbsent(metricKey, k -> new MetricAggregationState()); + // Sum metrics (request, error, fault) + final MetricKey sumKey = new MetricKey(labels, anchorTimestamp); + final MetricAggregationState sumState = sumStateByKey.computeIfAbsent(sumKey, k -> new MetricAggregationState()); + sumState.incrementRequestCount(1); + sumState.incrementErrorCount(serverSpan.getError()); + sumState.incrementFaultCount(serverSpan.getFault()); - // Increment request count for every SERVER span - state.incrementRequestCount(1); - - // Accumulate latency duration in seconds for histogram - Long durationInNanos = serverSpan.getDurationInNanos(); - if (durationInNanos != null && durationInNanos > 0) { - final double durationInSeconds = durationInNanos / 1_000_000_000.0; - state.addLatencyDuration(durationInSeconds); + if (serverSpan.getError() == 1 && sumState.getErrorExemplars().size() < 10) { + sumState.addErrorExemplar(createExemplarFromSpan(serverSpan, sumState.getErrorCount())); } - - // Use pre-computed error and fault indicators from SpanStateData - state.incrementErrorCount(serverSpan.getError()); - state.incrementFaultCount(serverSpan.getFault()); - - // Add exemplars for error spans - if (serverSpan.getError() == 1 && state.getErrorExemplars().size() < 10) { - state.addErrorExemplar(createExemplarFromSpan(serverSpan, state.getErrorCount())); + if (serverSpan.getFault() == 1 && sumState.getFaultExemplars().size() < 10) { + sumState.addFaultExemplar(createExemplarFromSpan(serverSpan, sumState.getFaultCount())); } - // Add exemplars for fault spans - if (serverSpan.getFault() == 1 && state.getFaultExemplars().size() < 10) { - state.addFaultExemplar(createExemplarFromSpan(serverSpan, state.getFaultCount())); + // Histogram metrics (latency) + final Long durationInNanos = serverSpan.getDurationInNanos(); + if (durationInNanos != null && durationInNanos > 0) { + final MetricKey histKey = new MetricKey(labels, anchorTimestamp); + final MetricAggregationState histState = histogramStateByKey.computeIfAbsent(histKey, k -> new MetricAggregationState()); + histState.addLatencyDuration(durationInNanos / 1_000_000_000.0); } } /** - * Create all JacksonSum and JacksonStandardHistogram metrics from aggregated state - * This method is called after ALL traces have been processed + * Create all JacksonSum and JacksonStandardHistogram metrics from aggregated state. * - * @param metricsStateByKey Shared map containing aggregated metric state for all traces + * @param sumStateByKey Map containing aggregated sum metric state + * @param histogramStateByKey Map containing aggregated histogram metric state * @return List of JacksonMetric objects (JacksonSum and JacksonStandardHistogram) */ - public static List createMetricsFromAggregatedState(final Map metricsStateByKey) { + public static List createMetricsFromAggregatedState( + final Map sumStateByKey, + final Map histogramStateByKey) { final List metrics = new ArrayList<>(); - // Generate JacksonSum and JacksonStandardHistogram metrics from aggregated state - for (Map.Entry entry : metricsStateByKey.entrySet()) { + // Generate Sum metrics (request, error, fault) + for (Map.Entry entry : sumStateByKey.entrySet()) { final MetricKey metricKey = entry.getKey(); final MetricAggregationState state = entry.getValue(); - // Create request_count metric (always generated for every SERVER span) metrics.add(createJacksonSumMetric( "request", "Number of requests", state.getRequestCount(), metricKey.getLabels(), metricKey.getTimestamp(), - Collections.emptyList() // No exemplars for request count + Collections.emptyList() )); metrics.add(createJacksonSumMetric( @@ -192,8 +185,13 @@ public static List createMetricsFromAggregatedState(final Map entry : histogramStateByKey.entrySet()) { + final MetricKey metricKey = entry.getKey(); + final MetricAggregationState state = entry.getValue(); - // Create latency_seconds histogram (only if there are duration samples) if (!state.getLatencyDurations().isEmpty()) { metrics.add(createJacksonStandardHistogram( "latency", @@ -205,7 +203,6 @@ public static List createMetricsFromAggregatedState(final Map exemplars) { final long timestampNanos = getTimeNanos(timestamp); - final long startTimeNanos = timestampNanos; // For counter metrics, start time can be same as timestamp - - final Map labelsWithRandomKey = new HashMap<>(); - labelsWithRandomKey.putAll(labels); - labelsWithRandomKey.put("randomKey", UUID.randomUUID().toString()); + final long startTimeNanos = timestampNanos; return JacksonSum.builder() .withName(metricName) .withDescription(description) .withTime(convertUnixNanosToISO8601(timestampNanos)) .withStartTime(convertUnixNanosToISO8601(startTimeNanos)) - .withIsMonotonic(true) // These are counter metrics - .withUnit("1") // Count unit + .withIsMonotonic(true) + .withUnit("1") .withAggregationTemporality("AGGREGATION_TEMPORALITY_DELTA") .withValue(value) .withExemplars(exemplars) - .withAttributes(labelsWithRandomKey) + .withAttributes(labels) .build(false); } /** * Create a JacksonStandardHistogram metric from collected latency durations - * - * @param metricName Name of the metric - * @param description Description of the metric - * @param durations List of duration values in seconds - * @param labels Labels for the metric - * @param timestamp Timestamp for the metric - * @return JacksonStandardHistogram metric event */ static JacksonMetric createJacksonStandardHistogram(final String metricName, final String description, @@ -306,21 +284,17 @@ static JacksonMetric createJacksonStandardHistogram(final String metricName, final Map labels, final Instant timestamp) { final long timestampNanos = getTimeNanos(timestamp); - final long startTimeNanos = timestampNanos; // For histogram metrics, start time can be same as timestamp + final long startTimeNanos = timestampNanos; // Create histogram buckets from raw duration values final HistogramBuckets buckets = createHistogramBucketsFromDurations(durations); - final Map labelsWithRandomKey = new HashMap<>(); - labelsWithRandomKey.putAll(labels); - labelsWithRandomKey.put("randomKey", UUID.randomUUID().toString()); - return JacksonStandardHistogram.builder() .withName(metricName) .withDescription(description) .withTime(convertUnixNanosToISO8601(timestampNanos)) .withStartTime(convertUnixNanosToISO8601(startTimeNanos)) - .withUnit("s") // Seconds unit for latency + .withUnit("s") .withAggregationTemporality("AGGREGATION_TEMPORALITY_DELTA") .withCount((long) durations.size()) .withSum(durations.stream().mapToDouble(Double::doubleValue).sum()) @@ -330,14 +304,14 @@ static JacksonMetric createJacksonStandardHistogram(final String metricName, .withExplicitBoundsList(buckets.getExplicitBounds()) .withBucketCount(buckets.getBucketCounts().size()) .withExplicitBoundsCount(buckets.getExplicitBounds().size()) - .withAttributes(labelsWithRandomKey) + .withAttributes(labels) .build(false); } /** * Create histogram buckets from raw duration values * Uses O-Tel Java SDK bucket: 0.0 ms to 10 sec - * https://opentelemetry.io/docs/specs/otel/metrics/sdk/?utm_source=chatgpt.com#explicit-bucket-histogram-aggregation + * https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation * * @param durations List of duration values in seconds * @return HistogramBuckets with counts and bounds @@ -366,6 +340,18 @@ static HistogramBuckets createHistogramBucketsFromDurations(final List d return new HistogramBuckets(bucketCounts, EXPLICIT_BOUNDS); } + private static void putCommonLabels(final Map labels, + final String environment, + final String serviceName, + final String operationName, + final String hostId) { + labels.put("namespace", "span_derived"); + labels.put("environment", environment); + labels.put("service", serviceName); + labels.put("operation", operationName); + labels.put(HOST_ID_LABEL, hostId); + } + // Private constructor to prevent instantiation private ApmServiceMapMetricsUtil() { throw new UnsupportedOperationException("Utility class should not be instantiated"); diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularityTest.java b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularityTest.java new file mode 100644 index 0000000000..f1eb7b0b51 --- /dev/null +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampGranularityTest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.temporal.ChronoUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class MetricTimestampGranularityTest { + + @Test + void seconds_has_correct_option_value() { + assertThat(MetricTimestampGranularity.fromOptionValue("seconds"), equalTo(MetricTimestampGranularity.SECONDS)); + } + + @Test + void minutes_has_correct_option_value() { + assertThat(MetricTimestampGranularity.fromOptionValue("minutes"), equalTo(MetricTimestampGranularity.MINUTES)); + } + + @Test + void invalid_option_value_returns_null() { + assertThat(MetricTimestampGranularity.fromOptionValue("invalid"), nullValue()); + } + + @Test + void seconds_has_correct_chrono_unit() { + assertThat(MetricTimestampGranularity.SECONDS.getChronoUnit(), equalTo(ChronoUnit.SECONDS)); + } + + @Test + void minutes_has_correct_chrono_unit() { + assertThat(MetricTimestampGranularity.MINUTES.getChronoUnit(), equalTo(ChronoUnit.MINUTES)); + } + + @ParameterizedTest + @EnumSource(MetricTimestampGranularity.class) + void all_enum_values_can_be_created_from_option(final MetricTimestampGranularity granularity) { + assertThat(MetricTimestampGranularity.fromOptionValue(granularity.getOption()), notNullValue()); + } + + @ParameterizedTest + @EnumSource(MetricTimestampGranularity.class) + void all_enum_values_have_chrono_unit(final MetricTimestampGranularity granularity) { + assertThat(granularity.getChronoUnit(), notNullValue()); + } +} diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSourceTest.java b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSourceTest.java new file mode 100644 index 0000000000..1c7097dfbd --- /dev/null +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/MetricTimestampSourceTest.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class MetricTimestampSourceTest { + + @Test + void arrival_time_has_correct_option_value() { + assertThat(MetricTimestampSource.fromOptionValue("arrival_time"), equalTo(MetricTimestampSource.ARRIVAL_TIME)); + } + + @Test + void span_end_time_has_correct_option_value() { + assertThat(MetricTimestampSource.fromOptionValue("span_end_time"), equalTo(MetricTimestampSource.SPAN_END_TIME)); + } + + @Test + void invalid_option_value_returns_null() { + assertThat(MetricTimestampSource.fromOptionValue("invalid"), nullValue()); + } + + @ParameterizedTest + @EnumSource(MetricTimestampSource.class) + void all_enum_values_can_be_created_from_option(final MetricTimestampSource source) { + assertThat(MetricTimestampSource.fromOptionValue(source.getOption()), notNullValue()); + } +} diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfigTest.java b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfigTest.java index 309b626e56..ce03357bd4 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfigTest.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfigTest.java @@ -35,6 +35,8 @@ public void testConfigDefaults() { assertThat(otelApmServiceMapProcessorConfig.getWindowDuration(), equalTo(Duration.ofSeconds(OTelApmServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION_SECONDS))); assertThat(otelApmServiceMapProcessorConfig.getDbPath(), equalTo(OTelApmServiceMapProcessorConfig.DEFAULT_DB_PATH)); assertThat(otelApmServiceMapProcessorConfig.getGroupByAttributes(), equalTo(Collections.emptyList())); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampSource(), equalTo(MetricTimestampSource.ARRIVAL_TIME)); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampGranularity(), equalTo(MetricTimestampGranularity.SECONDS)); } @Test @@ -51,4 +53,32 @@ public void testCustomConfigValues() throws NoSuchFieldException, IllegalAccessE assertThat(otelApmServiceMapProcessorConfig.getDbPath(), equalTo(TEST_DB_PATH)); assertThat(otelApmServiceMapProcessorConfig.getGroupByAttributes(), equalTo(TEST_ATTRIBUTES)); } + + @Test + public void testDefaultMetricTimestampSource() { + otelApmServiceMapProcessorConfig = createObjectUnderTest(); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampSource(), equalTo(MetricTimestampSource.ARRIVAL_TIME)); + } + + @Test + public void testCustomMetricTimestampSource() throws NoSuchFieldException, IllegalAccessException { + otelApmServiceMapProcessorConfig = createObjectUnderTest(); + ReflectivelySetField.setField(OTelApmServiceMapProcessorConfig.class, otelApmServiceMapProcessorConfig, + "metricTimestampSource", MetricTimestampSource.SPAN_END_TIME); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampSource(), equalTo(MetricTimestampSource.SPAN_END_TIME)); + } + + @Test + public void testDefaultMetricTimestampGranularity() { + otelApmServiceMapProcessorConfig = createObjectUnderTest(); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampGranularity(), equalTo(MetricTimestampGranularity.SECONDS)); + } + + @Test + public void testCustomMetricTimestampGranularity() throws NoSuchFieldException, IllegalAccessException { + otelApmServiceMapProcessorConfig = createObjectUnderTest(); + ReflectivelySetField.setField(OTelApmServiceMapProcessorConfig.class, otelApmServiceMapProcessorConfig, + "metricTimestampGranularity", MetricTimestampGranularity.MINUTES); + assertThat(otelApmServiceMapProcessorConfig.getMetricTimestampGranularity(), equalTo(MetricTimestampGranularity.MINUTES)); + } } diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorTest.java b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorTest.java index 0bbc01d05e..654d6a9fc7 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorTest.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorTest.java @@ -93,6 +93,11 @@ private OTelApmServiceMapProcessor createObjectUnderTest(List groupByAtt private OTelApmServiceMapProcessor createObjectUnderTest(Duration duration, int workers) { return new OTelApmServiceMapProcessor(duration, tempDir, clock, workers, eventFactory, pluginMetrics); } + + private OTelApmServiceMapProcessor createObjectUnderTest(MetricTimestampSource metricTimestampSource) { + return new OTelApmServiceMapProcessor(Duration.ofSeconds(60), tempDir, clock, 1, eventFactory, pluginMetrics, + Collections.emptyList(), metricTimestampSource); + } @BeforeEach void setUp() { @@ -1193,6 +1198,137 @@ void testDecorateSpansInTraceWithEphemeralStorage() { isolatedProcessor.shutdown(); } + @Test + void testArrivalTimeMode_usesClockInstant_notSpanEndTime() { + // Given - arrival_time mode should use clock.instant() regardless of span endTime + final Instant arrivalTime = Instant.parse("2021-01-01T00:05:00Z"); + final Instant arrivalTimePlusWindow = arrivalTime.plusSeconds(65); + + when(clock.instant()) + .thenReturn(arrivalTime) // 1. constructor: previousTimestamp + .thenReturn(arrivalTime) // 2. doExecute call 1: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow) // 3. doExecute call 2: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow) // 4. processCurrentWindowSpans: currentTime + .thenReturn(arrivalTimePlusWindow) // 5. rotateWindows: LOG.debug + .thenReturn(arrivalTimePlusWindow) // 6. rotateWindows: previousTimestamp + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 7. doExecute call 3: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 8. processCurrentWindowSpans: currentTime + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 9. rotateWindows: LOG.debug + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)); // 10. rotateWindows: previousTimestamp + + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + doAnswer((a) -> { + eventMetadata = a.getArgument(0); + return eventBuilder; + }).when(eventBuilder).withEventMetadata(any()); + doAnswer((a) -> { + eventData = a.getArgument(0); + return eventBuilder; + }).when(eventBuilder).withData(any()); + doAnswer((a) -> { + return JacksonEvent.builder() + .withEventMetadata(eventMetadata) + .withData(eventData) + .build(); + }).when(eventBuilder).build(); + + File isolatedDir = new File(tempDir, "arrival-test-" + System.nanoTime()); + isolatedDir.mkdirs(); + OTelApmServiceMapProcessor arrivalProcessor = new OTelApmServiceMapProcessor( + Duration.ofSeconds(60), isolatedDir, clock, 1, eventFactory, pluginMetrics, + Collections.emptyList(), MetricTimestampSource.ARRIVAL_TIME); + + // Span with endTime at 12:30 — should be IGNORED in arrival_time mode + Span span = createMockSpanWithIds("svc", "op", "SPAN_KIND_SERVER", + "1111111111111111", "", "aaaaaaaaaaaaaaaa"); + when(span.getEndTime()).thenReturn("2021-01-01T12:30:45.123Z"); + + arrivalProcessor.doExecute(Collections.singletonList(new Record<>(span))); + arrivalProcessor.doExecute(Collections.emptyList()); + Collection> result = arrivalProcessor.doExecute(Collections.emptyList()); + + // Verify metrics exist and their timestamps use arrival time (NOT span endTime 12:30) + assertThat(result.isEmpty(), equalTo(false)); + for (Record record : result) { + Event event = record.getData(); + String time = event.get("time", String.class); + if (time != null) { + // arrival_time mode: timestamp should be from clock.instant(), NOT span's endTime (12:30) + // Sum metrics truncate to seconds, Histogram to minutes — both should start with 00:07 + assertTrue(time.startsWith("2021-01-01T00:07:"), + "Expected arrival time (00:07:xx) but got: " + time); + } + } + + arrivalProcessor.shutdown(); + } + + @Test + void testSpanEndTimeMode_usesSpanEndTime() { + // Given - span_end_time mode should use span's endTime + final Instant arrivalTime = Instant.parse("2021-01-01T00:05:00Z"); + final Instant arrivalTimePlusWindow = arrivalTime.plusSeconds(65); + + when(clock.instant()) + .thenReturn(arrivalTime) // 1. constructor: previousTimestamp + .thenReturn(arrivalTime) // 2. doExecute call 1: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow) // 3. doExecute call 2: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow) // 4. processCurrentWindowSpans: currentTime + .thenReturn(arrivalTimePlusWindow) // 5. rotateWindows: LOG.debug + .thenReturn(arrivalTimePlusWindow) // 6. rotateWindows: previousTimestamp + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 7. doExecute call 3: windowDurationHasPassed + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 8. processCurrentWindowSpans: currentTime + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)) // 9. rotateWindows: LOG.debug + .thenReturn(arrivalTimePlusWindow.plusSeconds(65)); // 10. rotateWindows: previousTimestamp + + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + doAnswer((a) -> { + eventMetadata = a.getArgument(0); + return eventBuilder; + }).when(eventBuilder).withEventMetadata(any()); + doAnswer((a) -> { + eventData = a.getArgument(0); + return eventBuilder; + }).when(eventBuilder).withData(any()); + doAnswer((a) -> { + return JacksonEvent.builder() + .withEventMetadata(eventMetadata) + .withData(eventData) + .build(); + }).when(eventBuilder).build(); + + File isolatedDir = new File(tempDir, "spanend-test-" + System.nanoTime()); + isolatedDir.mkdirs(); + OTelApmServiceMapProcessor spanEndProcessor = new OTelApmServiceMapProcessor( + Duration.ofSeconds(60), isolatedDir, clock, 1, eventFactory, pluginMetrics, + Collections.emptyList(), MetricTimestampSource.SPAN_END_TIME); + + // Span with endTime at 12:30:45 — should be used and truncated to 12:30:00 + Span span = createMockSpanWithIds("svc", "op", "SPAN_KIND_SERVER", + "1111111111111111", "", "aaaaaaaaaaaaaaaa"); + when(span.getEndTime()).thenReturn("2021-01-01T12:30:45.123Z"); + + spanEndProcessor.doExecute(Collections.singletonList(new Record<>(span))); + spanEndProcessor.doExecute(Collections.emptyList()); + Collection> result = spanEndProcessor.doExecute(Collections.emptyList()); + + assertThat(result.isEmpty(), equalTo(false)); + for (Record record : result) { + Event event = record.getData(); + String time = event.get("time", String.class); + if (time != null) { + // span_end_time mode: timestamp from span endTime (12:30:45), NOT arrival time (00:07) + // Sum metrics truncate to seconds (12:30:45Z), Histogram to minutes (12:30:00Z) + assertTrue(time.startsWith("2021-01-01T12:30:"), + "Expected span endTime (12:30:xx) but got: " + time); + } + } + + spanEndProcessor.shutdown(); + } + // Helper method to create mock spans private Span createMockSpan(String serviceName, String operationName, String spanKind) { Span mockSpan = mock(Span.class); diff --git a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtilTest.java b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtilTest.java index daa7b9752c..2b1d926d15 100644 --- a/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtilTest.java +++ b/data-prepper-plugins/otel-apm-service-map-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/utils/ApmServiceMapMetricsUtilTest.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -45,25 +47,29 @@ @ExtendWith(MockitoExtension.class) class ApmServiceMapMetricsUtilTest { + private String testHostId; + private SpanStateData testClientSpan; private SpanStateData testServerSpan; private ClientSpanDecoration mockDecoration; - private Map metricsStateByKey; + private Map sumStateByKey; + private Map histogramStateByKey; private Instant currentTime; private Instant anchorTimestamp; @BeforeEach void setUp() { + testHostId = java.util.UUID.randomUUID().toString(); testClientSpan = createMockSpanStateData("client-service", "client-operation", "test-env"); testServerSpan = createMockSpanStateData("server-service", "server-operation", "test-env"); mockDecoration = createMockClientSpanDecoration(); - metricsStateByKey = new HashMap<>(); + sumStateByKey = new HashMap<>(); + histogramStateByKey = new HashMap<>(); currentTime = Instant.now(); - anchorTimestamp = Instant.now().minusSeconds(60); + anchorTimestamp = Instant.now().minusSeconds(60).truncatedTo(java.time.temporal.ChronoUnit.SECONDS); } private SpanStateData createMockSpanStateData(String serviceName, String operationName, String environment) { - // Create a real SpanStateData instance for proper field access Map spanAttributes = new HashMap<>(); spanAttributes.put("resource", Map.of("attributes", Map.of("deployment.environment.name", environment))); @@ -122,16 +128,21 @@ private SpanStateData createSpanWithHttpStatus(int httpStatusCode, String servic void testGenerateMetricsForClientSpan_Success() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - testClientSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + testClientSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - assertEquals(1, metricsStateByKey.size()); - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getErrorCount()); - assertEquals(0, state.getFaultCount()); - assertEquals(1, state.getLatencyDurations().size()); - assertEquals(1.0, state.getLatencyDurations().get(0), 0.001); + assertEquals(1, sumStateByKey.size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(0, sumState.getErrorCount()); + assertEquals(0, sumState.getFaultCount()); + + assertEquals(1, histogramStateByKey.size()); + MetricAggregationState histState = histogramStateByKey.values().iterator().next(); + assertEquals(1, histState.getLatencyDurations().size()); + assertEquals(1.0, histState.getLatencyDurations().get(0), 0.001); } @Test @@ -141,15 +152,17 @@ void testGenerateMetricsForClientSpan_WithError() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - errorSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + errorSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(1, state.getErrorCount()); - assertEquals(0, state.getFaultCount()); - assertEquals(1, state.getErrorExemplars().size()); - assertEquals(0, state.getFaultExemplars().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(1, sumState.getErrorCount()); + assertEquals(0, sumState.getFaultCount()); + assertEquals(1, sumState.getErrorExemplars().size()); + assertEquals(0, sumState.getFaultExemplars().size()); } @Test @@ -159,15 +172,17 @@ void testGenerateMetricsForClientSpan_WithFault() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - faultSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + faultSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getErrorCount()); - assertEquals(1, state.getFaultCount()); - assertEquals(0, state.getErrorExemplars().size()); - assertEquals(1, state.getFaultExemplars().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(0, sumState.getErrorCount()); + assertEquals(1, sumState.getFaultCount()); + assertEquals(0, sumState.getErrorExemplars().size()); + assertEquals(1, sumState.getFaultExemplars().size()); } @Test @@ -177,12 +192,14 @@ void testGenerateMetricsForClientSpan_WithNullDuration() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - testClientSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + testClientSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getLatencyDurations().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertTrue(histogramStateByKey.isEmpty()); // No histogram entry for null duration } @Test @@ -192,12 +209,14 @@ void testGenerateMetricsForClientSpan_WithZeroDuration() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - testClientSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + testClientSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getLatencyDurations().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertTrue(histogramStateByKey.isEmpty()); // No histogram entry for zero duration } @Test @@ -218,14 +237,17 @@ void testGenerateMetricsForClientSpan_ExemplarLimit() { labels.put("remoteEnvironment", mockDecoration.getRemoteEnvironment()); labels.put("remoteService", mockDecoration.getRemoteService()); labels.put("remoteOperation", mockDecoration.getRemoteOperation()); + labels.put("service_map_processor_host_id", testHostId); labels.putAll(errorSpan.getGroupByAttributes()); MetricKey key = new MetricKey(labels, anchorTimestamp); - metricsStateByKey.put(key, existingState); + sumStateByKey.put(key, existingState); // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - errorSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + errorSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then assertEquals(10, existingState.getErrorExemplars().size()); // Should not exceed limit @@ -235,15 +257,20 @@ void testGenerateMetricsForClientSpan_ExemplarLimit() { void testGenerateMetricsForServerSpan_Success() { // When ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - testServerSpan, currentTime, metricsStateByKey, anchorTimestamp); + testServerSpan, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - assertEquals(1, metricsStateByKey.size()); - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getErrorCount()); - assertEquals(0, state.getFaultCount()); - assertEquals(1, state.getLatencyDurations().size()); + assertEquals(1, sumStateByKey.size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(0, sumState.getErrorCount()); + assertEquals(0, sumState.getFaultCount()); + + assertEquals(1, histogramStateByKey.size()); + MetricAggregationState histState = histogramStateByKey.values().iterator().next(); + assertEquals(1, histState.getLatencyDurations().size()); } @Test @@ -253,54 +280,88 @@ void testGenerateMetricsForServerSpan_WithError() { // When ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - errorSpan, currentTime, metricsStateByKey, anchorTimestamp); + errorSpan, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(1, state.getErrorCount()); - assertEquals(0, state.getFaultCount()); - assertEquals(1, state.getErrorExemplars().size()); - assertEquals(0, state.getFaultExemplars().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(1, sumState.getErrorCount()); + assertEquals(0, sumState.getFaultCount()); + assertEquals(1, sumState.getErrorExemplars().size()); + assertEquals(0, sumState.getFaultExemplars().size()); } @Test void testGenerateMetricsForServerSpan_WithFault() { - // Given - Create span with fault status + // Given - Create span with fault status SpanStateData faultSpan = createSpanWithHttpStatus(500); // HTTP 500 = fault // When ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - faultSpan, currentTime, metricsStateByKey, anchorTimestamp); + faultSpan, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(1, state.getRequestCount()); - assertEquals(0, state.getErrorCount()); - assertEquals(1, state.getFaultCount()); - assertEquals(0, state.getErrorExemplars().size()); - assertEquals(1, state.getFaultExemplars().size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(1, sumState.getRequestCount()); + assertEquals(0, sumState.getErrorCount()); + assertEquals(1, sumState.getFaultCount()); + assertEquals(0, sumState.getErrorExemplars().size()); + assertEquals(1, sumState.getFaultExemplars().size()); } @Test void testCreateMetricsFromAggregatedState_EmptyLatencyDurations() { // Given - MetricAggregationState state = new MetricAggregationState(1, 0, 0); - // latencyDurations is empty by default + MetricAggregationState sumState = new MetricAggregationState(1, 0, 0); Map labels = new HashMap<>(); labels.put("service", "test-service"); MetricKey key = new MetricKey(labels, anchorTimestamp); - metricsStateByKey.put(key, state); + sumStateByKey.put(key, sumState); + // histogramStateByKey is empty // When - List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(metricsStateByKey); + List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState( + sumStateByKey, histogramStateByKey); // Then assertEquals(3, metrics.size()); // Only request, error, fault (no latency) } + @Test + void testCreateMetricsFromAggregatedState_WithBothSumAndHistogram() { + // Given + MetricAggregationState sumState = new MetricAggregationState(5, 2, 1); + Map labels = new HashMap<>(); + labels.put("service", "test-service"); + sumStateByKey.put(new MetricKey(labels, anchorTimestamp), sumState); + + MetricAggregationState histState = new MetricAggregationState(); + histState.addLatencyDuration(0.1); + histState.addLatencyDuration(0.5); + histogramStateByKey.put(new MetricKey(labels, anchorTimestamp), histState); + + // When + List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState( + sumStateByKey, histogramStateByKey); + + // Then + assertEquals(4, metrics.size()); // request, error, fault, latency + + List metricNames = metrics.stream() + .map(JacksonMetric::getName) + .collect(Collectors.toList()); + assertTrue(metricNames.contains("request")); + assertTrue(metricNames.contains("error")); + assertTrue(metricNames.contains("fault")); + assertTrue(metricNames.contains("latency")); + } + @Test void testCreateExemplarFromSpan_Success() { // When @@ -318,25 +379,17 @@ void testCreateExemplarFromSpan_Success() { void testCreateExemplarFromSpan_WithException() { // Given - Create a corrupted span that will cause issues SpanStateData corruptedSpan = new SpanStateData( - null, // serviceName is null - null, // spanId is null - null, // parentSpanId is null - null, // traceId is null - "SERVER", - "test-op", - "test-op", - 1000000000L, - "OK", - "2023-01-01T00:00:00.000Z", - Collections.emptyMap(), - Collections.emptyMap() + null, null, null, null, + "SERVER", "test-op", "test-op", + 1000000000L, "OK", "2023-01-01T00:00:00.000Z", + Collections.emptyMap(), Collections.emptyMap() ); // When Exemplar exemplar = ApmServiceMapMetricsUtil.createExemplarFromSpan(corruptedSpan, 1.0); // Then - assertNotNull(exemplar); // Should still return a minimal exemplar + assertNotNull(exemplar); assertEquals(1.0, exemplar.getValue()); } @@ -374,7 +427,7 @@ void testCreateJacksonSumMetric_Success() { assertEquals(metricName, metric.getName()); assertEquals(description, metric.getDescription()); assertNotNull(metric.getAttributes()); - assertTrue(metric.getAttributes().containsKey("randomKey")); // Verify random key is added + assertFalse(metric.getAttributes().containsKey("randomKey")); } @Test @@ -394,10 +447,8 @@ void testCreateJacksonStandardHistogram_Success() { assertNotNull(metric); assertEquals(metricName, metric.getName()); assertEquals(description, metric.getDescription()); - // Verify attributes exist (specific content may vary based on implementation) assertNotNull(metric.getAttributes()); - // Verify it's a histogram by checking the type returned by the method if (metric instanceof JacksonHistogram) { JacksonHistogram histogram = (JacksonHistogram) metric; assertEquals(4L, histogram.getCount()); @@ -423,18 +474,17 @@ void testCreateHistogramBucketsFromDurations_Success() { assertNotNull(buckets); assertNotNull(buckets.getBucketCounts()); assertNotNull(buckets.getExplicitBounds()); - assertEquals(16, buckets.getBucketCounts().size()); // 15 bounds + 1 overflow bucket + assertEquals(16, buckets.getBucketCounts().size()); assertEquals(15, buckets.getExplicitBounds().size()); - // Verify total count equals input size long totalCount = buckets.getBucketCounts().stream().mapToLong(Long::longValue).sum(); assertEquals(durations.size(), totalCount); } @Test void testCreateHistogramBucketsFromDurations_BoundaryValues() { - // Given - test exact boundary values - List durations = Arrays.asList(0.0, 0.005, 0.01, 0.025); // Exact boundary values + // Given + List durations = Arrays.asList(0.0, 0.005, 0.01, 0.025); // When HistogramBuckets buckets = ApmServiceMapMetricsUtil.createHistogramBucketsFromDurations(durations); @@ -444,7 +494,6 @@ void testCreateHistogramBucketsFromDurations_BoundaryValues() { long totalCount = buckets.getBucketCounts().stream().mapToLong(Long::longValue).sum(); assertEquals(4, totalCount); - // Verify at least some buckets have data (bucket distribution may vary based on implementation) boolean hasBucketData = buckets.getBucketCounts().stream().anyMatch(count -> count > 0); assertTrue(hasBucketData, "At least some buckets should contain data"); } @@ -454,7 +503,7 @@ void testCreateHistogramBucketsFromDurations_WithNullValues() { // Given List durations = new ArrayList<>(); durations.add(0.1); - durations.add(null); // Should be ignored + durations.add(null); durations.add(1.0); // When @@ -462,9 +511,8 @@ void testCreateHistogramBucketsFromDurations_WithNullValues() { // Then assertNotNull(buckets); - // Verify only non-null values are counted long totalCount = buckets.getBucketCounts().stream().mapToLong(Long::longValue).sum(); - assertEquals(2, totalCount); // Only 2 non-null values + assertEquals(2, totalCount); } @Test @@ -479,8 +527,6 @@ void testCreateHistogramBucketsFromDurations_EmptyList() { assertNotNull(buckets); assertEquals(16, buckets.getBucketCounts().size()); assertEquals(15, buckets.getExplicitBounds().size()); - - // All bucket counts should be 0 for (Long count : buckets.getBucketCounts()) { assertEquals(0L, count); } @@ -489,50 +535,19 @@ void testCreateHistogramBucketsFromDurations_EmptyList() { @Test void testCreateHistogramBucketsFromDurations_OverflowBucket() { // Given - List durations = Arrays.asList(20.0, 100.0); // Values beyond largest bound (10.0) + List durations = Arrays.asList(20.0, 100.0); // When HistogramBuckets buckets = ApmServiceMapMetricsUtil.createHistogramBucketsFromDurations(durations); // Then assertNotNull(buckets); - // Overflow bucket (last bucket) should have count 2 assertEquals(2L, buckets.getBucketCounts().get(buckets.getBucketCounts().size() - 1)); - - // All other buckets should be 0 for (int i = 0; i < buckets.getBucketCounts().size() - 1; i++) { assertEquals(0L, buckets.getBucketCounts().get(i)); } } - @Test - void testCreateMetricsFromAggregatedState_Success() { - // Given - MetricAggregationState state = new MetricAggregationState(5,2,1); - state.getLatencyDurations().addAll(Arrays.asList(0.1, 0.2, 0.5, 1.0, 2.0)); - - Map labels = new HashMap<>(); - labels.put("service", "test-service"); - - MetricKey key = new MetricKey(labels, anchorTimestamp); - metricsStateByKey.put(key, state); - - // When - List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(metricsStateByKey); - - // Then - assertEquals(4, metrics.size()); // request, error, fault, latency - - // Verify metric names - List metricNames = metrics.stream() - .map(JacksonMetric::getName) - .collect(Collectors.toList()); - assertTrue(metricNames.contains("request")); - assertTrue(metricNames.contains("error")); - assertTrue(metricNames.contains("fault")); - assertTrue(metricNames.contains("latency")); - } - @Test void testMultipleSpansAggregation() { // Given @@ -543,29 +558,37 @@ void testMultipleSpansAggregation() { // When ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - span1, currentTime, metricsStateByKey, anchorTimestamp); + span1, currentTime, sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - span2, currentTime, metricsStateByKey, anchorTimestamp); - - // Then - assertEquals(1, metricsStateByKey.size()); // Same labels, should aggregate - MetricAggregationState state = metricsStateByKey.values().iterator().next(); - assertEquals(2, state.getRequestCount()); - assertEquals(1, state.getErrorCount()); - assertEquals(1, state.getFaultCount()); - assertEquals(2, state.getLatencyDurations().size()); - assertEquals(1.0, state.getLatencyDurations().get(0), 0.001); - assertEquals(2.0, state.getLatencyDurations().get(1), 0.001); + span2, currentTime, sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); + + // Then - sum metrics aggregate by seconds + assertEquals(1, sumStateByKey.size()); + MetricAggregationState sumState = sumStateByKey.values().iterator().next(); + assertEquals(2, sumState.getRequestCount()); + assertEquals(1, sumState.getErrorCount()); + assertEquals(1, sumState.getFaultCount()); + + // Histogram metrics aggregate by minutes + assertEquals(1, histogramStateByKey.size()); + MetricAggregationState histState = histogramStateByKey.values().iterator().next(); + assertEquals(2, histState.getLatencyDurations().size()); + assertEquals(1.0, histState.getLatencyDurations().get(0), 0.001); + assertEquals(2.0, histState.getLatencyDurations().get(1), 0.001); } @Test void testMetricsLabelsCorrectness_ClientSpan() { // When ApmServiceMapMetricsUtil.generateMetricsForClientSpan( - testClientSpan, mockDecoration, currentTime, metricsStateByKey, anchorTimestamp); + testClientSpan, mockDecoration, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricKey key = metricsStateByKey.keySet().iterator().next(); + MetricKey key = sumStateByKey.keySet().iterator().next(); Map labels = key.getLabels(); assertEquals("span_derived", labels.get("namespace")); @@ -575,39 +598,61 @@ void testMetricsLabelsCorrectness_ClientSpan() { assertEquals(mockDecoration.getRemoteEnvironment(), labels.get("remoteEnvironment")); assertEquals(mockDecoration.getRemoteService(), labels.get("remoteService")); assertEquals(mockDecoration.getRemoteOperation(), labels.get("remoteOperation")); - assertEquals("value", labels.get("custom")); // from groupByAttributes + assertEquals(testHostId, labels.get("service_map_processor_host_id")); + assertEquals("value", labels.get("custom")); } @Test void testMetricsLabelsCorrectness_ServerSpan() { // When ApmServiceMapMetricsUtil.generateMetricsForServerSpan( - testServerSpan, currentTime, metricsStateByKey, anchorTimestamp); + testServerSpan, currentTime, + sumStateByKey, histogramStateByKey, + anchorTimestamp, testHostId); // Then - MetricKey key = metricsStateByKey.keySet().iterator().next(); + MetricKey key = sumStateByKey.keySet().iterator().next(); Map labels = key.getLabels(); assertEquals("span_derived", labels.get("namespace")); assertEquals(testServerSpan.getEnvironment(), labels.get("environment")); assertEquals(testServerSpan.getServiceName(), labels.get("service")); assertEquals(testServerSpan.getOperationName(), labels.get("operation")); - assertEquals("value", labels.get("custom")); // from groupByAttributes + assertEquals(testHostId, labels.get("service_map_processor_host_id")); + assertEquals("value", labels.get("custom")); - // Should NOT have remote* labels for server spans assertFalse(labels.containsKey("remoteEnvironment")); assertFalse(labels.containsKey("remoteService")); assertFalse(labels.containsKey("remoteOperation")); } - @Test + @Test + void testSumAndHistogramUseSameTimestamp() { + // Given + Instant anchor = Instant.parse("2023-01-01T00:01:30Z"); + + // When + ApmServiceMapMetricsUtil.generateMetricsForServerSpan( + testServerSpan, currentTime, + sumStateByKey, histogramStateByKey, + anchor, testHostId); + + // Then + MetricKey sumKey = sumStateByKey.keySet().iterator().next(); + MetricKey histKey = histogramStateByKey.keySet().iterator().next(); + + assertEquals(anchor, sumKey.getTimestamp()); + assertEquals(anchor, histKey.getTimestamp()); + } + + @Test void testMetricsSortedByTimestamp() { // Given MetricAggregationState state1 = new MetricAggregationState(1, 0, 0); - state1.getLatencyDurations().add(1.0); - MetricAggregationState state2 = new MetricAggregationState(2, 0, 0); - state2.getLatencyDurations().add(2.0); + + MetricAggregationState histState = new MetricAggregationState(); + histState.addLatencyDuration(1.0); Instant earlierTime = anchorTimestamp.minusSeconds(60); Instant laterTime = anchorTimestamp.plusSeconds(60); @@ -618,20 +663,21 @@ void testMetricsSortedByTimestamp() { Map labels2 = new HashMap<>(); labels2.put("service", "service2"); - metricsStateByKey.put(new MetricKey(labels2, laterTime), state2); // Add later time first - metricsStateByKey.put(new MetricKey(labels1, earlierTime), state1); + sumStateByKey.put(new MetricKey(labels2, laterTime), state2); + sumStateByKey.put(new MetricKey(labels1, earlierTime), state1); + histogramStateByKey.put(new MetricKey(labels1, earlierTime), histState); // When - List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(metricsStateByKey); + List metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState( + sumStateByKey, histogramStateByKey); // Then assertFalse(metrics.isEmpty()); - // Verify metrics are sorted by timestamp - compare the first few metrics if (metrics.size() >= 2) { String firstTimestamp = metrics.get(0).getTime(); String secondTimestamp = metrics.get(1).getTime(); - assertTrue(firstTimestamp.compareTo(secondTimestamp) <= 0, - "Metrics should be sorted by timestamp"); + assertThat("Metrics should be sorted by timestamp", + firstTimestamp, lessThanOrEqualTo(secondTimestamp)); } } }