Skip to content

Commit 2b4d0ac

Browse files
Adaptive Sampling Support (#576)
1 parent 1d390bd commit 2b4d0ac

32 files changed

Lines changed: 2112 additions & 311 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
1212

1313
## Unreleased
1414

15+
- Adaptive Sampling support
16+
([#576](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/576))
1517
- Fix: Support new fields in X-Ray API responses
1618
([#577](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/577))
1719
- Sign Lambda layer by AWS Signer

aws-opentelemetry-distro/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ dependencies = [
8383
"opentelemetry-instrumentation-urllib3 == 0.54b1",
8484
"opentelemetry-instrumentation-wsgi == 0.54b1",
8585
"opentelemetry-instrumentation-cassandra == 0.54b1",
86+
"cachetools == 6.2.4",
87+
"pyyaml == 6.0.3",
8688
]
8789

8890
[project.optional-dependencies]

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
AWS_TRACE_FLAG_SAMPLED: str = "aws.trace.flag.sampled"
1616
AWS_TRACE_LAMBDA_FLAG_MULTIPLE_SERVER: str = "aws.trace.lambda.multiple-server"
1717
AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER: str = "aws.remote.resource.cfn.primary.identifier"
18+
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY: str = "aws.xray.adaptive_sampling_configured"
19+
AWS_XRAY_SAMPLING_RULE: str = "aws.xray.sampling_rule"
1820

1921
# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
2022
# TODO:Move to Semantic Conventions when these attributes are added.

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
from typing_extensions import override
66

7-
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_CONSUMER_PARENT_SPAN_KIND, AWS_SDK_DESCENDANT
7+
from amazon.opentelemetry.distro._aws_attribute_keys import (
8+
AWS_CONSUMER_PARENT_SPAN_KIND,
9+
AWS_SDK_DESCENDANT,
10+
AWS_TRACE_FLAG_SAMPLED,
11+
)
812
from amazon.opentelemetry.distro._aws_span_processing_util import is_aws_sdk_span, is_local_root
913
from opentelemetry.context import Context
1014
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
@@ -80,6 +84,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None
8084

8185
if propagation_data is not None:
8286
span.set_attribute(self._propagation_data_key, propagation_data)
87+
span.set_attribute(AWS_TRACE_FLAG_SAMPLED, span.get_span_context().trace_flags.sampled)
8388

8489
# pylint: disable=no-self-use
8590
@override

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
5+
# pylint: disable=too-many-lines
6+
47
import logging
58
import os
69
import re
710
from logging import Logger, getLogger
11+
from pathlib import Path
812
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
913

14+
import yaml
1015
from importlib_metadata import version
1116
from typing_extensions import override
1217

@@ -25,6 +30,12 @@
2530
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
2631
from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter
2732
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
33+
from amazon.opentelemetry.distro.sampler._aws_xray_adaptive_sampling_config import (
34+
_AnomalyCaptureLimit,
35+
_AnomalyConditions,
36+
_AWSXRayAdaptiveSamplingConfig,
37+
_UsageType,
38+
)
2839
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2940
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3041
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
@@ -96,6 +107,7 @@
96107
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
97108
OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"
98109
OTEL_AWS_ENHANCED_CODE_ATTRIBUTES = "OTEL_AWS_EXPERIMENTAL_CODE_ATTRIBUTES"
110+
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = "AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG"
99111

100112
XRAY_SERVICE = "xray"
101113
LOGS_SERIVCE = "logs"
@@ -243,7 +255,8 @@ def _init_tracing(
243255
sampler: Sampler = None,
244256
resource: Resource = None,
245257
):
246-
sampler = _customize_sampler(sampler)
258+
original_sampler = sampler
259+
sampler = _customize_sampler(original_sampler)
247260

248261
trace_provider: TracerProvider = TracerProvider(
249262
id_generator=id_generator,
@@ -254,12 +267,12 @@ def _init_tracing(
254267
for _, exporter_class in exporters.items():
255268
exporter_args: Dict[str, any] = {}
256269
span_exporter: SpanExporter = exporter_class(**exporter_args)
257-
span_exporter = _customize_span_exporter(span_exporter, resource)
270+
span_exporter = _customize_span_exporter(span_exporter, resource, original_sampler)
258271
trace_provider.add_span_processor(
259272
BatchSpanProcessor(span_exporter=span_exporter, max_export_batch_size=_span_export_batch_size())
260273
)
261274

262-
_customize_span_processors(trace_provider, resource)
275+
_customize_span_processors(trace_provider, resource, original_sampler)
263276

264277
set_tracer_provider(trace_provider)
265278

@@ -397,12 +410,25 @@ def _custom_import_sampler(sampler_name: str, resource: Resource) -> Sampler:
397410

398411

399412
def _customize_sampler(sampler: Sampler) -> Sampler:
413+
if isinstance(sampler, AwsXRayRemoteSampler):
414+
config = os.environ.get(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG)
415+
parsed_config = None
416+
417+
try:
418+
parsed_config = _parse_config_string(config)
419+
# pylint: disable=broad-exception-caught
420+
except Exception as error:
421+
_logger.warning("Failed to parse adaptive sampling configuration: %s", str(error))
422+
423+
if parsed_config is not None:
424+
sampler.set_adaptive_sampling_config(parsed_config)
425+
400426
if not _is_application_signals_enabled():
401427
return sampler
402428
return AlwaysRecordSampler(sampler)
403429

404430

405-
def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
431+
def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource, sampler: Sampler = None) -> SpanExporter:
406432
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
407433
if _is_lambda_environment():
408434
# Override OTLP http default endpoint to UDP
@@ -425,7 +451,10 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
425451
if not _is_application_signals_enabled():
426452
return span_exporter
427453

428-
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
454+
span_exporter = AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
455+
if sampler is not None and isinstance(sampler, AwsXRayRemoteSampler):
456+
sampler.set_span_exporter(span_exporter)
457+
return span_exporter
429458

430459

431460
def _customize_log_record_processor(logger_provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
@@ -472,7 +501,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
472501
return log_exporter
473502

474503

475-
def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
504+
def _customize_span_processors(provider: TracerProvider, resource: Resource, sampler: Sampler) -> None:
476505

477506
if is_enhanced_code_attributes() is True:
478507
# pylint: disable=import-outside-toplevel
@@ -518,7 +547,7 @@ def session_id_predicate(baggage_key: str) -> bool:
518547
)
519548
meter_provider: MeterProvider = MeterProvider(resource=resource, metric_readers=[periodic_exporting_metric_reader])
520549
# Construct and set application signals metrics processor
521-
provider.add_span_processor(AwsSpanMetricsProcessorBuilder(meter_provider, resource).build())
550+
provider.add_span_processor(AwsSpanMetricsProcessorBuilder(meter_provider, resource).set_sampler(sampler).build())
522551

523552
return
524553

@@ -898,3 +927,84 @@ def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
898927
except Exception as errors:
899928
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
900929
return None
930+
931+
932+
# pylint: disable=too-many-return-statements,too-many-branches
933+
def _parse_config_string(config: str) -> Optional[_AWSXRayAdaptiveSamplingConfig]:
934+
if config is None:
935+
return None
936+
937+
# Check if the config is a file path and the file exists
938+
path = Path(config)
939+
if path.exists():
940+
try:
941+
config = path.read_text(encoding="utf-8")
942+
except IOError as err:
943+
_logger.warning("Failed to read adaptive sampling configuration file: %s", err)
944+
return None
945+
elif config.endswith(".yml") or config.endswith(".yaml"):
946+
_logger.warning("Adaptive sampling configuration file must be a YAML file")
947+
return None
948+
else:
949+
_logger.debug("Adaptive sampling configuration is not a file path, assuming it's a YAML string")
950+
951+
# Parse YAML config
952+
config_map = None
953+
try:
954+
config_map = yaml.safe_load(config)
955+
# pylint: disable=broad-exception-caught
956+
except Exception as exception:
957+
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping: %s", exception)
958+
if not isinstance(config_map, dict):
959+
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping")
960+
return None
961+
962+
# Ensure only relevant data is in the YAML configuration
963+
for key in config_map:
964+
if key not in ["version", "anomalyConditions", "anomalyCaptureLimit"]:
965+
_logger.warning("Invalid key in adaptive sampling configuration: %s", key)
966+
return None
967+
968+
version_obj = config_map.get("version")
969+
if version_obj is None:
970+
_logger.warning("Missing required 'version' field in adaptive sampling configuration")
971+
return None
972+
973+
try:
974+
config_version = float(version_obj)
975+
if config_version < 1.0 or config_version >= 2.0:
976+
_logger.warning(
977+
"Incompatible adaptive sampling config version: %s. "
978+
"This version of the AWS X-Ray remote sampler only supports version 1.X.",
979+
config_version,
980+
)
981+
return None
982+
983+
# Parse anomaly conditions
984+
anomaly_conditions = None
985+
if "anomalyConditions" in config_map:
986+
anomaly_conditions = [
987+
_AnomalyConditions(
988+
error_code_regex=cond.get("errorCodeRegex"),
989+
operations=cond.get("operations"),
990+
high_latency_ms=cond.get("highLatencyMs"),
991+
usage=_UsageType(cond["usage"]) if "usage" in cond else None,
992+
)
993+
for cond in config_map["anomalyConditions"]
994+
]
995+
996+
# Parse anomaly capture limit
997+
anomaly_capture_limit = None
998+
if "anomalyCaptureLimit" in config_map:
999+
anomaly_capture_limit_data = config_map["anomalyCaptureLimit"]
1000+
anomaly_capture_limit = _AnomalyCaptureLimit(
1001+
anomaly_traces_per_second=anomaly_capture_limit_data["anomalyTracesPerSecond"]
1002+
)
1003+
1004+
return _AWSXRayAdaptiveSamplingConfig(
1005+
version=config_version, anomaly_conditions=anomaly_conditions, anomaly_capture_limit=anomaly_capture_limit
1006+
)
1007+
except ValueError as err:
1008+
_logger.warning("Failed to load AWS X-Ray adaptive sampling configuration: %s", err)
1009+
1010+
return None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_span_metrics_processor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_REMOTE_SERVICE
88
from amazon.opentelemetry.distro.metric_attribute_generator import MetricAttributeGenerator
9+
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
910
from opentelemetry.context import Context
1011
from opentelemetry.metrics import Histogram
1112
from opentelemetry.sdk.resources import Resource
1213
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan, Span, SpanProcessor, StatusCode
14+
from opentelemetry.sdk.trace.sampling import Sampler
1315
from opentelemetry.semconv.trace import SpanAttributes
1416

1517
_HTTP_STATUS_CODE = SpanAttributes.HTTP_STATUS_CODE
@@ -66,13 +68,15 @@ def __init__(
6668
latency_histogram: Histogram,
6769
generator: MetricAttributeGenerator,
6870
resource: Resource,
71+
sampler: Optional[Sampler],
6972
force_flush_function: Callable = _no_op_function,
7073
):
7174
self._error_histogram = error_histogram
7275
self._fault_histogram = fault_histogram
7376
self._latency_histogram = latency_histogram
7477
self._generator = generator
7578
self._resource = resource
79+
self._sampler = sampler
7680
self._force_flush_function = force_flush_function
7781

7882
# pylint: disable=no-self-use
@@ -89,6 +93,9 @@ def on_end(self, span: ReadableSpan) -> None:
8993
for attributes in attribute_dict.values():
9094
self._record_metrics(span, attributes)
9195

96+
if self._sampler and isinstance(self._sampler, AwsXRayRemoteSampler):
97+
self._sampler.adapt_sampling(span)
98+
9299
@override
93100
def shutdown(self) -> None:
94101
self.force_flush()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_span_metrics_processor_builder.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Optional
4+
35
from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
46
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
57
from amazon.opentelemetry.distro.metric_attribute_generator import MetricAttributeGenerator
68
from opentelemetry.sdk.metrics import Histogram, Meter, MeterProvider
79
from opentelemetry.sdk.resources import Resource
10+
from opentelemetry.sdk.trace.sampling import Sampler
811

912
# Metric instrument configuration constants
1013
_ERROR: str = "Error"
@@ -26,6 +29,7 @@ class AwsSpanMetricsProcessorBuilder:
2629

2730
# Optional builder elements
2831
_generator: MetricAttributeGenerator = _DEFAULT_GENERATOR
32+
_sampler: Optional[Sampler] = None
2933
_scope_name: str = _DEFAULT_SCOPE_NAME
3034

3135
def __init__(self, meter_provider: MeterProvider, resource: Resource):
@@ -52,6 +56,16 @@ def set_scope_name(self, scope_name: str) -> "AwsSpanMetricsProcessorBuilder":
5256
self._scope_name = scope_name
5357
return self
5458

59+
def set_sampler(self, sampler: Sampler) -> "AwsSpanMetricsProcessorBuilder":
60+
"""
61+
Sets the sampler used to determine if the spans should be sampled This will be used
62+
to increase sampling rate in the case of errors.
63+
"""
64+
if sampler is None:
65+
raise ValueError("sampler must not be None")
66+
self._sampler = sampler
67+
return self
68+
5569
def build(self) -> AwsSpanMetricsProcessor:
5670
meter: Meter = self._meter_provider.get_meter(self._scope_name)
5771
error_histogram: Histogram = meter.create_histogram(_ERROR)
@@ -68,5 +82,6 @@ def build(self) -> AwsSpanMetricsProcessor:
6882
latency_histogram,
6983
self._generator,
7084
self._resource,
85+
self._sampler,
7186
self._meter_provider.force_flush,
7287
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from types import MappingProxyType
4+
from typing import Optional
5+
6+
from amazon.opentelemetry.distro._aws_attribute_keys import (
7+
AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY,
8+
AWS_XRAY_SAMPLING_RULE,
9+
)
10+
from opentelemetry.sdk.trace.sampling import Decision, SamplingResult
11+
from opentelemetry.trace import TraceState
12+
from opentelemetry.util.types import Attributes, AttributeValue
13+
14+
15+
class _AwsSamplingResult(SamplingResult):
16+
AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "xrsr"
17+
18+
def __init__(
19+
self,
20+
decision: Decision,
21+
attributes: "Attributes" = None,
22+
trace_state: Optional["TraceState"] = None,
23+
sampling_rule_name: Optional[str] = None,
24+
sampling_rule_hash: Optional[str] = None,
25+
has_adaptive_sampling_config: bool = False,
26+
):
27+
# Define attributes that will be set by super()
28+
self.decision = decision
29+
self.trace_state = None
30+
self.attributes = None
31+
32+
super().__init__(decision, attributes, trace_state)
33+
34+
# super will have defined self.attributes by this point
35+
self.__add_attribute(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIGURED_ATTRIBUTE_KEY, has_adaptive_sampling_config)
36+
if sampling_rule_name is not None:
37+
self.__add_attribute(AWS_XRAY_SAMPLING_RULE, sampling_rule_name)
38+
39+
if self.trace_state is None:
40+
self.trace_state = TraceState()
41+
if self.trace_state.get(self.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) is None:
42+
self.trace_state = self.trace_state.add(self.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, sampling_rule_hash)
43+
44+
self._sampling_rule_name = sampling_rule_name
45+
46+
def __add_attribute(self, key: str, value: AttributeValue):
47+
self.attributes = MappingProxyType(
48+
{
49+
**self.attributes,
50+
key: value,
51+
}
52+
)

0 commit comments

Comments
 (0)