Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.model.host;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;

/**
* Provides the hostname of the current Data Prepper instance.
* This is intended as a shared utility so that hostname resolution
* is consistent across all components (processors, source coordinators, etc.).
*/
public class HostContext {

private static final Logger LOG = LoggerFactory.getLogger(HostContext.class);
private static final String UNKNOWN_HOST = "unknown";
private static final String HOSTNAME = resolveHostname();

static String resolveHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (final Exception e) {
LOG.warn("Failed to resolve hostname, using '{}': {}", UNKNOWN_HOST, e.getMessage());
return UNKNOWN_HOST;
}
}

/**
* Returns the hostname of the current Data Prepper host.
*
* @return the hostname
*/
public static String getHostname() {
return HOSTNAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.model.host;

import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import java.net.InetAddress;
import java.net.UnknownHostException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.emptyString;
import static org.mockito.Mockito.mockStatic;

class HostContextTest {

@Test
void getHostname_returns_non_null_non_empty_value() {
final String hostname = HostContext.getHostname();
assertThat(hostname, notNullValue());
assertThat(hostname, not(emptyString()));
}

@Test
void getHostname_returns_consistent_value() {
final String first = HostContext.getHostname();
final String second = HostContext.getHostname();
assertThat(first, equalTo(second));
}

@Test
void getHostname_matches_InetAddress_hostname() throws UnknownHostException {
final String expected = InetAddress.getLocalHost().getHostName();
assertThat(HostContext.getHostname(), equalTo(expected));
}

@Test
void resolveHostname_returns_valid_hostname() throws UnknownHostException {
final String hostname = HostContext.resolveHostname();
assertThat(hostname, equalTo(InetAddress.getLocalHost().getHostName()));
}

@Test
void resolveHostname_returns_unknown_when_hostname_cannot_be_resolved() {
try (final MockedStatic<InetAddress> inetAddressMock = mockStatic(InetAddress.class)) {
inetAddressMock.when(InetAddress::getLocalHost)
.thenThrow(new UnknownHostException("test exception"));

assertThat(HostContext.resolveHostname(), equalTo("unknown"));
}
}

@Test
void constructor_can_be_created() {
final HostContext hostContext = new HostContext();
assertThat(hostContext, notNullValue());
}
}
26 changes: 26 additions & 0 deletions data-prepper-plugins/otel-apm-service-map-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ processor:
| `window_duration` | Duration | `60s` | Fixed time window in seconds for evaluating APM service map relationships |
| `db_path` | String | `"data/otel-apm-service-map/"` | Directory path for database files storing transient processing data |
| `group_by_attributes` | List\<String\> | `[]` | OpenTelemetry resource attributes to include in service grouping |
| `metric_timestamp_source` | String | `"arrival_time"` | Timestamp source for emitted metrics. `"arrival_time"` uses processing time at window evaluation (avoids late-span data loss in Prometheus/AMP). `"span_end_time"` uses the span's `endTime` field. |
| `metric_timestamp_granularity` | String | `"seconds"` | Truncation granularity for metric and service map timestamps. `"seconds"` truncates to second boundaries (1s collision window). `"minutes"` truncates to minute boundaries (60s collision window). |

### Advanced Configuration

Expand All @@ -42,13 +44,37 @@ processor:
- otel_apm_service_map:
window_duration: 120s # 2-minute windows for high-latency services
db_path: "/tmp/apm-service-map/"
metric_timestamp_source: arrival_time
metric_timestamp_granularity: seconds
group_by_attributes:
- "service.version"
- "deployment.environment"
- "service.namespace"
- "k8s.cluster.name"
```

### Metric Timestamp Source

The `metric_timestamp_source` option controls what timestamp is used for emitted metrics.

| Value | Timestamp used | Late-span safe | Description |
|---|---|---|---|
| `arrival_time` (default) | `clock.instant()` at window evaluation | Yes | All spans in a window share the same processing timestamp. Matches the [OTel Collector spanmetrics connector](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/spanmetricsconnector) approach. |
| `span_end_time` | Span's `endTime` field | No | Each span's end time is used. Late-arriving spans may produce metrics with timestamps that collide with previously written data points, causing silent data loss in Prometheus/AMP. |

**Recommendation:** Use the default `arrival_time` unless you have a specific requirement for span-aligned timestamps and accept the risk of late-span data loss.

### Metric Timestamp Granularity

The `metric_timestamp_granularity` option controls the truncation granularity for all emitted timestamps (metrics and service map events).

| Value | Collision window (`span_end_time` mode) | Data points per window | Description |
|---|---|---|---|
| `seconds` (default) | 1 second | More (one per unique second) | Truncates to second boundaries. Minimizes collision risk in `span_end_time` mode. |
| `minutes` | 60 seconds | Fewer (one per unique minute) | Truncates to minute boundaries. Higher collision risk but fewer data points. |

In `arrival_time` mode, granularity has minimal impact since all spans in a window share the same `clock.instant()` — each window always produces one data point per label combination regardless of truncation.

## Pipeline Examples

### Basic Pipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum MetricTimestampGranularity {
SECONDS("seconds", ChronoUnit.SECONDS),
MINUTES("minutes", ChronoUnit.MINUTES);

private static final Map<String, MetricTimestampGranularity> OPTIONS_MAP = Arrays.stream(MetricTimestampGranularity.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private final String option;
private final ChronoUnit chronoUnit;

MetricTimestampGranularity(final String option, final ChronoUnit chronoUnit) {
this.option = option;
this.chronoUnit = chronoUnit;
}

public String getOption() {
return option;
}

public ChronoUnit getChronoUnit() {
return chronoUnit;
}

@JsonCreator
public static MetricTimestampGranularity fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum MetricTimestampSource {
ARRIVAL_TIME("arrival_time"),
SPAN_END_TIME("span_end_time");

private static final Map<String, MetricTimestampSource> OPTIONS_MAP = Arrays.stream(MetricTimestampSource.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private final String option;

MetricTimestampSource(final String option) {
this.option = option;
}

public String getOption() {
return option;
}

@JsonCreator
public static MetricTimestampSource fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}
}
Loading
Loading