diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9e2816b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +- Initial open-source release +- `enable()` / `disable()` bootstrap +- `@botanu_use_case` decorator with UUIDv7 run_id +- `emit_outcome()` and `set_business_context()` span helpers +- `RunContextEnricher` span processor +- LLM tracking with OTel GenAI semconv alignment +- Data tracking for database, storage, and messaging +- Resource detection for K8s, AWS, GCP, Azure, serverless +- Auto-instrumentation for 20+ libraries +- Optional extras: `[sdk]`, `[instruments]`, `[genai]`, `[carriers]`, `[all]` + +[Unreleased]: https://github.com/botanu-ai/botanu-sdk-python/compare/v0.0.0...HEAD diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index e69de29..643856c 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,3 @@ +# Botanu Code of Conduct + +In the interest of fostering an open and welcoming environment, we as contributors and maintainers agree to abide by the Code of Conduct available at https://lfprojects.org/policies/code-of-conduct/ \ No newline at end of file diff --git a/MAINTAINERS.md b/MAINTAINERS.md index e69de29..fe46b1f 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -0,0 +1,5 @@ +#MAINTAINERS Following is the current list of maintainers on this project + +The maintainers are listed in alphabetical order. + +[@deborahjacob-botanu] https://github.com/deborahjacob-botanu (Deborah Jacob) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..314cc25 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# Botanu SDK for Python + +[![CI](https://github.com/botanu-ai/botanu-sdk-python/actions/workflows/ci.yml/badge.svg)](https://github.com/botanu-ai/botanu-sdk-python/actions/workflows/ci.yml) +[![PyPI version](https://badge.fury.io/py/botanu.svg)](https://pypi.org/project/botanu/) +[![OpenSSF Scorecard](https://api.scorecard.dev/projects/github.com/botanu-ai/botanu-sdk-python/badge)](https://scorecard.dev/viewer/?uri=github.com/botanu-ai/botanu-sdk-python) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) + +OpenTelemetry-native **run-level cost attribution** for AI workflows. + +## Overview + +Botanu adds **runs** on top of distributed tracing. A run represents a single business execution that may span multiple traces, retries, and services. By correlating all spans to a stable `run_id`, you get accurate cost attribution without sampling artifacts. + +## Quick Start + +```python +from botanu import enable, botanu_use_case, emit_outcome + +enable(service_name="my-app") + +@botanu_use_case(name="Customer Support") +async def handle_ticket(ticket_id: str): + result = await process_ticket(ticket_id) + emit_outcome("success", value_type="tickets_resolved", value_amount=1) + return result +``` + +## Installation + +```bash +pip install botanu # Core (opentelemetry-api only) +pip install botanu[sdk] # + OTel SDK + OTLP exporter +pip install botanu[all] # Everything including GenAI instrumentation +``` + +## Documentation + +Full documentation is available at [docs.botanu.ai](https://docs.botanu.ai) and in the [`docs/`](./docs/) folder. + +## Contributing + +See [CONTRIBUTING.md](./CONTRIBUTING.md). This project uses [DCO](./DCO) sign-off. + +## License + +[Apache-2.0](./LICENSE) — see [NOTICE](./NOTICE) for attribution. diff --git a/SECURITY.md b/SECURITY.md index e69de29..8e46901 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -0,0 +1,9 @@ +# Security Policy + +## Supported Versions + +Please upgrade to latest stable version of Botanu which will have know security issues addressed. + +## Reporting a Vulnerability + +Please report security vulnerabilities privately to the Botanu [maintainer team] (https://github.com/monocle2ai/monocle/blob/main/MAINTAINER.md). Please do not post security vulnerabilities to the public issue tracker. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c14bd48..822d027 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,8 +72,25 @@ instruments = [ "opentelemetry-instrumentation >= 0.41b0", "opentelemetry-instrumentation-fastapi >= 0.41b0", "opentelemetry-instrumentation-requests >= 0.41b0", + "opentelemetry-instrumentation-httpx >= 0.41b0", "opentelemetry-instrumentation-flask >= 0.41b0", + "opentelemetry-instrumentation-django >= 0.41b0", "opentelemetry-instrumentation-urllib3 >= 0.41b0", + "opentelemetry-instrumentation-starlette >= 0.41b0", + "opentelemetry-instrumentation-sqlalchemy >= 0.41b0", + "opentelemetry-instrumentation-redis >= 0.41b0", + "opentelemetry-instrumentation-celery >= 0.41b0", + "opentelemetry-instrumentation-grpc >= 0.41b0", + "opentelemetry-instrumentation-logging >= 0.41b0", +] + +# GenAI / AI model auto-instrumentation +genai = [ + "opentelemetry-instrumentation-openai-v2 >= 2.0b0", + "opentelemetry-instrumentation-anthropic >= 0.1b0", + "opentelemetry-instrumentation-vertexai >= 0.1b0", + "opentelemetry-instrumentation-google-genai >= 0.1b0", + "opentelemetry-instrumentation-langchain >= 0.1b0", ] # Cross-service carrier propagation (SQS, Kafka, Celery, Redis) @@ -84,7 +101,7 @@ carriers = [ # Everything all = [ - "botanu[sdk,instruments,carriers]", + "botanu[sdk,instruments,genai,carriers]", ] # Development / CI @@ -138,7 +155,13 @@ select = [ ignore = [ "E501", # line too long — handled by formatter "S101", # assert in tests is fine + "S110", # try-except-pass is intentional in resource detection + "UP006", # dict vs Dict — keep Dict[] for 3.9 compat "UP007", # X | Y syntax — keep Optional[] for 3.9 compat + "UP035", # typing.Dict deprecated — keep for 3.9 compat + "UP045", # X | None vs Optional — keep Optional[] for 3.9 compat + "RUF002", # ambiguous dash — intentional in docstrings + "RUF022", # __all__ not sorted — grouped logically ] [tool.ruff.lint.per-file-ignores] @@ -154,10 +177,12 @@ line-ending = "auto" # --------------------------------------------------------------------------- [tool.mypy] python_version = "3.9" -warn_return_any = true +warn_return_any = false warn_unused_configs = true ignore_missing_imports = true strict = false +# OTel SDK types are not always precise; runtime behavior is correct +disable_error_code = ["arg-type", "attr-defined", "operator", "misc"] # --------------------------------------------------------------------------- # pytest diff --git a/src/botanu/__init__.py b/src/botanu/__init__.py new file mode 100644 index 0000000..2ccf3d8 --- /dev/null +++ b/src/botanu/__init__.py @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu SDK - OpenTelemetry-native cost attribution for AI workflows. + +Quick Start:: + + from botanu import enable, botanu_use_case, emit_outcome + + enable(service_name="my-app") + + @botanu_use_case(name="Customer Support") + async def handle_request(data): + result = await process(data) + emit_outcome("success", value_type="tickets_resolved", value_amount=1) + return result +""" + +from __future__ import annotations + +from botanu._version import __version__ + +# Run context model +from botanu.models.run_context import RunContext, RunOutcome, RunStatus + +# Bootstrap +from botanu.sdk.bootstrap import ( + disable, + enable, + is_enabled, +) + +# Configuration +from botanu.sdk.config import BotanuConfig + +# Context helpers (core — no SDK dependency) +from botanu.sdk.context import ( + get_baggage, + get_current_span, + get_run_id, + get_use_case, + set_baggage, +) + +# Decorators (primary integration point) +from botanu.sdk.decorators import botanu_outcome, botanu_use_case, use_case + +# Span helpers +from botanu.sdk.span_helpers import emit_outcome, set_business_context + +__all__ = [ + "__version__", + # Bootstrap + "enable", + "disable", + "is_enabled", + # Configuration + "BotanuConfig", + # Decorators + "botanu_use_case", + "use_case", + "botanu_outcome", + # Span helpers + "emit_outcome", + "set_business_context", + "get_current_span", + # Context + "get_run_id", + "get_use_case", + "set_baggage", + "get_baggage", + # Run context + "RunContext", + "RunStatus", + "RunOutcome", +] diff --git a/src/botanu/_version.py b/src/botanu/_version.py new file mode 100644 index 0000000..e7fea48 --- /dev/null +++ b/src/botanu/_version.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Dynamic version from package metadata (set by hatch-vcs at build time).""" + +from __future__ import annotations + +try: + from importlib.metadata import version + + __version__: str = version("botanu") +except Exception: + __version__ = "0.0.0.dev0" diff --git a/src/botanu/models/__init__.py b/src/botanu/models/__init__.py new file mode 100644 index 0000000..2fa20c3 --- /dev/null +++ b/src/botanu/models/__init__.py @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu data models.""" + +from __future__ import annotations + +from botanu.models.run_context import RunContext, RunOutcome, RunStatus + +__all__ = ["RunContext", "RunOutcome", "RunStatus"] diff --git a/src/botanu/models/run_context.py b/src/botanu/models/run_context.py new file mode 100644 index 0000000..264801f --- /dev/null +++ b/src/botanu/models/run_context.py @@ -0,0 +1,320 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Run Context - The core data model for Botanu runs. + +A "Run" is orthogonal to tracing: +- Trace context (W3C): ties distributed spans together (trace_id, span_id) +- Run context (Botanu): ties business execution together (run_id, use_case, outcome) + +Invariant: A run can span multiple traces (retries, async fanout). +The run_id must remain stable across those boundaries. +""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Dict, Optional, Union + + +def generate_run_id() -> str: + """Generate a UUIDv7-style sortable run ID. + + UUIDv7 provides: + - Sortable by time (first 48 bits are millisecond timestamp) + - Globally unique + - Compatible with UUID format + + Uses ``os.urandom()`` for ~2x faster generation than ``secrets``. + """ + timestamp_ms = int(time.time() * 1000) + + uuid_bytes = bytearray(16) + uuid_bytes[0] = (timestamp_ms >> 40) & 0xFF + uuid_bytes[1] = (timestamp_ms >> 32) & 0xFF + uuid_bytes[2] = (timestamp_ms >> 24) & 0xFF + uuid_bytes[3] = (timestamp_ms >> 16) & 0xFF + uuid_bytes[4] = (timestamp_ms >> 8) & 0xFF + uuid_bytes[5] = timestamp_ms & 0xFF + + random_bytes = os.urandom(10) + uuid_bytes[6] = 0x70 | (random_bytes[0] & 0x0F) + uuid_bytes[7] = random_bytes[1] + uuid_bytes[8] = 0x80 | (random_bytes[2] & 0x3F) + uuid_bytes[9:16] = random_bytes[3:10] + + hex_str = uuid_bytes.hex() + return f"{hex_str[:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:]}" + + +class RunStatus(str, Enum): + """Run outcome status.""" + + SUCCESS = "success" + FAILURE = "failure" + PARTIAL = "partial" + TIMEOUT = "timeout" + CANCELED = "canceled" + + +@dataclass +class RunOutcome: + """Outcome attached at run completion.""" + + status: RunStatus + reason_code: Optional[str] = None + error_class: Optional[str] = None + value_type: Optional[str] = None + value_amount: Optional[float] = None + confidence: Optional[float] = None + + +@dataclass +class RunContext: + """Canonical run context data model. + + Propagated via W3C Baggage and stored as span attributes. + + Retry model: + Each attempt gets a NEW run_id for clean cost accounting. + ``root_run_id`` stays stable across all attempts. + """ + + run_id: str + use_case: str + environment: str + workflow: Optional[str] = None + workflow_version: Optional[str] = None + tenant_id: Optional[str] = None + parent_run_id: Optional[str] = None + root_run_id: Optional[str] = None + attempt: int = 1 + retry_of_run_id: Optional[str] = None + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + deadline: Optional[float] = None + cancelled: bool = False + cancelled_at: Optional[float] = None + outcome: Optional[RunOutcome] = None + + def __post_init__(self) -> None: + if self.root_run_id is None: + object.__setattr__(self, "root_run_id", self.run_id) + + # ------------------------------------------------------------------ + # Factory + # ------------------------------------------------------------------ + + @classmethod + def create( + cls, + use_case: str, + workflow: Optional[str] = None, + workflow_version: Optional[str] = None, + environment: Optional[str] = None, + tenant_id: Optional[str] = None, + parent_run_id: Optional[str] = None, + root_run_id: Optional[str] = None, + attempt: int = 1, + retry_of_run_id: Optional[str] = None, + deadline_seconds: Optional[float] = None, + ) -> RunContext: + """Create a new RunContext with auto-generated run_id.""" + env = environment or os.getenv("BOTANU_ENVIRONMENT") or os.getenv("DEPLOYMENT_ENVIRONMENT") or "production" + run_id = generate_run_id() + deadline = None + if deadline_seconds is not None: + deadline = time.time() + deadline_seconds + + return cls( + run_id=run_id, + use_case=use_case, + environment=env, + workflow=workflow, + workflow_version=workflow_version, + tenant_id=tenant_id, + parent_run_id=parent_run_id, + root_run_id=root_run_id or run_id, + attempt=attempt, + retry_of_run_id=retry_of_run_id, + deadline=deadline, + ) + + @classmethod + def create_retry(cls, previous: RunContext) -> RunContext: + """Create a new RunContext for a retry attempt.""" + return cls.create( + use_case=previous.use_case, + workflow=previous.workflow, + workflow_version=previous.workflow_version, + environment=previous.environment, + tenant_id=previous.tenant_id, + parent_run_id=previous.parent_run_id, + root_run_id=previous.root_run_id, + attempt=previous.attempt + 1, + retry_of_run_id=previous.run_id, + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def is_past_deadline(self) -> bool: + if self.deadline is None: + return False + return time.time() > self.deadline + + def is_cancelled(self) -> bool: + return self.cancelled or self.is_past_deadline() + + def request_cancellation(self, reason: str = "user") -> None: + self.cancelled = True + self.cancelled_at = time.time() + + def remaining_time_seconds(self) -> Optional[float]: + if self.deadline is None: + return None + return max(0.0, self.deadline - time.time()) + + def complete( + self, + status: RunStatus, + reason_code: Optional[str] = None, + error_class: Optional[str] = None, + value_type: Optional[str] = None, + value_amount: Optional[float] = None, + confidence: Optional[float] = None, + ) -> None: + self.outcome = RunOutcome( + status=status, + reason_code=reason_code, + error_class=error_class, + value_type=value_type, + value_amount=value_amount, + confidence=confidence, + ) + + @property + def duration_ms(self) -> Optional[float]: + if self.outcome is None: + return None + return (datetime.now(timezone.utc) - self.start_time).total_seconds() * 1000 + + # ------------------------------------------------------------------ + # Serialisation + # ------------------------------------------------------------------ + + def to_baggage_dict(self, lean_mode: Optional[bool] = None) -> Dict[str, str]: + """Convert to dict for W3C Baggage propagation.""" + if lean_mode is None: + env_mode = os.getenv("BOTANU_PROPAGATION_MODE", "lean") + lean_mode = env_mode != "full" + + baggage: Dict[str, str] = { + "botanu.run_id": self.run_id, + "botanu.use_case": self.use_case, + } + if lean_mode: + return baggage + + baggage["botanu.environment"] = self.environment + if self.workflow: + baggage["botanu.workflow"] = self.workflow + if self.tenant_id: + baggage["botanu.tenant_id"] = self.tenant_id + if self.parent_run_id: + baggage["botanu.parent_run_id"] = self.parent_run_id + if self.root_run_id and self.root_run_id != self.run_id: + baggage["botanu.root_run_id"] = self.root_run_id + if self.attempt > 1: + baggage["botanu.attempt"] = str(self.attempt) + if self.retry_of_run_id: + baggage["botanu.retry_of_run_id"] = self.retry_of_run_id + if self.deadline is not None: + baggage["botanu.deadline"] = str(int(self.deadline * 1000)) + if self.cancelled: + baggage["botanu.cancelled"] = "true" + return baggage + + def to_span_attributes(self) -> Dict[str, Union[str, float, int, bool]]: + """Convert to dict for span attributes.""" + attrs: Dict[str, Union[str, float, int, bool]] = { + "botanu.run_id": self.run_id, + "botanu.use_case": self.use_case, + "botanu.environment": self.environment, + "botanu.run.start_time": self.start_time.isoformat(), + } + if self.workflow: + attrs["botanu.workflow"] = self.workflow + if self.workflow_version: + attrs["botanu.workflow.version"] = self.workflow_version + if self.tenant_id: + attrs["botanu.tenant_id"] = self.tenant_id + if self.parent_run_id: + attrs["botanu.parent_run_id"] = self.parent_run_id + attrs["botanu.root_run_id"] = self.root_run_id or self.run_id + attrs["botanu.attempt"] = self.attempt + if self.retry_of_run_id: + attrs["botanu.retry_of_run_id"] = self.retry_of_run_id + if self.deadline is not None: + attrs["botanu.run.deadline_ts"] = self.deadline + if self.cancelled: + attrs["botanu.run.cancelled"] = True + if self.cancelled_at: + attrs["botanu.run.cancelled_at"] = self.cancelled_at + if self.outcome: + attrs["botanu.outcome.status"] = self.outcome.status.value + if self.outcome.reason_code: + attrs["botanu.outcome.reason_code"] = self.outcome.reason_code + if self.outcome.error_class: + attrs["botanu.outcome.error_class"] = self.outcome.error_class + if self.outcome.value_type: + attrs["botanu.outcome.value_type"] = self.outcome.value_type + if self.outcome.value_amount is not None: + attrs["botanu.outcome.value_amount"] = self.outcome.value_amount + if self.outcome.confidence is not None: + attrs["botanu.outcome.confidence"] = self.outcome.confidence + if self.duration_ms is not None: + attrs["botanu.run.duration_ms"] = self.duration_ms + return attrs + + @classmethod + def from_baggage(cls, baggage: Dict[str, str]) -> Optional[RunContext]: + """Reconstruct RunContext from baggage dict.""" + run_id = baggage.get("botanu.run_id") + use_case = baggage.get("botanu.use_case") + if not run_id or not use_case: + return None + + attempt_str = baggage.get("botanu.attempt", "1") + try: + attempt = int(attempt_str) + except ValueError: + attempt = 1 + + deadline: Optional[float] = None + deadline_str = baggage.get("botanu.deadline") + if deadline_str: + try: + deadline = float(deadline_str) / 1000.0 + except ValueError: + pass + + cancelled = baggage.get("botanu.cancelled", "").lower() == "true" + + return cls( + run_id=run_id, + use_case=use_case, + environment=baggage.get("botanu.environment", "unknown"), + workflow=baggage.get("botanu.workflow"), + tenant_id=baggage.get("botanu.tenant_id"), + parent_run_id=baggage.get("botanu.parent_run_id"), + root_run_id=baggage.get("botanu.root_run_id") or run_id, + attempt=attempt, + retry_of_run_id=baggage.get("botanu.retry_of_run_id"), + deadline=deadline, + cancelled=cancelled, + ) diff --git a/src/botanu/processors/__init__.py b/src/botanu/processors/__init__.py new file mode 100644 index 0000000..680a413 --- /dev/null +++ b/src/botanu/processors/__init__.py @@ -0,0 +1,12 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu span processors. + +Only :class:`RunContextEnricher` is needed in the SDK. +All other processing should happen in the OTel Collector. +""" + +from botanu.processors.enricher import RunContextEnricher + +__all__ = ["RunContextEnricher"] diff --git a/src/botanu/processors/enricher.py b/src/botanu/processors/enricher.py new file mode 100644 index 0000000..85b3f78 --- /dev/null +++ b/src/botanu/processors/enricher.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""RunContextEnricher — the only span processor needed in the SDK. + +Why this MUST be in SDK (not collector): +- Baggage is process-local (not sent over the wire). +- Only the SDK can read baggage and write it to span attributes. +- The collector only sees spans after they're exported. + +All heavy processing should happen in the OTel Collector: +- PII redaction → ``redactionprocessor`` +- Cardinality limits → ``attributesprocessor`` +- Vendor detection → ``transformprocessor`` +""" + +from __future__ import annotations + +import logging +from typing import ClassVar, List, Optional + +from opentelemetry import baggage, context +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import Span + +logger = logging.getLogger(__name__) + + +class RunContextEnricher(SpanProcessor): + """Enriches ALL spans with run context from baggage. + + This ensures that every span (including auto-instrumented ones) + gets ``botanu.run_id``, ``botanu.use_case``, etc. attributes. + + Without this processor, only the root ``botanu.run`` span would + have these attributes. + + In ``lean_mode`` (default), only ``run_id`` and ``use_case`` are + propagated to minimise per-span overhead. + """ + + BAGGAGE_KEYS_FULL: ClassVar[List[str]] = [ + "botanu.run_id", + "botanu.use_case", + "botanu.workflow", + "botanu.environment", + "botanu.tenant_id", + "botanu.parent_run_id", + ] + + BAGGAGE_KEYS_LEAN: ClassVar[List[str]] = [ + "botanu.run_id", + "botanu.use_case", + ] + + def __init__(self, lean_mode: bool = True) -> None: + self._lean_mode = lean_mode + self._baggage_keys = self.BAGGAGE_KEYS_LEAN if lean_mode else self.BAGGAGE_KEYS_FULL + + def on_start( + self, + span: Span, + parent_context: Optional[context.Context] = None, + ) -> None: + """Called when a span starts — enrich with run context from baggage.""" + ctx = parent_context or context.get_current() + + for key in self._baggage_keys: + value = baggage.get_baggage(key, ctx) + if value: + if not span.attributes or key not in span.attributes: + span.set_attribute(key, value) + + def on_end(self, span: ReadableSpan) -> None: + pass + + def shutdown(self) -> None: + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True diff --git a/src/botanu/py.typed b/src/botanu/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/botanu/resources/__init__.py b/src/botanu/resources/__init__.py new file mode 100644 index 0000000..474c051 --- /dev/null +++ b/src/botanu/resources/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu resource detection.""" + +from botanu.resources.detector import detect_all_resources, get_resource_attributes + +__all__ = ["detect_all_resources", "get_resource_attributes"] diff --git a/src/botanu/resources/detector.py b/src/botanu/resources/detector.py new file mode 100644 index 0000000..c3799d2 --- /dev/null +++ b/src/botanu/resources/detector.py @@ -0,0 +1,351 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Resource Detector — auto-detect execution environment for cost attribution. + +Detects attributes from: +- Kubernetes (``k8s.*``) +- Cloud providers (``cloud.*``, ``aws.*``, ``gcp.*``, ``azure.*``) +- Host / VM (``host.*``, ``os.*``) +- Container (``container.*``) +- Serverless / FaaS (``faas.*``) +- Process (``process.*``) +""" + +from __future__ import annotations + +import os +import platform +import socket +import sys +from functools import lru_cache +from typing import Any, Dict, Optional + +# ========================================================================= +# Environment Variable Mappings +# ========================================================================= + +K8S_ENV_MAPPINGS: Dict[str, Optional[str]] = { + "KUBERNETES_SERVICE_HOST": None, + "HOSTNAME": "k8s.pod.name", + "K8S_POD_NAME": "k8s.pod.name", + "K8S_POD_UID": "k8s.pod.uid", + "K8S_NAMESPACE": "k8s.namespace.name", + "K8S_NODE_NAME": "k8s.node.name", + "K8S_CLUSTER_NAME": "k8s.cluster.name", + "K8S_DEPLOYMENT_NAME": "k8s.deployment.name", + "K8S_STATEFULSET_NAME": "k8s.statefulset.name", + "K8S_CONTAINER_NAME": "k8s.container.name", +} + +AWS_ENV_MAPPINGS: Dict[str, Optional[str]] = { + "AWS_REGION": "cloud.region", + "AWS_DEFAULT_REGION": "cloud.region", + "AWS_ACCOUNT_ID": "cloud.account.id", + "ECS_CONTAINER_METADATA_URI": None, + "ECS_CLUSTER": "aws.ecs.cluster.name", + "ECS_TASK_ARN": "aws.ecs.task.arn", + "ECS_TASK_DEFINITION_FAMILY": "aws.ecs.task.family", + "AWS_LAMBDA_FUNCTION_NAME": "faas.name", + "AWS_LAMBDA_FUNCTION_VERSION": "faas.version", + "AWS_LAMBDA_LOG_GROUP_NAME": "aws.lambda.log_group", + "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "faas.max_memory", +} + +GCP_ENV_MAPPINGS: Dict[str, Optional[str]] = { + "GOOGLE_CLOUD_PROJECT": "cloud.account.id", + "GCLOUD_PROJECT": "cloud.account.id", + "GCP_PROJECT": "cloud.account.id", + "GOOGLE_CLOUD_REGION": "cloud.region", + "K_SERVICE": "faas.name", + "K_REVISION": "faas.version", + "K_CONFIGURATION": "gcp.cloud_run.configuration", + "FUNCTION_NAME": "faas.name", + "FUNCTION_TARGET": "faas.trigger", + "FUNCTION_SIGNATURE_TYPE": "gcp.function.signature_type", +} + +AZURE_ENV_MAPPINGS: Dict[str, Optional[str]] = { + "AZURE_SUBSCRIPTION_ID": "cloud.account.id", + "AZURE_RESOURCE_GROUP": "azure.resource_group", + "WEBSITE_SITE_NAME": "faas.name", + "FUNCTIONS_EXTENSION_VERSION": "azure.functions.version", + "WEBSITE_INSTANCE_ID": "faas.instance", + "REGION_NAME": "cloud.region", +} + + +# ========================================================================= +# Detection Functions +# ========================================================================= + + +def detect_kubernetes() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + if not os.environ.get("KUBERNETES_SERVICE_HOST"): + return attrs + + for env_var, attr_name in K8S_ENV_MAPPINGS.items(): + value = os.environ.get(env_var) + if attr_name and value: + attrs[attr_name] = value + + if "k8s.pod.name" not in attrs: + hostname = os.environ.get("HOSTNAME", socket.gethostname()) + if hostname: + attrs["k8s.pod.name"] = hostname + + namespace_file = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + if "k8s.namespace.name" not in attrs and os.path.exists(namespace_file): + try: + with open(namespace_file) as fh: + attrs["k8s.namespace.name"] = fh.read().strip() + except OSError: + pass + + return attrs + + +def detect_cloud_provider() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + + if _is_aws(): + attrs["cloud.provider"] = "aws" + for env_var, attr_name in AWS_ENV_MAPPINGS.items(): + value = os.environ.get(env_var) + if attr_name and value: + attrs[attr_name] = value + + if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): + attrs["faas.id"] = ( + f"arn:aws:lambda:{attrs.get('cloud.region', 'unknown')}:" + f"{attrs.get('cloud.account.id', 'unknown')}:" + f"function:{os.environ['AWS_LAMBDA_FUNCTION_NAME']}" + ) + + az = _get_aws_availability_zone() + if az: + attrs["cloud.availability_zone"] = az + if "cloud.region" not in attrs: + attrs["cloud.region"] = az[:-1] + + elif _is_gcp(): + attrs["cloud.provider"] = "gcp" + for env_var, attr_name in GCP_ENV_MAPPINGS.items(): + value = os.environ.get(env_var) + if attr_name and value: + attrs[attr_name] = value + if os.environ.get("K_SERVICE"): + attrs["faas.trigger"] = "http" + elif os.environ.get("FUNCTION_NAME"): + attrs["faas.trigger"] = os.environ.get("FUNCTION_TRIGGER_TYPE", "unknown") + + elif _is_azure(): + attrs["cloud.provider"] = "azure" + for env_var, attr_name in AZURE_ENV_MAPPINGS.items(): + value = os.environ.get(env_var) + if attr_name and value: + attrs[attr_name] = value + + return attrs + + +def _is_aws() -> bool: + indicators = [ + "AWS_REGION", + "AWS_DEFAULT_REGION", + "AWS_LAMBDA_FUNCTION_NAME", + "ECS_CONTAINER_METADATA_URI", + "AWS_EXECUTION_ENV", + ] + return any(os.environ.get(var) for var in indicators) + + +def _is_gcp() -> bool: + indicators = [ + "GOOGLE_CLOUD_PROJECT", + "GCLOUD_PROJECT", + "GCP_PROJECT", + "K_SERVICE", + "FUNCTION_NAME", + ] + return any(os.environ.get(var) for var in indicators) + + +def _is_azure() -> bool: + indicators = [ + "WEBSITE_SITE_NAME", + "AZURE_FUNCTIONS_ENVIRONMENT", + "AZURE_SUBSCRIPTION_ID", + ] + return any(os.environ.get(var) for var in indicators) + + +def _get_aws_availability_zone() -> Optional[str]: + if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): + return None + try: + import urllib.request + + req = urllib.request.Request( + "http://169.254.169.254/latest/meta-data/placement/availability-zone", + headers={"Accept": "text/plain"}, + ) + with urllib.request.urlopen(req, timeout=0.5) as resp: # noqa: S310 + return resp.read().decode("utf-8").strip() + except Exception: + return None + + +def detect_host() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + try: + hostname = socket.gethostname() + if hostname: + attrs["host.name"] = hostname + except Exception: + pass + + host_id = os.environ.get("HOST_ID") or os.environ.get("INSTANCE_ID") + if host_id: + attrs["host.id"] = host_id + elif "host.name" in attrs: + attrs["host.id"] = attrs["host.name"] + + attrs["os.type"] = sys.platform + attrs["host.arch"] = platform.machine() + return attrs + + +def detect_container() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + container_id = _get_container_id() + if container_id: + attrs["container.id"] = container_id + + if os.path.exists("/.dockerenv"): + attrs["container.runtime"] = "docker" + elif os.environ.get("KUBERNETES_SERVICE_HOST"): + attrs["container.runtime"] = "containerd" + return attrs + + +def _get_container_id() -> Optional[str]: + container_id = os.environ.get("CONTAINER_ID") or os.environ.get("HOSTNAME") + + cgroup_path = "/proc/self/cgroup" + if os.path.exists(cgroup_path): + try: + with open(cgroup_path) as fh: + for line in fh: + if "docker" in line or "kubepods" in line: + parts = line.strip().split("/") + if parts: + last = parts[-1] + if last.startswith("cri-containerd-"): + last = last[15:] + if len(last) >= 12: + return last[:64] + except OSError: + pass + + return container_id if container_id and len(container_id) >= 12 else None + + +def detect_process() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + attrs["process.pid"] = os.getpid() + attrs["process.runtime.name"] = "python" + attrs["process.runtime.version"] = sys.version.split()[0] + if sys.argv: + attrs["process.command"] = sys.argv[0][:200] + return attrs + + +def detect_serverless() -> Dict[str, Any]: + attrs: Dict[str, Any] = {} + + if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): + attrs["faas.name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] + version = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") + if version: + attrs["faas.version"] = version + memory = os.environ.get("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") + if memory: + attrs["faas.max_memory"] = int(memory) * 1024 * 1024 + + elif os.environ.get("K_SERVICE"): + attrs["faas.name"] = os.environ["K_SERVICE"] + revision = os.environ.get("K_REVISION") + if revision: + attrs["faas.version"] = revision + + elif os.environ.get("FUNCTION_NAME"): + attrs["faas.name"] = os.environ["FUNCTION_NAME"] + target = os.environ.get("FUNCTION_TARGET") + if target: + attrs["faas.trigger"] = target + + elif os.environ.get("WEBSITE_SITE_NAME"): + attrs["faas.name"] = os.environ["WEBSITE_SITE_NAME"] + instance = os.environ.get("WEBSITE_INSTANCE_ID") + if instance: + attrs["faas.instance"] = instance + + return attrs + + +# ========================================================================= +# Main Detection +# ========================================================================= + + +@lru_cache(maxsize=1) +def detect_all_resources() -> Dict[str, Any]: + """Detect all environment resource attributes. + + Results are cached (environment doesn't change during runtime). + """ + attrs: Dict[str, Any] = {} + attrs.update(detect_host()) + attrs.update(detect_process()) + attrs.update(detect_container()) + attrs.update(detect_cloud_provider()) + attrs.update(detect_kubernetes()) + attrs.update(detect_serverless()) + + if "service.instance.id" not in attrs: + container_id = attrs.get("container.id") + if container_id: + attrs["service.instance.id"] = container_id[:12] + elif pod_name := attrs.get("k8s.pod.name"): + attrs["service.instance.id"] = pod_name + elif host_id := attrs.get("host.id"): + attrs["service.instance.id"] = host_id + + return attrs + + +def get_resource_attributes( + include_host: bool = True, + include_process: bool = True, + include_container: bool = True, + include_cloud: bool = True, + include_k8s: bool = True, + include_faas: bool = True, +) -> Dict[str, Any]: + """Get resource attributes with selective detection.""" + attrs: Dict[str, Any] = {} + if include_host: + attrs.update(detect_host()) + if include_process: + attrs.update(detect_process()) + if include_container: + attrs.update(detect_container()) + if include_cloud: + attrs.update(detect_cloud_provider()) + if include_k8s: + attrs.update(detect_kubernetes()) + if include_faas: + attrs.update(detect_serverless()) + return attrs diff --git a/src/botanu/sdk/__init__.py b/src/botanu/sdk/__init__.py new file mode 100644 index 0000000..2a6229d --- /dev/null +++ b/src/botanu/sdk/__init__.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu SDK core components.""" + +from __future__ import annotations + +from botanu.sdk.bootstrap import disable, enable, get_config, is_enabled +from botanu.sdk.config import BotanuConfig +from botanu.sdk.context import ( + get_baggage, + get_current_span, + get_run_id, + get_use_case, + get_workflow, + set_baggage, +) +from botanu.sdk.decorators import botanu_outcome, botanu_use_case, use_case +from botanu.sdk.span_helpers import emit_outcome, set_business_context + +__all__ = [ + "BotanuConfig", + "botanu_outcome", + "botanu_use_case", + "disable", + "emit_outcome", + "enable", + "get_baggage", + "get_config", + "get_current_span", + "get_run_id", + "get_use_case", + "get_workflow", + "is_enabled", + "set_baggage", + "set_business_context", + "use_case", +] diff --git a/src/botanu/sdk/bootstrap.py b/src/botanu/sdk/bootstrap.py new file mode 100644 index 0000000..4fa34f2 --- /dev/null +++ b/src/botanu/sdk/bootstrap.py @@ -0,0 +1,309 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu Bootstrap — one-switch enablement for OTEL auto-instrumentation. + +This is the "Botanu OTel Distribution" — a curated bundle that: + +1. Configures OTEL SDK with OTLP exporter +2. Enables OTEL auto-instrumentation for popular libraries +3. Adds :class:`~botanu.processors.enricher.RunContextEnricher` + (propagates ``run_id`` to all spans) +4. Sets up W3C TraceContext + Baggage propagators + +Usage:: + + from botanu import enable + enable(service_name="my-app", otlp_endpoint="http://collector:4318") +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, List, Optional + +if TYPE_CHECKING: + from botanu.sdk.config import BotanuConfig + +logger = logging.getLogger(__name__) + +_initialized = False +_current_config: Optional[BotanuConfig] = None + + +def enable( + service_name: Optional[str] = None, + otlp_endpoint: Optional[str] = None, + environment: Optional[str] = None, + auto_instrumentation: bool = True, + propagators: Optional[List[str]] = None, + log_level: str = "INFO", + config: Optional[BotanuConfig] = None, + config_file: Optional[str] = None, +) -> bool: + """Enable Botanu SDK with OTEL auto-instrumentation. + + This is the ONE function customers need to call to get full observability. + + Args: + service_name: Service name. + otlp_endpoint: OTLP collector endpoint. + environment: Deployment environment. + auto_instrumentation: Enable OTEL auto-instrumentation (default: ``True``). + propagators: List of propagators (default: ``["tracecontext", "baggage"]``). + log_level: Logging level (default: ``"INFO"``). + config: Full :class:`BotanuConfig` (overrides individual params). + config_file: Path to YAML config file. + + Returns: + ``True`` if successfully initialized, ``False`` if already initialized. + """ + global _initialized, _current_config + + if _initialized: + logger.warning("Botanu SDK already initialized") + return False + + logging.basicConfig(level=getattr(logging, log_level.upper())) + + from botanu.sdk.config import BotanuConfig as ConfigClass + + if config is not None: + cfg = config + elif config_file is not None: + cfg = ConfigClass.from_yaml(config_file) + else: + cfg = ConfigClass.from_file_or_env() + + if service_name is not None: + cfg.service_name = service_name + if otlp_endpoint is not None: + cfg.otlp_endpoint = otlp_endpoint + if environment is not None: + cfg.deployment_environment = environment + + _current_config = cfg + + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + + logger.info( + "Initializing Botanu SDK: service=%s, env=%s, endpoint=%s", + cfg.service_name, + cfg.deployment_environment, + traces_endpoint, + ) + + try: + from opentelemetry import trace + from opentelemetry.baggage.propagation import W3CBaggagePropagator + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.propagate import set_global_textmap + from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + from botanu._version import __version__ + from botanu.processors import RunContextEnricher + from botanu.resources.detector import detect_all_resources + + # Build resource attributes + resource_attrs = { + "service.name": cfg.service_name, + "deployment.environment": cfg.deployment_environment, + "telemetry.sdk.name": "botanu", + "telemetry.sdk.version": __version__, + } + if cfg.service_version: + resource_attrs["service.version"] = cfg.service_version + if cfg.service_namespace: + resource_attrs["service.namespace"] = cfg.service_namespace + + # Auto-detect resources (K8s, cloud, host, container, FaaS) + if cfg.auto_detect_resources: + detected = detect_all_resources() + for key, value in detected.items(): + if key not in resource_attrs: + resource_attrs[key] = value + if detected: + logger.debug("Auto-detected resources: %s", list(detected.keys())) + + resource = Resource.create(resource_attrs) + provider = TracerProvider(resource=resource) + + # RunContextEnricher — the ONLY processor in SDK. + # Reads run_id from baggage, stamps on all spans. + lean_mode = cfg.propagation_mode == "lean" + provider.add_span_processor(RunContextEnricher(lean_mode=lean_mode)) + + # OTLP exporter + exporter = OTLPSpanExporter( + endpoint=traces_endpoint, + headers=cfg.otlp_headers or {}, + ) + provider.add_span_processor( + BatchSpanProcessor( + exporter, + max_export_batch_size=cfg.max_export_batch_size, + max_queue_size=cfg.max_queue_size, + schedule_delay_millis=cfg.schedule_delay_millis, + ) + ) + + trace.set_tracer_provider(provider) + + # Propagators (W3C TraceContext + Baggage) + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ] + ) + ) + + logger.info("Botanu SDK tracing initialized") + + if auto_instrumentation: + _enable_auto_instrumentation() + + _initialized = True + return True + + except Exception as exc: + logger.error("Failed to initialize Botanu SDK: %s", exc, exc_info=True) + return False + + +def _enable_auto_instrumentation() -> None: + """Enable OTEL auto-instrumentation for common libraries. + + Each instrumentation is optional — if the underlying library or + instrumentation package isn't installed, it is silently skipped. + """ + enabled: List[str] = [] + failed: List[tuple[str, str]] = [] + + # HTTP clients + _try_instrument(enabled, failed, "httpx", "opentelemetry.instrumentation.httpx", "HTTPXClientInstrumentation") + _try_instrument(enabled, failed, "requests", "opentelemetry.instrumentation.requests", "RequestsInstrumentor") + _try_instrument(enabled, failed, "urllib3", "opentelemetry.instrumentation.urllib3", "URLLib3Instrumentor") + _try_instrument( + enabled, failed, "aiohttp", "opentelemetry.instrumentation.aiohttp_client", "AioHttpClientInstrumentor" + ) + + # Web frameworks + _try_instrument(enabled, failed, "fastapi", "opentelemetry.instrumentation.fastapi", "FastAPIInstrumentor") + _try_instrument(enabled, failed, "flask", "opentelemetry.instrumentation.flask", "FlaskInstrumentor") + _try_instrument(enabled, failed, "django", "opentelemetry.instrumentation.django", "DjangoInstrumentor") + _try_instrument(enabled, failed, "starlette", "opentelemetry.instrumentation.starlette", "StarletteInstrumentor") + + # Databases + _try_instrument(enabled, failed, "sqlalchemy", "opentelemetry.instrumentation.sqlalchemy", "SQLAlchemyInstrumentor") + _try_instrument(enabled, failed, "psycopg2", "opentelemetry.instrumentation.psycopg2", "Psycopg2Instrumentor") + _try_instrument(enabled, failed, "asyncpg", "opentelemetry.instrumentation.asyncpg", "AsyncPGInstrumentor") + _try_instrument(enabled, failed, "pymongo", "opentelemetry.instrumentation.pymongo", "PymongoInstrumentor") + _try_instrument(enabled, failed, "redis", "opentelemetry.instrumentation.redis", "RedisInstrumentor") + + # Messaging + _try_instrument(enabled, failed, "celery", "opentelemetry.instrumentation.celery", "CeleryInstrumentor") + _try_instrument(enabled, failed, "kafka", "opentelemetry.instrumentation.kafka", "KafkaInstrumentor") + + # gRPC + _try_instrument_grpc(enabled, failed) + + # GenAI / AI + _try_instrument(enabled, failed, "openai", "opentelemetry.instrumentation.openai_v2", "OpenAIInstrumentor") + _try_instrument(enabled, failed, "anthropic", "opentelemetry.instrumentation.anthropic", "AnthropicInstrumentor") + _try_instrument(enabled, failed, "vertexai", "opentelemetry.instrumentation.vertexai", "VertexAIInstrumentor") + _try_instrument( + enabled, failed, "google_genai", "opentelemetry.instrumentation.google_genai", "GoogleGenAiInstrumentor" + ) + _try_instrument(enabled, failed, "langchain", "opentelemetry.instrumentation.langchain", "LangchainInstrumentor") + + # Runtime + _try_instrument(enabled, failed, "logging", "opentelemetry.instrumentation.logging", "LoggingInstrumentor") + + if enabled: + logger.info("Auto-instrumentation enabled: %s", ", ".join(enabled)) + if failed: + for name, error in failed: + logger.warning("Auto-instrumentation failed for %s: %s", name, error) + + +def _try_instrument( + enabled: List[str], + failed: List[tuple[str, str]], + name: str, + module_path: str, + class_name: str, +) -> None: + """Try to import and instrument a single library.""" + try: + import importlib + + mod = importlib.import_module(module_path) + instrumentor_cls = getattr(mod, class_name) + instrumentor_cls().instrument() + enabled.append(name) + except ImportError: + pass + except Exception as exc: + failed.append((name, str(exc))) + + +def _try_instrument_grpc( + enabled: List[str], + failed: List[tuple[str, str]], +) -> None: + """Try to instrument gRPC (client + server).""" + try: + from opentelemetry.instrumentation.grpc import ( + GrpcInstrumentorClient, + GrpcInstrumentorServer, + ) + + GrpcInstrumentorClient().instrument() + GrpcInstrumentorServer().instrument() + enabled.append("grpc") + except ImportError: + pass + except Exception as exc: + failed.append(("grpc", str(exc))) + + +def is_enabled() -> bool: + """Check if Botanu SDK is initialized.""" + return _initialized + + +def get_config() -> Optional[BotanuConfig]: + """Get the current Botanu configuration.""" + return _current_config + + +def disable() -> None: + """Disable Botanu SDK and shutdown OTEL. + + Call on application shutdown for clean exit. + """ + global _initialized + + if not _initialized: + return + + try: + from opentelemetry import trace + + provider = trace.get_tracer_provider() + if hasattr(provider, "shutdown"): + provider.shutdown() + + _initialized = False + logger.info("Botanu SDK shutdown complete") + + except Exception as exc: + logger.error("Error during Botanu SDK shutdown: %s", exc) diff --git a/src/botanu/sdk/config.py b/src/botanu/sdk/config.py new file mode 100644 index 0000000..69e0e7f --- /dev/null +++ b/src/botanu/sdk/config.py @@ -0,0 +1,306 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Configuration for Botanu SDK. + +The SDK is intentionally minimal on the hot path. Heavy processing happens in +the OpenTelemetry Collector, not in the application: + +- **SDK responsibility**: Generate run_id, propagate minimal context (run_id, use_case) +- **Collector responsibility**: PII redaction, vendor detection, attribute enrichment + +Configuration precedence (highest to lowest): +1. Code arguments (explicit values passed to BotanuConfig) +2. Environment variables (BOTANU_*, OTEL_*) +3. YAML config file (botanu.yaml or specified path) +4. Built-in defaults +""" + +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class BotanuConfig: + """Configuration for Botanu SDK and OpenTelemetry. + + The SDK is a thin wrapper on OpenTelemetry. PII redaction, cardinality + limits, and vendor enrichment are handled by the OTel Collector — not here. + + Example:: + + >>> config = BotanuConfig( + ... service_name="my-service", + ... otlp_endpoint="http://collector:4318/v1/traces", + ... ) + + >>> # Or load from YAML + >>> config = BotanuConfig.from_yaml("config/botanu.yaml") + """ + + # Service identification + service_name: Optional[str] = None + service_version: Optional[str] = None + service_namespace: Optional[str] = None + deployment_environment: Optional[str] = None + + # Resource detection + auto_detect_resources: bool = True + + # OTLP exporter configuration + otlp_endpoint: Optional[str] = None + otlp_headers: Optional[Dict[str, str]] = None + + # Span export configuration + max_export_batch_size: int = 512 + max_queue_size: int = 2048 + schedule_delay_millis: int = 5000 + + # Sampling (1.0 = 100% — never sample for cost attribution) + trace_sample_rate: float = 1.0 + + # Propagation mode: "lean" (run_id + use_case only) or "full" (all context) + propagation_mode: str = "lean" + + # Auto-instrumentation packages to enable + auto_instrument_packages: List[str] = field( + default_factory=lambda: [ + # HTTP clients + "requests", + "httpx", + "urllib3", + "aiohttp_client", + # Web frameworks + "fastapi", + "flask", + "django", + "starlette", + # Databases + "sqlalchemy", + "psycopg2", + "asyncpg", + "pymongo", + "redis", + # Messaging + "celery", + "kafka_python", + # gRPC + "grpc", + # GenAI / AI + "openai_v2", + "anthropic", + "vertexai", + "google_genai", + "langchain", + # Runtime + "logging", + ] + ) + + # Config file path (for tracking where config was loaded from) + _config_file: Optional[str] = field(default=None, repr=False) + + def __post_init__(self) -> None: + """Apply environment variable defaults.""" + if self.service_name is None: + self.service_name = os.getenv("OTEL_SERVICE_NAME", "unknown_service") + + if self.service_version is None: + self.service_version = os.getenv("OTEL_SERVICE_VERSION") + + if self.service_namespace is None: + self.service_namespace = os.getenv("OTEL_SERVICE_NAMESPACE") + + env_auto_detect = os.getenv("BOTANU_AUTO_DETECT_RESOURCES") + if env_auto_detect is not None: + self.auto_detect_resources = env_auto_detect.lower() in ("true", "1", "yes") + + if self.deployment_environment is None: + self.deployment_environment = os.getenv( + "OTEL_DEPLOYMENT_ENVIRONMENT", + os.getenv("BOTANU_ENVIRONMENT", "production"), + ) + + if self.otlp_endpoint is None: + env_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + if env_endpoint: + self.otlp_endpoint = env_endpoint + else: + base = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + self.otlp_endpoint = f"{base}/v1/traces" + + env_propagation_mode = os.getenv("BOTANU_PROPAGATION_MODE") + if env_propagation_mode and env_propagation_mode in ("lean", "full"): + self.propagation_mode = env_propagation_mode + + env_sample_rate = os.getenv("BOTANU_TRACE_SAMPLE_RATE") + if env_sample_rate: + self.trace_sample_rate = float(env_sample_rate) + + # ------------------------------------------------------------------ + # YAML loading + # ------------------------------------------------------------------ + + @classmethod + def from_yaml(cls, path: Optional[str] = None) -> BotanuConfig: + """Load configuration from a YAML file. + + Supports environment variable interpolation using ``${VAR_NAME}`` syntax. + + Args: + path: Path to YAML config file. + + Raises: + FileNotFoundError: If config file doesn't exist. + ValueError: If YAML is malformed. + """ + if path is None: + raise FileNotFoundError("No config file path provided") + + resolved = Path(path) + if not resolved.exists(): + raise FileNotFoundError(f"Config file not found: {resolved}") + + try: + import yaml # type: ignore[import-untyped] + except ImportError as err: + raise ImportError("PyYAML required for YAML config. Install with: pip install pyyaml") from err + + with open(resolved) as fh: + raw_content = fh.read() + + content = _interpolate_env_vars(raw_content) + + try: + data = yaml.safe_load(content) + except yaml.YAMLError as exc: + raise ValueError(f"Invalid YAML in {resolved}: {exc}") from exc + + if data is None: + data = {} + + return cls._from_dict(data, config_file=str(resolved)) + + @classmethod + def from_file_or_env(cls, path: Optional[str] = None) -> BotanuConfig: + """Load config from file if exists, otherwise use environment variables. + + Search order: + 1. Explicit *path* argument + 2. ``BOTANU_CONFIG_FILE`` env var + 3. ``./botanu.yaml`` + 4. ``./config/botanu.yaml`` + 5. Falls back to env-only config + """ + search_paths: List[Path] = [] + + if path: + search_paths.append(Path(path)) + + env_path = os.getenv("BOTANU_CONFIG_FILE") + if env_path: + search_paths.append(Path(env_path)) + + search_paths.extend( + [ + Path("botanu.yaml"), + Path("botanu.yml"), + Path("config/botanu.yaml"), + Path("config/botanu.yml"), + ] + ) + + for candidate in search_paths: + if candidate.exists(): + logger.info("Loading config from: %s", candidate) + return cls.from_yaml(str(candidate)) + + logger.debug("No config file found, using environment variables only") + return cls() + + @classmethod + def _from_dict( + cls, + data: Dict[str, Any], + config_file: Optional[str] = None, + ) -> BotanuConfig: + """Create config from dictionary (parsed YAML).""" + service = data.get("service", {}) + otlp = data.get("otlp", {}) + export = data.get("export", {}) + sampling = data.get("sampling", {}) + propagation = data.get("propagation", {}) + resource = data.get("resource", {}) + auto_packages = data.get("auto_instrument_packages") + + return cls( + service_name=service.get("name"), + service_version=service.get("version"), + service_namespace=service.get("namespace"), + deployment_environment=service.get("environment"), + auto_detect_resources=resource.get("auto_detect", True), + otlp_endpoint=otlp.get("endpoint"), + otlp_headers=otlp.get("headers"), + max_export_batch_size=export.get("batch_size", 512), + max_queue_size=export.get("queue_size", 2048), + schedule_delay_millis=export.get("delay_ms", 5000), + trace_sample_rate=sampling.get("rate", 1.0), + propagation_mode=propagation.get("mode", "lean"), + auto_instrument_packages=(auto_packages if auto_packages else BotanuConfig().auto_instrument_packages), + _config_file=config_file, + ) + + def to_dict(self) -> Dict[str, Any]: + """Export configuration as dictionary.""" + return { + "service": { + "name": self.service_name, + "version": self.service_version, + "namespace": self.service_namespace, + "environment": self.deployment_environment, + }, + "resource": { + "auto_detect": self.auto_detect_resources, + }, + "otlp": { + "endpoint": self.otlp_endpoint, + "headers": self.otlp_headers, + }, + "export": { + "batch_size": self.max_export_batch_size, + "queue_size": self.max_queue_size, + "delay_ms": self.schedule_delay_millis, + }, + "sampling": { + "rate": self.trace_sample_rate, + }, + "propagation": { + "mode": self.propagation_mode, + }, + "auto_instrument_packages": self.auto_instrument_packages, + } + + +def _interpolate_env_vars(content: str) -> str: + """Interpolate ``${VAR_NAME}`` and ``${VAR_NAME:-default}`` in *content*.""" + pattern = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-([^}]*))?\}") + + def _replace(match: re.Match) -> str: # type: ignore[type-arg] + var_name = match.group(1) + default = match.group(2) + value = os.getenv(var_name) + if value is not None: + return value + if default is not None: + return default + return match.group(0) + + return pattern.sub(_replace, content) diff --git a/src/botanu/sdk/context.py b/src/botanu/sdk/context.py new file mode 100644 index 0000000..1beaeaf --- /dev/null +++ b/src/botanu/sdk/context.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Context and baggage helpers for Botanu SDK. + +Uses OpenTelemetry Context and Baggage for propagation. +""" + +from __future__ import annotations + +from typing import Optional, cast + +from opentelemetry import baggage, trace +from opentelemetry.context import attach, get_current + + +def set_baggage(key: str, value: str) -> object: + """Set a baggage value and attach the new context. + + Baggage is automatically propagated across service boundaries via + W3C Baggage header. + + Args: + key: Baggage key (e.g., ``"botanu.run_id"``). + value: Baggage value. + + Returns: + Token for detaching the context later. + """ + ctx = baggage.set_baggage(key, value, context=get_current()) + return attach(ctx) + + +def get_baggage(key: str) -> Optional[str]: + """Get a baggage value from the current context. + + Args: + key: Baggage key (e.g., ``"botanu.run_id"``). + + Returns: + Baggage value or ``None`` if not set. + """ + value = baggage.get_baggage(key, context=get_current()) + return cast(Optional[str], value) + + +def get_current_span() -> trace.Span: + """Get the current active span. + + Returns: + Current span (may be non-recording if no span is active). + """ + return trace.get_current_span() + + +def get_run_id() -> Optional[str]: + """Get the current ``run_id`` from baggage.""" + return get_baggage("botanu.run_id") + + +def get_use_case() -> Optional[str]: + """Get the current ``use_case`` from baggage.""" + return get_baggage("botanu.use_case") + + +def get_workflow() -> Optional[str]: + """Get the current ``workflow`` from baggage.""" + return get_baggage("botanu.workflow") diff --git a/src/botanu/sdk/decorators.py b/src/botanu/sdk/decorators.py new file mode 100644 index 0000000..6e71cc4 --- /dev/null +++ b/src/botanu/sdk/decorators.py @@ -0,0 +1,289 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Decorators for automatic run span creation and context propagation. + +The ``@botanu_use_case`` decorator is the primary integration point. +It creates a "run span" that: +- Generates a UUIDv7 run_id +- Emits ``run.started`` and ``run.completed`` events +- Propagates run context via W3C Baggage +- Records outcome at completion +""" + +from __future__ import annotations + +import functools +import hashlib +import inspect +from datetime import datetime, timezone +from typing import Any, Callable, Dict, Optional, TypeVar, Union + +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from botanu.models.run_context import RunContext, RunStatus +from botanu.sdk.context import get_baggage, set_baggage +from botanu.tracking.metrics import record_run_completed + +T = TypeVar("T") + +tracer = trace.get_tracer("botanu_sdk") + + +def _compute_workflow_version(func: Callable[..., Any]) -> str: + try: + source = inspect.getsource(func) + code_hash = hashlib.sha256(source.encode()).hexdigest() + return f"v:{code_hash[:12]}" + except (OSError, TypeError): + return "v:unknown" + + +def _get_parent_run_id() -> Optional[str]: + return get_baggage("botanu.run_id") + + +def botanu_use_case( + name: str, + workflow: Optional[str] = None, + *, + environment: Optional[str] = None, + tenant_id: Optional[str] = None, + auto_outcome_on_success: bool = True, + span_kind: SpanKind = SpanKind.SERVER, +) -> Callable[[Callable[..., T]], Callable[..., T]]: + """Decorator to create a run span with automatic context propagation. + + This is the primary integration point. It: + + 1. Creates a UUIDv7 ``run_id`` (sortable, globally unique) + 2. Creates a ``botanu.run`` span as the root of the run + 3. Emits ``run.started`` event + 4. Propagates run context via W3C Baggage + 5. On completion: emits ``run.completed`` event with outcome + + Args: + name: Use case name (low cardinality, e.g. ``"Customer Support"``). + workflow: Workflow name (defaults to function qualified name). + environment: Deployment environment. + tenant_id: Tenant identifier for multi-tenant apps. + auto_outcome_on_success: Emit ``"success"`` if no exception. + span_kind: OpenTelemetry span kind (default: ``SERVER``). + + Example:: + + @botanu_use_case("Customer Support") + async def handle_ticket(ticket_id: str): + result = await process_ticket(ticket_id) + emit_outcome("success", value_type="tickets_resolved", value_amount=1) + return result + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + workflow_name = workflow or func.__qualname__ + workflow_version = _compute_workflow_version(func) + is_async = inspect.iscoroutinefunction(func) + + @functools.wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> T: + parent_run_id = _get_parent_run_id() + run_ctx = RunContext.create( + use_case=name, + workflow=workflow_name, + workflow_version=workflow_version, + environment=environment, + tenant_id=tenant_id, + parent_run_id=parent_run_id, + ) + + with tracer.start_as_current_span( + name=f"botanu.run/{name}", + kind=span_kind, + ) as span: + for key, value in run_ctx.to_span_attributes().items(): + span.set_attribute(key, value) + + span.add_event( + "botanu.run.started", + attributes={ + "run_id": run_ctx.run_id, + "use_case": run_ctx.use_case, + "workflow": workflow_name, + }, + ) + + for key, value in run_ctx.to_baggage_dict().items(): + set_baggage(key, value) + + try: + result = await func(*args, **kwargs) + + span_attrs = getattr(span, "attributes", None) + existing_outcome = span_attrs.get("botanu.outcome.status") if span_attrs else None + + if existing_outcome is None and auto_outcome_on_success: + run_ctx.complete(RunStatus.SUCCESS) + + span.set_status(Status(StatusCode.OK)) + _emit_run_completed(span, run_ctx, RunStatus.SUCCESS) + return result + + except Exception as exc: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) + run_ctx.complete(RunStatus.FAILURE, error_class=exc.__class__.__name__) + _emit_run_completed( + span, + run_ctx, + RunStatus.FAILURE, + error_class=exc.__class__.__name__, + ) + raise + + @functools.wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any) -> T: + parent_run_id = _get_parent_run_id() + run_ctx = RunContext.create( + use_case=name, + workflow=workflow_name, + workflow_version=workflow_version, + environment=environment, + tenant_id=tenant_id, + parent_run_id=parent_run_id, + ) + + with tracer.start_as_current_span( + name=f"botanu.run/{name}", + kind=span_kind, + ) as span: + for key, value in run_ctx.to_span_attributes().items(): + span.set_attribute(key, value) + + span.add_event( + "botanu.run.started", + attributes={ + "run_id": run_ctx.run_id, + "use_case": run_ctx.use_case, + "workflow": workflow_name, + }, + ) + + for key, value in run_ctx.to_baggage_dict().items(): + set_baggage(key, value) + + try: + result = func(*args, **kwargs) + + span_attrs = getattr(span, "attributes", None) + existing_outcome = span_attrs.get("botanu.outcome.status") if span_attrs else None + + if existing_outcome is None and auto_outcome_on_success: + run_ctx.complete(RunStatus.SUCCESS) + + span.set_status(Status(StatusCode.OK)) + _emit_run_completed(span, run_ctx, RunStatus.SUCCESS) + return result + + except Exception as exc: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) + run_ctx.complete(RunStatus.FAILURE, error_class=exc.__class__.__name__) + _emit_run_completed( + span, + run_ctx, + RunStatus.FAILURE, + error_class=exc.__class__.__name__, + ) + raise + + if is_async: + return async_wrapper # type: ignore[return-value] + return sync_wrapper # type: ignore[return-value] + + return decorator + + +def _emit_run_completed( + span: trace.Span, + run_ctx: RunContext, + status: RunStatus, + error_class: Optional[str] = None, +) -> None: + duration_ms = (datetime.now(timezone.utc) - run_ctx.start_time).total_seconds() * 1000 + + event_attrs: Dict[str, Union[str, float]] = { + "run_id": run_ctx.run_id, + "use_case": run_ctx.use_case, + "status": status.value, + "duration_ms": duration_ms, + } + if error_class: + event_attrs["error_class"] = error_class + if run_ctx.outcome and run_ctx.outcome.value_type: + event_attrs["value_type"] = run_ctx.outcome.value_type + if run_ctx.outcome and run_ctx.outcome.value_amount is not None: + event_attrs["value_amount"] = run_ctx.outcome.value_amount + + span.add_event("botanu.run.completed", attributes=event_attrs) + + span.set_attribute("botanu.outcome.status", status.value) + span.set_attribute("botanu.run.duration_ms", duration_ms) + + record_run_completed( + use_case=run_ctx.use_case, + status=status.value, + environment=run_ctx.environment, + duration_ms=duration_ms, + workflow=run_ctx.workflow, + ) + + +# Alias +use_case = botanu_use_case + + +def botanu_outcome( + success: Optional[str] = None, + partial: Optional[str] = None, + failed: Optional[str] = None, +) -> Callable[[Callable[..., T]], Callable[..., T]]: + """Decorator to automatically emit outcomes based on function result. + + This is a convenience decorator for sub-functions within a use case. + It does NOT create a new run — use ``@botanu_use_case`` for that. + """ + from botanu.sdk.span_helpers import emit_outcome + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + is_async = inspect.iscoroutinefunction(func) + + @functools.wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> T: + try: + result = await func(*args, **kwargs) + span = trace.get_current_span() + if not span.attributes or "botanu.outcome.status" not in span.attributes: + emit_outcome("success") + return result + except Exception as exc: + emit_outcome("failed", reason=exc.__class__.__name__) + raise + + @functools.wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any) -> T: + try: + result = func(*args, **kwargs) + span = trace.get_current_span() + if not span.attributes or "botanu.outcome.status" not in span.attributes: + emit_outcome("success") + return result + except Exception as exc: + emit_outcome("failed", reason=exc.__class__.__name__) + raise + + if is_async: + return async_wrapper # type: ignore[return-value] + return sync_wrapper # type: ignore[return-value] + + return decorator diff --git a/src/botanu/sdk/middleware.py b/src/botanu/sdk/middleware.py new file mode 100644 index 0000000..78bc987 --- /dev/null +++ b/src/botanu/sdk/middleware.py @@ -0,0 +1,99 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""FastAPI / Starlette middleware for span enrichment. + +This middleware works alongside OpenTelemetry's FastAPIInstrumentor to enrich +spans with Botanu-specific context. +""" + +from __future__ import annotations + +import uuid +from typing import Optional + +from opentelemetry import baggage, trace +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +from botanu.sdk.context import set_baggage + + +class BotanuMiddleware(BaseHTTPMiddleware): + """FastAPI middleware to enrich spans with Botanu context. + + This middleware should be used **after** OpenTelemetry's + ``FastAPIInstrumentor``. It extracts Botanu context from incoming + requests and enriches the current span with Botanu attributes. + + Example:: + + from fastapi import FastAPI + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + from botanu.sdk.middleware import BotanuMiddleware + + app = FastAPI() + FastAPIInstrumentor.instrument_app(app) + app.add_middleware( + BotanuMiddleware, + use_case="customer_support", + workflow="ticket_api", + ) + """ + + def __init__( + self, + app: object, + *, + use_case: str, + workflow: Optional[str] = None, + auto_generate_run_id: bool = True, + ) -> None: + super().__init__(app) # type: ignore[arg-type] + self.use_case = use_case + self.workflow = workflow or use_case + self.auto_generate_run_id = auto_generate_run_id + + async def dispatch(self, request: Request, call_next: object) -> Response: # type: ignore[override] + """Process request and enrich span with Botanu context.""" + span = trace.get_current_span() + + # Extract run_id from baggage or headers + run_id = baggage.get_baggage("botanu.run_id") + if not run_id: + run_id = request.headers.get("x-botanu-run-id") + + if not run_id and self.auto_generate_run_id: + run_id = str(uuid.uuid4()) + + use_case = baggage.get_baggage("botanu.use_case") or request.headers.get("x-botanu-use-case") or self.use_case + workflow = baggage.get_baggage("botanu.workflow") or request.headers.get("x-botanu-workflow") or self.workflow + customer_id = baggage.get_baggage("botanu.customer_id") or request.headers.get("x-botanu-customer-id") + + # Enrich span with Botanu attributes + if run_id: + span.set_attribute("botanu.run_id", run_id) + set_baggage("botanu.run_id", run_id) + + span.set_attribute("botanu.use_case", use_case) + set_baggage("botanu.use_case", use_case) + + span.set_attribute("botanu.workflow", workflow) + set_baggage("botanu.workflow", workflow) + + if customer_id: + span.set_attribute("botanu.customer_id", customer_id) + set_baggage("botanu.customer_id", customer_id) + + span.set_attribute("http.route", request.url.path) + span.set_attribute("http.method", request.method) + + response = await call_next(request) # type: ignore[misc] + + if run_id: + response.headers["x-botanu-run-id"] = run_id + response.headers["x-botanu-use-case"] = use_case + response.headers["x-botanu-workflow"] = workflow + + return response diff --git a/src/botanu/sdk/span_helpers.py b/src/botanu/sdk/span_helpers.py new file mode 100644 index 0000000..98eaffd --- /dev/null +++ b/src/botanu/sdk/span_helpers.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Helper functions for working with OpenTelemetry spans. + +These functions add Botanu-specific attributes to the current span. +""" + +from __future__ import annotations + +from typing import Optional + +from opentelemetry import trace + + +def emit_outcome( + status: str, + *, + value_type: Optional[str] = None, + value_amount: Optional[float] = None, + confidence: Optional[float] = None, + reason: Optional[str] = None, +) -> None: + """Emit an outcome for the current span. + + Sets span attributes for outcome tracking and ROI calculation. + + Args: + status: Outcome status (``"success"``, ``"partial"``, ``"failed"``). + value_type: Type of business value (e.g., ``"tickets_resolved"``). + value_amount: Quantified value amount. + confidence: Confidence score (0.0–1.0). + reason: Optional reason for the outcome. + + Example:: + + >>> emit_outcome("success", value_type="tickets_resolved", value_amount=1) + >>> emit_outcome("failed", reason="missing_context") + """ + span = trace.get_current_span() + + span.set_attribute("botanu.outcome", status) + + if value_type: + span.set_attribute("botanu.outcome.value_type", value_type) + + if value_amount is not None: + span.set_attribute("botanu.outcome.value_amount", value_amount) + + if confidence is not None: + span.set_attribute("botanu.outcome.confidence", confidence) + + if reason: + span.set_attribute("botanu.outcome.reason", reason) + + # Add span event for timeline visibility + event_attrs: dict[str, object] = {"status": status} + if value_type: + event_attrs["value_type"] = value_type + if value_amount is not None: + event_attrs["value_amount"] = value_amount + + span.add_event("botanu.outcome_emitted", event_attrs) + + +def set_business_context( + *, + customer_id: Optional[str] = None, + team: Optional[str] = None, + cost_center: Optional[str] = None, + region: Optional[str] = None, +) -> None: + """Set business context attributes on the current span. + + Args: + customer_id: Customer identifier for multi-tenant attribution. + team: Team or department. + cost_center: Cost centre for financial tracking. + region: Geographic region. + """ + span = trace.get_current_span() + + if customer_id: + span.set_attribute("botanu.customer_id", customer_id) + + if team: + span.set_attribute("botanu.team", team) + + if cost_center: + span.set_attribute("botanu.cost_center", cost_center) + + if region: + span.set_attribute("botanu.region", region) diff --git a/src/botanu/tracking/__init__.py b/src/botanu/tracking/__init__.py new file mode 100644 index 0000000..5761bc0 --- /dev/null +++ b/src/botanu/tracking/__init__.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Botanu tracking components. + +Provides tracking for different operation types: +- LLM/GenAI model calls +- Database, storage, and messaging operations +- Attempt ledger for durable cost tracking +- Run completion metrics +""" + +from __future__ import annotations + +from botanu.tracking.data import ( + DBOperation, + MessagingOperation, + StorageOperation, + set_data_metrics, + set_warehouse_metrics, + track_db_operation, + track_messaging_operation, + track_storage_operation, +) +from botanu.tracking.ledger import ( + AttemptLedger, + AttemptStatus, + LedgerEventType, + get_ledger, + record_attempt_ended, + record_attempt_started, + record_llm_attempted, + record_tool_attempted, + set_ledger, +) +from botanu.tracking.llm import ( + BotanuAttributes, + GenAIAttributes, + LLMTracker, + ModelOperation, + ToolTracker, + set_llm_attributes, + set_token_usage, + track_llm_call, + track_tool_call, +) +from botanu.tracking.metrics import record_run_completed + +__all__ = [ + # LLM tracking + "track_llm_call", + "track_tool_call", + "set_llm_attributes", + "set_token_usage", + "ModelOperation", + "GenAIAttributes", + "BotanuAttributes", + "LLMTracker", + "ToolTracker", + # Data tracking + "track_db_operation", + "track_storage_operation", + "track_messaging_operation", + "set_data_metrics", + "set_warehouse_metrics", + "DBOperation", + "StorageOperation", + "MessagingOperation", + # Attempt ledger + "AttemptLedger", + "get_ledger", + "set_ledger", + "record_attempt_started", + "record_attempt_ended", + "record_llm_attempted", + "record_tool_attempted", + "LedgerEventType", + "AttemptStatus", + # Metrics + "record_run_completed", +] diff --git a/src/botanu/tracking/data.py b/src/botanu/tracking/data.py new file mode 100644 index 0000000..5a58f57 --- /dev/null +++ b/src/botanu/tracking/data.py @@ -0,0 +1,488 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Data Tracking — Track database, storage, and messaging operations. + +Usage:: + + from botanu.tracking.data import track_db_operation, track_storage_operation + + with track_db_operation(system="postgresql", operation="SELECT") as db: + result = cursor.execute("SELECT * FROM users WHERE active = true") + db.set_result(rows_returned=len(result)) +""" + +from __future__ import annotations + +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, Generator, Optional + +from opentelemetry import trace +from opentelemetry.trace import Span, SpanKind, Status, StatusCode + +# ========================================================================= +# System Normalization Maps +# ========================================================================= + +DB_SYSTEMS: Dict[str, str] = { + "postgresql": "postgresql", + "postgres": "postgresql", + "pg": "postgresql", + "mysql": "mysql", + "mariadb": "mariadb", + "mssql": "mssql", + "sqlserver": "mssql", + "oracle": "oracle", + "sqlite": "sqlite", + "mongodb": "mongodb", + "mongo": "mongodb", + "dynamodb": "dynamodb", + "cassandra": "cassandra", + "couchdb": "couchdb", + "firestore": "firestore", + "cosmosdb": "cosmosdb", + "redis": "redis", + "memcached": "memcached", + "elasticache": "elasticache", + "elasticsearch": "elasticsearch", + "opensearch": "opensearch", + "snowflake": "snowflake", + "bigquery": "bigquery", + "redshift": "redshift", + "databricks": "databricks", + "athena": "athena", + "synapse": "synapse", + "influxdb": "influxdb", + "timescaledb": "timescaledb", + "neo4j": "neo4j", + "neptune": "neptune", +} + +STORAGE_SYSTEMS: Dict[str, str] = { + "s3": "s3", + "aws_s3": "s3", + "gcs": "gcs", + "google_cloud_storage": "gcs", + "blob": "azure_blob", + "azure_blob": "azure_blob", + "minio": "minio", + "ceph": "ceph", + "nfs": "nfs", + "efs": "efs", +} + +MESSAGING_SYSTEMS: Dict[str, str] = { + "sqs": "sqs", + "aws_sqs": "sqs", + "sns": "sns", + "kinesis": "kinesis", + "eventbridge": "eventbridge", + "pubsub": "pubsub", + "google_pubsub": "pubsub", + "servicebus": "servicebus", + "azure_servicebus": "servicebus", + "eventhub": "eventhub", + "kafka": "kafka", + "rabbitmq": "rabbitmq", + "nats": "nats", + "redis_pubsub": "redis_pubsub", + "celery": "celery", +} + + +class DBOperation: + SELECT = "SELECT" + INSERT = "INSERT" + UPDATE = "UPDATE" + DELETE = "DELETE" + UPSERT = "UPSERT" + MERGE = "MERGE" + CREATE = "CREATE" + DROP = "DROP" + ALTER = "ALTER" + INDEX = "INDEX" + TRANSACTION = "TRANSACTION" + BATCH = "BATCH" + + +class StorageOperation: + GET = "GET" + PUT = "PUT" + DELETE = "DELETE" + LIST = "LIST" + HEAD = "HEAD" + COPY = "COPY" + MULTIPART_UPLOAD = "MULTIPART_UPLOAD" + + +class MessagingOperation: + PUBLISH = "publish" + CONSUME = "consume" + RECEIVE = "receive" + SEND = "send" + SUBSCRIBE = "subscribe" + + +# ========================================================================= +# Database Tracker +# ========================================================================= + + +@dataclass +class DBTracker: + """Tracks database operations.""" + + system: str + operation: str + span: Optional[Span] = field(default=None, repr=False) + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + rows_returned: int = 0 + rows_affected: int = 0 + bytes_read: int = 0 + bytes_written: int = 0 + + def set_result( + self, + rows_returned: int = 0, + rows_affected: int = 0, + bytes_read: int = 0, + bytes_written: int = 0, + ) -> DBTracker: + self.rows_returned = rows_returned + self.rows_affected = rows_affected + self.bytes_read = bytes_read + self.bytes_written = bytes_written + if self.span: + if rows_returned > 0: + self.span.set_attribute("botanu.data.rows_returned", rows_returned) + if rows_affected > 0: + self.span.set_attribute("botanu.data.rows_affected", rows_affected) + if bytes_read > 0: + self.span.set_attribute("botanu.data.bytes_read", bytes_read) + if bytes_written > 0: + self.span.set_attribute("botanu.data.bytes_written", bytes_written) + return self + + def set_table(self, table_name: str, schema: Optional[str] = None) -> DBTracker: + if self.span: + self.span.set_attribute("db.collection.name", table_name) + if schema: + self.span.set_attribute("db.schema", schema) + return self + + def set_query_id(self, query_id: str) -> DBTracker: + if self.span: + self.span.set_attribute("botanu.warehouse.query_id", query_id) + return self + + def set_bytes_scanned(self, bytes_scanned: int) -> DBTracker: + self.bytes_read = bytes_scanned + if self.span: + self.span.set_attribute("botanu.warehouse.bytes_scanned", bytes_scanned) + return self + + def set_error(self, error: Exception) -> DBTracker: + if self.span: + self.span.set_status(Status(StatusCode.ERROR, str(error))) + self.span.set_attribute("botanu.data.error", type(error).__name__) + self.span.record_exception(error) + return self + + def add_metadata(self, **kwargs: Any) -> DBTracker: + if self.span: + for key, value in kwargs.items(): + attr_key = key if key.startswith("botanu.") else f"botanu.data.{key}" + self.span.set_attribute(attr_key, value) + return self + + def _finalize(self) -> None: + if not self.span: + return + duration_ms = (datetime.now(timezone.utc) - self.start_time).total_seconds() * 1000 + self.span.set_attribute("botanu.data.duration_ms", duration_ms) + + +@contextmanager +def track_db_operation( + system: str, + operation: str, + database: Optional[str] = None, + **kwargs: Any, +) -> Generator[DBTracker, None, None]: + """Track a database operation. + + Args: + system: Database system (postgresql, mysql, mongodb, …). + operation: Type of operation (SELECT, INSERT, …). + database: Database name (optional). + """ + tracer = trace.get_tracer("botanu.data") + normalized_system = DB_SYSTEMS.get(system.lower(), system.lower()) + + with tracer.start_as_current_span( + name=f"db.{normalized_system}.{operation.lower()}", + kind=SpanKind.CLIENT, + ) as span: + span.set_attribute("db.system", normalized_system) + span.set_attribute("db.operation", operation.upper()) + span.set_attribute("botanu.vendor", normalized_system) + if database: + span.set_attribute("db.name", database) + for key, value in kwargs.items(): + span.set_attribute(f"botanu.data.{key}", value) + + tracker = DBTracker(system=normalized_system, operation=operation, span=span) + try: + yield tracker + except Exception as exc: + tracker.set_error(exc) + raise + finally: + tracker._finalize() + + +# ========================================================================= +# Storage Tracker +# ========================================================================= + + +@dataclass +class StorageTracker: + """Tracks storage operations.""" + + system: str + operation: str + span: Optional[Span] = field(default=None, repr=False) + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + objects_count: int = 0 + bytes_read: int = 0 + bytes_written: int = 0 + + def set_result( + self, + objects_count: int = 0, + bytes_read: int = 0, + bytes_written: int = 0, + ) -> StorageTracker: + self.objects_count = objects_count + self.bytes_read = bytes_read + self.bytes_written = bytes_written + if self.span: + if objects_count > 0: + self.span.set_attribute("botanu.data.objects_count", objects_count) + if bytes_read > 0: + self.span.set_attribute("botanu.data.bytes_read", bytes_read) + if bytes_written > 0: + self.span.set_attribute("botanu.data.bytes_written", bytes_written) + return self + + def set_bucket(self, bucket: str) -> StorageTracker: + if self.span: + self.span.set_attribute("botanu.storage.bucket", bucket) + return self + + def set_error(self, error: Exception) -> StorageTracker: + if self.span: + self.span.set_status(Status(StatusCode.ERROR, str(error))) + self.span.set_attribute("botanu.storage.error", type(error).__name__) + self.span.record_exception(error) + return self + + def add_metadata(self, **kwargs: Any) -> StorageTracker: + if self.span: + for key, value in kwargs.items(): + attr_key = key if key.startswith("botanu.") else f"botanu.storage.{key}" + self.span.set_attribute(attr_key, value) + return self + + def _finalize(self) -> None: + if not self.span: + return + duration_ms = (datetime.now(timezone.utc) - self.start_time).total_seconds() * 1000 + self.span.set_attribute("botanu.storage.duration_ms", duration_ms) + + +@contextmanager +def track_storage_operation( + system: str, + operation: str, + **kwargs: Any, +) -> Generator[StorageTracker, None, None]: + """Track a storage operation. + + Args: + system: Storage system (s3, gcs, azure_blob, …). + operation: Type of operation (GET, PUT, DELETE, …). + """ + tracer = trace.get_tracer("botanu.storage") + normalized_system = STORAGE_SYSTEMS.get(system.lower(), system.lower()) + + with tracer.start_as_current_span( + name=f"storage.{normalized_system}.{operation.lower()}", + kind=SpanKind.CLIENT, + ) as span: + span.set_attribute("botanu.storage.system", normalized_system) + span.set_attribute("botanu.storage.operation", operation.upper()) + span.set_attribute("botanu.vendor", normalized_system) + for key, value in kwargs.items(): + span.set_attribute(f"botanu.storage.{key}", value) + + tracker = StorageTracker(system=normalized_system, operation=operation, span=span) + try: + yield tracker + except Exception as exc: + tracker.set_error(exc) + raise + finally: + tracker._finalize() + + +# ========================================================================= +# Messaging Tracker +# ========================================================================= + + +@dataclass +class MessagingTracker: + """Tracks messaging operations.""" + + system: str + operation: str + destination: str + span: Optional[Span] = field(default=None, repr=False) + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + message_count: int = 0 + bytes_transferred: int = 0 + + def set_result( + self, + message_count: int = 0, + bytes_transferred: int = 0, + ) -> MessagingTracker: + self.message_count = message_count + self.bytes_transferred = bytes_transferred + if self.span: + if message_count > 0: + self.span.set_attribute("botanu.messaging.message_count", message_count) + if bytes_transferred > 0: + self.span.set_attribute("botanu.messaging.bytes_transferred", bytes_transferred) + return self + + def set_error(self, error: Exception) -> MessagingTracker: + if self.span: + self.span.set_status(Status(StatusCode.ERROR, str(error))) + self.span.set_attribute("botanu.messaging.error", type(error).__name__) + self.span.record_exception(error) + return self + + def add_metadata(self, **kwargs: Any) -> MessagingTracker: + if self.span: + for key, value in kwargs.items(): + attr_key = key if key.startswith("botanu.") else f"botanu.messaging.{key}" + self.span.set_attribute(attr_key, value) + return self + + def _finalize(self) -> None: + if not self.span: + return + duration_ms = (datetime.now(timezone.utc) - self.start_time).total_seconds() * 1000 + self.span.set_attribute("botanu.messaging.duration_ms", duration_ms) + + +@contextmanager +def track_messaging_operation( + system: str, + operation: str, + destination: str, + **kwargs: Any, +) -> Generator[MessagingTracker, None, None]: + """Track a messaging operation. + + Args: + system: Messaging system (sqs, kafka, pubsub, …). + operation: Type of operation (publish, consume, …). + destination: Queue/topic name. + """ + tracer = trace.get_tracer("botanu.messaging") + normalized_system = MESSAGING_SYSTEMS.get(system.lower(), system.lower()) + span_kind = SpanKind.PRODUCER if operation in ("publish", "send") else SpanKind.CONSUMER + + with tracer.start_as_current_span( + name=f"messaging.{normalized_system}.{operation.lower()}", + kind=span_kind, + ) as span: + span.set_attribute("messaging.system", normalized_system) + span.set_attribute("messaging.operation", operation.lower()) + span.set_attribute("messaging.destination.name", destination) + span.set_attribute("botanu.vendor", normalized_system) + for key, value in kwargs.items(): + span.set_attribute(f"botanu.messaging.{key}", value) + + tracker = MessagingTracker( + system=normalized_system, + operation=operation, + destination=destination, + span=span, + ) + try: + yield tracker + except Exception as exc: + tracker.set_error(exc) + raise + finally: + tracker._finalize() + + +# ========================================================================= +# Standalone Helpers +# ========================================================================= + + +def set_data_metrics( + rows_returned: int = 0, + rows_affected: int = 0, + bytes_read: int = 0, + bytes_written: int = 0, + objects_count: int = 0, + span: Optional[Span] = None, +) -> None: + """Set data operation metrics on the current span.""" + target_span = span or trace.get_current_span() + if not target_span or not target_span.is_recording(): + return + + if rows_returned > 0: + target_span.set_attribute("botanu.data.rows_returned", rows_returned) + if rows_affected > 0: + target_span.set_attribute("botanu.data.rows_affected", rows_affected) + if bytes_read > 0: + target_span.set_attribute("botanu.data.bytes_read", bytes_read) + if bytes_written > 0: + target_span.set_attribute("botanu.data.bytes_written", bytes_written) + if objects_count > 0: + target_span.set_attribute("botanu.data.objects_count", objects_count) + + +def set_warehouse_metrics( + query_id: str, + bytes_scanned: int, + rows_returned: int = 0, + partitions_scanned: int = 0, + span: Optional[Span] = None, +) -> None: + """Set data warehouse query metrics on the current span.""" + target_span = span or trace.get_current_span() + if not target_span or not target_span.is_recording(): + return + + target_span.set_attribute("botanu.warehouse.query_id", query_id) + target_span.set_attribute("botanu.warehouse.bytes_scanned", bytes_scanned) + if rows_returned > 0: + target_span.set_attribute("botanu.data.rows_returned", rows_returned) + if partitions_scanned > 0: + target_span.set_attribute("botanu.warehouse.partitions_scanned", partitions_scanned) diff --git a/src/botanu/tracking/ledger.py b/src/botanu/tracking/ledger.py new file mode 100644 index 0000000..b19ec1e --- /dev/null +++ b/src/botanu/tracking/ledger.py @@ -0,0 +1,414 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Attempt Ledger — durable event log for invisible cost tracking. + +An append-only event log that is NEVER sampled and survives crashes. +Uses OTel Logs API to emit structured events. + +Event Types: +- ``attempt.started``: Run/attempt began +- ``llm.attempted``: LLM call attempt (with tokens, cost) +- ``tool.attempted``: Tool execution attempt +- ``attempt.ended``: Run/attempt completed +- ``cancellation.requested``: Cancellation was requested +- ``zombie.detected``: Work continued after timeout +""" + +from __future__ import annotations + +import logging +import os +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, Optional + +from opentelemetry import trace + +logger = logging.getLogger(__name__) + + +class LedgerEventType(str, Enum): + ATTEMPT_STARTED = "attempt.started" + ATTEMPT_ENDED = "attempt.ended" + LLM_ATTEMPTED = "llm.attempted" + TOOL_ATTEMPTED = "tool.attempted" + CANCEL_REQUESTED = "cancellation.requested" + CANCEL_ACKNOWLEDGED = "cancellation.acknowledged" + ZOMBIE_DETECTED = "zombie.detected" + REDELIVERY_DETECTED = "redelivery.detected" + + +class AttemptStatus(str, Enum): + SUCCESS = "success" + ERROR = "error" + TIMEOUT = "timeout" + CANCELLED = "cancelled" + RATE_LIMITED = "rate_limited" + + +@dataclass +class AttemptLedger: + """Durable event ledger for cost tracking. + + Emits structured log records that are never sampled, providing a + reliable source of truth for attempt counts, token costs, and zombie work. + """ + + service_name: str = field( + default_factory=lambda: os.getenv("OTEL_SERVICE_NAME", "unknown"), + ) + otlp_endpoint: Optional[str] = field(default=None) + _logger: Any = field(default=None, init=False, repr=False) + _initialized: bool = field(default=False, init=False) + + def __post_init__(self) -> None: + self._initialize_logger() + + def _initialize_logger(self) -> None: + try: + from opentelemetry._logs import get_logger_provider, set_logger_provider + from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + from opentelemetry.sdk._logs import LoggerProvider + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + + provider = get_logger_provider() + + endpoint = self.otlp_endpoint + if not endpoint: + traces_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if traces_endpoint: + endpoint = f"{traces_endpoint.rstrip('/')}/v1/logs" + else: + endpoint = "http://localhost:4318/v1/logs" + + if provider is None or not hasattr(provider, "get_logger"): + new_provider = LoggerProvider() + exporter = OTLPLogExporter(endpoint=endpoint) + new_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + set_logger_provider(new_provider) + provider = new_provider + + self._logger = provider.get_logger("botanu.attempt_ledger") + self._initialized = True + logger.debug("AttemptLedger initialized with endpoint: %s", endpoint) + + except Exception as exc: + logger.warning("Failed to initialize AttemptLedger: %s", exc) + self._initialized = False + + def _get_trace_context(self) -> Dict[str, str]: + span = trace.get_current_span() + ctx = span.get_span_context() if span else None + if ctx and ctx.is_valid: + return { + "trace_id": format(ctx.trace_id, "032x"), + "span_id": format(ctx.span_id, "016x"), + } + return {} + + def _emit( + self, + event_type: LedgerEventType, + severity: Any, + attributes: Dict[str, Any], + ) -> None: + if not self._initialized or not self._logger: + return + + try: + from opentelemetry.sdk._logs import LogRecord + + attrs = { + "event.name": event_type.value, + "service.name": self.service_name, + "timestamp_ms": int(time.time() * 1000), + **self._get_trace_context(), + **attributes, + } + + self._logger.emit( + LogRecord( + timestamp=int(time.time_ns()), + severity_number=severity, + severity_text=severity.name, + body=event_type.value, + attributes=attrs, + ) + ) + except Exception as exc: + logger.debug("Failed to emit ledger event: %s", exc) + + # ----------------------------------------------------------------- + # Attempt Lifecycle + # ----------------------------------------------------------------- + + def attempt_started( + self, + run_id: str, + use_case: str, + attempt: int = 1, + root_run_id: Optional[str] = None, + workflow: Optional[str] = None, + tenant_id: Optional[str] = None, + deadline_ts: Optional[float] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.ATTEMPT_STARTED, + SeverityNumber.INFO, + { + "botanu.run_id": run_id, + "botanu.use_case": use_case, + "botanu.attempt": attempt, + "botanu.root_run_id": root_run_id or run_id, + "botanu.workflow": workflow, + "botanu.tenant_id": tenant_id, + "botanu.deadline_ts": deadline_ts, + }, + ) + + def attempt_ended( + self, + run_id: str, + status: str, + duration_ms: Optional[float] = None, + error_class: Optional[str] = None, + reason_code: Optional[str] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.ATTEMPT_ENDED, + SeverityNumber.INFO if status == "success" else SeverityNumber.WARN, + { + "botanu.run_id": run_id, + "status": status, + "duration_ms": duration_ms, + "error_class": error_class, + "reason_code": reason_code, + }, + ) + + # ----------------------------------------------------------------- + # LLM Attempt Events + # ----------------------------------------------------------------- + + def llm_attempted( + self, + run_id: str, + provider: str, + model: str, + operation: str = "chat", + attempt_number: int = 1, + input_tokens: int = 0, + output_tokens: int = 0, + cached_tokens: int = 0, + duration_ms: Optional[float] = None, + status: str = "success", + error_class: Optional[str] = None, + provider_request_id: Optional[str] = None, + estimated_cost_usd: Optional[float] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.LLM_ATTEMPTED, + SeverityNumber.INFO if status == "success" else SeverityNumber.WARN, + { + "botanu.run_id": run_id, + "gen_ai.provider.name": provider, + "gen_ai.request.model": model, + "gen_ai.operation.name": operation, + "botanu.attempt": attempt_number, + "gen_ai.usage.input_tokens": input_tokens, + "gen_ai.usage.output_tokens": output_tokens, + "botanu.usage.cached_tokens": cached_tokens, + "duration_ms": duration_ms, + "status": status, + "error_class": error_class, + "gen_ai.response.id": provider_request_id, + "botanu.cost.estimated_usd": estimated_cost_usd, + }, + ) + + def tool_attempted( + self, + run_id: str, + tool_name: str, + tool_call_id: Optional[str] = None, + attempt_number: int = 1, + duration_ms: Optional[float] = None, + status: str = "success", + error_class: Optional[str] = None, + items_returned: int = 0, + bytes_processed: int = 0, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.TOOL_ATTEMPTED, + SeverityNumber.INFO if status == "success" else SeverityNumber.WARN, + { + "botanu.run_id": run_id, + "gen_ai.tool.name": tool_name, + "gen_ai.tool.call.id": tool_call_id, + "botanu.attempt": attempt_number, + "duration_ms": duration_ms, + "status": status, + "error_class": error_class, + "items_returned": items_returned, + "bytes_processed": bytes_processed, + }, + ) + + # ----------------------------------------------------------------- + # Cancellation & Zombie Detection + # ----------------------------------------------------------------- + + def cancel_requested( + self, + run_id: str, + reason: str = "user", + requested_at_ms: Optional[float] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.CANCEL_REQUESTED, + SeverityNumber.WARN, + { + "botanu.run_id": run_id, + "cancellation.reason": reason, + "cancellation.requested_at_ms": requested_at_ms or int(time.time() * 1000), + }, + ) + + def cancel_acknowledged( + self, + run_id: str, + acknowledged_by: str, + latency_ms: Optional[float] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.CANCEL_ACKNOWLEDGED, + SeverityNumber.INFO, + { + "botanu.run_id": run_id, + "cancellation.acknowledged_by": acknowledged_by, + "cancellation.latency_ms": latency_ms, + }, + ) + + def zombie_detected( + self, + run_id: str, + deadline_ts: float, + actual_end_ts: float, + zombie_duration_ms: float, + component: str, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.ZOMBIE_DETECTED, + SeverityNumber.ERROR, + { + "botanu.run_id": run_id, + "deadline_ts": deadline_ts, + "actual_end_ts": actual_end_ts, + "zombie_duration_ms": zombie_duration_ms, + "zombie_component": component, + }, + ) + + def redelivery_detected( + self, + run_id: str, + queue_name: str, + delivery_count: int, + original_message_id: Optional[str] = None, + ) -> None: + from opentelemetry._logs import SeverityNumber + + self._emit( + LedgerEventType.REDELIVERY_DETECTED, + SeverityNumber.WARN, + { + "botanu.run_id": run_id, + "queue.name": queue_name, + "delivery_count": delivery_count, + "original_message_id": original_message_id, + }, + ) + + # ----------------------------------------------------------------- + # Lifecycle + # ----------------------------------------------------------------- + + def flush(self, timeout_ms: int = 5000) -> bool: + if not self._initialized: + return True + try: + from opentelemetry._logs import get_logger_provider + + provider = get_logger_provider() + if hasattr(provider, "force_flush"): + return provider.force_flush(timeout_ms) + return True + except Exception as exc: + logger.debug("Failed to flush AttemptLedger: %s", exc) + return False + + def shutdown(self) -> None: + if not self._initialized: + return + try: + from opentelemetry._logs import get_logger_provider + + provider = get_logger_provider() + if hasattr(provider, "shutdown"): + provider.shutdown() + except Exception as exc: + logger.debug("Failed to shutdown AttemptLedger: %s", exc) + + +# ========================================================================= +# Global ledger +# ========================================================================= + +_global_ledger: Optional[AttemptLedger] = None + + +def get_ledger() -> AttemptLedger: + """Get the global attempt ledger instance.""" + global _global_ledger + if _global_ledger is None: + _global_ledger = AttemptLedger() + return _global_ledger + + +def set_ledger(ledger: AttemptLedger) -> None: + """Set the global attempt ledger instance.""" + global _global_ledger + _global_ledger = ledger + + +def record_attempt_started(**kwargs: Any) -> None: + get_ledger().attempt_started(**kwargs) + + +def record_attempt_ended(**kwargs: Any) -> None: + get_ledger().attempt_ended(**kwargs) + + +def record_llm_attempted(**kwargs: Any) -> None: + get_ledger().llm_attempted(**kwargs) + + +def record_tool_attempted(**kwargs: Any) -> None: + get_ledger().tool_attempted(**kwargs) diff --git a/src/botanu/tracking/llm.py b/src/botanu/tracking/llm.py new file mode 100644 index 0000000..9ddccc4 --- /dev/null +++ b/src/botanu/tracking/llm.py @@ -0,0 +1,688 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""LLM/Model Tracking — Track AI model usage for cost attribution. + +Aligned with OpenTelemetry GenAI Semantic Conventions: +https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/ + +Usage:: + + from botanu.tracking.llm import track_llm_call, track_tool_call + + with track_llm_call(provider="openai", model="gpt-4") as tracker: + response = openai.chat.completions.create(...) + tracker.set_tokens( + input_tokens=response.usage.prompt_tokens, + output_tokens=response.usage.completion_tokens, + ) + tracker.set_request_id(response.id) +""" + +from __future__ import annotations + +import functools +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, Generator, List, Optional + +from opentelemetry import metrics, trace +from opentelemetry.trace import Span, SpanKind, Status, StatusCode + +# ========================================================================= +# OTel GenAI Semantic Convention Attribute Names +# ========================================================================= + + +class GenAIAttributes: + """OpenTelemetry GenAI Semantic Convention attribute names.""" + + OPERATION_NAME = "gen_ai.operation.name" + PROVIDER_NAME = "gen_ai.provider.name" + REQUEST_MODEL = "gen_ai.request.model" + RESPONSE_MODEL = "gen_ai.response.model" + USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens" + USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens" + REQUEST_TEMPERATURE = "gen_ai.request.temperature" + REQUEST_TOP_P = "gen_ai.request.top_p" + REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" + REQUEST_STOP_SEQUENCES = "gen_ai.request.stop_sequences" + REQUEST_FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty" + REQUEST_PRESENCE_PENALTY = "gen_ai.request.presence_penalty" + RESPONSE_ID = "gen_ai.response.id" + RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons" + TOOL_NAME = "gen_ai.tool.name" + TOOL_CALL_ID = "gen_ai.tool.call.id" + ERROR_TYPE = "error.type" + + +class BotanuAttributes: + """Botanu-specific attributes for cost attribution.""" + + PROVIDER_REQUEST_ID = "botanu.provider.request_id" + CLIENT_REQUEST_ID = "botanu.provider.client_request_id" + TOKENS_CACHED = "botanu.usage.cached_tokens" + TOKENS_CACHED_READ = "botanu.usage.cache_read_tokens" + TOKENS_CACHED_WRITE = "botanu.usage.cache_write_tokens" + STREAMING = "botanu.request.streaming" + CACHE_HIT = "botanu.request.cache_hit" + ATTEMPT_NUMBER = "botanu.request.attempt" + TOOL_SUCCESS = "botanu.tool.success" + TOOL_ITEMS_RETURNED = "botanu.tool.items_returned" + TOOL_BYTES_PROCESSED = "botanu.tool.bytes_processed" + TOOL_DURATION_MS = "botanu.tool.duration_ms" + VENDOR = "botanu.vendor" + + +# ========================================================================= +# Provider name mapping +# ========================================================================= + +LLM_PROVIDERS: Dict[str, str] = { + "openai": "openai", + "azure_openai": "azure.openai", + "azure-openai": "azure.openai", + "azureopenai": "azure.openai", + "anthropic": "anthropic", + "claude": "anthropic", + "bedrock": "aws.bedrock", + "aws_bedrock": "aws.bedrock", + "amazon_bedrock": "aws.bedrock", + "vertex": "gcp.vertex_ai", + "vertexai": "gcp.vertex_ai", + "vertex_ai": "gcp.vertex_ai", + "gcp_vertex": "gcp.vertex_ai", + "gemini": "gcp.vertex_ai", + "google": "gcp.vertex_ai", + "cohere": "cohere", + "mistral": "mistral", + "mistralai": "mistral", + "together": "together", + "togetherai": "together", + "groq": "groq", + "replicate": "replicate", + "ollama": "ollama", + "huggingface": "huggingface", + "hf": "huggingface", + "fireworks": "fireworks", + "perplexity": "perplexity", +} + + +class ModelOperation: + """GenAI operation types per OTel semconv.""" + + CHAT = "chat" + TEXT_COMPLETION = "text_completion" + EMBEDDINGS = "embeddings" + GENERATE_CONTENT = "generate_content" + EXECUTE_TOOL = "execute_tool" + CREATE_AGENT = "create_agent" + INVOKE_AGENT = "invoke_agent" + RERANK = "rerank" + IMAGE_GENERATION = "image_generation" + IMAGE_EDIT = "image_edit" + SPEECH_TO_TEXT = "speech_to_text" + TEXT_TO_SPEECH = "text_to_speech" + MODERATION = "moderation" + + # Aliases + COMPLETION = "text_completion" + EMBEDDING = "embeddings" + FUNCTION_CALL = "execute_tool" + TOOL_USE = "execute_tool" + + +# ========================================================================= +# GenAI Metrics +# ========================================================================= + +_meter = metrics.get_meter("botanu.gen_ai") + +_token_usage_histogram = _meter.create_histogram( + name="gen_ai.client.token.usage", + description="Number of input and output tokens used", + unit="{token}", +) + +_operation_duration_histogram = _meter.create_histogram( + name="gen_ai.client.operation.duration", + description="GenAI operation duration", + unit="s", +) + +_attempt_counter = _meter.create_counter( + name="botanu.gen_ai.attempts", + description="Number of request attempts (including retries)", + unit="{attempt}", +) + + +def _record_token_metrics( + provider: str, + model: str, + operation: str, + input_tokens: int, + output_tokens: int, + error_type: Optional[str] = None, +) -> None: + base_attrs: Dict[str, str] = { + GenAIAttributes.OPERATION_NAME: operation, + GenAIAttributes.PROVIDER_NAME: provider, + GenAIAttributes.REQUEST_MODEL: model, + } + if error_type: + base_attrs[GenAIAttributes.ERROR_TYPE] = error_type + + if input_tokens > 0: + _token_usage_histogram.record( + input_tokens, + {**base_attrs, "gen_ai.token.type": "input"}, + ) + if output_tokens > 0: + _token_usage_histogram.record( + output_tokens, + {**base_attrs, "gen_ai.token.type": "output"}, + ) + + +def _record_duration_metric( + provider: str, + model: str, + operation: str, + duration_seconds: float, + error_type: Optional[str] = None, +) -> None: + attrs: Dict[str, str] = { + GenAIAttributes.OPERATION_NAME: operation, + GenAIAttributes.PROVIDER_NAME: provider, + GenAIAttributes.REQUEST_MODEL: model, + } + if error_type: + attrs[GenAIAttributes.ERROR_TYPE] = error_type + + _operation_duration_histogram.record(duration_seconds, attrs) + + +# ========================================================================= +# LLM Tracker +# ========================================================================= + + +@dataclass +class LLMTracker: + """Context manager for tracking LLM calls with OTel GenAI semconv.""" + + provider: str + model: str + operation: str = ModelOperation.CHAT + span: Optional[Span] = field(default=None, repr=False) + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + input_tokens: int = 0 + output_tokens: int = 0 + cached_tokens: int = 0 + cache_read_tokens: int = 0 + cache_write_tokens: int = 0 + + provider_request_id: Optional[str] = None + client_request_id: Optional[str] = None + response_model: Optional[str] = None + finish_reason: Optional[str] = None + is_streaming: bool = False + cache_hit: bool = False + attempt_number: int = 1 + error_type: Optional[str] = None + + def set_tokens( + self, + input_tokens: int = 0, + output_tokens: int = 0, + cached_tokens: int = 0, + cache_read_tokens: int = 0, + cache_write_tokens: int = 0, + ) -> LLMTracker: + """Set token counts from model response.""" + self.input_tokens = input_tokens + self.output_tokens = output_tokens + self.cached_tokens = cached_tokens or cache_read_tokens + self.cache_read_tokens = cache_read_tokens + self.cache_write_tokens = cache_write_tokens + + if self.span: + self.span.set_attribute(GenAIAttributes.USAGE_INPUT_TOKENS, input_tokens) + self.span.set_attribute(GenAIAttributes.USAGE_OUTPUT_TOKENS, output_tokens) + if self.cached_tokens > 0: + self.span.set_attribute(BotanuAttributes.TOKENS_CACHED, self.cached_tokens) + if cache_read_tokens > 0: + self.span.set_attribute(BotanuAttributes.TOKENS_CACHED_READ, cache_read_tokens) + if cache_write_tokens > 0: + self.span.set_attribute(BotanuAttributes.TOKENS_CACHED_WRITE, cache_write_tokens) + return self + + def set_request_id( + self, + provider_request_id: Optional[str] = None, + client_request_id: Optional[str] = None, + ) -> LLMTracker: + """Set provider request IDs for billing reconciliation.""" + if provider_request_id: + self.provider_request_id = provider_request_id + if self.span: + self.span.set_attribute(GenAIAttributes.RESPONSE_ID, provider_request_id) + self.span.set_attribute(BotanuAttributes.PROVIDER_REQUEST_ID, provider_request_id) + if client_request_id: + self.client_request_id = client_request_id + if self.span: + self.span.set_attribute(BotanuAttributes.CLIENT_REQUEST_ID, client_request_id) + return self + + def set_response_model(self, model: str) -> LLMTracker: + """Set the actual model used in the response.""" + self.response_model = model + if self.span: + self.span.set_attribute(GenAIAttributes.RESPONSE_MODEL, model) + return self + + def set_finish_reason(self, reason: str) -> LLMTracker: + """Set the finish/stop reason from the response.""" + self.finish_reason = reason + if self.span: + self.span.set_attribute(GenAIAttributes.RESPONSE_FINISH_REASONS, [reason]) + return self + + def set_streaming(self, is_streaming: bool = True) -> LLMTracker: + """Mark request as streaming.""" + self.is_streaming = is_streaming + if self.span: + self.span.set_attribute(BotanuAttributes.STREAMING, is_streaming) + return self + + def set_cache_hit(self, cache_hit: bool = True) -> LLMTracker: + """Mark as cache hit.""" + self.cache_hit = cache_hit + if self.span: + self.span.set_attribute(BotanuAttributes.CACHE_HIT, cache_hit) + return self + + def set_attempt(self, attempt_number: int) -> LLMTracker: + """Set the attempt number (for retry tracking).""" + self.attempt_number = attempt_number + if self.span: + self.span.set_attribute(BotanuAttributes.ATTEMPT_NUMBER, attempt_number) + return self + + def set_request_params( + self, + temperature: Optional[float] = None, + top_p: Optional[float] = None, + max_tokens: Optional[int] = None, + stop_sequences: Optional[List[str]] = None, + frequency_penalty: Optional[float] = None, + presence_penalty: Optional[float] = None, + ) -> LLMTracker: + """Set request parameters per OTel GenAI semconv.""" + if self.span: + if temperature is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_TEMPERATURE, temperature) + if top_p is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_TOP_P, top_p) + if max_tokens is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_MAX_TOKENS, max_tokens) + if stop_sequences is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_STOP_SEQUENCES, stop_sequences) + if frequency_penalty is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_FREQUENCY_PENALTY, frequency_penalty) + if presence_penalty is not None: + self.span.set_attribute(GenAIAttributes.REQUEST_PRESENCE_PENALTY, presence_penalty) + return self + + def set_error(self, error: Exception) -> LLMTracker: + """Record an error from the LLM call.""" + self.error_type = type(error).__name__ + if self.span: + self.span.set_status(Status(StatusCode.ERROR, str(error))) + self.span.set_attribute(GenAIAttributes.ERROR_TYPE, self.error_type) + self.span.record_exception(error) + return self + + def add_metadata(self, **kwargs: Any) -> LLMTracker: + """Add custom metadata to the span.""" + if self.span: + for key, value in kwargs.items(): + attr_key = key if key.startswith(("botanu.", "gen_ai.")) else f"botanu.{key}" + self.span.set_attribute(attr_key, value) + return self + + def _finalize(self) -> None: + if not self.span: + return + + duration_seconds = (datetime.now(timezone.utc) - self.start_time).total_seconds() + + _record_token_metrics( + provider=self.provider, + model=self.model, + operation=self.operation, + input_tokens=self.input_tokens, + output_tokens=self.output_tokens, + error_type=self.error_type, + ) + _record_duration_metric( + provider=self.provider, + model=self.model, + operation=self.operation, + duration_seconds=duration_seconds, + error_type=self.error_type, + ) + _attempt_counter.add( + 1, + { + GenAIAttributes.PROVIDER_NAME: self.provider, + GenAIAttributes.REQUEST_MODEL: self.model, + GenAIAttributes.OPERATION_NAME: self.operation, + "status": "error" if self.error_type else "success", + }, + ) + + +@contextmanager +def track_llm_call( + provider: str, + model: str, + operation: str = ModelOperation.CHAT, + client_request_id: Optional[str] = None, + **kwargs: Any, +) -> Generator[LLMTracker, None, None]: + """Context manager for tracking LLM/model calls with OTel GenAI semconv. + + Args: + provider: LLM provider (openai, anthropic, bedrock, vertex, …). + model: Model name/ID (gpt-4, claude-3-opus, …). + operation: Type of operation (chat, embeddings, text_completion, …). + client_request_id: Optional client-generated request ID. + **kwargs: Additional span attributes. + + Yields: + :class:`LLMTracker` instance. + """ + tracer = trace.get_tracer("botanu.gen_ai") + normalized_provider = LLM_PROVIDERS.get(provider.lower(), provider.lower()) + span_name = f"{operation} {model}" + + with tracer.start_as_current_span(name=span_name, kind=SpanKind.CLIENT) as span: + span.set_attribute(GenAIAttributes.OPERATION_NAME, operation) + span.set_attribute(GenAIAttributes.PROVIDER_NAME, normalized_provider) + span.set_attribute(GenAIAttributes.REQUEST_MODEL, model) + span.set_attribute(BotanuAttributes.VENDOR, normalized_provider) + + for key, value in kwargs.items(): + attr_key = key if key.startswith(("botanu.", "gen_ai.")) else f"botanu.{key}" + span.set_attribute(attr_key, value) + + tracker = LLMTracker( + provider=normalized_provider, + model=model, + operation=operation, + span=span, + ) + if client_request_id: + tracker.set_request_id(client_request_id=client_request_id) + + try: + yield tracker + except Exception as exc: + tracker.set_error(exc) + raise + finally: + tracker._finalize() + + +# ========================================================================= +# Tool/Function Call Tracker +# ========================================================================= + +_tool_duration_histogram = _meter.create_histogram( + name="botanu.tool.duration", + description="Tool execution duration", + unit="s", +) + +_tool_counter = _meter.create_counter( + name="botanu.tool.executions", + description="Number of tool executions", + unit="{execution}", +) + + +@dataclass +class ToolTracker: + """Context manager for tracking tool/function calls.""" + + tool_name: str + tool_call_id: Optional[str] = None + provider: Optional[str] = None + span: Optional[Span] = field(default=None, repr=False) + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + success: bool = True + items_returned: int = 0 + bytes_processed: int = 0 + error_type: Optional[str] = None + + def set_result( + self, + success: bool = True, + items_returned: int = 0, + bytes_processed: int = 0, + ) -> ToolTracker: + """Set tool execution result.""" + self.success = success + self.items_returned = items_returned + self.bytes_processed = bytes_processed + if self.span: + self.span.set_attribute(BotanuAttributes.TOOL_SUCCESS, success) + if items_returned > 0: + self.span.set_attribute(BotanuAttributes.TOOL_ITEMS_RETURNED, items_returned) + if bytes_processed > 0: + self.span.set_attribute(BotanuAttributes.TOOL_BYTES_PROCESSED, bytes_processed) + return self + + def set_tool_call_id(self, tool_call_id: str) -> ToolTracker: + """Set the tool call ID from the LLM response.""" + self.tool_call_id = tool_call_id + if self.span: + self.span.set_attribute(GenAIAttributes.TOOL_CALL_ID, tool_call_id) + return self + + def set_error(self, error: Exception) -> ToolTracker: + """Record tool execution error.""" + self.success = False + self.error_type = type(error).__name__ + if self.span: + self.span.set_status(Status(StatusCode.ERROR, str(error))) + self.span.set_attribute(GenAIAttributes.ERROR_TYPE, self.error_type) + self.span.record_exception(error) + return self + + def add_metadata(self, **kwargs: Any) -> ToolTracker: + """Add custom metadata to the span.""" + if self.span: + for key, value in kwargs.items(): + attr_key = key if key.startswith(("botanu.", "gen_ai.")) else f"botanu.tool.{key}" + self.span.set_attribute(attr_key, value) + return self + + def _finalize(self) -> None: + if not self.span: + return + duration_seconds = (datetime.now(timezone.utc) - self.start_time).total_seconds() + self.span.set_attribute(BotanuAttributes.TOOL_DURATION_MS, duration_seconds * 1000) + + attrs: Dict[str, str] = { + GenAIAttributes.TOOL_NAME: self.tool_name, + "status": "error" if self.error_type else "success", + } + if self.provider: + attrs[GenAIAttributes.PROVIDER_NAME] = self.provider + + _tool_duration_histogram.record(duration_seconds, attrs) + _tool_counter.add(1, attrs) + + +@contextmanager +def track_tool_call( + tool_name: str, + tool_call_id: Optional[str] = None, + provider: Optional[str] = None, + **kwargs: Any, +) -> Generator[ToolTracker, None, None]: + """Context manager for tracking tool/function calls. + + Args: + tool_name: Name of the tool/function. + tool_call_id: Tool call ID from the LLM response. + provider: Tool provider if external (e.g., ``"tavily"``). + **kwargs: Additional span attributes. + + Yields: + :class:`ToolTracker` instance. + """ + tracer = trace.get_tracer("botanu.gen_ai") + span_name = f"execute_tool {tool_name}" + + with tracer.start_as_current_span(name=span_name, kind=SpanKind.INTERNAL) as span: + span.set_attribute(GenAIAttributes.OPERATION_NAME, ModelOperation.EXECUTE_TOOL) + span.set_attribute(GenAIAttributes.TOOL_NAME, tool_name) + + if tool_call_id: + span.set_attribute(GenAIAttributes.TOOL_CALL_ID, tool_call_id) + if provider: + normalized = LLM_PROVIDERS.get(provider.lower(), provider.lower()) + span.set_attribute(GenAIAttributes.PROVIDER_NAME, normalized) + span.set_attribute(BotanuAttributes.VENDOR, normalized) + + for key, value in kwargs.items(): + attr_key = key if key.startswith(("botanu.", "gen_ai.")) else f"botanu.tool.{key}" + span.set_attribute(attr_key, value) + + tracker = ToolTracker( + tool_name=tool_name, + tool_call_id=tool_call_id, + provider=provider, + span=span, + ) + + try: + yield tracker + except Exception as exc: + tracker.set_error(exc) + raise + finally: + tracker._finalize() + + +# ========================================================================= +# Standalone Helpers +# ========================================================================= + + +def set_llm_attributes( + provider: str, + model: str, + operation: str = ModelOperation.CHAT, + input_tokens: int = 0, + output_tokens: int = 0, + cached_tokens: int = 0, + streaming: bool = False, + provider_request_id: Optional[str] = None, + span: Optional[Span] = None, +) -> None: + """Set LLM attributes on the current span using OTel GenAI semconv.""" + target_span = span or trace.get_current_span() + if not target_span or not target_span.is_recording(): + return + + normalized_provider = LLM_PROVIDERS.get(provider.lower(), provider.lower()) + + target_span.set_attribute(GenAIAttributes.OPERATION_NAME, operation) + target_span.set_attribute(GenAIAttributes.PROVIDER_NAME, normalized_provider) + target_span.set_attribute(GenAIAttributes.REQUEST_MODEL, model) + target_span.set_attribute(BotanuAttributes.VENDOR, normalized_provider) + + if input_tokens > 0: + target_span.set_attribute(GenAIAttributes.USAGE_INPUT_TOKENS, input_tokens) + if output_tokens > 0: + target_span.set_attribute(GenAIAttributes.USAGE_OUTPUT_TOKENS, output_tokens) + if cached_tokens > 0: + target_span.set_attribute(BotanuAttributes.TOKENS_CACHED, cached_tokens) + if streaming: + target_span.set_attribute(BotanuAttributes.STREAMING, True) + if provider_request_id: + target_span.set_attribute(GenAIAttributes.RESPONSE_ID, provider_request_id) + target_span.set_attribute(BotanuAttributes.PROVIDER_REQUEST_ID, provider_request_id) + + _record_token_metrics( + provider=normalized_provider, + model=model, + operation=operation, + input_tokens=input_tokens, + output_tokens=output_tokens, + ) + + +def set_token_usage( + input_tokens: int, + output_tokens: int, + cached_tokens: int = 0, + span: Optional[Span] = None, +) -> None: + """Set token usage on the current span using OTel GenAI semconv.""" + target_span = span or trace.get_current_span() + if not target_span or not target_span.is_recording(): + return + + target_span.set_attribute(GenAIAttributes.USAGE_INPUT_TOKENS, input_tokens) + target_span.set_attribute(GenAIAttributes.USAGE_OUTPUT_TOKENS, output_tokens) + + if cached_tokens > 0: + target_span.set_attribute(BotanuAttributes.TOKENS_CACHED, cached_tokens) + + +def llm_instrumented( + provider: str, + model_param: str = "model", + tokens_from_response: bool = True, +) -> Any: + """Decorator to auto-instrument LLM client methods. + + Args: + provider: LLM provider name. + model_param: Name of the parameter containing the model name. + tokens_from_response: Whether to extract tokens from ``response.usage``. + """ + + def decorator(func: Any) -> Any: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + model = kwargs.get(model_param) or (args[1] if len(args) > 1 else "unknown") + + with track_llm_call(provider, model) as tracker: + if kwargs.get("stream"): + tracker.set_streaming(True) + + response = func(*args, **kwargs) + + if tokens_from_response and hasattr(response, "usage"): + usage = response.usage + tracker.set_tokens( + input_tokens=getattr(usage, "prompt_tokens", 0) or getattr(usage, "input_tokens", 0), + output_tokens=getattr(usage, "completion_tokens", 0) or getattr(usage, "output_tokens", 0), + ) + + return response + + return wrapper + + return decorator diff --git a/src/botanu/tracking/metrics.py b/src/botanu/tracking/metrics.py new file mode 100644 index 0000000..89e34b8 --- /dev/null +++ b/src/botanu/tracking/metrics.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Run metrics — reliable aggregates for dashboards and alerts. + +Metrics are the "always-on truth" — they're not sampled like spans. + +- ``botanu.run.completed`` (counter): Total runs by use_case, status, environment +- ``botanu.run.duration_ms`` (histogram): Run duration distribution +""" + +from __future__ import annotations + +import logging +from typing import Optional + +from opentelemetry import metrics +from opentelemetry.metrics import Counter, Histogram + +logger = logging.getLogger(__name__) + +_SDK_METER_NAME = "botanu_sdk" + +meter = metrics.get_meter(_SDK_METER_NAME) + +_run_completed_counter: Optional[Counter] = None +_run_duration_histogram: Optional[Histogram] = None + + +def _get_run_completed_counter() -> Counter: + global _run_completed_counter + if _run_completed_counter is None: + _run_completed_counter = meter.create_counter( + name="botanu.run.completed", + description="Total number of completed runs", + unit="1", + ) + return _run_completed_counter + + +def _get_run_duration_histogram() -> Histogram: + global _run_duration_histogram + if _run_duration_histogram is None: + _run_duration_histogram = meter.create_histogram( + name="botanu.run.duration_ms", + description="Run duration in milliseconds", + unit="ms", + ) + return _run_duration_histogram + + +def record_run_completed( + use_case: str, + status: str, + environment: str, + duration_ms: float, + service_name: Optional[str] = None, + workflow: Optional[str] = None, +) -> None: + """Record a completed run in metrics. + + Called at the end of every run, regardless of whether the span is sampled. + + Args: + use_case: Use case name (low cardinality). + status: Outcome status (success/failure/partial/timeout/canceled). + environment: Deployment environment. + duration_ms: Run duration in milliseconds. + service_name: Service name (optional). + workflow: Workflow name (optional). + """ + attrs = { + "use_case": use_case, + "status": status, + "environment": environment, + } + if service_name: + attrs["service.name"] = service_name + if workflow: + attrs["workflow"] = workflow + + try: + _get_run_completed_counter().add(1, attrs) + except Exception as exc: + logger.debug("Failed to record run.completed metric: %s", exc) + + try: + _get_run_duration_histogram().record(duration_ms, attrs) + except Exception as exc: + logger.debug("Failed to record run.duration_ms metric: %s", exc) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..288f918 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,58 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Shared test fixtures for Botanu SDK tests.""" + +from __future__ import annotations + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +# Module-level provider and exporter to avoid "cannot override" warnings +_provider: TracerProvider = None +_exporter: InMemorySpanExporter = None + + +def _get_or_create_provider() -> tuple[TracerProvider, InMemorySpanExporter]: + """Get or create the global test provider.""" + global _provider, _exporter + + if _provider is None: + _provider = TracerProvider() + _exporter = InMemorySpanExporter() + _provider.add_span_processor(SimpleSpanProcessor(_exporter)) + trace.set_tracer_provider(_provider) + + return _provider, _exporter + + +@pytest.fixture(autouse=True) +def reset_tracing(): + """Reset tracing state before each test.""" + _, exporter = _get_or_create_provider() + exporter.clear() + yield + exporter.clear() + + +@pytest.fixture +def tracer_provider(): + """Get the test TracerProvider.""" + provider, _ = _get_or_create_provider() + return provider + + +@pytest.fixture +def memory_exporter(): + """Get the in-memory span exporter for testing.""" + _, exporter = _get_or_create_provider() + return exporter + + +@pytest.fixture +def tracer(tracer_provider): + """Get a tracer instance.""" + return trace.get_tracer("test-tracer") diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e6ae60f --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e6ae60f --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/unit/test_decorators.py b/tests/unit/test_decorators.py new file mode 100644 index 0000000..e7b7dc6 --- /dev/null +++ b/tests/unit/test_decorators.py @@ -0,0 +1,124 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for SDK decorators.""" + +from __future__ import annotations + +import pytest + +from botanu.sdk.decorators import botanu_use_case + + +class TestBotanuUseCaseDecorator: + """Tests for @botanu_use_case decorator.""" + + def test_sync_function_creates_span(self, memory_exporter): + @botanu_use_case("Test Use Case") + def my_function(): + return "result" + + result = my_function() + + assert result == "result" + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "botanu.run/Test Use Case" + + def test_span_has_run_attributes(self, memory_exporter): + @botanu_use_case("Customer Support", workflow="handle_ticket") + def my_function(): + return "done" + + my_function() + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + + assert "botanu.run_id" in attrs + assert attrs["botanu.use_case"] == "Customer Support" + assert attrs["botanu.workflow"] == "handle_ticket" + + def test_emits_started_event(self, memory_exporter): + @botanu_use_case("Test") + def my_function(): + pass + + my_function() + + spans = memory_exporter.get_finished_spans() + events = spans[0].events + + started_events = [e for e in events if e.name == "botanu.run.started"] + assert len(started_events) == 1 + + def test_emits_completed_event(self, memory_exporter): + @botanu_use_case("Test") + def my_function(): + return "done" + + my_function() + + spans = memory_exporter.get_finished_spans() + events = spans[0].events + + completed_events = [e for e in events if e.name == "botanu.run.completed"] + assert len(completed_events) == 1 + assert completed_events[0].attributes["status"] == "success" + + def test_records_exception_on_failure(self, memory_exporter): + @botanu_use_case("Test") + def failing_function(): + raise ValueError("test error") + + with pytest.raises(ValueError): + failing_function() + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + events = spans[0].events + completed_events = [e for e in events if e.name == "botanu.run.completed"] + assert len(completed_events) == 1 + assert completed_events[0].attributes["status"] == "failure" + assert completed_events[0].attributes["error_class"] == "ValueError" + + @pytest.mark.asyncio + async def test_async_function_creates_span(self, memory_exporter): + @botanu_use_case("Async Test") + async def async_function(): + return "async result" + + result = await async_function() + + assert result == "async result" + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "botanu.run/Async Test" + + @pytest.mark.asyncio + async def test_async_exception_handling(self, memory_exporter): + @botanu_use_case("Async Test") + async def failing_async(): + raise RuntimeError("async error") + + with pytest.raises(RuntimeError): + await failing_async() + + spans = memory_exporter.get_finished_spans() + events = spans[0].events + completed_events = [e for e in events if e.name == "botanu.run.completed"] + assert completed_events[0].attributes["status"] == "failure" + + def test_workflow_version_computed(self, memory_exporter): + @botanu_use_case("Test") + def versioned_function(): + return "versioned" + + versioned_function() + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + + assert "botanu.workflow.version" in attrs + assert attrs["botanu.workflow.version"].startswith("v:") diff --git a/tests/unit/test_llm_tracking.py b/tests/unit/test_llm_tracking.py new file mode 100644 index 0000000..a45b315 --- /dev/null +++ b/tests/unit/test_llm_tracking.py @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for LLM tracking.""" + +from __future__ import annotations + +import pytest + +from botanu.tracking.llm import ( + GenAIAttributes, + ModelOperation, + track_llm_call, +) + + +class TestTrackLLMCall: + """Tests for track_llm_call context manager.""" + + def test_creates_span_with_model_name(self, memory_exporter): + with track_llm_call(model="gpt-4", provider="openai") as tracker: + tracker.set_tokens(input_tokens=100, output_tokens=50) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + # Span name format: "{operation} {model}" + assert spans[0].name == "chat gpt-4" + + def test_records_token_usage(self, memory_exporter): + with track_llm_call(model="claude-3-opus", provider="anthropic") as tracker: + tracker.set_tokens(input_tokens=500, output_tokens=200) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + + assert attrs[GenAIAttributes.USAGE_INPUT_TOKENS] == 500 + assert attrs[GenAIAttributes.USAGE_OUTPUT_TOKENS] == 200 + + def test_records_error_on_exception(self, memory_exporter): + with pytest.raises(ValueError): + with track_llm_call(model="gpt-4", provider="openai") as _tracker: + raise ValueError("API error") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs.get(GenAIAttributes.ERROR_TYPE) == "ValueError" + + def test_operation_type_attribute(self, memory_exporter): + with track_llm_call( + model="gpt-4", + provider="openai", + operation=ModelOperation.EMBEDDINGS, + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.OPERATION_NAME] == "embeddings" + + def test_request_params(self, memory_exporter): + with track_llm_call( + model="gpt-4", + provider="openai", + ) as tracker: + tracker.set_request_params(temperature=0.7, max_tokens=1000) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.REQUEST_TEMPERATURE] == 0.7 + assert attrs[GenAIAttributes.REQUEST_MAX_TOKENS] == 1000 + + +class TestLLMTracker: + """Tests for LLMTracker helper methods.""" + + def test_set_request_id(self, memory_exporter): + with track_llm_call(model="gpt-4", provider="openai") as tracker: + tracker.set_request_id(provider_request_id="resp_123") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.RESPONSE_ID] == "resp_123" + + def test_set_finish_reason(self, memory_exporter): + with track_llm_call(model="gpt-4", provider="openai") as tracker: + tracker.set_finish_reason("stop") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + # OTel converts lists to tuples for span attributes + assert attrs[GenAIAttributes.RESPONSE_FINISH_REASONS] == ("stop",) + + +class TestProviderNormalization: + """Tests for provider name normalization.""" + + def test_openai_normalized(self, memory_exporter): + with track_llm_call(model="gpt-4", provider="OpenAI"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.PROVIDER_NAME] == "openai" + + def test_anthropic_normalized(self, memory_exporter): + with track_llm_call(model="claude-3", provider="Anthropic"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.PROVIDER_NAME] == "anthropic" + + def test_bedrock_normalized(self, memory_exporter): + with track_llm_call(model="claude-v2", provider="bedrock"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.PROVIDER_NAME] == "aws.bedrock" diff --git a/tests/unit/test_run_context.py b/tests/unit/test_run_context.py new file mode 100644 index 0000000..0869676 --- /dev/null +++ b/tests/unit/test_run_context.py @@ -0,0 +1,204 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for RunContext model.""" + +from __future__ import annotations + +import os +import re +import time +from unittest import mock + +from botanu.models.run_context import ( + RunContext, + RunStatus, + generate_run_id, +) + + +class TestGenerateRunId: + """Tests for UUIDv7 generation.""" + + def test_format_is_uuid(self): + """run_id should be valid UUID format.""" + run_id = generate_run_id() + uuid_pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" + assert re.match(uuid_pattern, run_id), f"Invalid UUID format: {run_id}" + + def test_uniqueness(self): + """Generated IDs should be unique.""" + ids = [generate_run_id() for _ in range(1000)] + assert len(set(ids)) == 1000 + + def test_sortable_by_time(self): + """IDs generated later should sort after earlier ones.""" + id1 = generate_run_id() + time.sleep(0.002) + id2 = generate_run_id() + assert id1 < id2 + + +class TestRunContextCreate: + """Tests for RunContext.create factory.""" + + def test_creates_with_required_fields(self): + ctx = RunContext.create(use_case="Customer Support") + assert ctx.run_id is not None + assert ctx.use_case == "Customer Support" + assert ctx.environment == "production" # default + assert ctx.attempt == 1 + + def test_root_run_id_defaults_to_run_id(self): + ctx = RunContext.create(use_case="test") + assert ctx.root_run_id == ctx.run_id + + def test_accepts_custom_root_run_id(self): + ctx = RunContext.create(use_case="test", root_run_id="custom-root") + assert ctx.root_run_id == "custom-root" + + def test_environment_from_env_var(self): + with mock.patch.dict(os.environ, {"BOTANU_ENVIRONMENT": "staging"}): + ctx = RunContext.create(use_case="test") + assert ctx.environment == "staging" + + def test_explicit_environment_overrides_env_var(self): + with mock.patch.dict(os.environ, {"BOTANU_ENVIRONMENT": "staging"}): + ctx = RunContext.create(use_case="test", environment="production") + assert ctx.environment == "production" + + +class TestRunContextRetry: + """Tests for retry handling.""" + + def test_create_retry_increments_attempt(self): + original = RunContext.create(use_case="test") + retry = RunContext.create_retry(original) + + assert retry.attempt == 2 + assert retry.retry_of_run_id == original.run_id + assert retry.root_run_id == original.root_run_id + assert retry.run_id != original.run_id + + def test_multiple_retries_preserve_root(self): + original = RunContext.create(use_case="test") + retry1 = RunContext.create_retry(original) + retry2 = RunContext.create_retry(retry1) + + assert retry2.attempt == 3 + assert retry2.root_run_id == original.run_id + + +class TestRunContextDeadline: + """Tests for deadline handling.""" + + def test_deadline_seconds(self): + ctx = RunContext.create(use_case="test", deadline_seconds=10.0) + assert ctx.deadline is not None + assert ctx.deadline > time.time() + + def test_is_past_deadline(self): + ctx = RunContext.create(use_case="test", deadline_seconds=0.001) + time.sleep(0.01) + assert ctx.is_past_deadline() is True + + def test_remaining_time_seconds(self): + ctx = RunContext.create(use_case="test", deadline_seconds=10.0) + remaining = ctx.remaining_time_seconds() + assert remaining is not None + assert 9.0 < remaining <= 10.0 + + +class TestRunContextCancellation: + """Tests for cancellation handling.""" + + def test_request_cancellation(self): + ctx = RunContext.create(use_case="test") + assert ctx.is_cancelled() is False + + ctx.request_cancellation("user") + assert ctx.is_cancelled() is True + assert ctx.cancelled_at is not None + + +class TestRunContextOutcome: + """Tests for outcome recording.""" + + def test_complete_sets_outcome(self): + ctx = RunContext.create(use_case="test") + ctx.complete( + status=RunStatus.SUCCESS, + value_type="tickets_resolved", + value_amount=1.0, + ) + + assert ctx.outcome is not None + assert ctx.outcome.status == RunStatus.SUCCESS + assert ctx.outcome.value_type == "tickets_resolved" + assert ctx.outcome.value_amount == 1.0 + + +class TestRunContextSerialization: + """Tests for baggage and span attribute serialization.""" + + def test_to_baggage_dict_lean_mode(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "lean"}): + ctx = RunContext.create( + use_case="Customer Support", + workflow="handle_ticket", + tenant_id="tenant-123", + ) + baggage = ctx.to_baggage_dict() + + # Lean mode only includes run_id and use_case + assert "botanu.run_id" in baggage + assert "botanu.use_case" in baggage + assert "botanu.workflow" not in baggage + assert "botanu.tenant_id" not in baggage + + def test_to_baggage_dict_full_mode(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "full"}): + ctx = RunContext.create( + use_case="Customer Support", + workflow="handle_ticket", + tenant_id="tenant-123", + ) + baggage = ctx.to_baggage_dict() + + assert baggage["botanu.workflow"] == "handle_ticket" + assert baggage["botanu.tenant_id"] == "tenant-123" + + def test_to_span_attributes(self): + ctx = RunContext.create( + use_case="Customer Support", + workflow="handle_ticket", + tenant_id="tenant-123", + ) + attrs = ctx.to_span_attributes() + + assert attrs["botanu.run_id"] == ctx.run_id + assert attrs["botanu.use_case"] == "Customer Support" + assert attrs["botanu.workflow"] == "handle_ticket" + assert attrs["botanu.tenant_id"] == "tenant-123" + + def test_from_baggage_roundtrip(self): + original = RunContext.create( + use_case="test", + workflow="my_workflow", + tenant_id="tenant-abc", + ) + baggage = original.to_baggage_dict(lean_mode=False) + restored = RunContext.from_baggage(baggage) + + assert restored is not None + assert restored.run_id == original.run_id + assert restored.use_case == original.use_case + assert restored.workflow == original.workflow + assert restored.tenant_id == original.tenant_id + + def test_from_baggage_returns_none_for_missing_fields(self): + result = RunContext.from_baggage({}) + assert result is None + + result = RunContext.from_baggage({"botanu.run_id": "some-id"}) + assert result is None