Logs and traces are essential pillars of observability, providing insights into application behavior, performance, and issues. This guide covers everything you need to implement effective logging and distributed tracing.
- Metrics: Numerical measurements (Prometheus)
- Logs: Event records (ELK, Loki)
- Traces: Request flow (Jaeger, Zipkin)
Standard Levels (from least to most severe):
DEBUG → INFO → WARN → ERROR → FATAL
When to Use Each:
-
DEBUG: Detailed diagnostic information
logger.debug(f"Processing user {user_id} with data {data}")
-
INFO: General informational messages
logger.info(f"User {user_id} logged in successfully")
-
WARN: Warning messages for potential issues
logger.warning(f"High memory usage: {memory_percent}%")
-
ERROR: Error conditions that don't stop execution
logger.error(f"Failed to connect to database: {error}")
-
FATAL/CRITICAL: Critical failures requiring immediate attention
logger.critical(f"System failure: {error}")
Why Structured Logging?
- Machine-readable
- Easy to query and filter
- Better correlation
- Consistent format
JSON Format:
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields
if hasattr(record, "request_id"):
log_entry["request_id"] = record.request_id
if hasattr(record, "user_id"):
log_entry["user_id"] = record.user_id
if hasattr(record, "duration_ms"):
log_entry["duration_ms"] = record.duration_ms
# Add exception info
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Configure logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
logger.info("Request processed", extra={
"request_id": "abc123",
"user_id": "user456",
"duration_ms": 45,
"status_code": 200
})Output:
{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"logger": "__main__",
"message": "Request processed",
"module": "app",
"function": "handle_request",
"line": 42,
"request_id": "abc123",
"user_id": "user456",
"duration_ms": 45,
"status_code": 200
}# Good - With context
logger.info("User login", extra={
"user_id": user_id,
"ip_address": request.remote_addr,
"user_agent": request.headers.get("User-Agent")
})
# Bad - Without context
logger.info("User login")# Don't use ERROR for warnings
logger.warning("High memory usage") # Not logger.error
# Don't use INFO for debug info
logger.debug("Variable value: %s", value) # Not logger.info# Bad
logger.info(f"Password: {password}")
# Good
logger.info("Password reset requested", extra={"user_id": user_id})import uuid
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar('request_id', default=None)
# Middleware
def add_request_id(request):
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
return request_id
# In handlers
logger.info("Processing request", extra={
"request_id": request_id_var.get()
})Architecture:
Applications → Logstash → Elasticsearch → Kibana
Storage and search engine
# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
volumes:
es_data:Log processing pipeline
# logstash.conf
input {
file {
path => "/var/log/app.log"
start_position => "beginning"
codec => "json"
}
beats {
port => 5044
}
}
filter {
if [fields][service] == "api" {
mutate {
add_field => { "service_type" => "api" }
}
}
date {
match => [ "timestamp", "ISO8601" ]
}
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}Visualization and dashboards
# docker-compose.yml
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearchLightweight log aggregation
# docker-compose.yml
version: '3.8'
services:
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:latest
volumes:
- /var/log:/var/log:ro
- ./promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.ymlPromtail Configuration:
# promtail-config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*.logUnified logging layer
# fluentd.conf
<source>
@type tail
path /var/log/app.log
pos_file /var/log/app.log.pos
tag app.logs
format json
</source>
<filter app.logs>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
environment "#{ENV['ENVIRONMENT']}"
</record>
</filter>
<match app.logs>
@type elasticsearch
host elasticsearch
port 9200
index_name app-logs
type_name _doc
<buffer>
@type file
path /var/log/fluentd-buffers/elasticsearch.buffer
flush_interval 10s
</buffer>
</match>Tracing follows a request as it flows through multiple services, showing:
- Which services handled the request
- How long each service took
- Where errors occurred
- Dependencies between services
Industry standard for observability
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
# Setup
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Export to Jaeger
jaeger_exporter = OTLPSpanExporter(
endpoint="http://jaeger:4317",
insecure=True
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument Flask
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
# Manual tracing
def process_order(order_id):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# Call database
with tracer.start_as_current_span("db.query") as db_span:
db_span.set_attribute("db.query", "SELECT * FROM orders")
result = db.get_order(order_id)
# Call payment service
with tracer.start_as_current_span("payment.process") as payment_span:
payment_span.set_attribute("payment.amount", result.amount)
process_payment(order_id)
span.set_attribute("order.status", "completed")const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { HttpInstrumentation } = require('@opentelemetry/instrumentation-http');
const provider = new NodeTracerProvider();
provider.addSpanProcessor(
new SimpleSpanProcessor(
new JaegerExporter({
endpoint: 'http://jaeger:14268/api/traces',
})
)
);
registerInstrumentations({
instrumentations: [new HttpInstrumentation()],
});
provider.register();Distributed tracing system
# Docker
docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 14250:14250 \
-p 9411:9411 \
jaegertracing/all-in-one:latestapiVersion: apps/v1
kind: Deployment
metadata:
name: jaeger
spec:
replicas: 1
selector:
matchLabels:
app: jaeger
template:
metadata:
labels:
app: jaeger
spec:
containers:
- name: jaeger
image: jaegertracing/all-in-one:latest
ports:
- containerPort: 16686
name: ui
- containerPort: 14268
name: http
env:
- name: COLLECTOR_ZIPKIN_HOST_PORT
value: ":9411"
---
apiVersion: v1
kind: Service
metadata:
name: jaeger
spec:
selector:
app: jaeger
ports:
- port: 16686
targetPort: 16686
name: ui
- port: 14268
targetPort: 14268
name: http
type: LoadBalancerAlternative tracing system
# docker-compose.yml
version: '3.8'
services:
zipkin:
image: openzipkin/zipkin:latest
ports:
- "9411:9411"// Find errors in last hour
{
"query": {
"bool": {
"must": [
{
"match": {
"level": "ERROR"
}
},
{
"range": {
"timestamp": {
"gte": "now-1h"
}
}
}
]
}
}
}
// Find slow requests
{
"query": {
"range": {
"duration_ms": {
"gte": 1000
}
}
},
"sort": [
{
"duration_ms": {
"order": "desc"
}
}
]
}# Count errors
sum(count_over_time({app="myapp"} |= "ERROR" [5m]))
# Top error messages
topk(10, sum by (message) (count_over_time({app="myapp"} |= "ERROR" [1h])))
# Request rate
rate({app="myapp"} | json | __error__="" [5m])
# P95 latency
histogram_quantile(0.95,
sum(rate({app="myapp"} | json | duration_ms [5m])) by (le)
)
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
# Extract trace context from incoming request
def handle_request(request):
carrier = dict(request.headers)
context = extract(carrier)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("handle_request", context=context) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", request.url)
# Log with trace ID
trace_id = format(span.get_span_context().trace_id, '032x')
logger.info("Processing request", extra={
"trace_id": trace_id,
"span_id": format(span.get_span_context().span_id, '016x')
})
# Process request
process_request(request)# Log with trace context
def log_with_trace(message, level="info"):
span = trace.get_current_span()
if span:
context = span.get_span_context()
trace_id = format(context.trace_id, '032x')
span_id = format(context.span_id, '016x')
getattr(logger, level)(message, extra={
"trace_id": trace_id,
"span_id": span_id
})
else:
getattr(logger, level)(message)# Always use structured logs
logger.info("User action", extra={
"user_id": user_id,
"action": "login",
"ip": request.remote_addr
})# Add request ID to all logs
request_id = str(uuid.uuid4())
logger.info("Request started", extra={"request_id": request_id})# Use correct log levels
logger.debug("Variable value") # Development
logger.info("User action") # Normal operation
logger.warning("High usage") # Potential issue
logger.error("Operation failed") # Error occurred
logger.critical("System down") # Critical failure# Never log passwords, tokens, PII
# Log metadata instead
logger.info("User login", extra={"user_id": user_id}) # Not passwordimport random
if random.random() < 0.1: # Sample 10%
logger.debug("Detailed debug info")from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
logger.addHandler(handler)- Implement structured logging
- Set up log aggregation
- Configure distributed tracing
- Correlate logs and traces
- Create log dashboards
- Set up log alerts
- Implement log rotation
- Use appropriate log levels
- Include correlation IDs
- Monitor log volume
Next Steps:
- Learn Monitoring & Observability
- Explore On-Call & SRE
- Master Troubleshooting
Remember: Logs and traces are essential for debugging and understanding system behavior. Use structured logging, implement distributed tracing, and always correlate logs with traces for complete observability.