Skip to content

Commit 03edb9b

Browse files
committed
fix: Add asyncio task context propagation to env vars
Signed-off-by: Cagri Yonca <cagri@ibm.com>
1 parent 09906fb commit 03edb9b

2 files changed

Lines changed: 92 additions & 66 deletions

File tree

src/instana/options.py

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
- GCROptions - Options class for Google cloud Run. Holds settings specific to GCR.
1515
"""
1616

17+
from __future__ import annotations
18+
1719
import logging
1820
import os
19-
from typing import Any, Dict, Sequence, Tuple
21+
from collections.abc import Sequence
22+
from typing import Any
2023

2124
from instana.configurator import config
2225
from instana.log import logger
@@ -41,7 +44,7 @@
4144
class BaseOptions(object):
4245
"""Base class for all option classes. Holds items common to all"""
4346

44-
def __init__(self, **kwds: Dict[str, Any]) -> None:
47+
def __init__(self, **kwds: object) -> None:
4548
self.debug = False
4649
self.log_level = logging.WARN
4750
self.service_name = determine_service_name()
@@ -115,6 +118,11 @@ def set_trace_configurations(self) -> None:
115118
"trace_correlation", True
116119
)
117120

121+
if "INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION" in os.environ:
122+
config["asyncio_task_context_propagation"]["enabled"] = is_truthy(
123+
os.environ["INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION"]
124+
)
125+
118126
self.set_disable_trace_configurations()
119127
self.set_stack_trace_configurations()
120128
self.set_span_filter_configurations()
@@ -319,7 +327,7 @@ def is_span_disabled(self, category=None, span_type=None) -> bool:
319327
# Default: not disabled
320328
return False
321329

322-
def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]:
330+
def get_stack_trace_config(self, span_name: str) -> tuple[str, int]:
323331
"""
324332
Get stack trace configuration for a specific span type.
325333
Technology-specific configuration overrides global configuration.
@@ -357,7 +365,7 @@ class StandardOptions(BaseOptions):
357365
DEFAULT_POLL_RATE = 1
358366
MAX_POLL_RATE = 5
359367

360-
def __init__(self, **kwds: Dict[str, Any]) -> None:
368+
def __init__(self, **kwds: object) -> None:
361369
super(StandardOptions, self).__init__()
362370

363371
self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST)
@@ -367,7 +375,7 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
367375
if not isinstance(self.agent_port, int):
368376
self.agent_port = int(self.agent_port)
369377

370-
def set_secrets(self, secrets: Dict[str, Any]) -> None:
378+
def set_secrets(self, secrets: dict[str, str | list[str]]) -> None:
371379
"""
372380
Set the secret option from the agent config.
373381
@param secrets: dictionary of secrets
@@ -376,7 +384,7 @@ def set_secrets(self, secrets: Dict[str, Any]) -> None:
376384
self.secrets_matcher = secrets["matcher"]
377385
self.secrets_list = secrets["list"]
378386

379-
def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
387+
def set_extra_headers(self, extra_headers: list[str]) -> None:
380388
"""
381389
Set the extra headers option from the agent config, which uses the legacy configuration setting.
382390
@param extra_headers: dictionary of headers
@@ -390,41 +398,17 @@ def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
390398
f"Will also capture these custom headers: {self.extra_http_headers}"
391399
)
392400

393-
def set_tracing(self, tracing: Dict[str, Any]) -> None:
401+
def set_tracing(self, tracing: dict[str, Any]) -> None:
394402
"""
395403
Set tracing options from the agent config.
396404
@param tracing: tracing configuration dictionary
397405
@return: None
398406
"""
399407
if "filter" in tracing and not self._has_high_priority_span_filter_source():
400-
parsed = parse_filter_rules(tracing["filter"])
401-
for policy in ("exclude", "include"):
402-
rules = parsed.get(policy, [])
403-
if rules:
404-
if policy not in self.span_filters:
405-
self.span_filters[policy] = []
406-
self.span_filters[policy].extend(rules)
408+
self._apply_agent_filter_config(tracing["filter"])
407409

408410
if "kafka" in tracing:
409-
if (
410-
"INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
411-
and not (
412-
isinstance(config.get("tracing"), dict)
413-
and "kafka" in config["tracing"]
414-
)
415-
and "trace-correlation" in tracing["kafka"]
416-
):
417-
self.kafka_trace_correlation = is_truthy(
418-
tracing["kafka"].get("trace-correlation", True)
419-
)
420-
421-
if (
422-
"header-format" in tracing["kafka"]
423-
and tracing["kafka"]["header-format"] == "binary"
424-
):
425-
logger.warning(
426-
"Binary header format for Kafka is deprecated. Please use string header format."
427-
)
411+
self._apply_agent_kafka_config(tracing["kafka"])
428412

