Skip to content

Commit ef9e73e

Browse files
committed
fix prometheus
Signed-off-by: kerthcet <kerthcet@gmail.com>
1 parent f15ee70 commit ef9e73e

8 files changed

Lines changed: 1372 additions & 529 deletions

File tree

alphatrion/storage/runtime.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44

55
from opentelemetry import trace
66
from opentelemetry.sdk.trace import TracerProvider
7+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
78
from traceloop.sdk import Traceloop
89

910
from alphatrion import envs
1011
from alphatrion.artifact.artifact import Artifact
1112
from alphatrion.storage.sqlstore import SQLStore
1213
from alphatrion.storage.tracestore import TraceStore
1314
from alphatrion.tracing.clickhouse_exporter import ClickHouseSpanExporter
14-
from alphatrion.tracing.prometheus_span_processor import PrometheusSpanProcessor
15+
from alphatrion.tracing.cost_enrichment_processor import CostEnrichmentProcessor
16+
from alphatrion.tracing.prometheus_exporter import PrometheusExporter
1517
from alphatrion.tracing.span_processor import ContextAttributesSpanProcessor
1618

1719
__STORAGE_RUNTIME__ = None
@@ -62,23 +64,31 @@ def __init__(self):
6264
telemetry_enabled=False,
6365
)
6466

65-
# Add custom span processor to inject context attributes (run_id, etc.)
66-
# into all spans, including child spans created by instrumented libraries
67+
# Add custom span processors
6768
tracer_provider = trace.get_tracer_provider()
69+
70+
# 1. Context attributes processor - injects context (run_id, etc.) into all spans
6871
tracer_provider.add_span_processor(ContextAttributesSpanProcessor())
6972

70-
# Add Prometheus span processor if enabled
73+
# 2. Cost enrichment processor - calculates costs from tokens and adds to span attributes
74+
# This runs early so downstream processors/exporters can access cost data
75+
tracer_provider.add_span_processor(CostEnrichmentProcessor())
76+
77+
# 3. Add Prometheus exporter if enabled
7178
if os.getenv(envs.ENABLE_PROMETHEUS, "false").lower() == "true":
7279
pushgateway_url = os.getenv(
7380
envs.PROMETHEUS_PUSHGATEWAY_URL, "localhost:9091"
7481
)
7582
job_name = os.getenv(envs.PROMETHEUS_JOB_NAME, "alphatrion")
7683

77-
prometheus_processor = PrometheusSpanProcessor(
84+
prometheus_exporter = PrometheusExporter(
7885
pushgateway_url=pushgateway_url,
7986
job_name=job_name,
8087
)
81-
tracer_provider.add_span_processor(prometheus_processor)
88+
# Use BatchSpanProcessor for better performance
89+
tracer_provider.add_span_processor(
90+
BatchSpanProcessor(prometheus_exporter)
91+
)
8292

8393
artifact_insecure = os.getenv(envs.ARTIFACT_INSECURE, "false").lower() == "true"
8494
if artifact_storage_enabled():

alphatrion/tracing/clickhouse_exporter.py

Lines changed: 7 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
SEMANTIC_KIND_REASONING,
1414
SEMANTIC_KIND_UNKNOWN,
1515
)
16-
from alphatrion.utils.pricing import calculate_cost
1716

1817
logger = logging.getLogger(__name__)
1918

@@ -130,58 +129,16 @@ def _convert_span(self, span: ReadableSpan) -> dict[str, Any]:
130129

