55import os
66import re
77from logging import Logger , getLogger
8+ from pathlib import Path
89from typing import ClassVar , Dict , List , NamedTuple , Optional , Type , Union
910
11+ import yaml
1012from importlib_metadata import version
1113from typing_extensions import override
1214
2527from amazon .opentelemetry .distro .aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
2628from amazon .opentelemetry .distro .exporter .console .logs .compact_console_log_exporter import CompactConsoleLogExporter
2729from amazon .opentelemetry .distro .otlp_udp_exporter import OTLPUdpSpanExporter
30+ from amazon .opentelemetry .distro .sampler ._aws_xray_adaptive_sampling_config import (
31+ _AnomalyCaptureLimit ,
32+ _AnomalyConditions ,
33+ _AWSXRayAdaptiveSamplingConfig ,
34+ _UsageType ,
35+ )
2836from amazon .opentelemetry .distro .sampler .aws_xray_remote_sampler import AwsXRayRemoteSampler
2937from amazon .opentelemetry .distro .scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3038from amazon .opentelemetry .distro .scope_based_filtering_view import ScopeBasedRetainingView
96104OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
97105OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"
98106OTEL_AWS_ENHANCED_CODE_ATTRIBUTES = "OTEL_AWS_EXPERIMENTAL_CODE_ATTRIBUTES"
107+ AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = "AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG"
99108
100109XRAY_SERVICE = "xray"
101110LOGS_SERIVCE = "logs"
@@ -243,7 +252,8 @@ def _init_tracing(
243252 sampler : Sampler = None ,
244253 resource : Resource = None ,
245254):
246- sampler = _customize_sampler (sampler )
255+ original_sampler = sampler
256+ sampler = _customize_sampler (original_sampler )
247257
248258 trace_provider : TracerProvider = TracerProvider (
249259 id_generator = id_generator ,
@@ -254,12 +264,12 @@ def _init_tracing(
254264 for _ , exporter_class in exporters .items ():
255265 exporter_args : Dict [str , any ] = {}
256266 span_exporter : SpanExporter = exporter_class (** exporter_args )
257- span_exporter = _customize_span_exporter (span_exporter , resource )
267+ span_exporter = _customize_span_exporter (span_exporter , resource , original_sampler )
258268 trace_provider .add_span_processor (
259269 BatchSpanProcessor (span_exporter = span_exporter , max_export_batch_size = _span_export_batch_size ())
260270 )
261271
262- _customize_span_processors (trace_provider , resource )
272+ _customize_span_processors (trace_provider , resource , original_sampler )
263273
264274 set_tracer_provider (trace_provider )
265275
@@ -397,12 +407,24 @@ def _custom_import_sampler(sampler_name: str, resource: Resource) -> Sampler:
397407
398408
399409def _customize_sampler (sampler : Sampler ) -> Sampler :
410+ if isinstance (sampler , AwsXRayRemoteSampler ):
411+ config = os .environ .get (AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG )
412+ parsed_config = None
413+
414+ try :
415+ parsed_config = _parse_config_string (config )
416+ except ValueError as error :
417+ _logger .warning ("Failed to parse adaptive sampling configuration: %s" , str (error ))
418+
419+ if parsed_config is not None :
420+ sampler .set_adaptive_sampling_config (parsed_config )
421+
400422 if not _is_application_signals_enabled ():
401423 return sampler
402424 return AlwaysRecordSampler (sampler )
403425
404426
405- def _customize_span_exporter (span_exporter : SpanExporter , resource : Resource ) -> SpanExporter :
427+ def _customize_span_exporter (span_exporter : SpanExporter , resource : Resource , sampler : Sampler = None ) -> SpanExporter :
406428 traces_endpoint = os .environ .get (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT )
407429 if _is_lambda_environment ():
408430 # Override OTLP http default endpoint to UDP
@@ -425,7 +447,10 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
425447 if not _is_application_signals_enabled ():
426448 return span_exporter
427449
428- return AwsMetricAttributesSpanExporterBuilder (span_exporter , resource ).build ()
450+ span_exporter = AwsMetricAttributesSpanExporterBuilder (span_exporter , resource ).build ()
451+ if sampler is not None and isinstance (sampler , AwsXRayRemoteSampler ):
452+ sampler .set_span_exporter (span_exporter )
453+ return span_exporter
429454
430455
431456def _customize_log_record_processor (logger_provider : LoggerProvider , log_exporter : Optional [LogExporter ]) -> None :
@@ -472,7 +497,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
472497 return log_exporter
473498
474499
475- def _customize_span_processors (provider : TracerProvider , resource : Resource ) -> None :
500+ def _customize_span_processors (provider : TracerProvider , resource : Resource , sampler : Sampler ) -> None :
476501
477502 if is_enhanced_code_attributes () is True :
478503 # pylint: disable=import-outside-toplevel
@@ -518,7 +543,7 @@ def session_id_predicate(baggage_key: str) -> bool:
518543 )
519544 meter_provider : MeterProvider = MeterProvider (resource = resource , metric_readers = [periodic_exporting_metric_reader ])
520545 # Construct and set application signals metrics processor
521- provider .add_span_processor (AwsSpanMetricsProcessorBuilder (meter_provider , resource ).build ())
546+ provider .add_span_processor (AwsSpanMetricsProcessorBuilder (meter_provider , resource ).set_sampler ( sampler ). build ())
522547
523548 return
524549
@@ -898,3 +923,66 @@ def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
898923 except Exception as errors :
899924 _logger .error ("Failed to create AWS OTLP exporter: %s" , errors )
900925 return None
926+
927+
928+ def _parse_config_string (config : str ) -> Optional [_AWSXRayAdaptiveSamplingConfig ]:
929+ if config is None :
930+ return None
931+
932+ # Check if the config is a file path and the file exists
933+ path = Path (config )
934+ if path .exists ():
935+ try :
936+ config = path .read_text (encoding = "utf-8" )
937+ except IOError as err :
938+ raise ValueError (f"Failed to read adaptive sampling configuration file: { err } " ) from err
939+ elif config .endswith (".yml" ) or config .endswith (".yaml" ):
940+ raise ValueError ("Adaptive sampling configuration file must be a YAML file" )
941+ else :
942+ _logger .debug ("Adaptive sampling configuration is not a file path, assuming it's a YAML string" )
943+
944+ # Parse YAML config
945+ config_map = yaml .safe_load (config )
946+ if not isinstance (config_map , dict ):
947+ raise ValueError ("Adaptive sampling configuration must be a valid YAML mapping" )
948+
949+ # Ensure only relevant data is in the YAML configuration
950+ for key in config_map :
951+ if key not in ["version" , "anomalyConditions" , "anomalyCaptureLimit" ]:
952+ raise ValueError (f"Invalid key in adaptive sampling configuration: { key } " )
953+
954+ version_obj = config_map .get ("version" )
955+ if version_obj is None :
956+ raise ValueError ("Missing required 'version' field in adaptive sampling configuration" )
957+
958+ config_version = float (version_obj )
959+ if config_version < 1.0 or config_version >= 2.0 :
960+ raise ValueError (
961+ f"Incompatible adaptive sampling config version: { config_version } . "
962+ "This version of the AWS X-Ray remote sampler only supports version 1.X."
963+ )
964+
965+ # Parse anomaly conditions
966+ anomaly_conditions = None
967+ if "anomalyConditions" in config_map :
968+ anomaly_conditions = [
969+ _AnomalyConditions (
970+ error_code_regex = cond .get ("errorCodeRegex" ),
971+ operations = cond .get ("operations" ),
972+ high_latency_ms = cond .get ("highLatencyMs" ),
973+ usage = _UsageType (cond ["usage" ]) if "usage" in cond else None ,
974+ )
975+ for cond in config_map ["anomalyConditions" ]
976+ ]
977+
978+ # Parse anomaly capture limit
979+ anomaly_capture_limit = None
980+ if "anomalyCaptureLimit" in config_map :
981+ anomaly_capture_limit_data = config_map ["anomalyCaptureLimit" ]
982+ anomaly_capture_limit = _AnomalyCaptureLimit (
983+ anomaly_traces_per_second = anomaly_capture_limit_data ["anomalyTracesPerSecond" ]
984+ )
985+
986+ return _AWSXRayAdaptiveSamplingConfig (
987+ version = config_version , anomaly_conditions = anomaly_conditions , anomaly_capture_limit = anomaly_capture_limit
988+ )
0 commit comments