429413
if "extra-http-headers" in tracing:
430414
self.extra_http_headers = tracing["extra-http-headers"]
@@ -436,6 +420,32 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None:
436420
# Handle stack trace configuration from agent config
437421
self.set_stack_trace_from_agent(tracing)
438422

423+
def _apply_agent_filter_config(self, filter_config: dict[str, Any]) -> None:
424+
"""Apply span filter rules from agent config."""
425+
parsed = parse_filter_rules(filter_config)
426+
for policy in ("exclude", "include"):
427+
rules = parsed.get(policy, [])
428+
if rules:
429+
if policy not in self.span_filters:
430+
self.span_filters[policy] = []
431+
self.span_filters[policy].extend(rules)
432+
433+
def _apply_agent_kafka_config(self, kafka_config: dict[str, str | bool]) -> None:
434+
"""Apply Kafka tracing configuration from agent config."""
435+
no_env_override = "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
436+
no_code_override = not (
437+
isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]
438+
)
439+
if no_env_override and no_code_override and "trace-correlation" in kafka_config:
440+
self.kafka_trace_correlation = is_truthy(
441+
kafka_config.get("trace-correlation", True)
442+
)
443+
444+
if kafka_config.get("header-format") == "binary":
445+
logger.warning(
446+
"Binary header format for Kafka is deprecated. Please use string header format."
447+
)
448+
439449
def _has_high_priority_span_filter_source(self) -> bool:
440450
"""Return True if a higher-priority span filter source (env var, YAML, or in-code config)
441451
has already been configured, in which case the agent-provided filter should be ignored."""
@@ -469,7 +479,7 @@ def _should_apply_agent_global_config(self) -> bool:
469479
return not (has_env_vars or has_yaml_config or has_in_code_config)
470480

471481
def _apply_agent_global_stack_trace_config(
472-
self, global_config: Dict[str, Any]
482+
self, global_config: dict[str, Any]
473483
) -> None:
474484
"""Apply global stack trace configuration from agent config."""
475485
if "stack-trace" in global_config and (
@@ -486,7 +496,7 @@ def _apply_agent_global_stack_trace_config(
486496
):
487497
self.stack_trace_length = validated_length
488498

489-
def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
499+
def _apply_agent_tech_stack_trace_config(self, tracing: dict[str, Any]) -> None:
490500
"""Apply technology-specific stack trace configuration from agent config."""
491501
for tech_name, tech_config in tracing.items():
492502
if tech_name == "global" or not isinstance(tech_config, dict):
@@ -502,7 +512,7 @@ def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
502512
if tech_stack_config:
503513
self.stack_trace_technology_config[tech_name] = tech_stack_config
504514

505-
def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
515+
def set_stack_trace_from_agent(self, tracing: dict[str, Any]) -> None:
506516
"""
507517
Set stack trace configuration from agent config (configuration.yaml).
508518
Only applies if not already set by higher priority sources.
@@ -517,7 +527,7 @@ def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
517527
if not self.stack_trace_technology_config:
518528
self._apply_agent_tech_stack_trace_config(tracing)
519529

520-
def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
530+
def set_disable_tracing(self, tracing_config: Sequence[dict[str, Any]]) -> None:
521531
# The precedence is as follows:
522532
# environment variables > in-code (local) config > agent config (configuration.yaml)
523533
if (
@@ -533,7 +543,7 @@ def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
533543
self.disabled_spans.extend(disabled_spans)
534544
self.enabled_spans.extend(enabled_spans)
535545

536-
def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
546+
def set_poll_rate(self, plugin_config: dict[str, Any]) -> None:
537547
"""Set poll rate from agent plugin configuration."""
538548
poll_rate_value = plugin_config.get("poll_rate")
539549
if poll_rate_value is None:
@@ -561,7 +571,7 @@ def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
561571
)
562572
self.poll_rate = self.DEFAULT_POLL_RATE
563573

564-
def set_from(self, res_data: Dict[str, Any]) -> None:
574+
def set_from(self, res_data: dict[str, Any]) -> None:
565575
"""
566576
Set the source identifiers given to use by the Instana Host agent.
567577
@param res_data: source identifiers provided as announce response
@@ -591,7 +601,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
591601
class ServerlessOptions(BaseOptions):
592602
"""Base class for serverless environments. Holds settings common to all serverless environments."""
593603

594-
def __init__(self, **kwds: Dict[str, Any]) -> None:
604+
def __init__(self, **kwds: object) -> None:
595605
super(ServerlessOptions, self).__init__()
596606

597607
self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None)
@@ -601,16 +611,10 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
601611
if self.endpoint_url is not None and self.endpoint_url[-1] == "/":
602612
self.endpoint_url = self.endpoint_url[:-1]
603613

