Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions data-prepper-plugins/otel-apm-service-map-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,38 @@ The `metric_timestamp_granularity` option controls the truncation granularity fo

In `arrival_time` mode, granularity has minimal impact since all spans in a window share the same `clock.instant()` — each window always produces one data point per label combination regardless of truncation.

## Environment Derivation

For every emitted `NodeOperationDetail` the processor populates `sourceNode.keyAttributes.environment` (and the matching `targetNode.keyAttributes.environment` on edge events) by inspecting OTel resource attributes — and where applicable span attributes — on the underlying spans. The same value is also written to each raw span as `attributes.derived.environment` by the companion `otel_traces` processor.

Lookup precedence (first match wins):

1. AWS platform detection from `cloud.platform` (and a few platform-specific signals listed below).
2. `deployment.environment.name` from `resource.attributes`.
3. `deployment.environment` from `resource.attributes` (legacy OTel key).
4. Default fallback: `generic:default`.

For every AWS attribute the processor checks both span-level attributes and `resource.attributes` (since most OTel SDKs emit them in the resource).

| Resource (or span) attributes | `environment` value |
|---|---|
| `cloud.platform=aws_api_gateway` (+ optional `aws.api_gateway.stage=<s>`) | `api-gateway:<s>` |
| `cloud.platform=aws_ec2` | `ec2:default` |
| `cloud.platform=aws_ecs` + `aws.ecs.launchtype=fargate` | `ecs-fargate:default` |
| `cloud.platform=aws_ecs` + `aws.ecs.launchtype=ec2` | `ecs-ec2:default` |
| `cloud.platform=aws_ecs` (launchtype absent or other) | `ecs:default` |
| `cloud.platform=aws_eks` | `eks:default` |
| `cloud.platform=aws_elastic_beanstalk` | `elastic-beanstalk:default` |
| `cloud.platform=aws_lambda` | `lambda:default` |
| `cloud.resource_id` starts with `arn:aws:lambda:` | `lambda:default` |
| `aws.lambda.invoked_arn` is set | `lambda:default` |
| `cloud.provider=aws` + `faas.name=<n>` | `lambda:default` |
| `deployment.environment.name=<env>` | `<env>` |
| `deployment.environment=<env>` | `<env>` (legacy) |
| (none of the above) | `generic:default` |

