Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 74 additions & 66 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
import os
from typing import Any, Dict, Sequence, Tuple
from typing import Any, Sequence, Union

from instana.configurator import config
from instana.log import logger
Expand All @@ -41,7 +41,7 @@
class BaseOptions(object):
"""Base class for all option classes. Holds items common to all"""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
self.debug = False
self.log_level = logging.WARN
self.service_name = determine_service_name()
Expand Down Expand Up @@ -115,6 +115,11 @@ def set_trace_configurations(self) -> None:
"trace_correlation", True
)

if "INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION" in os.environ:
config["asyncio_task_context_propagation"]["enabled"] = is_truthy(
os.environ["INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION"]
)

self.set_disable_trace_configurations()
self.set_stack_trace_configurations()
self.set_span_filter_configurations()
Expand Down Expand Up @@ -319,7 +324,7 @@ def is_span_disabled(self, category=None, span_type=None) -> bool:
# Default: not disabled
return False

def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]:
def get_stack_trace_config(self, span_name: str) -> tuple[str, int]:
"""
Get stack trace configuration for a specific span type.
Technology-specific configuration overrides global configuration.
Expand Down Expand Up @@ -357,7 +362,7 @@ class StandardOptions(BaseOptions):
DEFAULT_POLL_RATE = 1
MAX_POLL_RATE = 5

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(StandardOptions, self).__init__()

self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST)
Expand All @@ -367,7 +372,7 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
if not isinstance(self.agent_port, int):
self.agent_port = int(self.agent_port)

def set_secrets(self, secrets: Dict[str, Any]) -> None:
def set_secrets(self, secrets: dict[str, Union[str, list[str]]]) -> None:
"""
Set the secret option from the agent config.
@param secrets: dictionary of secrets
Expand All @@ -376,7 +381,7 @@ def set_secrets(self, secrets: Dict[str, Any]) -> None:
self.secrets_matcher = secrets["matcher"]
self.secrets_list = secrets["list"]

def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
def set_extra_headers(self, extra_headers: list[str]) -> None:
"""
Set the extra headers option from the agent config, which uses the legacy configuration setting.
@param extra_headers: dictionary of headers
Expand All @@ -390,41 +395,17 @@ def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None:
f"Will also capture these custom headers: {self.extra_http_headers}"
)

def set_tracing(self, tracing: Dict[str, Any]) -> None:
def set_tracing(self, tracing: dict[str, Any]) -> None:
"""
Set tracing options from the agent config.
@param tracing: tracing configuration dictionary
@return: None
"""
if "filter" in tracing and not self._has_high_priority_span_filter_source():
parsed = parse_filter_rules(tracing["filter"])
for policy in ("exclude", "include"):
rules = parsed.get(policy, [])
if rules:
if policy not in self.span_filters:
self.span_filters[policy] = []
self.span_filters[policy].extend(rules)
self._apply_agent_filter_config(tracing["filter"])

if "kafka" in tracing:
if (
"INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
and not (
isinstance(config.get("tracing"), dict)
and "kafka" in config["tracing"]
)
and "trace-correlation" in tracing["kafka"]
):
self.kafka_trace_correlation = is_truthy(
tracing["kafka"].get("trace-correlation", True)
)

if (
"header-format" in tracing["kafka"]
and tracing["kafka"]["header-format"] == "binary"
):
logger.warning(
"Binary header format for Kafka is deprecated. Please use string header format."
)
self._apply_agent_kafka_config(tracing["kafka"])

if "extra-http-headers" in tracing:
self.extra_http_headers = tracing["extra-http-headers"]
Expand All @@ -436,6 +417,34 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None:
# Handle stack trace configuration from agent config
self.set_stack_trace_from_agent(tracing)

def _apply_agent_filter_config(self, filter_config: dict[str, Any]) -> None:
"""Apply span filter rules from agent config."""
parsed = parse_filter_rules(filter_config)
for policy in ("exclude", "include"):
rules = parsed.get(policy, [])
if rules:
if policy not in self.span_filters:
self.span_filters[policy] = []
self.span_filters[policy].extend(rules)

def _apply_agent_kafka_config(
self, kafka_config: dict[str, Union[str, bool]]
) -> None:
"""Apply Kafka tracing configuration from agent config."""
no_env_override = "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
no_code_override = not (
isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]
)
if no_env_override and no_code_override and "trace-correlation" in kafka_config:
self.kafka_trace_correlation = is_truthy(
kafka_config.get("trace-correlation", True)
)

if kafka_config.get("header-format") == "binary":
logger.warning(
"Binary header format for Kafka is deprecated. Please use string header format."
)

def _has_high_priority_span_filter_source(self) -> bool:
"""Return True if a higher-priority span filter source (env var, YAML, or in-code config)
has already been configured, in which case the agent-provided filter should be ignored."""
Expand Down Expand Up @@ -469,7 +478,7 @@ def _should_apply_agent_global_config(self) -> bool:
return not (has_env_vars or has_yaml_config or has_in_code_config)

def _apply_agent_global_stack_trace_config(
self, global_config: Dict[str, Any]
self, global_config: dict[str, Any]
) -> None:
"""Apply global stack trace configuration from agent config."""
if "stack-trace" in global_config and (
Expand All @@ -486,7 +495,7 @@ def _apply_agent_global_stack_trace_config(
):
self.stack_trace_length = validated_length

def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
def _apply_agent_tech_stack_trace_config(self, tracing: dict[str, Any]) -> None:
"""Apply technology-specific stack trace configuration from agent config."""
for tech_name, tech_config in tracing.items():
if tech_name == "global" or not isinstance(tech_config, dict):
Expand All @@ -502,7 +511,7 @@ def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None:
if tech_stack_config:
self.stack_trace_technology_config[tech_name] = tech_stack_config

def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
def set_stack_trace_from_agent(self, tracing: dict[str, Any]) -> None:
"""
Set stack trace configuration from agent config (configuration.yaml).
Only applies if not already set by higher priority sources.
Expand All @@ -517,7 +526,7 @@ def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None:
if not self.stack_trace_technology_config:
self._apply_agent_tech_stack_trace_config(tracing)

