|
| 1 | +import os |
| 2 | +import time |
| 3 | +import redis |
| 4 | +from redis.exceptions import ConnectionError, TimeoutError |
| 5 | +from opentelemetry import trace, metrics |
| 6 | +from opentelemetry.sdk.trace import TracerProvider |
| 7 | +from opentelemetry.sdk.trace.export import BatchSpanProcessor |
| 8 | +from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter |
| 9 | +from opentelemetry.sdk.metrics import MeterProvider |
| 10 | +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader |
| 11 | +from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter |
| 12 | +from opentelemetry.instrumentation.redis import RedisInstrumentor |
| 13 | + |
| 14 | +# 1. Initialize Tracing |
| 15 | +tracer_provider = TracerProvider() |
| 16 | +tracer_provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter())) |
| 17 | +trace.set_tracer_provider(tracer_provider) |
| 18 | +tracer = trace.get_tracer("redis.client") |
| 19 | + |
| 20 | +# 2. Initialize Metrics |
| 21 | +metrics_exporter = CloudMonitoringMetricsExporter() |
| 22 | +metric_reader = PeriodicExportingMetricReader(metrics_exporter, export_interval_millis=10000) |
| 23 | +meter_provider = MeterProvider(metric_readers=[metric_reader]) |
| 24 | +metrics.set_meter_provider(meter_provider) |
| 25 | +meter = metrics.get_meter("redis.metrics") |
| 26 | + |
| 27 | +rtt_hist = meter.create_histogram("redis_client_rtt", unit="ms") |
| 28 | +client_block_hist = meter.create_histogram("redis_client_blocking_latency", unit="ms") |
| 29 | +app_block_hist = meter.create_histogram("redis_application_blocking_latency", unit="ms") |
| 30 | +retry_counter = meter.create_counter("redis_retry_count") |
| 31 | +conn_error_counter = meter.create_counter("redis_connectivity_error_count") |
| 32 | + |
| 33 | +retry_counter.add(0, {"operation": "startup"}) |
| 34 | +conn_error_counter.add(0, {"operation": "startup"}) |
| 35 | + |
| 36 | +# 3. Setup Redis |
| 37 | +RedisInstrumentor().instrument() |
| 38 | +redis_host = os.environ.get("REDISHOST", "localhost") |
| 39 | +redis_port = int(os.environ.get("REDISPORT", 6379)) |
| 40 | + |
| 41 | +pool = redis.ConnectionPool(host=redis_host, port=redis_port, max_connections=10, decode_responses=True) |
| 42 | +r = redis.Redis(connection_pool=pool) |
| 43 | + |
| 44 | +def smart_redis_call(operation_name, func, *args, **kwargs): |
| 45 | + max_retries = 3 |
| 46 | + attempt = 0 |
| 47 | + |
| 48 | + pool_start = time.time() |
| 49 | + try: |
| 50 | + conn = pool.get_connection('PING') |
| 51 | + pool.release(conn) |
| 52 | + except Exception: |
| 53 | + pass |
| 54 | + client_block_hist.record((time.time() - pool_start) * 1000, {"operation": operation_name}) |
| 55 | + |
| 56 | + while attempt < max_retries: |
| 57 | + try: |
| 58 | + req_start = time.time() |
| 59 | + response = func(*args, **kwargs) |
| 60 | + rtt_hist.record((time.time() - req_start) * 1000, {"operation": operation_name}) |
| 61 | + |
| 62 | + app_start = time.time() |
| 63 | + _ = str(response) |
| 64 | + app_block_hist.record((time.time() - app_start) * 1000, {"operation": operation_name}) |
| 65 | + |
| 66 | + return response |
| 67 | + |
| 68 | + except (ConnectionError, TimeoutError) as e: |
| 69 | + attempt += 1 |
| 70 | + conn_error_counter.add(1, {"operation": operation_name}) |
| 71 | + retry_counter.add(1, {"operation": operation_name}) |
| 72 | + if attempt >= max_retries: |
| 73 | + raise e |
| 74 | + time.sleep((2 ** attempt) * 0.1) |
| 75 | + |
| 76 | +if __name__ == "__main__": |
| 77 | + with tracer.start_as_current_span("process_user_span"): |
| 78 | + try: |
| 79 | + # Simple write and read operations |
| 80 | + smart_redis_call("set_user", r.set, "user:123", "active") |
| 81 | + |
| 82 | + result = smart_redis_call("get_user", r.get, "user:123") |
| 83 | + print(f"Retrieved: {result}") |
| 84 | + except Exception as e: |
| 85 | + print(f"Error: {e}") |
| 86 | + |
| 87 | + tracer_provider.force_flush() |
| 88 | + meter_provider.force_flush() |
0 commit comments