131130
# Calculate cost for LLM spans with token usage
132131
# Store cost per span to enable model-level cost analytics
133-
if "llm.usage.total_tokens" in span_attributes:
132+
if "alphatrion.cost.total_tokens" in span_attributes:
134133
try:
135-
base_url = span_attributes.get("gen_ai.openai.api_base", "")
136-
provider = determine_provider(base_url)
137-
138-
# Get model and tokens
139-
model = span_attributes.get(
140-
"gen_ai.request.model"
141-
) or span_attributes.get("gen_ai.response.model", "")
142-
143-
input_tokens = int(span_attributes.get("gen_ai.usage.input_tokens", 0))
144-
output_tokens = int(
145-
span_attributes.get("gen_ai.usage.output_tokens", 0)
146-
)
147-
cache_creation_input_tokens = int(
148-
span_attributes.get("gen_ai.usage.cache_creation_input_tokens", 0)
149-
)
150-
cache_read_input_tokens = int(
151-
span_attributes.get("gen_ai.usage.cache_read_input_tokens", 0)
152-
)
153-
154-
# Calculate cost for this span
155-
cost_result = calculate_cost(
156-
provider=provider,
157-
model=model,
158-
input_tokens=input_tokens,
159-
output_tokens=output_tokens,
160-
cache_creation_input_tokens=cache_creation_input_tokens,
161-
cache_read_input_tokens=cache_read_input_tokens,
162-
)
163-
164-
# Add cost to span attributes (in USD)
165-
# This enables model-level cost analytics across all spans
166-
span_attributes["alphatrion.cost.total_tokens"] = str(
167-
cost_result["total_cost"]
168-
)
169-
span_attributes["alphatrion.cost.input_tokens"] = str(
170-
cost_result["input_cost"]
171-
)
172-
span_attributes["alphatrion.cost.output_tokens"] = str(
173-
cost_result["output_cost"]
174-
)
175-
span_attributes["alphatrion.cost.cache_creation_input_tokens"] = str(
176-
cost_result["cache_creation_input_cost"]
177-
)
178-
span_attributes["alphatrion.cost.cache_read_input_tokens"] = str(
179-
cost_result["cache_read_input_cost"]
180-
)
134+
# Cost attributes are already enriched by CostEnrichmentProcessor
135+
# Just ensure they exist in span_attributes for ClickHouse storage
136+
# (they should already be present from the span)
137+
pass
181138

182139
except Exception as e:
183-
logger.warning(f"Failed to calculate cost for span {span.name}: {e}")
184-
# Don't fail span export if cost calculation fails
140+
logger.warning(f"Failed to process LLM span {span.name}: {e}")
141+
# Don't fail span export if processing fails
185142

186143
# Extract core identifiers from span attributes
187144
org_id = span_attributes.get("org_id", "")
@@ -318,23 +275,3 @@ def determine_semantic_kind(attributes: dict[str, str]) -> str:
318275

