Skip to content

Commit 5d21edd

Browse files
committed
fix: Add asyncio task context propagation to env vars
Signed-off-by: Cagri Yonca <cagri@ibm.com>
1 parent 09906fb commit 5d21edd

2 files changed

Lines changed: 91 additions & 66 deletions

File tree

src/instana/options.py

Lines changed: 74 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import logging
1818
import os
19-
from typing import Any, Dict, Sequence, Tuple
19+
from typing import Any, Sequence, Union
2020

2121
from instana.configurator import config
2222
from instana.log import logger
@@ -41,7 +41,7 @@
4141
class BaseOptions(object):
4242
"""Base class for all option classes. Holds items common to all"""
4343

44-
def __init__(self, **kwds: Dict[str, Any]) -> None:
44+
def __init__(self, **kwds: dict[str, Any]) -> None:
4545
self.debug = False
4646
self.log_level = logging.WARN
4747
self.service_name = determine_service_name()
@@ -115,6 +115,11 @@ def set_trace_configurations(self) -> None:
115115
"trace_correlation", True
116116
)
117117

118+
if "INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION" in os.environ:
119+
config["asyncio_task_context_propagation"]["enabled"] = is_truthy(
120+
os.environ["INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION"]
121+
)
122+
118123
self.set_disable_trace_configurations()
119124
self.set_stack_trace_configurations()
120125
self.set_span_filter_configurations()
@@ -319,7 +324,7 @@ def is_span_disabled(self, category=None, span_type=None) -> bool:
319324
# Default: not disabled
320325
return False
321326

322-
def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]:
327+
def get_stack_trace_config(self, span_name: str) -> tuple[str, int]:
323328
"""
324329
Get stack trace configuration for a specific span type.
325330
Technology-specific configuration overrides global configuration.
@@ -357,7 +362,7 @@ class StandardOptions(BaseOptions):
357362
DEFAULT_POLL_RATE = 1
358363
MAX_POLL_RATE = 5
359364

360-
def __init__(self, **kwds: Dict[str, Any]) -> None:
365+
def __init__(self, **kwds: object) -> None:
361366
super(StandardOptions, self).__init__()
362367

363368
self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST)
@@ -367,7 +372,7 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
367372
if not isinstance(self.agent_port, int):
368373
self.agent_port = int(self.agent_port)
369374

370-
def set_secrets(self, secrets: Dict[str, Any]) -> None:
375+
def set_secrets(self, secrets: dict[str, Union[str, list[str]]]) -> None:
371376
"""
372377
Set the secret option from the agent config.
373378
@param secrets: dictionary of secrets
@@ -376,7 +381,7 @@ def set_secrets(self, secrets: Dict[str, Any]) -> None:
376381
self.secrets_matcher = secrets["matcher"]
377382
self.secrets_list = secrets["list"]
378383

