Skip to content

Commit cc9c0b8

Browse files
committed
Ensure thread safety + parsing errors don't crash instrumentation
1 parent 930a1ec commit cc9c0b8

3 files changed

Lines changed: 130 additions & 66 deletions

File tree

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
5+
# pylint: disable=too-many-lines
6+
47
import logging
58
import os
69
import re
@@ -413,7 +416,8 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
413416

414417
try:
415418
parsed_config = _parse_config_string(config)
416-
except ValueError as error:
419+
# pylint: disable=broad-exception-caught
420+
except Exception as error:
417421
_logger.warning("Failed to parse adaptive sampling configuration: %s", str(error))
418422

419423
if parsed_config is not None:
@@ -925,6 +929,7 @@ def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
925929
return None
926930

927931

932+
# pylint: disable=too-many-return-statements,too-many-branches
928933
def _parse_config_string(config: str) -> Optional[_AWSXRayAdaptiveSamplingConfig]:
929934
if config is None:
930935
return None
@@ -935,54 +940,71 @@ def _parse_config_string(config: str) -> Optional[_AWSXRayAdaptiveSamplingConfig
935940
try:
936941
config = path.read_text(encoding="utf-8")
937942
except IOError as err:
938-
raise ValueError(f"Failed to read adaptive sampling configuration file: {err}") from err
943+
_logger.warning("Failed to read adaptive sampling configuration file: %s", err)
944+
return None
939945
elif config.endswith(".yml") or config.endswith(".yaml"):
940-
raise ValueError("Adaptive sampling configuration file must be a YAML file")
946+
_logger.warning("Adaptive sampling configuration file must be a YAML file")
947+
return None
941948
else:
942949
_logger.debug("Adaptive sampling configuration is not a file path, assuming it's a YAML string")
943950

944951
# Parse YAML config
945-
config_map = yaml.safe_load(config)
952+
config_map = None
953+
try:
954+
config_map = yaml.safe_load(config)
955+
# pylint: disable=broad-exception-caught
956+
except Exception as exception:
957+
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping: %s", exception)
946958
if not isinstance(config_map, dict):
947-
raise ValueError("Adaptive sampling configuration must be a valid YAML mapping")
959+
_logger.warning("Adaptive sampling configuration must be a valid YAML mapping")
960+
return None
948961

949962
# Ensure only relevant data is in the YAML configuration
950963
for key in config_map:
951964
if key not in ["version", "anomalyConditions", "anomalyCaptureLimit"]:
952-
raise ValueError(f"Invalid key in adaptive sampling configuration: {key}")
965+
_logger.warning("Invalid key in adaptive sampling configuration: %s", key)
966+
return None
953967

954968
version_obj = config_map.get("version")
955969
if version_obj is None:
956-
raise ValueError("Missing required 'version' field in adaptive sampling configuration")
970+
_logger.warning("Missing required 'version' field in adaptive sampling configuration")
971+
return None
957972

958-
config_version = float(version_obj)
959-
if config_version < 1.0 or config_version >= 2.0:
960-
raise ValueError(
961-
f"Incompatible adaptive sampling config version: {config_version}. "
962-
"This version of the AWS X-Ray remote sampler only supports version 1.X."
963-
)
973+
try:
974+
config_version = float(version_obj)
975+
if config_version < 1.0 or config_version >= 2.0:
976+
_logger.warning(
977+
"Incompatible adaptive sampling config version: %s. "
978+
"This version of the AWS X-Ray remote sampler only supports version 1.X.",
979+
config_version,
980+
)
981+
return None
964982

965-
# Parse anomaly conditions
966-
anomaly_conditions = None
967-
if "anomalyConditions" in config_map:
968-
anomaly_conditions = [
969-
_AnomalyConditions(
970-
error_code_regex=cond.get("errorCodeRegex"),
971-
operations=cond.get("operations"),
972-
high_latency_ms=cond.get("highLatencyMs"),
973-
usage=_UsageType(cond["usage"]) if "usage" in cond else None,
983+
# Parse anomaly conditions
984+
anomaly_conditions = None
985+
if "anomalyConditions" in config_map:
986+
anomaly_conditions = [
987+
_AnomalyConditions(
988+
error_code_regex=cond.get("errorCodeRegex"),
989+
operations=cond.get("operations"),
990+
high_latency_ms=cond.get("highLatencyMs"),
991+
usage=_UsageType(cond["usage"]) if "usage" in cond else None,
992+
)
993+
for cond in config_map["anomalyConditions"]
994+
]
995+
996+
# Parse anomaly capture limit
997+
anomaly_capture_limit = None
998+
if "anomalyCaptureLimit" in config_map:
999+
anomaly_capture_limit_data = config_map["anomalyCaptureLimit"]
1000+
anomaly_capture_limit = _AnomalyCaptureLimit(
1001+
anomaly_traces_per_second=anomaly_capture_limit_data["anomalyTracesPerSecond"]
9741002
)
975-
for cond in config_map["anomalyConditions"]
976-
]
9771003

978-
# Parse anomaly capture limit
979-
anomaly_capture_limit = None
980-
if "anomalyCaptureLimit" in config_map:
981-
anomaly_capture_limit_data = config_map["anomalyCaptureLimit"]
982-
anomaly_capture_limit = _AnomalyCaptureLimit(
983-
anomaly_traces_per_second=anomaly_capture_limit_data["anomalyTracesPerSecond"]
1004+
return _AWSXRayAdaptiveSamplingConfig(
1005+
version=config_version, anomaly_conditions=anomaly_conditions, anomaly_capture_limit=anomaly_capture_limit
9841006
)
1007+
except ValueError as err:
1008+
_logger.warning("Failed to load AWS X-Ray adaptive sampling configuration: %s", err)
9851009

986-
return _AWSXRayAdaptiveSamplingConfig(
987-
version=config_version, anomaly_conditions=anomaly_conditions, anomaly_capture_limit=anomaly_capture_limit
988-
)
1010+
return None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_rule_cache.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def __init__(
5959
self._adaptive_sampling_rule_exists = False
6060
self._adaptive_sampling_config: Optional[_AWSXRayAdaptiveSamplingConfig] = None
6161
self._anomaly_capture_rate_limiter: Optional[_RateLimiter] = None
62+
self._trace_usage_cache_lock = Lock()
6263
self._trace_usage_cache: TTLCache[int, _UsageType] = TTLCache(
6364
maxsize=TRACE_USAGE_CACHE_MAX_SIZE,
6465
ttl=TRACE_USAGE_CACHE_TTL_SECONDS,
@@ -151,8 +152,12 @@ def adapt_sampling(self, span: ReadableSpan, span_batcher: Callable[[ReadableSpa
151152
should_capture_anomaly_span = result.should_capture_anomaly_span
152153

153154
trace_id: int = span.context.trace_id
154-
existing_usage: _UsageType = self._trace_usage_cache.get(trace_id)
155-
is_new_trace: bool = existing_usage is None
155+
is_new_trace: bool = False
156+
with self._trace_usage_cache_lock:
157+
existing_usage: _UsageType = self._trace_usage_cache.get(trace_id)
158+
is_new_trace = existing_usage is None
159+
if existing_usage is None:
160+
self._trace_usage_cache[trace_id] = _UsageType.NEITHER
156161

157162
# Anomaly Capture
158163
is_span_captured = False
@@ -275,25 +280,26 @@ def __is_anomaly(self, span: ReadableSpan) -> "_AnomalyDetectionResult":
275280
def __update_trace_usage_cache(
276281
self, trace_id: int, is_span_captured: bool, is_counted_as_anomaly_for_boost: bool
277282
) -> None:
278-
existing_usage = self._trace_usage_cache.get(trace_id)
283+
with self._trace_usage_cache_lock:
284+
existing_usage = self._trace_usage_cache.get(trace_id)
279285

280-
# Any interaction with a cache entry will reset the expiration timer of that entry
281-
if is_span_captured and is_counted_as_anomaly_for_boost:
282-
self._trace_usage_cache[trace_id] = _UsageType.BOTH
283-
elif is_span_captured:
284-
if _UsageType.is_used_for_boost(existing_usage):
285-
self._trace_usage_cache[trace_id] = _UsageType.BOTH
286-
else:
287-
self._trace_usage_cache[trace_id] = _UsageType.ANOMALY_TRACE_CAPTURE
288-
elif is_counted_as_anomaly_for_boost:
289-
if _UsageType.is_used_for_anomaly_trace_capture(existing_usage):
286+
# Any interaction with a cache entry will reset the expiration timer of that entry
287+
if is_span_captured and is_counted_as_anomaly_for_boost:
290288
self._trace_usage_cache[trace_id] = _UsageType.BOTH
289+
elif is_span_captured:
290+
if _UsageType.is_used_for_boost(existing_usage):
291+
self._trace_usage_cache[trace_id] = _UsageType.BOTH
292+
else:
293+
self._trace_usage_cache[trace_id] = _UsageType.ANOMALY_TRACE_CAPTURE
294+
elif is_counted_as_anomaly_for_boost:
295+
if _UsageType.is_used_for_anomaly_trace_capture(existing_usage):
296+
self._trace_usage_cache[trace_id] = _UsageType.BOTH
297+
else:
298+
self._trace_usage_cache[trace_id] = _UsageType.SAMPLING_BOOST
299+
elif existing_usage is not None:
300+
self._trace_usage_cache[trace_id] = existing_usage
291301
else:
292-
self._trace_usage_cache[trace_id] = _UsageType.SAMPLING_BOOST
293-
elif existing_usage is not None:
294-
self._trace_usage_cache[trace_id] = existing_usage
295-
else:
296-
self._trace_usage_cache[trace_id] = _UsageType.NEITHER
302+
self._trace_usage_cache[trace_id] = _UsageType.NEITHER
297303

298304
def update_sampling_rules(self, new_sampling_rules: List[_SamplingRule]) -> None:
299305
new_sampling_rules.sort()

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -351,19 +351,46 @@ def test_parse_adaptive_sampling_config_null(self):
351351
self.assertIsNone(result)
352352

353353
def test_parse_adaptive_sampling_config_missing_version(self):
354-
"""Tests that _parse_config_string raises exception for missing version"""
355-
with self.assertRaises(Exception):
356-
_parse_config_string("")
354+
"""Tests that _parse_config_string returns None for missing version and logs warning"""
355+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger:
356+
result = _parse_config_string("anomalyConditions: []")
357+
self.assertIsNone(result)
358+
mock_logger.warning.assert_called_with(
359+
"Missing required 'version' field in adaptive sampling configuration"
360+
)
357361

358362
def test_parse_adaptive_sampling_config_unsupported_version(self):
359-
"""Tests that _parse_config_string raises exception for unsupported version"""
360-
with self.assertRaises(ValueError):
361-
_parse_config_string("{version: 5000.1}")
363+
"""Tests that _parse_config_string returns None for unsupported version and logs warning"""
364+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger:
365+
result = _parse_config_string("{version: 5000.1}")
366+
self.assertIsNone(result)
367+
mock_logger.warning.assert_called_with(
368+
"Incompatible adaptive sampling config version: %s. "
369+
"This version of the AWS X-Ray remote sampler only supports version 1.X.",
370+
5000.1,
371+
)
362372

363373
def test_parse_adaptive_sampling_config_invalid_yaml(self):
364-
"""Tests that _parse_config_string raises exception for invalid YAML"""
365-
with self.assertRaises(Exception):
366-
_parse_config_string("{version: 1, invalid: yaml: structure}")
374+
"""Tests that _parse_config_string returns None for invalid YAML and logs warning"""
375+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger_1:
376+
result = _parse_config_string("{version: 1, invalid: yaml: structure}")
377+
self.assertIsNone(result)
378+
mock_logger_1.warning.assert_called()
379+
warning_calls = [str(call) for call in mock_logger_1.warning.call_args_list]
380+
self.assertTrue(
381+
any("must be a valid YAML mapping" in call for call in warning_calls),
382+
f"Expected warning about invalid YAML, got: {warning_calls}",
383+
)
384+
385+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger_2:
386+
result = _parse_config_string("{version: 1, {anomalyCaptureLimit: {anomalyTracesPerSecond: 1}}}")
387+
self.assertIsNone(result)
388+
mock_logger_2.warning.assert_called()
389+
warning_calls = [str(call) for call in mock_logger_2.warning.call_args_list]
390+
self.assertTrue(
391+
any("must be a valid YAML mapping" in call for call in warning_calls),
392+
f"Expected warning about invalid YAML, got: {warning_calls}",
393+
)
367394

368395
def test_parse_adaptive_sampling_config_from_file_valid(self):
369396
"""Tests that _parse_config_string correctly parses a valid YAML file"""
@@ -376,20 +403,29 @@ def test_parse_adaptive_sampling_config_from_file_valid(self):
376403
self.assertEqual(config.anomaly_capture_limit.anomaly_traces_per_second, 10)
377404

378405
def test_parse_adaptive_sampling_config_from_file_invalid(self):
379-
"""Tests that _parse_config_string raises exception for invalid YAML file"""
406+
"""Tests that _parse_config_string returns None for invalid YAML file and logs warning"""
380407
import os
381408

382409
config_path = os.path.join(DATA_DIR, "adaptive-sampling-config-invalid.yaml")
383-
with self.assertRaises(Exception):
384-
_parse_config_string(config_path)
410+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger:
411+
result = _parse_config_string(config_path)
412+
self.assertIsNone(result)
413+
mock_logger.warning.assert_called()
414+
warning_calls = [str(call) for call in mock_logger.warning.call_args_list]
415+
self.assertTrue(
416+
any("Failed to load AWS X-Ray adaptive sampling configuration" in call for call in warning_calls),
417+
f"Expected warning about failed configuration loading, got: {warning_calls}",
418+
)
385419

386420
def test_parse_adaptive_sampling_config_from_file_non_existant(self):
387-
"""Tests that _parse_config_string raises exception for non-existent YAML file"""
421+
"""Tests that _parse_config_string returns None for non-existent YAML file and logs warning"""
388422
import os
389423

390424
config_path = os.path.join(DATA_DIR, "done.yaml")
391-
with self.assertRaises(ValueError):
392-
_parse_config_string(config_path)
425+
with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger:
426+
result = _parse_config_string(config_path)
427+
self.assertIsNone(result)
428+
mock_logger.warning.assert_called_with("Adaptive sampling configuration file must be a YAML file")
393429

394430
def test_customize_span_exporter(self):
395431
mock_exporter: SpanExporter = MagicMock(spec=OTLPSpanExporter)

0 commit comments

Comments
 (0)