319276
# Default to unknown
320277
return SEMANTIC_KIND_UNKNOWN
321-
322-
323-
def determine_provider(api_base: str) -> str:
324-
"""Determine provider from API base URL.
325-
326-
Args:
327-
api_base: API base URL (e.g., "https://api.anthropic.com")
328-
329-
Returns:
330-
Provider name (e.g., "anthropic", "openai", "deepinfra", or "unknown")
331-
"""
332-
api_base = api_base.lower()
333-
if "anthropic" in api_base:
334-
return "anthropic"
335-
elif "deepinfra" in api_base:
336-
return "deepinfra"
337-
elif "openai" in api_base:
338-
return "openai"
339-
else:
340-
return "unknown"
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""
2+
Cost Enrichment Span Processor.
3+
4+
This processor enriches spans with cost information by calculating costs from token usage.
5+
It runs early in the processing chain so that downstream processors and exporters can
6+
access pre-calculated costs from span attributes.
7+
"""
8+
9+
import logging
10+
11+
from opentelemetry.context import Context
12+
from opentelemetry.sdk.trace import ReadableSpan
13+
from opentelemetry.sdk.trace.export import SpanProcessor
14+
15+
from alphatrion.utils.pricing import calculate_cost
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class CostEnrichmentProcessor(SpanProcessor):
21+
"""
22+
Span processor that enriches spans with cost information.
23+
24+
This processor checks if cost attributes are already present in a span.
25+
If not, it calculates costs from token usage and adds them to the span's
26+
attributes dictionary. This ensures all downstream processors and exporters
27+
have access to consistent cost data.
28+
"""
29+
30+
def on_start(self, span: ReadableSpan, parent_context: Context | None = None):
31+
"""Called when a span is started. No-op for this processor."""
32+
pass
33+
34+
def on_end(self, span: ReadableSpan):
35+
"""
36+
Called when a span ends. Calculate and add cost attributes if missing.
37+
38+
Args:
39+
span: The completed span
40+
"""
41+
try:
42+
# Only process spans with attributes
43+
if not span.attributes:
44+
return
45+
46+
# Check if costs are already present
47+
if "alphatrion.cost.total_tokens" in span.attributes:
48+
# Costs already calculated (e.g., in claude.py)
49+
return
50+
51+
# Check if this is an LLM span with token usage
52+
if "gen_ai.usage.input_tokens" not in span.attributes:
53+
# Not an LLM span, skip
54+
return
55+
56+
# Extract token usage
57+
attributes = span.attributes
58+
provider = determine_provider(str(attributes.get("gen_ai.openai.api_base")))
59+
model = str(
60+
attributes.get("gen_ai.request.model")
61+
or attributes.get("gen_ai.response.model", "")
62+
)
63+
input_tokens = int(attributes.get("gen_ai.usage.input_tokens", 0))
64+
output_tokens = int(attributes.get("gen_ai.usage.output_tokens", 0))
65+
cache_creation_input_tokens = int(
66+
attributes.get("gen_ai.usage.cache_creation_input_tokens", 0)
67+
)
68+
cache_read_input_tokens = int(
69+
attributes.get("gen_ai.usage.cache_read_input_tokens", 0)
70+
)
71+
72+
# Calculate costs
73+
cost_result = calculate_cost(
74+
provider=provider,
75+
model=model,
76+
input_tokens=input_tokens,
77+
output_tokens=output_tokens,
78+
cache_creation_input_tokens=cache_creation_input_tokens,
79+
cache_read_input_tokens=cache_read_input_tokens,
80+
)
81+
82+
# Add cost attributes to span
83+
# Note: We can't modify ReadableSpan.attributes directly after span ends,
84+
# but we can modify the underlying _attributes dict that will be read
85+
# by exporters. This is a bit of a hack but it's the only way to enrich
86+
# spans post-creation without modifying OpenTelemetry internals.
87+
if hasattr(span, "_attributes"):
88+
span._attributes["alphatrion.cost.total_tokens"] = str(
89+
cost_result["total_cost"]
90+
)
91+
span._attributes["alphatrion.cost.input_tokens"] = str(
92+
cost_result["input_cost"]
93+
)
94+
span._attributes["alphatrion.cost.output_tokens"] = str(
95+
cost_result["output_cost"]
96+
)
97+
span._attributes["alphatrion.cost.cache_creation_input_tokens"] = str(
98+
cost_result["cache_creation_input_cost"]
99+
)
100+
span._attributes["alphatrion.cost.cache_read_input_tokens"] = str(
101+
cost_result["cache_read_input_cost"]
102+
)
103+
logger.debug(
104+
f"Enriched span {span.name} with cost: ${cost_result['total_cost']:.6f}"
105+
)
106+
107+
except Exception as e:
108+
logger.warning(f"Failed to enrich span with cost: {e}", exc_info=True)
109+
110+
def shutdown(self):
111+
"""Shutdown the processor."""
112+
logger.info("CostEnrichmentProcessor shut down successfully")
113+
114+
def force_flush(self, timeout_millis: int = 30000) -> bool:
115+
"""
116+
Force flush (no-op for this processor).
117+
118+
Args:
119+
timeout_millis: Timeout in milliseconds
120+
121+
Returns:
122+
True always
123+
"""
124+
return True
125+
126+
127+
def determine_provider(api_base: str) -> str:
128+
"""Determine provider from API base URL.
129+
130+
Args:
131+
api_base: API base URL (e.g., "https://api.anthropic.com")
132+
133+
Returns:
134+
Provider name (e.g., "anthropic", "openai", "deepinfra", or "unknown")
135+
"""
136+
api_base = api_base.lower()
137+
if "anthropic" in api_base:
138+
return "anthropic"
139+
elif "deepinfra" in api_base:
140+
return "deepinfra"
141+
elif "openai" in api_base:
142+
return "openai"
143+
else:
144+
return "unknown"

0 commit comments

Comments
 (0)