379-
def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
384+
def set_extra_headers(self, extra_headers: list[str]) -> None:
380385
"""
381386
Set the extra headers option from the agent config, which uses the legacy configuration setting.
382387
@param extra_headers: dictionary of headers
@@ -390,41 +395,17 @@ def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
390395
f"Will also capture these custom headers: {self.extra_http_headers}"
391396
)
392397

393-
def set_tracing(self, tracing: Dict[str, Any]) -> None:
398+
def set_tracing(self, tracing: dict[str, Any]) -> None:
394399
"""
395400
Set tracing options from the agent config.
396401
@param tracing: tracing configuration dictionary
397402
@return: None
398403
"""
399404
if "filter" in tracing and not self._has_high_priority_span_filter_source():
400-
parsed = parse_filter_rules(tracing["filter"])
401-
for policy in ("exclude", "include"):
402-
rules = parsed.get(policy, [])
403-
if rules:
404-
if policy not in self.span_filters:
405-
self.span_filters[policy] = []
406-
self.span_filters[policy].extend(rules)
405+
self._apply_agent_filter_config(tracing["filter"])
407406

408407
if "kafka" in tracing:
409-
if (
410-
"INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
411-
and not (
412-
isinstance(config.get("tracing"), dict)
413-
and "kafka" in config["tracing"]
414-
)
415-
and "trace-correlation" in tracing["kafka"]
416-
):
417-
self.kafka_trace_correlation = is_truthy(
418-
tracing["kafka"].get("trace-correlation", True)
419-
)
420-
421-
if (
422-
"header-format" in tracing["kafka"]
423-
and tracing["kafka"]["header-format"] == "binary"
424-
):
425-
logger.warning(
426-
"Binary header format for Kafka is deprecated. Please use string header format."
427-
)
408+
self._apply_agent_kafka_config(tracing["kafka"])
428409

429410
if "extra-http-headers" in tracing:
430411
self.extra_http_headers = tracing["extra-http-headers"]
@@ -436,6 +417,34 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None:
436417
# Handle stack trace configuration from agent config
437418
self.set_stack_trace_from_agent(tracing)
438419

420+
def _apply_agent_filter_config(self, filter_config: dict[str, Any]) -> None:
421+
"""Apply span filter rules from agent config."""
422+
parsed = parse_filter_rules(filter_config)
423+
for policy in ("exclude", "include"):
424+
rules = parsed.get(policy, [])
425+
if rules:
426+
if policy not in self.span_filters:
427+
self.span_filters[policy] = []
428+
self.span_filters[policy].extend(rules)
429+
430+
def _apply_agent_kafka_config(
431+
self, kafka_config: dict[str, Union[str, bool]]
432+
) -> None:
433+
"""Apply Kafka tracing configuration from agent config."""
434+
no_env_override = "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
435+
no_code_override = not (
436+
isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]
437+
)
438+
if no_env_override and no_code_override and "trace-correlation" in kafka_config:
439+
self.kafka_trace_correlation = is_truthy(
440+
kafka_config.get("trace-correlation", True)
441+
)
442+
443+
if kafka_config.get("header-format") == "binary":
444+
logger.warning(
445+
"Binary header format for Kafka is deprecated. Please use string header format."
446+
)
447+
439448
def _has_high_priority_span_filter_source(self) -> bool:
440449
"""Return True if a higher-priority span filter source (env var, YAML, or in-code config)
441450
has already been configured, in which case the agent-provided filter should be ignored."""
@@ -469,7 +478,7 @@ def _should_apply_agent_global_config(self) -> bool:
469478
return not (has_env_vars or has_yaml_config or has_in_code_config)
470479

471480
def _apply_agent_global_stack_trace_config(
472-
self, global_config: Dict[str, Any]
481+
self, global_config: dict[str, Any]
473482
) -> None:
474483
"""Apply global stack trace configuration from agent config."""
475484
if "stack-trace" in global_config and (
@@ -486,7 +495,7 @@ def _apply_agent_global_stack_trace_config(
486495
):
487496
self.stack_trace_length = validated_length
488497

489-
def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
498+
def _apply_agent_tech_stack_trace_config(self, tracing: dict[str, Any]) -> None:
490499
"""Apply technology-specific stack trace configuration from agent config."""
491500
for tech_name, tech_config in tracing.items():
492501
if tech_name == "global" or not isinstance(tech_config, dict):
@@ -502,7 +511,7 @@ def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
502511
if tech_stack_config:
503512
self.stack_trace_technology_config[tech_name] = tech_stack_config
504513

505-
def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
514+
def set_stack_trace_from_agent(self, tracing: dict[str, Any]) -> None:
506515
"""
507516
Set stack trace configuration from agent config (configuration.yaml).
508517
Only applies if not already set by higher priority sources.
@@ -517,7 +526,7 @@ def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
517526
if not self.stack_trace_technology_config:
518527
self._apply_agent_tech_stack_trace_config(tracing)
519528

520-
def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
529+
def set_disable_tracing(self, tracing_config: Sequence[dict[str, Any]]) -> None:
521530
# The precedence is as follows:
522531
# environment variables > in-code (local) config > agent config (configuration.yaml)
523532
if (
@@ -533,7 +542,7 @@ def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
533542
self.disabled_spans.extend(disabled_spans)
534543
self.enabled_spans.extend(enabled_spans)
535544

536-
def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
545+
def set_poll_rate(self, plugin_config: dict[str, Any]) -> None:
537546
"""Set poll rate from agent plugin configuration."""
538547
poll_rate_value = plugin_config.get("poll_rate")
539548
if poll_rate_value is None:
@@ -561,7 +570,7 @@ def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
561570
)
562571
self.poll_rate = self.DEFAULT_POLL_RATE
563572