Attribute names follow the OpenTelemetry semantic conventions for [cloud](https://opentelemetry.io/docs/specs/semconv/registry/attributes/cloud/) and [AWS](https://opentelemetry.io/docs/specs/semconv/registry/attributes/aws/).

## Pipeline Examples

### Basic Pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,9 @@ private Map<String, Object> extractSpanAttributes(final Span span) {
combinedAttributes.put("resource", resource);
}
final Map<String, Object> scope = span.getScope();
if (scope != null ) {
if (scope != null) {
final Map<String, Object> scopeAttributes = (Map<String, Object>)scope.get("attributes");
if (attributes != null) {
if (scopeAttributes != null) {
combinedAttributes.putAll(scopeAttributes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,73 @@ void testSpanEndTimeMode_usesSpanEndTime() {
spanEndProcessor.shutdown();
}

@Test
void doExecute_emitsServiceMapEventWithEnvironmentFromResource_whenScopeHasNullAttributes() {
// Regression for issue #6786 (Bug 1): when the instrumentation scope has no
// "attributes" key, OTelApmServiceMapProcessor.extractSpanAttributes used to
// throw on putAll(null) and the catch returned an empty map, dropping the
// resource and its deployment.environment.name.
when(clock.instant())
.thenReturn(testTime)
.thenReturn(testTime)
.thenReturn(testTime.plusSeconds(65))
.thenReturn(testTime.plusSeconds(65))
.thenReturn(testTime.plusSeconds(65))
.thenReturn(testTime.plusSeconds(65))
.thenReturn(testTime.plusSeconds(130))
.thenReturn(testTime.plusSeconds(130))
.thenReturn(testTime.plusSeconds(130))
.thenReturn(testTime.plusSeconds(130));

final BaseEventBuilder<Event> eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS);
when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder);
doAnswer((a) -> {
eventMetadata = a.getArgument(0);
return eventBuilder;
}).when(eventBuilder).withEventMetadata(any());
doAnswer((a) -> {
eventData = a.getArgument(0);
return eventBuilder;
}).when(eventBuilder).withData(any());
doAnswer((a) -> JacksonEvent.builder()
.withEventMetadata(eventMetadata).withData(eventData).build())
.when(eventBuilder).build();

final File isolatedDir = new File(tempDir, "scope-null-attrs-" + System.nanoTime());
isolatedDir.mkdirs();
final OTelApmServiceMapProcessor isolatedProcessor = new OTelApmServiceMapProcessor(
Duration.ofSeconds(60), isolatedDir, clock, 1, eventFactory, pluginMetrics);

final Span leafServerSpan = createMockSpanWithIds("leaf-service", "GET /api/test",
"SPAN_KIND_SERVER", "1111111111111111", "", "aaaaaaaaaaaaaaaa");
final Map<String, Object> resourceAttrs = new HashMap<>();
resourceAttrs.put("deployment.environment.name", "production");
final Map<String, Object> resource = new HashMap<>();
resource.put("attributes", resourceAttrs);
when(leafServerSpan.getResource()).thenReturn(resource);
// Scope present with NO "attributes" key — the trigger for Bug 1.
final Map<String, Object> scope = new HashMap<>();
scope.put("name", "my-tracer");
when(leafServerSpan.getScope()).thenReturn(scope);

try {
isolatedProcessor.doExecute(Collections.singletonList(new Record<>(leafServerSpan)));
isolatedProcessor.doExecute(Collections.emptyList());
final Collection<Record<Event>> result = isolatedProcessor.doExecute(Collections.emptyList());

assertNotNull(result);
final Record<Event> serviceMapEvent = result.stream()
.filter(r -> r.getData().getMetadata() != null
&& "SERVICE_MAP".equals(r.getData().getMetadata().getEventType()))
.findFirst()
.orElseThrow(() -> new AssertionError("Expected at least one SERVICE_MAP event"));
assertThat(serviceMapEvent.getData().get("sourceNode/keyAttributes/environment", String.class),
equalTo("production"));
} finally {
isolatedProcessor.shutdown();
}
}

// Helper method to create mock spans
private Span createMockSpan(String serviceName, String operationName, String spanKind) {
Span mockSpan = mock(Span.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ public static void deriveAttributesForSpan(final Span span) {
putAttribute(span, DERIVED_REMOTE_SERVICE_ATTRIBUTE, remoteOperationAndService.getService());
}

final String environment = computeEnvironment(spanAttributes);
// Build combined attributes including resource for environment derivation
final Map<String, Object> combinedAttributes = spanAttributes != null ? new HashMap<>(spanAttributes) : new HashMap<>();
final Map<String, Object> resource = span.getResource();
if (resource != null) {
combinedAttributes.put("resource", resource);
}
final String environment = computeEnvironment(combinedAttributes);

// Add derived attributes using our safe attribute setting method
putAttribute(span, DERIVED_FAULT_ATTRIBUTE, String.valueOf(errorFault.fault));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,73 @@ public static String getDeploymentEnvironment(final Map<String, Object> spanAttr
return "generic:default";
}

/**
* Get an attribute by checking span-level attributes first, then resource attributes.
*/
private static String getAttributeFromSpanOrResource(final Map<String, Object> spanAttributes,
final Map<String, Object> resourceAttributes,
final String key) {
String value = getStringAttribute(spanAttributes, key);
if (value == null && resourceAttributes != null) {
value = getStringAttribute(resourceAttributes, key);
}
return value;
}

/**
* Try to extract resource attributes from the nested structure: spanAttributes["resource"]["attributes"]
*/
private static Map<String, Object> extractResourceAttributes(final Map<String, Object> spanAttributes) {
try {
@SuppressWarnings("unchecked")
Map<String, Object> resourceAttrs = (Map<String, Object>)
((Map<String, Object>) spanAttributes.get("resource")).get("attributes");
return resourceAttrs;
} catch (Exception ignored) {
return null;
}
}

public static String getAwsServiceEnvironment(final Map<String, Object> spanAttributes) {
try {
String cloudPlatform = getStringAttribute(spanAttributes, "cloud.platform");
if (cloudPlatform != null && cloudPlatform.equals("aws_api_gateway")) {
return "api-gateway:"+getStringAttribute(spanAttributes, "aws.api_gateway.stage");
final Map<String, Object> resourceAttributes = extractResourceAttributes(spanAttributes);

String cloudPlatform = getAttributeFromSpanOrResource(spanAttributes, resourceAttributes, "cloud.platform");
if ("aws_api_gateway".equals(cloudPlatform)) {
String stage = getAttributeFromSpanOrResource(spanAttributes, resourceAttributes, "aws.api_gateway.stage");
return "api-gateway:" + stage;
}
if (cloudPlatform != null && cloudPlatform.equals("aws_ec2")) {
if ("aws_ec2".equals(cloudPlatform)) {
return "ec2:default";
}
String cloudResourceId = getStringAttribute(spanAttributes, "cloud.resource_id");
String invokedArn = getStringAttribute(spanAttributes, "aws.lambda.invoked_arn");
if (cloudResourceId != null && cloudResourceId.startsWith("arn:aws:lambda:") || invokedArn != null) {
if ("aws_ecs".equals(cloudPlatform)) {
String launchType = getAttributeFromSpanOrResource(spanAttributes, resourceAttributes, "aws.ecs.launchtype");
if ("fargate".equals(launchType)) {
return "ecs-fargate:default";
}
if ("ec2".equals(launchType)) {
return "ecs-ec2:default";
}
return "ecs:default";
}
if ("aws_eks".equals(cloudPlatform)) {
return "eks:default";
}
if ("aws_elastic_beanstalk".equals(cloudPlatform)) {
return "elastic-beanstalk:default";
}
if ("aws_lambda".equals(cloudPlatform)) {
return "lambda:default";
}

@SuppressWarnings("unchecked")
Map<String, Object> resourceAttributes = (Map<String, Object>)
((Map<String, Object>) spanAttributes.get("resource")).get("attributes");
String cloudProvider = getStringAttribute(resourceAttributes, "cloud.provider");
String faasName = getStringAttribute(resourceAttributes, "faas.name");

String cloudResourceId = getAttributeFromSpanOrResource(spanAttributes, resourceAttributes, "cloud.resource_id");
String invokedArn = getAttributeFromSpanOrResource(spanAttributes, resourceAttributes, "aws.lambda.invoked_arn");
if ((cloudResourceId != null && cloudResourceId.startsWith("arn:aws:lambda:")) || invokedArn != null) {
return "lambda:default";
}

String cloudProvider = resourceAttributes != null ? getStringAttribute(resourceAttributes, "cloud.provider") : null;
String faasName = resourceAttributes != null ? getStringAttribute(resourceAttributes, "faas.name") : null;
if (cloudProvider != null && cloudProvider.equals("aws") && faasName != null) {
return "lambda:default";
}
Expand Down
Loading
Loading