Patterns for effective cost attribution with Botanu SDK.
A run should represent a complete business transaction:
# GOOD - One run for one business outcome
@botanu_use_case("Customer Support")
async def resolve_ticket(ticket_id: str):
context = await fetch_context(ticket_id)
response = await generate_response(context)
await send_response(ticket_id, response)
emit_outcome("success", value_type="tickets_resolved", value_amount=1)# BAD - Multiple runs for one outcome
@botanu_use_case("Fetch Context")
async def fetch_context(ticket_id: str):
...
@botanu_use_case("Generate Response") # Don't do this
async def generate_response(context):
...Use cases appear in dashboards and queries. Choose names carefully:
# GOOD - Clear, descriptive names
@botanu_use_case("Customer Support")
@botanu_use_case("Document Analysis")
@botanu_use_case("Lead Qualification")
# BAD - Generic or technical names
@botanu_use_case("HandleRequest")
@botanu_use_case("Process")
@botanu_use_case("Main")Workflow names help distinguish different paths within a use case:
@botanu_use_case("Customer Support", workflow="ticket_resolution")
async def resolve_ticket():
...
@botanu_use_case("Customer Support", workflow="escalation")
async def escalate_ticket():
...Every run should have an explicit outcome:
@botanu_use_case("Data Processing")
async def process_data(data_id: str):
try:
result = await process(data_id)
emit_outcome("success", value_type="records_processed", value_amount=result.count)
return result
except ValidationError:
emit_outcome("failed", reason="validation_error")
raise
except TimeoutError:
emit_outcome("failed", reason="timeout")
raiseInclude value amounts for better ROI analysis:
# GOOD - Quantified outcomes
emit_outcome("success", value_type="emails_sent", value_amount=50)
emit_outcome("success", value_type="revenue_generated", value_amount=1299.99)
emit_outcome("success", value_type="documents_processed", value_amount=10)
# LESS USEFUL - No quantity
emit_outcome("success")Standardize your value types across the organization:
# Define standard value types
class ValueTypes:
TICKETS_RESOLVED = "tickets_resolved"
DOCUMENTS_PROCESSED = "documents_processed"
LEADS_QUALIFIED = "leads_qualified"
EMAILS_SENT = "emails_sent"
REVENUE_GENERATED = "revenue_generated"
# Use consistently
emit_outcome("success", value_type=ValueTypes.TICKETS_RESOLVED, value_amount=1)Always explain why something failed:
emit_outcome("failed", reason="rate_limit_exceeded")
emit_outcome("failed", reason="invalid_input")
emit_outcome("failed", reason="model_unavailable")
emit_outcome("failed", reason="context_too_long")Tokens are the primary cost driver for LLMs:
with track_llm_call(provider="openai", model="gpt-4") as tracker:
response = await client.chat.completions.create(...)
# Always set tokens
tracker.set_tokens(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
)Request IDs enable reconciliation with provider invoices:
tracker.set_request_id(
provider_request_id=response.id, # From provider
client_request_id=uuid.uuid4().hex, # Your internal ID
)Record attempt numbers for accurate cost per success:
for attempt in range(max_retries):
with track_llm_call(provider="openai", model="gpt-4") as tracker:
tracker.set_attempt(attempt + 1)
try:
response = await client.chat.completions.create(...)
break
except RateLimitError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(backoff)Specify the operation type for accurate categorization:
from botanu.tracking.llm import track_llm_call, ModelOperation
# Chat completion
with track_llm_call(provider="openai", model="gpt-4", operation=ModelOperation.CHAT):
...
# Embeddings
with track_llm_call(provider="openai", model="text-embedding-3-small", operation=ModelOperation.EMBEDDINGS):
...Include databases, storage, and messaging:
@botanu_use_case("ETL Pipeline")
async def run_etl():
# Track warehouse query (billed by bytes scanned)
with track_db_operation(system="snowflake", operation="SELECT") as db:
db.set_bytes_scanned(result.bytes_scanned)
db.set_query_id(result.query_id)
# Track storage operations (billed by requests + data)
with track_storage_operation(system="s3", operation="PUT") as storage:
storage.set_result(bytes_written=len(data))
# Track messaging (billed by message count)
with track_messaging_operation(system="sqs", operation="publish", destination="queue") as msg:
msg.set_result(message_count=batch_size)For data warehouses billed by data scanned:
with track_db_operation(system="bigquery", operation="SELECT") as db:
result = await bq_client.query(sql)
db.set_bytes_scanned(result.total_bytes_processed)
db.set_result(rows_returned=result.num_rows)Extract context from incoming requests:
from fastapi import FastAPI
from botanu.sdk.middleware import BotanuMiddleware
app = FastAPI()
app.add_middleware(BotanuMiddleware)Inject and extract context manually for async messaging:
# Producer
def publish_message(payload):
ctx = get_current_run_context()
message = {
"payload": payload,
"baggage": ctx.to_baggage_dict() if ctx else {}
}
queue.publish(message)
# Consumer
def process_message(message):
baggage = message.get("baggage", {})
ctx = RunContext.from_baggage(baggage)
with ctx.as_current():
handle_payload(message["payload"])Default lean mode minimizes header overhead:
# Lean mode: ~100 bytes of baggage
# Propagates: run_id, use_case
# Full mode: ~300 bytes of baggage
# Propagates: run_id, use_case, workflow, environment, tenant_id, parent_run_idKeep configuration out of code:
export OTEL_SERVICE_NAME=my-service
export OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4318
export BOTANU_ENVIRONMENT=productionAlways use 100% sampling for accurate cost data:
# GOOD
trace_sample_rate: float = 1.0
# BAD - Missing cost data
trace_sample_rate: float = 0.1 # Only 10% of costs capturedFor multi-environment setups:
# config/production.yaml
service:
name: ${OTEL_SERVICE_NAME}
environment: production
otlp:
endpoint: ${COLLECTOR_ENDPOINT}
propagation:
mode: leanFor accurate per-tenant cost attribution:
@botanu_use_case("Customer Support", tenant_id=request.tenant_id)
async def handle_ticket(request):
...Add additional attribution dimensions:
set_business_context(
customer_id=request.customer_id,
team="engineering",
cost_center="R&D",
region="us-west-2",
)Don't lose error context:
with track_llm_call(provider="openai", model="gpt-4") as tracker:
try:
response = await client.chat.completions.create(...)
except openai.APIError as e:
tracker.set_error(e) # Records error type and message
raiseEven failed runs should have outcomes:
@botanu_use_case("Data Processing")
async def process(data_id):
try:
await process_data(data_id)
emit_outcome("success", value_type="items_processed", value_amount=1)
except ValidationError:
emit_outcome("failed", reason="validation_error")
raise
except Exception as e:
emit_outcome("failed", reason=type(e).__name__)
raiseFor async applications, ensure tracking is non-blocking:
# The SDK uses span events, not separate API calls
# This is already non-blocking
with track_llm_call(provider="openai", model="gpt-4") as tracker:
response = await async_llm_call()
tracker.set_tokens(...) # Immediate, non-blockingFor batch operations, track at batch level:
# GOOD - Batch tracking
with track_db_operation(system="postgresql", operation="INSERT") as db:
await cursor.executemany(insert_sql, batch_of_1000_rows)
db.set_result(rows_affected=1000)
# LESS EFFICIENT - Per-row tracking
for row in batch_of_1000_rows:
with track_db_operation(system="postgresql", operation="INSERT") as db:
await cursor.execute(insert_sql, row)
db.set_result(rows_affected=1)Use the NoOp tracer for unit tests:
from opentelemetry import trace
from opentelemetry.trace import NoOpTracerProvider
def setup_test_tracing():
trace.set_tracer_provider(NoOpTracerProvider())Verify outcomes are emitted correctly:
from unittest.mock import patch
def test_successful_outcome():
with patch("botanu.sdk.span_helpers.emit_outcome") as mock_emit:
result = await handle_ticket("123")
mock_emit.assert_called_with("success", value_type="tickets_resolved", value_amount=1)- Anti-Patterns - What to avoid
- Architecture - SDK design principles
- Configuration - Configuration options