564-
def set_from(self, res_data: Dict[str, Any]) -> None:
573+
def set_from(self, res_data: dict[str, Any]) -> None:
565574
"""
566575
Set the source identifiers given to use by the Instana Host agent.
567576
@param res_data: source identifiers provided as announce response
@@ -591,7 +600,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
591600
class ServerlessOptions(BaseOptions):
592601
"""Base class for serverless environments. Holds settings common to all serverless environments."""
593602

594-
def __init__(self, **kwds: Dict[str, Any]) -> None:
603+
def __init__(self, **kwds: dict[str, Any]) -> None:
595604
super(ServerlessOptions, self).__init__()
596605

597606
self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None)
@@ -601,16 +610,10 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
601610
if self.endpoint_url is not None and self.endpoint_url[-1] == "/":
602611
self.endpoint_url = self.endpoint_url[:-1]
603612

604-
if "INSTANA_DISABLE_CA_CHECK" in os.environ:
605-
self.ssl_verify = False
606-
else:
607-
self.ssl_verify = True
613+
self.ssl_verify = "INSTANA_DISABLE_CA_CHECK" not in os.environ
608614

609615
proxy = os.environ.get("INSTANA_ENDPOINT_PROXY", None)
610-
if proxy is None:
611-
self.endpoint_proxy = {}
612-
else:
613-
self.endpoint_proxy = {"https": proxy}
616+
self.endpoint_proxy = {"https": proxy} if proxy else {}
614617

615618
timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None)
616619
if timeout_in_ms is None:
@@ -631,33 +634,38 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
631634

632635
value = os.environ.get("INSTANA_LOG_LEVEL", None)
633636
if value is not None:
634-
try:
635-
value = value.lower()
636-
if value == "debug":
637-
self.log_level = logging.DEBUG
638-
elif value == "info":
639-
self.log_level = logging.INFO
640-
elif value == "warn" or value == "warning":
641-
self.log_level = logging.WARNING
642-
elif value == "error":
643-
self.log_level = logging.ERROR
644-
else:
645-
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
646-
except Exception:
647-
logger.debug("BaseAgent.update_log_level: ", exc_info=True)
637+
self._apply_log_level(value)
638+
639+
def _apply_log_level(self, value: str) -> None:
640+
"""Set log_level from a raw INSTANA_LOG_LEVEL string."""
641+
_LOG_LEVELS = {
642+
"debug": logging.DEBUG,
643+
"info": logging.INFO,
644+
"warn": logging.WARNING,
645+
"warning": logging.WARNING,
646+
"error": logging.ERROR,
647+
}
648+
try:
649+
level = _LOG_LEVELS.get(value.lower())
650+
if level is not None:
651+
self.log_level = level
652+
else:
653+
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
654+
except Exception:
655+
logger.debug("BaseAgent.update_log_level: ", exc_info=True)
648656

649657

650658
class AWSLambdaOptions(ServerlessOptions):
651659
"""Options class for AWS Lambda. Holds settings specific to AWS Lambda."""
652660

653-
def __init__(self, **kwds: Dict[str, Any]) -> None:
661+
def __init__(self, **kwds: dict[str, Any]) -> None:
654662
super(AWSLambdaOptions, self).__init__()
655663

656664

657665
class AWSFargateOptions(ServerlessOptions):
658666
"""Options class for AWS Fargate. Holds settings specific to AWS Fargate."""
659667

660-
def __init__(self, **kwds: Dict[str, Any]) -> None:
668+
def __init__(self, **kwds: dict[str, Any]) -> None:
661669
super(AWSFargateOptions, self).__init__()
662670

663671
self.tags = None
@@ -682,12 +690,12 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
682690
class EKSFargateOptions(AWSFargateOptions):
683691
"""Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate."""
684692

685-
def __init__(self, **kwds: Dict[str, Any]) -> None:
693+
def __init__(self, **kwds: dict[str, Any]) -> None:
686694
super(EKSFargateOptions, self).__init__()
687695

688696

689697
class GCROptions(ServerlessOptions):
690698
"""Options class for Google Cloud Run. Holds settings specific to Google Cloud Run."""
691699

692-
def __init__(self, **kwds: Dict[str, Any]) -> None:
700+
def __init__(self, **kwds: dict[str, Any]) -> None:
693701
super(GCROptions, self).__init__()

tests/test_options.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,23 @@ def test_tracing_filter_environment_variables(self) -> None:
864864
],
865865
}
866866

867+
def test_asyncio_task_context_propagation_default(self) -> None:
868+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION is False by default."""
869+
self.base_options = BaseOptions()
870+
assert config["asyncio_task_context_propagation"]["enabled"] is False
871+
872+
@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "true"})
873+
def test_asyncio_task_context_propagation_enabled_via_env(self) -> None:
874+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=true enables the flag."""
875+
self.base_options = BaseOptions()
876+
assert config["asyncio_task_context_propagation"]["enabled"] is True
877+
878+
@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "false"})
879+
def test_asyncio_task_context_propagation_disabled_via_env(self) -> None:
880+
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=false keeps the flag disabled."""
881+
self.base_options = BaseOptions()
882+
assert config["asyncio_task_context_propagation"]["enabled"] is False
883+
867884

868885
class TestStandardOptions:
869886
@pytest.fixture(autouse=True)

0 commit comments

Comments
 (0)