Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t

## Unreleased

- Adaptive Sampling support
([#576](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/576))
- Fix: Support new fields in X-Ray API responses
([#577](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/577))
- Sign Lambda layer by AWS Signer
Expand Down
2 changes: 2 additions & 0 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ dependencies = [
"opentelemetry-instrumentation-urllib3 == 0.54b1",
"opentelemetry-instrumentation-wsgi == 0.54b1",
"opentelemetry-instrumentation-cassandra == 0.54b1",
"cachetools == 6.2.4",
"pyyaml == 6.0.3",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
AWS_TRACE_FLAG_SAMPLED: str = "aws.trace.flag.sampled"
AWS_TRACE_LAMBDA_FLAG_MULTIPLE_SERVER: str = "aws.trace.lambda.multiple-server"
AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER: str = "aws.remote.resource.cfn.primary.identifier"
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY: str = "aws.xray.adaptive_sampling_configured"
AWS_XRAY_SAMPLING_RULE: str = "aws.xray.sampling_rule"

# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
# TODO:Move to Semantic Conventions when these attributes are added.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_CONSUMER_PARENT_SPAN_KIND, AWS_SDK_DESCENDANT
from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_CONSUMER_PARENT_SPAN_KIND,
AWS_SDK_DESCENDANT,
AWS_TRACE_FLAG_SAMPLED,
)
from amazon.opentelemetry.distro._aws_span_processing_util import is_aws_sdk_span, is_local_root
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
Expand Down Expand Up @@ -80,6 +84,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None

if propagation_data is not None:
span.set_attribute(self._propagation_data_key, propagation_data)
span.set_attribute(AWS_TRACE_FLAG_SAMPLED, span.get_span_context().trace_flags.sampled)

# pylint: disable=no-self-use
@override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.

# pylint: disable=too-many-lines

import logging
import os
import re
from logging import Logger, getLogger
from pathlib import Path
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union

import yaml
from importlib_metadata import version
from typing_extensions import override

Expand All @@ -25,6 +30,12 @@
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler._aws_xray_adaptive_sampling_config import (
_AnomalyCaptureLimit,
_AnomalyConditions,
_AWSXRayAdaptiveSamplingConfig,
_UsageType,
)
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
Expand Down Expand Up @@ -96,6 +107,7 @@
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"
OTEL_AWS_ENHANCED_CODE_ATTRIBUTES = "OTEL_AWS_EXPERIMENTAL_CODE_ATTRIBUTES"
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = "AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG"

XRAY_SERVICE = "xray"
LOGS_SERIVCE = "logs"
Expand Down Expand Up @@ -243,7 +255,8 @@ def _init_tracing(
sampler: Sampler = None,
resource: Resource = None,
):
sampler = _customize_sampler(sampler)
original_sampler = sampler
sampler = _customize_sampler(original_sampler)

trace_provider: TracerProvider = TracerProvider(
id_generator=id_generator,
Expand All @@ -254,12 +267,12 @@ def _init_tracing(
for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
span_exporter: SpanExporter = exporter_class(**exporter_args)
span_exporter = _customize_span_exporter(span_exporter, resource)
span_exporter = _customize_span_exporter(span_exporter, resource, original_sampler)
trace_provider.add_span_processor(
BatchSpanProcessor(span_exporter=span_exporter, max_export_batch_size=_span_export_batch_size())
)

_customize_span_processors(trace_provider, resource)
_customize_span_processors(trace_provider, resource, original_sampler)

set_tracer_provider(trace_provider)

Expand Down Expand Up @@ -397,12 +410,25 @@ def _custom_import_sampler(sampler_name: str, resource: Resource) -> Sampler:


def _customize_sampler(sampler: Sampler) -> Sampler:
if isinstance(sampler, AwsXRayRemoteSampler):
config = os.environ.get(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG)
parsed_config = None

try:
parsed_config = _parse_config_string(config)
# pylint: disable=broad-exception-caught
except Exception as error:
_logger.warning("Failed to parse adaptive sampling configuration: %s", str(error))

if parsed_config is not None:
sampler.set_adaptive_sampling_config(parsed_config)

if not _is_application_signals_enabled():
return sampler
return AlwaysRecordSampler(sampler)


def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource, sampler: Sampler = None) -> SpanExporter:
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
if _is_lambda_environment():
# Override OTLP http default endpoint to UDP
Expand All @@ -425,7 +451,10 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
if not _is_application_signals_enabled():
return span_exporter

return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
span_exporter = AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
if sampler is not None and isinstance(sampler, AwsXRayRemoteSampler):
sampler.set_span_exporter(span_exporter)
return span_exporter


def _customize_log_record_processor(logger_provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
Expand Down Expand Up @@ -472,7 +501,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
return log_exporter


def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
def _customize_span_processors(provider: TracerProvider, resource: Resource, sampler: Sampler) -> None:

if is_enhanced_code_attributes() is True:
# pylint: disable=import-outside-toplevel
Expand Down Expand Up @@ -518,7 +547,7 @@ def session_id_predicate(baggage_key: str) -> bool:
)
meter_provider: MeterProvider = MeterProvider(resource=resource, metric_readers=[periodic_exporting_metric_reader])
# Construct and set application signals metrics processor
provider.add_span_processor(AwsSpanMetricsProcessorBuilder(meter_provider, resource).build())
provider.add_span_processor(AwsSpanMetricsProcessorBuilder(meter_provider, resource).set_sampler(sampler).build())

return

Expand Down Expand Up @@ -898,3 +927,84 @@ def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
except Exception as errors:
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
return None


# pylint: disable=too-many-return-statements,too-many-branches
def _parse_config_string(config: str) -> Optional[_AWSXRayAdaptiveSamplingConfig]:
if config is None:
return None

# Check if the config is a file path and the file exists
path = Path(config)
if path.exists():
try:
config = path.read_text(encoding="utf-8")
except IOError as err:
_logger.warning("Failed to read adaptive sampling configuration file: %s", err)
return None
elif config.endswith(".yml") or config.endswith(".yaml"):
_logger.warning("Adaptive sampling configuration file must be a YAML file")
return None
else:
_logger.debug("Adaptive sampling configuration is not a file path, assuming it's a YAML string")

# Parse YAML config
config_map = None
try:
config_map = yaml.safe_load(config)
# pylint: disable=broad-exception-caught
except Exception as exception:
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping: %s", exception)
if not isinstance(config_map, dict):
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping")
return None

# Ensure only relevant data is in the YAML configuration
for key in config_map:
if key not in ["version", "anomalyConditions", "anomalyCaptureLimit"]:
_logger.warning("Invalid key in adaptive sampling configuration: %s", key)
return None

version_obj = config_map.get("version")
if version_obj is None:
_logger.warning("Missing required 'version' field in adaptive sampling configuration")
return None

try:
config_version = float(version_obj)
if config_version < 1.0 or config_version >= 2.0:
_logger.warning(
"Incompatible adaptive sampling config version: %s. "
"This version of the AWS X-Ray remote sampler only supports version 1.X.",
config_version,
)
return None

# Parse anomaly conditions
anomaly_conditions = None
if "anomalyConditions" in config_map:
anomaly_conditions = [
_AnomalyConditions(
error_code_regex=cond.get("errorCodeRegex"),
operations=cond.get("operations"),
high_latency_ms=cond.get("highLatencyMs"),
usage=_UsageType(cond["usage"]) if "usage" in cond else None,
)
for cond in config_map["anomalyConditions"]
]

# Parse anomaly capture limit
anomaly_capture_limit = None
if "anomalyCaptureLimit" in config_map:
anomaly_capture_limit_data = config_map["anomalyCaptureLimit"]
anomaly_capture_limit = _AnomalyCaptureLimit(
anomaly_traces_per_second=anomaly_capture_limit_data["anomalyTracesPerSecond"]
)

return _AWSXRayAdaptiveSamplingConfig(
version=config_version, anomaly_conditions=anomaly_conditions, anomaly_capture_limit=anomaly_capture_limit
)
except ValueError as err:
_logger.warning("Failed to load AWS X-Ray adaptive sampling configuration: %s", err)

return None
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_REMOTE_SERVICE
from amazon.opentelemetry.distro.metric_attribute_generator import MetricAttributeGenerator
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.context import Context
from opentelemetry.metrics import Histogram
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan, Span, SpanProcessor, StatusCode
from opentelemetry.sdk.trace.sampling import Sampler
from opentelemetry.semconv.trace import SpanAttributes

_HTTP_STATUS_CODE = SpanAttributes.HTTP_STATUS_CODE
Expand Down Expand Up @@ -66,13 +68,15 @@ def __init__(
latency_histogram: Histogram,
generator: MetricAttributeGenerator,
resource: Resource,
sampler: Optional[Sampler],
force_flush_function: Callable = _no_op_function,
):
self._error_histogram = error_histogram
self._fault_histogram = fault_histogram
self._latency_histogram = latency_histogram
self._generator = generator
self._resource = resource
self._sampler = sampler
self._force_flush_function = force_flush_function

# pylint: disable=no-self-use
Expand All @@ -89,6 +93,9 @@ def on_end(self, span: ReadableSpan) -> None:
for attributes in attribute_dict.values():
self._record_metrics(span, attributes)

if self._sampler and isinstance(self._sampler, AwsXRayRemoteSampler):
self._sampler.adapt_sampling(span)
Comment thread
majanjua-amzn marked this conversation as resolved.
Comment thread
majanjua-amzn marked this conversation as resolved.

@override
def shutdown(self) -> None:
self.force_flush()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Optional

from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
from amazon.opentelemetry.distro.metric_attribute_generator import MetricAttributeGenerator
from opentelemetry.sdk.metrics import Histogram, Meter, MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.sampling import Sampler

# Metric instrument configuration constants
_ERROR: str = "Error"
Expand All @@ -26,6 +29,7 @@ class AwsSpanMetricsProcessorBuilder:

# Optional builder elements
_generator: MetricAttributeGenerator = _DEFAULT_GENERATOR
_sampler: Optional[Sampler] = None
_scope_name: str = _DEFAULT_SCOPE_NAME

def __init__(self, meter_provider: MeterProvider, resource: Resource):
Expand All @@ -52,6 +56,16 @@ def set_scope_name(self, scope_name: str) -> "AwsSpanMetricsProcessorBuilder":
self._scope_name = scope_name
return self

def set_sampler(self, sampler: Sampler) -> "AwsSpanMetricsProcessorBuilder":
"""
Sets the sampler used to determine if the spans should be sampled This will be used
to increase sampling rate in the case of errors.
"""
if sampler is None:
raise ValueError("sampler must not be None")
self._sampler = sampler
return self

def build(self) -> AwsSpanMetricsProcessor:
meter: Meter = self._meter_provider.get_meter(self._scope_name)
error_histogram: Histogram = meter.create_histogram(_ERROR)
Expand All @@ -68,5 +82,6 @@ def build(self) -> AwsSpanMetricsProcessor:
latency_histogram,
self._generator,
self._resource,
self._sampler,
self._meter_provider.force_flush,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from types import MappingProxyType
from typing import Optional

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY,
AWS_XRAY_SAMPLING_RULE,
)
from opentelemetry.sdk.trace.sampling import Decision, SamplingResult
from opentelemetry.trace import TraceState
from opentelemetry.util.types import Attributes, AttributeValue


class _AwsSamplingResult(SamplingResult):
AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "xrsr"

def __init__(
self,
decision: Decision,
attributes: "Attributes" = None,
trace_state: Optional["TraceState"] = None,
sampling_rule_name: Optional[str] = None,
sampling_rule_hash: Optional[str] = None,
has_adaptive_sampling_config: bool = False,
):
# Define attributes that will be set by super()
self.decision = decision
self.trace_state = None
self.attributes = None

super().__init__(decision, attributes, trace_state)

# super will have defined self.attributes by this point
self.__add_attribute(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY, has_adaptive_sampling_config)
if sampling_rule_name is not None:
self.__add_attribute(AWS_XRAY_SAMPLING_RULE, sampling_rule_name)

if self.trace_state is None:
self.trace_state = TraceState()
if self.trace_state.get(self.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) is None:
self.trace_state = self.trace_state.add(self.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, sampling_rule_hash)

self._sampling_rule_name = sampling_rule_name

def __add_attribute(self, key: str, value: AttributeValue):
self.attributes = MappingProxyType(
{
**self.attributes,
key: value,
}
)
Loading