diff --git a/drift/__init__.py b/drift/__init__.py index 5c5f9ee..854fd3b 100644 --- a/drift/__init__.py +++ b/drift/__init__.py @@ -32,6 +32,7 @@ from .instrumentation.fastapi import FastAPIInstrumentation from .instrumentation.flask import FlaskInstrumentation from .instrumentation.requests import RequestsInstrumentation +from .instrumentation.urllib3 import Urllib3Instrumentation __version__ = "0.1.0" @@ -61,6 +62,7 @@ "FlaskInstrumentation", "FastAPIInstrumentation", "RequestsInstrumentation", + "Urllib3Instrumentation", # Adapters "SpanExportAdapter", "ExportResult", diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 867d4da..f88eea7 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -402,6 +402,16 @@ def _init_auto_instrumentations(self) -> None: except ImportError: pass + try: + import urllib3 # type: ignore[unresolved-import] + + from ..instrumentation.urllib3 import Urllib3Instrumentation + + _ = Urllib3Instrumentation() + logger.debug("urllib3 instrumentation initialized") + except ImportError: + pass + # Initialize PostgreSQL instrumentation before Django # Instrument BOTH psycopg2 and psycopg if available # This allows apps to use either or both diff --git a/drift/instrumentation/urllib3/__init__.py b/drift/instrumentation/urllib3/__init__.py new file mode 100644 index 0000000..d8cc132 --- /dev/null +++ b/drift/instrumentation/urllib3/__init__.py @@ -0,0 +1,5 @@ +"""urllib3 instrumentation module.""" + +from .instrumentation import RequestDroppedByTransform, Urllib3Instrumentation + +__all__ = ["Urllib3Instrumentation", "RequestDroppedByTransform"] diff --git a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml new file mode 100644 index 0000000..9f66127 --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml @@ -0,0 +1,28 @@ +version: 1 + +service: + id: "urllib3-e2e-test-id" + name: "urllib3-e2e-test" + port: 8000 + start: + command: "python src/app.py" + readiness_check: + command: "curl -f http://localhost:8000/health" + timeout: 45s + interval: 5s + +tusk_api: + url: "http://localhost:8000" + +test_execution: + concurrent_limit: 10 + batch_size: 10 + timeout: 30s + +recording: + sampling_rate: 1.0 + export_spans: false + +replay: + enable_telemetry: false + diff --git a/drift/instrumentation/urllib3/e2e-tests/Dockerfile b/drift/instrumentation/urllib3/e2e-tests/Dockerfile new file mode 100644 index 0000000..f19a7c2 --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/Dockerfile @@ -0,0 +1,22 @@ +FROM python-e2e-base:latest + +# Copy SDK source for editable install +COPY . /sdk + +# Copy test files +COPY drift/instrumentation/urllib3/e2e-tests /app + +WORKDIR /app + +# Install dependencies (requirements.txt uses -e /sdk for SDK) +RUN pip install -q -r requirements.txt + +# Make entrypoint executable +RUN chmod +x entrypoint.py + +# Create .tusk directories +RUN mkdir -p /app/.tusk/traces /app/.tusk/logs + +# Run entrypoint +ENTRYPOINT ["python", "entrypoint.py"] + diff --git a/drift/instrumentation/urllib3/e2e-tests/docker-compose.yml b/drift/instrumentation/urllib3/e2e-tests/docker-compose.yml new file mode 100644 index 0000000..6accb34 --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/docker-compose.yml @@ -0,0 +1,20 @@ +services: + app: + build: + context: ../../../.. + dockerfile: drift/instrumentation/urllib3/e2e-tests/Dockerfile + args: + - TUSK_CLI_VERSION=${TUSK_CLI_VERSION:-latest} + environment: + - PORT=8000 + - TUSK_ANALYTICS_DISABLED=1 + - PYTHONUNBUFFERED=1 + working_dir: /app + volumes: + # Mount SDK source for hot reload (no rebuild needed for SDK changes) + - ../../../..:/sdk + # Mount app source for development + - ./src:/app/src + # Mount .tusk folder to persist traces + - ./.tusk:/app/.tusk + diff --git a/drift/instrumentation/urllib3/e2e-tests/entrypoint.py b/drift/instrumentation/urllib3/e2e-tests/entrypoint.py new file mode 100644 index 0000000..6f0af9c --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/entrypoint.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +E2E Test Entrypoint for urllib3 Instrumentation + +This script orchestrates the full e2e test lifecycle: +1. Setup: Install dependencies +2. Record: Start app in RECORD mode, execute requests +3. Test: Run Tusk CLI tests +4. Teardown: Cleanup and return exit code +""" + +import sys +from pathlib import Path + +# Add SDK to path for imports +sys.path.insert(0, "/sdk") + +from drift.instrumentation.e2e_common.base_runner import E2ETestRunnerBase + + +class Urllib3E2ETestRunner(E2ETestRunnerBase): + """E2E test runner for urllib3 instrumentation.""" + + def __init__(self): + import os + + port = int(os.getenv("PORT", "8000")) + super().__init__(app_port=port) + + +if __name__ == "__main__": + runner = Urllib3E2ETestRunner() + exit_code = runner.run() + sys.exit(exit_code) diff --git a/drift/instrumentation/urllib3/e2e-tests/requirements.txt b/drift/instrumentation/urllib3/e2e-tests/requirements.txt new file mode 100644 index 0000000..f2ee0a4 --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/requirements.txt @@ -0,0 +1,5 @@ +-e /sdk +Flask>=3.1.2 +urllib3>=2.0.0 +requests>=2.32.5 + diff --git a/drift/instrumentation/urllib3/e2e-tests/run.sh b/drift/instrumentation/urllib3/e2e-tests/run.sh new file mode 100755 index 0000000..ffd41ce --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/run.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +# Exit on error +set -e + +# Accept optional port parameter (default: 8000) +APP_PORT=${1:-8000} +export APP_PORT + +# Generate unique docker compose project name +# Get the instrumentation name (parent directory of e2e-tests) +TEST_NAME="$(basename "$(dirname "$(pwd)")")" +PROJECT_NAME="python-${TEST_NAME}-${APP_PORT}" + +# Colors for output +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Running Python E2E Test: ${TEST_NAME}${NC}" +echo -e "${BLUE}Port: ${APP_PORT}${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up containers...${NC}" + docker compose -p "$PROJECT_NAME" down -v 2>/dev/null || true +} + +# Register cleanup on exit +trap cleanup EXIT + +# Build containers +echo -e "${BLUE}Building containers...${NC}" +docker compose -p "$PROJECT_NAME" build --no-cache + +# Run the test container +echo -e "${BLUE}Starting test...${NC}" +echo "" + +# Run container and capture exit code (always use port 8000 inside container) +# Disable set -e temporarily to capture exit code +set +e +docker compose -p "$PROJECT_NAME" run --rm app +EXIT_CODE=$? +set -e + +echo "" +if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN}Test passed!${NC}" + echo -e "${GREEN}========================================${NC}" +else + echo -e "${RED}========================================${NC}" + echo -e "${RED}Test failed with exit code ${EXIT_CODE}${NC}" + echo -e "${RED}========================================${NC}" +fi + +exit $EXIT_CODE + diff --git a/drift/instrumentation/urllib3/e2e-tests/src/app.py b/drift/instrumentation/urllib3/e2e-tests/src/app.py new file mode 100644 index 0000000..064224e --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/src/app.py @@ -0,0 +1,471 @@ +"""Flask test app for e2e tests - urllib3 instrumentation testing.""" + +import json + +import urllib3 +from flask import Flask, jsonify, request + +from drift import TuskDrift + +# Initialize SDK +sdk = TuskDrift.initialize( + api_key="tusk-test-key", + log_level="debug", +) + +app = Flask(__name__) + +# Create a shared PoolManager for connection reuse +http = urllib3.PoolManager() + + +# ============================================================================= +# Health Check +# ============================================================================= + + +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "healthy"}) + + +# ============================================================================= +# PoolManager Tests (high-level API) +# ============================================================================= + + +@app.route("/api/poolmanager/get-json", methods=["GET"]) +def poolmanager_get_json(): + """Test GET request returning JSON using PoolManager.""" + try: + response = http.request("GET", "https://jsonplaceholder.typicode.com/posts/1") + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/get-with-params", methods=["GET"]) +def poolmanager_get_with_params(): + """Test GET request with query parameters using PoolManager.""" + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/comments", + fields={"postId": "1"}, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/get-with-headers", methods=["GET"]) +def poolmanager_get_with_headers(): + """Test GET request with custom headers using PoolManager.""" + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/1", + headers={ + "X-Custom-Header": "test-value", + "Accept": "application/json", + }, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/post-json", methods=["POST"]) +def poolmanager_post_json(): + """Test POST request with JSON body using PoolManager.""" + try: + req_data = request.get_json() or {} + body = json.dumps( + { + "title": req_data.get("title", "Test Title"), + "body": req_data.get("body", "Test Body"), + "userId": req_data.get("userId", 1), + } + ) + response = http.request( + "POST", + "https://jsonplaceholder.typicode.com/posts", + body=body.encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/post-form", methods=["POST"]) +def poolmanager_post_form(): + """Test POST request with form-encoded data using PoolManager.""" + try: + response = http.request( + "POST", + "https://jsonplaceholder.typicode.com/posts", + fields={ + "title": "Form Title", + "body": "Form Body", + "userId": "1", + }, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/put-json", methods=["PUT"]) +def poolmanager_put_json(): + """Test PUT request with JSON body using PoolManager.""" + try: + req_data = request.get_json() or {} + body = json.dumps( + { + "id": 1, + "title": req_data.get("title", "Updated Title"), + "body": req_data.get("body", "Updated Body"), + "userId": req_data.get("userId", 1), + } + ) + response = http.request( + "PUT", + "https://jsonplaceholder.typicode.com/posts/1", + body=body.encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/patch-json", methods=["PATCH"]) +def poolmanager_patch_json(): + """Test PATCH request with partial JSON body using PoolManager.""" + try: + req_data = request.get_json() or {} + body = json.dumps( + { + "title": req_data.get("title", "Patched Title"), + } + ) + response = http.request( + "PATCH", + "https://jsonplaceholder.typicode.com/posts/1", + body=body.encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/delete", methods=["DELETE"]) +def poolmanager_delete(): + """Test DELETE request using PoolManager.""" + try: + response = http.request("DELETE", "https://jsonplaceholder.typicode.com/posts/1") + return jsonify({"status": "deleted", "status_code": response.status}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/poolmanager/chain", methods=["GET"]) +def poolmanager_chain(): + """Test sequential chained requests using PoolManager.""" + try: + # First request: get a user + user_response = http.request("GET", "https://jsonplaceholder.typicode.com/users/1") + user = json.loads(user_response.data.decode("utf-8")) + + # Second request: get posts by that user + posts_response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts", + fields={"userId": str(user["id"])}, + ) + posts = json.loads(posts_response.data.decode("utf-8")) + + # Third request: get comments on the first post + if posts: + comments_response = http.request( + "GET", + f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments", + ) + comments = json.loads(comments_response.data.decode("utf-8")) + else: + comments = [] + + return jsonify( + { + "user": user, + "post_count": len(posts), + "first_post_comments": len(comments), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================= +# HTTPConnectionPool Tests (low-level API) +# ============================================================================= + + +@app.route("/api/connectionpool/get-json", methods=["GET"]) +def connectionpool_get_json(): + """Test GET request using HTTPConnectionPool directly.""" + pool = None + try: + pool = urllib3.HTTPSConnectionPool("jsonplaceholder.typicode.com", port=443) + response = pool.request("GET", "/posts/2") + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + if pool is not None: + pool.close() + + +@app.route("/api/connectionpool/post-json", methods=["POST"]) +def connectionpool_post_json(): + """Test POST request using HTTPConnectionPool directly.""" + pool = None + try: + req_data = request.get_json() or {} + body = json.dumps( + { + "title": req_data.get("title", "Pool Test Title"), + "body": req_data.get("body", "Pool Test Body"), + "userId": req_data.get("userId", 2), + } + ) + pool = urllib3.HTTPSConnectionPool("jsonplaceholder.typicode.com", port=443) + response = pool.request( + "POST", + "/posts", + body=body.encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + if pool is not None: + pool.close() + + +# ============================================================================= +# Additional Test Cases +# ============================================================================= + + +@app.route("/test/timeout", methods=["GET"]) +def test_timeout(): + """Test request with explicit timeout.""" + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/3", + timeout=urllib3.Timeout(connect=5.0, read=10.0), + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/retries", methods=["GET"]) +def test_retries(): + """Test request with retry configuration.""" + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/4", + retries=urllib3.Retry(total=3, backoff_factor=0.1), + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/binary-response", methods=["GET"]) +def test_binary_response(): + """Test handling of binary response (should be handled gracefully).""" + try: + # Fetch a small image + response = http.request( + "GET", + "https://httpbin.org/image/png", + headers={"Accept": "image/png"}, + ) + return jsonify( + { + "status": response.status, + "content_type": response.headers.get("Content-Type", ""), + "content_length": len(response.data), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/redirect", methods=["GET"]) +def test_redirect(): + """Test following redirects.""" + try: + response = http.request( + "GET", + "https://httpbin.org/redirect/2", + redirect=True, + ) + return jsonify( + { + "status": response.status, + "final_url": response.geturl() if hasattr(response, "geturl") else "unknown", + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/new-poolmanager", methods=["GET"]) +def test_new_poolmanager(): + """Test with a fresh PoolManager instance per request.""" + try: + local_http = urllib3.PoolManager() + response = local_http.request("GET", "https://jsonplaceholder.typicode.com/posts/5") + data = json.loads(response.data.decode("utf-8")) + local_http.clear() + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/basic-auth", methods=["GET"]) +def test_basic_auth(): + """Test request with basic authentication.""" + try: + # Create authorization header for basic auth + import base64 + + credentials = base64.b64encode(b"testuser:testpass").decode("ascii") + headers = urllib3.make_headers(basic_auth="testuser:testpass") + + response = http.request( + "GET", + "https://httpbin.org/basic-auth/testuser/testpass", + headers=headers, + ) + data = json.loads(response.data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/multiple-requests", methods=["GET"]) +def test_multiple_requests(): + """Test multiple requests in a single endpoint.""" + try: + results = [] + + # Make three sequential requests + for i in range(1, 4): + response = http.request("GET", f"https://jsonplaceholder.typicode.com/posts/{i}") + data = json.loads(response.data.decode("utf-8")) + results.append({"id": data["id"], "title": data["title"]}) + + return jsonify({"posts": results}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/requests-lib", methods=["GET"]) +def test_requests_lib(): + """Test using requests library (which uses urllib3 internally). + + This test verifies that we don't create double spans when requests + library is used, since requests uses urllib3 under the hood. + """ + import requests as requests_lib + + try: + response = requests_lib.get("https://jsonplaceholder.typicode.com/posts/10") + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================= +# Bug Detection Tests - Confirmed bugs that expose instrumentation issues +# ============================================================================= + + +@app.route("/test/bug/preload-content-false", methods=["GET"]) +def test_bug_preload_content_false(): + """CONFIRMED BUG: preload_content=False parameter breaks response reading. + + When preload_content=False, the response body is not preloaded into memory. + The instrumentation reads .data in _finalize_span which consumes the body + before the application can read it. + + Root cause: instrumentation.py line 839 accesses response.data unconditionally + """ + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/21", + preload_content=False, + ) + # Manually read the data after the response + data_bytes = response.read() + response.release_conn() + data = json.loads(data_bytes.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/bug/streaming-response", methods=["GET"]) +def test_bug_streaming_response(): + """CONFIRMED BUG: Streaming response body is consumed before iteration. + + When using response.stream() to iterate over chunks, the instrumentation + has already consumed the body by accessing response.data in _finalize_span. + + Root cause: Same as preload-content-false - instrumentation.py line 839 + """ + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/27", + preload_content=False, + ) + + # Try to read response in chunks using stream() + chunks = [] + for chunk in response.stream(32): # Read 32 bytes at a time + chunks.append(chunk) + + response.release_conn() + full_data = b"".join(chunks) + data = json.loads(full_data.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + sdk.mark_app_as_ready() + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py new file mode 100644 index 0000000..4a6771f --- /dev/null +++ b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py @@ -0,0 +1,130 @@ +"""Execute test requests against the Flask app to exercise the urllib3 instrumentation.""" + +import time + +import requests + +BASE_URL = "http://localhost:8000" + + +def make_request(method, endpoint, **kwargs): + """Make HTTP request and log result.""" + url = f"{BASE_URL}{endpoint}" + print(f"-> {method} {endpoint}") + + # Set default timeout if not provided + kwargs.setdefault("timeout", 30) + response = requests.request(method, url, **kwargs) + print(f" Status: {response.status_code}") + time.sleep(0.5) # Small delay between requests + return response + + +if __name__ == "__main__": + print("Starting test request sequence for urllib3 instrumentation...\n") + + # Health check + make_request("GET", "/health") + + # ========================================================================== + # PoolManager Tests (high-level API) + # ========================================================================== + print("\n--- PoolManager Tests ---\n") + + # Basic GET request - JSON response + make_request("GET", "/api/poolmanager/get-json") + + # GET with query parameters + make_request("GET", "/api/poolmanager/get-with-params") + + # GET with custom headers + make_request("GET", "/api/poolmanager/get-with-headers") + + # POST with JSON body + make_request( + "POST", + "/api/poolmanager/post-json", + json={"title": "Test Post", "body": "This is a test post body", "userId": 1}, + ) + + # POST with form data + make_request("POST", "/api/poolmanager/post-form") + + # PUT request + make_request( + "PUT", + "/api/poolmanager/put-json", + json={"title": "Updated Post", "body": "This is an updated post body", "userId": 1}, + ) + + # PATCH request + make_request("PATCH", "/api/poolmanager/patch-json", json={"title": "Patched Title"}) + + # DELETE request + make_request("DELETE", "/api/poolmanager/delete") + + # Sequential chained requests + make_request("GET", "/api/poolmanager/chain") + + # ========================================================================== + # HTTPConnectionPool Tests (low-level API) + # ========================================================================== + print("\n--- HTTPConnectionPool Tests ---\n") + + # GET using connection pool directly + make_request("GET", "/api/connectionpool/get-json") + + # POST using connection pool directly + make_request( + "POST", + "/api/connectionpool/post-json", + json={"title": "Pool Test", "body": "Connection pool test", "userId": 2}, + ) + + # ========================================================================== + # Additional Tests + # ========================================================================== + print("\n--- Additional Tests ---\n") + + # Request with timeout + make_request("GET", "/test/timeout") + + # Request with retries + make_request("GET", "/test/retries") + + # Redirect handling + make_request("GET", "/test/redirect") + + # New PoolManager per request + make_request("GET", "/test/new-poolmanager") + + # Basic authentication + make_request("GET", "/test/basic-auth") + + # Multiple requests in sequence + make_request("GET", "/test/multiple-requests") + + # ========================================================================== + # Avoid Double-span Test (requests library) + # ========================================================================== + print("\n--- Double-span Test (requests library) ---\n") + + # Test that uses requests library (which internally uses urllib3) + # This should NOT create double spans + make_request("GET", "/test/requests-lib") + + # ========================================================================== + # Note: Bug detection tests for preload_content=False and streaming responses + # are NOT included in e2e tests because these patterns are incompatible with + # replay mode - we can't capture the response body without consuming the stream. + # + # The instrumentation now correctly handles these patterns by NOT capturing + # the response body, which allows the application to read/stream normally. + # However, this means there's no body to replay in REPLAY mode. + # + # To verify the fix works, run the manual test script in RECORD mode: + # curl http://localhost:8000/test/bug/preload-content-false + # curl http://localhost:8000/test/bug/streaming-response + # ========================================================================== + + print("\nAll requests completed successfully") diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py new file mode 100644 index 0000000..c03326d --- /dev/null +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -0,0 +1,1006 @@ +from __future__ import annotations + +import base64 +import json +import logging +from typing import Any +from urllib.parse import parse_qs, urlencode, urlparse + +from opentelemetry.trace import Span, Status +from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry.trace import StatusCode as OTelStatusCode + + +class RequestDroppedByTransform(Exception): + """Exception raised when an outbound HTTP request is dropped by a transform rule. + + This matches Node SDK behavior where drop transforms prevent the HTTP call + and raise an error rather than returning a fake response. + + Attributes: + message: Error message explaining the drop + method: HTTP method (GET, POST, etc.) + url: Request URL that was dropped + """ + + def __init__(self, message: str, method: str, url: str): + self.message = message + self.method = method + self.url = url + super().__init__(message) + + +from opentelemetry import trace + +from ...core.data_normalization import create_mock_input_value, remove_none_values +from ...core.drift_sdk import TuskDrift +from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge +from ...core.mode_utils import handle_record_mode, handle_replay_mode +from ...core.tracing import TdSpanAttributes +from ...core.tracing.span_utils import CreateSpanOptions, SpanInfo, SpanUtils +from ...core.types import ( + PackageType, + SpanKind, + SpanStatus, + StatusCode, + TuskDriftMode, +) +from ..base import InstrumentationBase +from ..http import HttpSpanData, HttpTransformEngine + +logger = logging.getLogger(__name__) + +# Higher-level instrumentations that use urllib3 under the hood +# When these are active, urllib3 should skip creating duplicate spans +HIGHER_LEVEL_HTTP_INSTRUMENTATIONS = {"RequestsInstrumentation"} + +HEADER_SCHEMA_MERGES = { + "headers": SchemaMerge(match_importance=0.0), +} + + +class Urllib3Instrumentation(InstrumentationBase): + """Instrumentation for the urllib3 HTTP client library. + + Patches urllib3.PoolManager.urlopen() and urllib3.HTTPConnectionPool.urlopen() + to: + - Intercept HTTP requests in REPLAY mode and return mocked responses + - Capture request/response data as CLIENT spans in RECORD mode + + urllib3 is the underlying HTTP library used by the requests library. + It provides connection pooling and thread safety. + """ + + def __init__(self, enabled: bool = True, transforms: dict[str, Any] | None = None) -> None: + self._transform_engine = HttpTransformEngine(self._resolve_http_transforms(transforms)) + super().__init__( + name="Urllib3Instrumentation", + module_name="urllib3", + supported_versions="*", + enabled=enabled, + ) + + def _resolve_http_transforms( + self, provided: dict[str, Any] | list[dict[str, Any]] | None + ) -> list[dict[str, Any]] | None: + """Resolve HTTP transforms from provided config or SDK config.""" + if isinstance(provided, list): + return provided + if isinstance(provided, dict) and isinstance(provided.get("http"), list): + return provided["http"] + + sdk = TuskDrift.get_instance() + transforms = getattr(sdk.config, "transforms", None) + if isinstance(transforms, dict) and isinstance(transforms.get("http"), list): + return transforms["http"] + return None + + def _is_already_instrumented_by_higher_level(self) -> bool: + """Check if there's already an active client span from a higher-level HTTP instrumentation. + + This prevents double spans when urllib3 is used under the hood by libraries + like 'requests' which have their own instrumentation. + + Returns: + True if a higher-level instrumentation is already active, False otherwise. + """ + current_span = trace.get_current_span() + if current_span is None or not current_span.is_recording(): + return False + + span_context = current_span.get_span_context() + if not span_context.is_valid: + return False + + # Access attributes safely - the Span interface doesn't expose .attributes, + # but SDK Span implementations do have it + attributes = getattr(current_span, "attributes", None) + if attributes is not None: + instrumentation_name = attributes.get(TdSpanAttributes.INSTRUMENTATION_NAME) + if instrumentation_name in HIGHER_LEVEL_HTTP_INSTRUMENTATIONS: + logger.debug(f"Skipping urllib3 span creation - already instrumented by {instrumentation_name}") + return True + + return False + + def patch(self, module: Any) -> None: + """Patch the urllib3 module. + + Patches PoolManager.urlopen() and HTTPConnectionPool.urlopen() to intercept + all HTTP requests made through urllib3. + """ + # Patch PoolManager.urlopen + if hasattr(module, "PoolManager"): + self._patch_pool_manager(module) + else: + logger.warning("urllib3.PoolManager not found, skipping PoolManager instrumentation") + + # Patch HTTPConnectionPool.urlopen for direct pool usage + if hasattr(module, "HTTPConnectionPool"): + self._patch_connection_pool(module) + else: + logger.warning("urllib3.HTTPConnectionPool not found, skipping HTTPConnectionPool instrumentation") + + def _patch_pool_manager(self, module: Any) -> None: + """Patch urllib3.PoolManager.urlopen for high-level API.""" + original_urlopen = module.PoolManager.urlopen + instrumentation_self = self + + def patched_urlopen( + pool_self, + method: str, + url: str, + redirect: bool = True, + **kw, + ): + """Patched PoolManager.urlopen method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return original_urlopen(pool_self, method, url, redirect=redirect, **kw) + + # Skip if already instrumented by higher-level library + if instrumentation_self._is_already_instrumented_by_higher_level(): + return original_urlopen(pool_self, method, url, redirect=redirect, **kw) + + def original_call(): + return original_urlopen(pool_self, method, url, redirect=redirect, **kw) + + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_urlopen( + sdk, module, method, url, **kw + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(module, url), + is_server_request=False, + ) + + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_urlopen( + pool_self, method, url, is_pre_app_start, original_urlopen, redirect=redirect, **kw + ), + span_kind=OTelSpanKind.CLIENT, + ) + + module.PoolManager.urlopen = patched_urlopen + logger.info("urllib3.PoolManager.urlopen instrumented") + + def _patch_connection_pool(self, module: Any) -> None: + """Patch urllib3.HTTPConnectionPool.urlopen for direct pool usage.""" + original_urlopen = module.HTTPConnectionPool.urlopen + instrumentation_self = self + + def patched_urlopen( + pool_self, + method: str, + url: str, + body=None, + headers=None, + retries=None, + redirect=True, + assert_same_host=True, + timeout=None, + pool_timeout=None, + release_conn=None, + chunked=False, + body_pos=None, + preload_content=True, + decode_content=True, + **response_kw, + ): + """Patched HTTPConnectionPool.urlopen method.""" + sdk = TuskDrift.get_instance() + + # Build full URL from pool's host/port and relative URL + scheme = pool_self.scheme if hasattr(pool_self, "scheme") else "http" + host = pool_self.host if hasattr(pool_self, "host") else "localhost" + port = pool_self.port if hasattr(pool_self, "port") else None + + if url.startswith(("http://", "https://")): + full_url = url + else: + port_str = f":{port}" if port and port not in (80, 443) else "" + full_url = f"{scheme}://{host}{port_str}{url}" + + # Pass through if SDK is disabled + if sdk.mode == TuskDriftMode.DISABLED: + return original_urlopen( + pool_self, + method, + url, + body=body, + headers=headers, + retries=retries, + redirect=redirect, + assert_same_host=assert_same_host, + timeout=timeout, + pool_timeout=pool_timeout, + release_conn=release_conn, + chunked=chunked, + body_pos=body_pos, + preload_content=preload_content, + decode_content=decode_content, + **response_kw, + ) + + if instrumentation_self._is_already_instrumented_by_higher_level(): + return original_urlopen( + pool_self, + method, + url, + body=body, + headers=headers, + retries=retries, + redirect=redirect, + assert_same_host=assert_same_host, + timeout=timeout, + pool_timeout=pool_timeout, + release_conn=release_conn, + chunked=chunked, + body_pos=body_pos, + preload_content=preload_content, + decode_content=decode_content, + **response_kw, + ) + + def original_call(): + return original_urlopen( + pool_self, + method, + url, + body=body, + headers=headers, + retries=retries, + redirect=redirect, + assert_same_host=assert_same_host, + timeout=timeout, + pool_timeout=pool_timeout, + release_conn=release_conn, + chunked=chunked, + body_pos=body_pos, + preload_content=preload_content, + decode_content=decode_content, + **response_kw, + ) + + # Import urllib3 module for mock response creation + import urllib3 as urllib3_module + + # REPLAY mode + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_urlopen( + sdk, + urllib3_module, + method, + full_url, + body=body, + headers=headers, + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(urllib3_module, full_url), + is_server_request=False, + ) + + # RECORD mode + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_connection_pool_urlopen( + pool_self, + method, + url, + full_url, + is_pre_app_start, + original_urlopen, + body=body, + headers=headers, + retries=retries, + redirect=redirect, + assert_same_host=assert_same_host, + timeout=timeout, + pool_timeout=pool_timeout, + release_conn=release_conn, + chunked=chunked, + body_pos=body_pos, + preload_content=preload_content, + decode_content=decode_content, + **response_kw, + ), + span_kind=OTelSpanKind.CLIENT, + ) + + module.HTTPConnectionPool.urlopen = patched_urlopen + logger.info("urllib3.HTTPConnectionPool.urlopen instrumented") + + def _get_default_response(self, urllib3_module: Any, url: str) -> Any: + """Return default response for background requests in REPLAY mode. + + Background requests (health checks, metrics, etc.) that happen outside + of any trace context should return a default response instead of failing. + """ + # Create a minimal HTTPResponse-like object + from io import BytesIO + + # urllib3.HTTPResponse expects specific arguments + response = urllib3_module.HTTPResponse( + body=BytesIO(b""), + headers={}, + status=200, + preload_content=True, + ) + logger.debug(f"[Urllib3Instrumentation] Returning default response for background request to {url}") + return response + + def _create_client_span(self, method: str, url: str, is_pre_app_start: bool) -> SpanInfo | None: + """Create a client span for HTTP requests. + + Args: + method: HTTP method + url: Request URL + is_pre_app_start: Whether this is before app start + + Returns: + SpanInfo if successful, None if span creation failed + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + return SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "Urllib3Instrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + def _handle_replay_urlopen( + self, + sdk: TuskDrift, + urllib3_module: Any, + method: str, + url: str, + body: Any = None, + headers: dict | None = None, + **kwargs, + ) -> Any: + """Handle urlopen in REPLAY mode. + + Creates a span, fetches mock response. + Raises RuntimeError if no mock is found. + """ + span_info = self._create_client_span(method, url, not sdk.app_ready) + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") + + try: + with SpanUtils.with_span(span_info): + mock_response = self._try_get_mock( + sdk, + urllib3_module, + method, + url, + span_info.trace_id, + span_info.span_id, + body=body, + headers=headers, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() + + def _handle_record_urlopen( + self, + pool_self: Any, + method: str, + url: str, + is_pre_app_start: bool, + original_urlopen: Any, + redirect: bool = True, + **kw, + ) -> Any: + """Handle PoolManager.urlopen in RECORD mode. + + Creates a span, makes the real request, and records the response. + """ + span_info = self._create_client_span(method, url, is_pre_app_start) + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_urlopen(pool_self, method, url, redirect=redirect, **kw) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms before making the request + headers = kw.get("headers") or {} + if isinstance(headers, (list, tuple)): + headers = dict(headers) + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), url, headers + ): + span_info.span.set_attribute( + TdSpanAttributes.OUTPUT_VALUE, + json.dumps({"bodyProcessingError": "dropped"}), + ) + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) + raise RequestDroppedByTransform( + f"Outbound request to {url} was dropped by transform rule", + method.upper(), + url, + ) + + error = None + response = None + + try: + response = original_urlopen(pool_self, method, url, redirect=redirect, **kw) + return response + except Exception as e: + error = e + raise + finally: + self._finalize_span( + span_info.span, + method, + url, + response, + error, + body=kw.get("body"), + headers=kw.get("headers"), + fields=kw.get("fields"), + ) + finally: + span_info.span.end() + + def _handle_record_connection_pool_urlopen( + self, + pool_self: Any, + method: str, + url: str, + full_url: str, + is_pre_app_start: bool, + original_urlopen: Any, + body: Any = None, + headers: dict | None = None, + **kwargs, + ) -> Any: + """Handle HTTPConnectionPool.urlopen in RECORD mode. + + Creates a span, makes the real request, and records the response. + """ + span_info = self._create_client_span(method, full_url, is_pre_app_start) + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_urlopen(pool_self, method, url, body=body, headers=headers, **kwargs) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms before making the request + headers_dict = dict(headers) if headers else {} + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), full_url, headers_dict + ): + span_info.span.set_attribute( + TdSpanAttributes.OUTPUT_VALUE, + json.dumps({"bodyProcessingError": "dropped"}), + ) + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) + raise RequestDroppedByTransform( + f"Outbound request to {full_url} was dropped by transform rule", + method.upper(), + full_url, + ) + + error = None + response = None + + try: + response = original_urlopen(pool_self, method, url, body=body, headers=headers, **kwargs) + return response + except Exception as e: + error = e + raise + finally: + self._finalize_span( + span_info.span, + method, + full_url, + response, + error, + body=body, + headers=headers, + ) + finally: + span_info.span.end() + + def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: + """Encode body data to base64 string. + + Args: + body_data: Body data (str, bytes, dict, or other) + + Returns: + Tuple of (base64_encoded_string, original_byte_size) + """ + if body_data is None: + return None, 0 + + if isinstance(body_data, bytes): + body_bytes = body_data + elif isinstance(body_data, str): + body_bytes = body_data.encode("utf-8") + elif isinstance(body_data, dict): + body_bytes = json.dumps(body_data).encode("utf-8") + else: + # Fallback: convert to string then encode + body_bytes = str(body_data).encode("utf-8") + + base64_body = base64.b64encode(body_bytes).decode("ascii") + + return base64_body, len(body_bytes) + + def _get_response_body_safely(self, response: Any) -> bytes | None: + """Get response body without consuming the stream for non-preloaded responses. + + urllib3's .data property will read from the underlying stream if the response + was created with preload_content=False. This breaks applications that: + - Use preload_content=False and call response.read() manually + - Use response.stream() to iterate over chunks + - Use any streaming pattern to process large responses + + This method checks whether it's safe to access the body without consuming + the stream, and returns None if the body cannot be safely captured. + + IMPORTANT: We must NOT use hasattr(response, "data") because in Python, + hasattr() calls getattr() internally, which would trigger the .data + property getter and consume the stream! + + Args: + response: urllib3 HTTPResponse object + + Returns: + Response body as bytes, or None if body cannot be safely captured + """ + # Check if this is a urllib3 HTTPResponse by checking for _body attribute + # Note: We use getattr with a sentinel to avoid triggering any property getters + _sentinel = object() + body = getattr(response, "_body", _sentinel) + + if body is _sentinel: + # Not a urllib3 HTTPResponse or doesn't have _body + return b"" + + # Check if body was already preloaded/cached + if body is not None: + # Body was already read and cached, safe to return + # Ensure it's bytes (urllib3's _body should always be bytes when set) + return body if isinstance(body, bytes) else b"" + + # Check if the stream was already fully consumed (fp is None or closed) + fp = getattr(response, "_fp", None) + if fp is None: + # Stream was consumed, body should be in _body (but it might be None) + return b"" + + # Check if the file pointer is closed + if hasattr(fp, "closed") and fp.closed: + return b"" + + # At this point, the stream is still open and body hasn't been read + # This means preload_content=False was used (otherwise the body would + # have been read during __init__) + # + # We skip capturing the body to avoid breaking the application + return None + + def _get_decoded_type_from_content_type(self, content_type: str | None) -> DecodedType | None: + """Determine decoded type from Content-Type header. Extracts + the main type (before semicolon) and returns the corresponding + DecodedType enum value. + + Args: + content_type: Content-Type header value + + Returns: + DecodedType enum value or None + """ + if not content_type: + return None + + main_type = content_type.lower().split(";")[0].strip() + + CONTENT_TYPE_MAP = { + "application/json": DecodedType.JSON, + "text/plain": DecodedType.PLAIN_TEXT, + "text/html": DecodedType.HTML, + "application/x-www-form-urlencoded": DecodedType.FORM_DATA, + "multipart/form-data": DecodedType.MULTIPART_FORM, + "application/xml": DecodedType.XML, + "text/xml": DecodedType.XML, + } + + return CONTENT_TYPE_MAP.get(main_type) + + def _get_content_type_header(self, headers: dict | None) -> str | None: + """Get content-type header (case-insensitive lookup).""" + if not headers: + return None + for key, value in headers.items(): + if key.lower() == "content-type": + return value + return None + + def _try_get_mock( + self, + sdk: TuskDrift, + urllib3_module: Any, + method: str, + url: str, + trace_id: str, + span_id: str, + body: Any = None, + headers: dict | None = None, + fields: dict | None = None, + ) -> Any: + """Try to get a mocked response from CLI. + + Returns: + Mocked response object if found, None otherwise + """ + try: + parsed_url = urlparse(url) + + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + elif fields is not None: + encoded_fields = urlencode(fields) + body_base64, body_size = self._encode_body_to_base64(encoded_fields) + + headers_dict = dict(headers) if headers else {} + + raw_input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": headers_dict, + "query": params, + } + + # Add body fields only if body exists + if body_base64 is not None: + raw_input_value["body"] = body_base64 + raw_input_value["bodySize"] = body_size + + input_value = create_mock_input_value(raw_input_value) + + input_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if body_base64 is not None: + request_content_type = self._get_content_type_header(headers_dict) + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + from ...core.mock_utils import find_mock_response_sync + + mock_response_output = find_mock_response_sync( + sdk=sdk, + trace_id=trace_id, + span_id=span_id, + name=f"{method.upper()} {parsed_url.path or '/'}", + package_name=parsed_url.scheme, + package_type=PackageType.HTTP, + instrumentation_name="Urllib3Instrumentation", + submodule_name=method.upper(), + input_value=input_value, + kind=SpanKind.CLIENT, + input_schema_merges=input_schema_merges, + ) + + if not mock_response_output or not mock_response_output.found: + logger.debug(f"No mock found for {method} {url} (trace_id={trace_id})") + return None + + # Create mocked response object + if mock_response_output.response is None: + logger.debug(f"Mock found but response data is None for {method} {url}") + return None + return self._create_mock_response(urllib3_module, mock_response_output.response, url) + + except Exception as e: + logger.error(f"Error getting mock for {method} {url}: {e}") + return None + + def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], url: str) -> Any: + """Create a mocked urllib3.HTTPResponse object. + + Args: + urllib3_module: The urllib3 module + mock_data: Mock response data from CLI + url: Request URL + + Returns: + Mocked HTTPResponse object + """ + from io import BytesIO + + status_code = mock_data.get("statusCode", 200) + headers = dict(mock_data.get("headers", {})) + + # Remove content-encoding and transfer-encoding headers since the body + # was already decompressed when recorded + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] + + # Get body - decode from base64 if needed + body = mock_data.get("body", "") + content = b"" + if isinstance(body, str): + try: + decoded = base64.b64decode(body.encode("ascii"), validate=True) + if base64.b64encode(decoded).decode("ascii") == body: + content = decoded + else: + content = body.encode("utf-8") + except Exception: + content = body.encode("utf-8") + elif isinstance(body, bytes): + content = body + else: + content = json.dumps(body).encode("utf-8") + + response = urllib3_module.HTTPResponse( + body=BytesIO(content), + headers=headers, + status=status_code, + preload_content=True, + ) + + # Read the content to make it available via response.data + response.read() + + logger.debug(f"Created mock urllib3 response: {status_code} for {url}") + return response + + def _finalize_span( + self, + span: Span, + method: str, + url: str, + response: Any, + error: Exception | None, + body: Any = None, + headers: dict | None = None, + fields: dict | None = None, + ) -> None: + """Finalize span with request/response data. + + Args: + span: The OpenTelemetry span to finalize + method: HTTP method + url: Request URL + response: Response object (if successful) + error: Exception (if failed) + body: Request body + headers: Request headers + fields: URL-encoded form fields + """ + try: + parsed_url = urlparse(url) + + # Build input value + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + headers_dict = dict(headers) if headers else {} + + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + elif fields is not None: + encoded_fields = urlencode(fields) + body_base64, body_size = self._encode_body_to_base64(encoded_fields) + + input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": headers_dict, + "query": params, + } + + if body_base64 is not None: + input_value["body"] = body_base64 + input_value["bodySize"] = body_size + + # Build output value + output_value = {} + status = SpanStatus(code=StatusCode.OK, message="") + response_body_base64 = None + + if error: + output_value = { + "errorName": type(error).__name__, + "errorMessage": str(error), + } + status = SpanStatus(code=StatusCode.ERROR, message=str(error)) + elif response: + response_headers = dict(response.headers) if hasattr(response, "headers") else {} + response_body_size = 0 + + try: + # Get response content safely without consuming the stream + # urllib3's .data property will read from the stream if not preloaded, + # which would break applications using preload_content=False or response.stream() + response_bytes = self._get_response_body_safely(response) + if response_bytes is not None: + response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) + else: + # Response body not captured (likely preload_content=False or streaming) + response_body_base64 = None + response_body_size = 0 + logger.warning( + f"Response body not captured for {method} {url} - request used " + f"preload_content=False or streaming. Replay may return an empty body." + ) + except Exception: + response_body_base64 = None + response_body_size = 0 + + status_code = response.status if hasattr(response, "status") else 200 + + output_value = { + "statusCode": status_code, + "statusMessage": response.reason if hasattr(response, "reason") else "", + "headers": response_headers, + } + + # Add body fields only if body exists + if response_body_base64 is not None: + output_value["body"] = response_body_base64 + output_value["bodySize"] = response_body_size + + if status_code >= 400: + status = SpanStatus( + code=StatusCode.ERROR, + message=f"HTTP {status_code}", + ) + + # Check if response content type should block the trace + from ...core.content_type_utils import get_decoded_type, should_block_content_type + from ...core.trace_blocking_manager import TraceBlockingManager + + response_content_type = response_headers.get("content-type") or response_headers.get("Content-Type") + decoded_type = get_decoded_type(response_content_type) + + if should_block_content_type(decoded_type): + span_context = span.get_span_context() + trace_id = format(span_context.trace_id, "032x") + + blocking_mgr = TraceBlockingManager.get_instance() + blocking_mgr.block_trace( + trace_id, reason=f"outbound_binary:{decoded_type.name if decoded_type else 'unknown'}" + ) + logger.warning( + f"Blocking trace {trace_id} - outbound request returned binary response: {response_content_type}" + ) + return + else: + output_value = {} + + # Apply transforms + transform_metadata = None + if self._transform_engine: + span_data = HttpSpanData( + kind=SpanKind.CLIENT, + input_value=input_value, + output_value=output_value, + ) + self._transform_engine.apply_transforms(span_data) + + input_value = span_data.input_value or input_value + output_value = span_data.output_value or output_value + transform_metadata = span_data.transform_metadata + + # Create schema merge hints + request_content_type = self._get_content_type_header(headers_dict) + response_content_type = None + if response and hasattr(response, "headers"): + response_content_type = self._get_content_type_header(dict(response.headers)) + + input_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if body_base64 is not None: + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + output_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if response_body_base64 is not None: + output_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(response_content_type), + ) + + # Set span attributes + normalized_input = remove_none_values(input_value) + normalized_output = remove_none_values(output_value) + span.set_attribute(TdSpanAttributes.INPUT_VALUE, json.dumps(normalized_input)) + span.set_attribute(TdSpanAttributes.OUTPUT_VALUE, json.dumps(normalized_output)) + + from ..wsgi.utilities import _schema_merges_to_dict + + input_schema_merges_dict = _schema_merges_to_dict(input_schema_merges) + output_schema_merges_dict = _schema_merges_to_dict(output_schema_merges) + + span.set_attribute(TdSpanAttributes.INPUT_SCHEMA_MERGES, json.dumps(input_schema_merges_dict)) + span.set_attribute(TdSpanAttributes.OUTPUT_SCHEMA_MERGES, json.dumps(output_schema_merges_dict)) + + if transform_metadata: + span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) + + if status.code == StatusCode.ERROR: + span.set_status(Status(OTelStatusCode.ERROR, status.message)) + else: + span.set_status(Status(OTelStatusCode.OK)) + + except Exception as e: + logger.error(f"Error finalizing span for {method} {url}: {e}") + span.set_status(Status(OTelStatusCode.ERROR, str(e))) diff --git a/drift/instrumentation/urllib3/notes.md b/drift/instrumentation/urllib3/notes.md new file mode 100644 index 0000000..384dede --- /dev/null +++ b/drift/instrumentation/urllib3/notes.md @@ -0,0 +1,95 @@ +# urllib3 Instrumentation Notes + +--- + +## 1. `preload_content=False` and Streaming Responses + +### Background + +In urllib3, when making HTTP requests: + +- **`preload_content=True` (default)**: Response body is immediately read into memory and cached in `response._body` +- **`preload_content=False`**: Response body is NOT automatically read; the underlying stream remains open for the application to read manually via `response.read()` or `response.stream()` + +This is commonly used for: + +- Large file downloads (avoid loading GB into memory) +- Streaming responses (process chunks as they arrive) +- Memory-efficient processing + +### Bug Found & Fixed + +**Problem:** The instrumentation broke applications using `preload_content=False` because accessing `response.data` in `_finalize_span()` consumed the response stream before the application could read it. + +**Root Cause:** The original code used `hasattr(response, "data")` to check for the attribute. However, Python's `hasattr()` internally calls `getattr()`, which triggers property getters. urllib3's `.data` is a property that reads the entire body when accessed, consuming the stream. + +**Fix:** Check for the internal `_body` attribute directly using `getattr(response, "_body", sentinel)` instead of `hasattr(response, "data")`. + +**Logging**: Instead of silently skipping body capture, the SDK will emit a warning in application logs to inform users that replay may not work correctly for this request. Example: + +```text +[TuskDrift] Response body not captured for GET https://example.com/api - request used preload_content=False or streaming. Replay may return an empty body. +``` + +### Trade-off: RECORD vs REPLAY + +For `preload_content=False` responses, we face an inherent trade-off: + +| Mode | Behavior | +|------|----------| +| **RECORD** | ✅ Works correctly - we skip capturing the body, so the application can read/stream normally | +| **REPLAY** | ⚠️ Response body will be empty - we didn't capture it, so there's nothing to return | + +This is intentional. We prioritize not breaking the application during RECORD over capturing data for REPLAY. + +### Mock Matching Impact + +Mock matching is not affected because the CLI's matching algorithm is entirely input-based. It matches on request URL, method, headers, body (InputValue), and uses InputValueHash, InputSchemaHash, and similarity scoring on InputValue. OutputValue (response) is not used for matching. + +Mock **response** is affected: + +- The matched span's `OutputValue.body` will be empty/missing +- Status code and headers are still captured correctly +- Application may fail if it expects actual response data + +### Code Location + +The safe body retrieval logic is in `_get_response_body_safely()` method in [`instrumentation.py`](./instrumentation.py): + +```python +def _get_response_body_safely(self, response: Any) -> bytes | None: + # Use getattr with a sentinel to avoid triggering property getters + _sentinel = object() + body = getattr(response, "_body", _sentinel) + + if body is _sentinel: + return b"" # Not a urllib3 HTTPResponse + + if body is not None: + return body # Body already cached, safe to return + + # Body hasn't been read yet (preload_content=False) + # Skip to avoid consuming the stream + return None +``` + +### Test Coverage + +The e2e tests include endpoints for `preload_content=False` and streaming responses (`/test/bug/preload-content-false`, `/test/bug/streaming-response`), but these are excluded from the automated REPLAY test suite since they're incompatible with replay. They can be tested manually to verify RECORD mode doesn't break the application. + +### Future Considerations + +**Potential improvements to support replay for streaming responses:** + +1. **Response wrapper approach**: Create a wrapper around the urllib3 response that intercepts `read()` and `stream()` calls, capturing bytes as the application reads them. After the application finishes reading, store the accumulated body in the span. This would require: + - Implementing a custom response wrapper class + - Deferring span finalization until the response is fully consumed + - Handling edge cases like partial reads, connection errors mid-stream + +2. **Post-read body capture**: After the application calls `response.read()`, the body becomes cached in `response._body`. We could potentially capture it at that point. However, this requires knowing *when* the application has finished reading, which is non-trivial. + +This may not be worth implementing at the moment because: + +- Applications using `preload_content=False` are typically downloading large files or streaming data - scenarios that aren't good candidates for unit test mocking anyway +- The complexity of implementing response wrappers could introduce subtle bugs +- Most API calls use the default `preload_content=True` and work correctly