604-
if "INSTANA_DISABLE_CA_CHECK" in os.environ:
605-
self.ssl_verify = False
606-
else:
607-
self.ssl_verify = True
614+
self.ssl_verify = "INSTANA_DISABLE_CA_CHECK" not in os.environ
608615

609616
proxy = os.environ.get("INSTANA_ENDPOINT_PROXY", None)
610-
if proxy is None:
611-
self.endpoint_proxy = {}
612-
else:
613-
self.endpoint_proxy = {"https": proxy}
617+
self.endpoint_proxy = {"https": proxy} if proxy else {}
614618

615619
timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None)
616620
if timeout_in_ms is None:
@@ -631,33 +635,38 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
631635

632636
value = os.environ.get("INSTANA_LOG_LEVEL", None)
633637
if value is not None:
634-
try:
635-
value = value.lower()
636-
if value == "debug":
637-
self.log_level = logging.DEBUG
638-
elif value == "info":
639-
self.log_level = logging.INFO
640-
elif value == "warn" or value == "warning":
641-
self.log_level = logging.WARNING
642-
elif value == "error":
643-
self.log_level = logging.ERROR
644-
else:
645-
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
646-
except Exception:
647-
logger.debug("BaseAgent.update_log_level: ", exc_info=True)
638+
self._apply_log_level(value)
639+
640+
def _apply_log_level(self, value: str) -> None:
641+
"""Set log_level from a raw INSTANA_LOG_LEVEL string."""
642+
_LOG_LEVELS = {
643+
"debug": logging.DEBUG,
644+
"info": logging.INFO,
645+
"warn": logging.WARNING,
646+
"warning": logging.WARNING,
647+
"error": logging.ERROR,
648+
}
649+
try:
650+
level = _LOG_LEVELS.get(value.lower())
651+
if level is not None:
652+
self.log_level = level
653+
else:
654+
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
655+
except Exception:
656+
logger.debug("BaseAgent.update_log_level: ", exc_info=True)
648657

649658

650659
class AWSLambdaOptions(ServerlessOptions):
651660
"""Options class for AWS Lambda. Holds settings specific to AWS Lambda."""
652661

653-
def __init__(self, **kwds: Dict[str, Any]) -> None:
662+
def __init__(self, **kwds: object) -> None:
654663
super(AWSLambdaOptions, self).__init__()
655664

656665

657666
class AWSFargateOptions(ServerlessOptions):
658667
"""Options class for AWS Fargate. Holds settings specific to AWS Fargate."""
659668

660-
def __init__(self, **kwds: Dict[str, Any]) -> None:
669+
def __init__(self, **kwds: object) -> None:
661670
super(AWSFargateOptions, self).__init__()
662671

663672
self.tags = None
@@ -682,12 +691,12 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
682691
class EKSFargateOptions(AWSFargateOptions):
683692
"""Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate."""
684693

685-
def __init__(self, **kwds: Dict[str, Any]) -> None:
694+
def __init__(self, **kwds: object) -> None:
686695
super(EKSFargateOptions, self).__init__()
687696

688697

689698
class GCROptions(ServerlessOptions):
690699
"""Options class for Google Cloud Run. Holds settings specific to Google Cloud Run."""
691700

692-
def __init__(self, **kwds: Dict[str, Any]) -> None:
701+
def __init__(self, **kwds: object) -> None:
693702
super(GCROptions, self).__init__()

tests/test_options.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,23 @@ def test_tracing_filter_environment_variables(self) -> None:
864864
],
865865
}
866866

867+
def test_asyncio_task_context_propagation_default(self) -> None:
868+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION is False by default."""
869+
self.base_options = BaseOptions()
870+
assert config["asyncio_task_context_propagation"]["enabled"] is False
871+
872+
@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "true"})
873+
def test_asyncio_task_context_propagation_enabled_via_env(self) -> None:
874+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=true enables the flag."""
875+
self.base_options = BaseOptions()
876+
assert config["asyncio_task_context_propagation"]["enabled"] is True
877+
878+
@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "false"})
879+
def test_asyncio_task_context_propagation_disabled_via_env(self) -> None:
880+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=false keeps the flag disabled."""
881+
self.base_options = BaseOptions()
882+
assert config["asyncio_task_context_propagation"]["enabled"] is False
883+
867884

868885
class TestStandardOptions:
869886
@pytest.fixture(autouse=True)

0 commit comments

Comments
 (0)