def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
def set_disable_tracing(self, tracing_config: Sequence[dict[str, Any]]) -> None:
# The precedence is as follows:
# environment variables > in-code (local) config > agent config (configuration.yaml)
if (
Expand All @@ -533,7 +542,7 @@ def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None:
self.disabled_spans.extend(disabled_spans)
self.enabled_spans.extend(enabled_spans)

def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
def set_poll_rate(self, plugin_config: dict[str, Any]) -> None:
"""Set poll rate from agent plugin configuration."""
poll_rate_value = plugin_config.get("poll_rate")
if poll_rate_value is None:
Expand Down Expand Up @@ -561,7 +570,7 @@ def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None:
)
self.poll_rate = self.DEFAULT_POLL_RATE

def set_from(self, res_data: Dict[str, Any]) -> None:
def set_from(self, res_data: dict[str, Any]) -> None:
"""
Set the source identifiers given to use by the Instana Host agent.
@param res_data: source identifiers provided as announce response
Expand Down Expand Up @@ -591,7 +600,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
class ServerlessOptions(BaseOptions):
"""Base class for serverless environments. Holds settings common to all serverless environments."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(ServerlessOptions, self).__init__()

self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None)
Expand All @@ -601,16 +610,10 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
if self.endpoint_url is not None and self.endpoint_url[-1] == "/":
self.endpoint_url = self.endpoint_url[:-1]

if "INSTANA_DISABLE_CA_CHECK" in os.environ:
self.ssl_verify = False
else:
self.ssl_verify = True
self.ssl_verify = "INSTANA_DISABLE_CA_CHECK" not in os.environ

proxy = os.environ.get("INSTANA_ENDPOINT_PROXY", None)
if proxy is None:
self.endpoint_proxy = {}
else:
self.endpoint_proxy = {"https": proxy}
self.endpoint_proxy = {"https": proxy} if proxy else {}

timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None)
if timeout_in_ms is None:
Expand All @@ -631,33 +634,38 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:

value = os.environ.get("INSTANA_LOG_LEVEL", None)
if value is not None:
try:
value = value.lower()
if value == "debug":
self.log_level = logging.DEBUG
elif value == "info":
self.log_level = logging.INFO
elif value == "warn" or value == "warning":
self.log_level = logging.WARNING
elif value == "error":
self.log_level = logging.ERROR
else:
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
except Exception:
logger.debug("BaseAgent.update_log_level: ", exc_info=True)
self._apply_log_level(value)

def _apply_log_level(self, value: str) -> None:
"""Set log_level from a raw INSTANA_LOG_LEVEL string."""
_LOG_LEVELS = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARNING,
"warning": logging.WARNING,
"error": logging.ERROR,
}
try:
level = _LOG_LEVELS.get(value.lower())
if level is not None:
self.log_level = level
else:
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
except Exception:
logger.debug("BaseAgent.update_log_level: ", exc_info=True)


class AWSLambdaOptions(ServerlessOptions):
"""Options class for AWS Lambda. Holds settings specific to AWS Lambda."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(AWSLambdaOptions, self).__init__()


class AWSFargateOptions(ServerlessOptions):
"""Options class for AWS Fargate. Holds settings specific to AWS Fargate."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(AWSFargateOptions, self).__init__()

self.tags = None
Expand All @@ -682,12 +690,12 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
class EKSFargateOptions(AWSFargateOptions):
"""Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(EKSFargateOptions, self).__init__()


class GCROptions(ServerlessOptions):
"""Options class for Google Cloud Run. Holds settings specific to Google Cloud Run."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
def __init__(self, **kwds: dict[str, Any]) -> None:
super(GCROptions, self).__init__()
17 changes: 17 additions & 0 deletions tests/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,23 @@ def test_tracing_filter_environment_variables(self) -> None:
],
}

def test_asyncio_task_context_propagation_default(self) -> None:
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION is False by default."""
self.base_options = BaseOptions()
assert config["asyncio_task_context_propagation"]["enabled"] is False

@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "true"})
def test_asyncio_task_context_propagation_enabled_via_env(self) -> None:
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=true enables the flag."""
self.base_options = BaseOptions()
assert config["asyncio_task_context_propagation"]["enabled"] is True

@patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "false"})
def test_asyncio_task_context_propagation_disabled_via_env(self) -> None:
"""INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=false keeps the flag disabled."""
self.base_options = BaseOptions()
assert config["asyncio_task_context_propagation"]["enabled"] is False


class TestStandardOptions:
@pytest.fixture(autouse=True)
Expand Down
Loading