Skip to content

Commit e449adb

Browse files
authored
feat: add mock upstream support across HTTP e2e suites (#73)
1 parent c65612c commit e449adb

File tree

18 files changed

+799
-60
lines changed

18 files changed

+799
-60
lines changed

drift/instrumentation/aiohttp/e2e-tests/docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
services:
2+
mock-upstream:
3+
image: python:3.12-slim
4+
command: ["python", "/mock/mock_server.py"]
5+
working_dir: /mock
6+
volumes:
7+
- ../../e2e_common/mock_upstream:/mock:ro
8+
environment:
9+
- PYTHONUNBUFFERED=1
10+
211
app:
312
build:
413
context: ../../../..
@@ -14,7 +23,11 @@ services:
1423
- BENCHMARK_DURATION=${BENCHMARK_DURATION:-10}
1524
- BENCHMARK_WARMUP=${BENCHMARK_WARMUP:-3}
1625
- TUSK_SAMPLING_RATE=${TUSK_SAMPLING_RATE:-}
26+
- USE_MOCK_EXTERNALS=${USE_MOCK_EXTERNALS:-1}
27+
- MOCK_SERVER_BASE_URL=${MOCK_SERVER_BASE_URL:-http://mock-upstream:8081}
1728
working_dir: /app
29+
depends_on:
30+
- mock-upstream
1831
volumes:
1932
# Mount SDK source for hot reload (no rebuild needed for SDK changes)
2033
- ../../../..:/sdk

drift/instrumentation/aiohttp/e2e-tests/src/app.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
from flask import Flask, jsonify, request
77

88
from drift import TuskDrift
9+
from drift.instrumentation.e2e_common.external_http import (
10+
external_http_timeout_seconds,
11+
upstream_url,
12+
)
913

1014
# Initialize SDK
1115
sdk = TuskDrift.initialize(
@@ -14,6 +18,25 @@
1418
)
1519

1620
app = Flask(__name__)
21+
EXTERNAL_HTTP_TIMEOUT_SECONDS = external_http_timeout_seconds()
22+
23+
24+
def _configure_aiohttp_for_mock_and_timeouts():
25+
original_request = aiohttp.ClientSession._request
26+
27+
async def patched_request(self, method, str_or_url, *args, **kwargs):
28+
session_timeout = getattr(self, "_timeout", None)
29+
default_timeout = getattr(aiohttp.client, "DEFAULT_TIMEOUT", None)
30+
using_default_session_timeout = session_timeout is default_timeout or session_timeout == default_timeout
31+
if "timeout" not in kwargs and using_default_session_timeout:
32+
kwargs["timeout"] = aiohttp.ClientTimeout(total=EXTERNAL_HTTP_TIMEOUT_SECONDS)
33+
rewritten = upstream_url(str(str_or_url))
34+
return await original_request(self, method, rewritten, *args, **kwargs)
35+
36+
aiohttp.ClientSession._request = patched_request
37+
38+
39+
_configure_aiohttp_for_mock_and_timeouts()
1740

1841

1942
# =============================================================================

drift/instrumentation/django/e2e-tests/docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
services:
2+
mock-upstream:
3+
image: python:3.12-slim
4+
command: ["python", "/mock/mock_server.py"]
5+
working_dir: /mock
6+
volumes:
7+
- ../../e2e_common/mock_upstream:/mock:ro
8+
environment:
9+
- PYTHONUNBUFFERED=1
10+
211
app:
312
build:
413
context: ../../../..
@@ -15,7 +24,11 @@ services:
1524
- BENCHMARK_DURATION=${BENCHMARK_DURATION:-10}
1625
- BENCHMARK_WARMUP=${BENCHMARK_WARMUP:-3}
1726
- TUSK_SAMPLING_RATE=${TUSK_SAMPLING_RATE:-}
27+
- USE_MOCK_EXTERNALS=${USE_MOCK_EXTERNALS:-1}
28+
- MOCK_SERVER_BASE_URL=${MOCK_SERVER_BASE_URL:-http://mock-upstream:8081}
1829
working_dir: /app
30+
depends_on:
31+
- mock-upstream
1932
volumes:
2033
# Mount SDK source for hot reload (no rebuild needed for SDK changes)
2134
- ../../../..:/sdk

drift/instrumentation/django/e2e-tests/src/views.py

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010
from django.views.decorators.http import require_GET, require_http_methods, require_POST
1111
from opentelemetry import context as otel_context
1212

13+
from drift.instrumentation.e2e_common.external_http import (
14+
external_http_timeout_seconds,
15+
upstream_url,
16+
)
17+
18+
EXTERNAL_HTTP_TIMEOUT_SECONDS = external_http_timeout_seconds()
19+
1320

1421
def _run_with_context(ctx, fn, *args, **kwargs):
1522
"""Helper to run a function with OpenTelemetry context in a thread pool."""
@@ -31,12 +38,13 @@ def get_weather(request):
3138
"""Fetch weather data from external API."""
3239
try:
3340
response = requests.get(
34-
"https://api.open-meteo.com/v1/forecast",
41+
upstream_url("https://api.open-meteo.com/v1/forecast"),
3542
params={
3643
"latitude": 40.7128,
3744
"longitude": -74.0060,
3845
"current_weather": "true",
3946
},
47+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
4048
)
4149
weather = response.json()
4250

@@ -47,87 +55,142 @@ def get_weather(request):
4755
}
4856
)
4957
except Exception as e:
50-
return JsonResponse({"error": f"Failed to fetch weather: {str(e)}"}, status=500)
58+
return JsonResponse(
59+
{
60+
"location": "New York",
61+
"weather": {},
62+
"fallback": True,
63+
"error": f"Failed to fetch weather: {str(e)}",
64+
}
65+
)
5166

5267

5368
@require_GET
5469
def get_user(request, user_id: str):
5570
"""Fetch user data from external API with seed."""
5671
try:
57-
response = requests.get(f"https://randomuser.me/api/?seed={user_id}")
72+
response = requests.get(
73+
upstream_url("https://randomuser.me/api/"),
74+
params={"seed": user_id},
75+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
76+
)
5877
return JsonResponse(response.json())
5978
except Exception as e:
60-
return JsonResponse({"error": f"Failed to fetch user: {str(e)}"}, status=500)
79+
return JsonResponse({"results": [], "fallback": True, "error": f"Failed to fetch user: {str(e)}"})
6180

6281

6382
@csrf_exempt
6483
@require_POST
6584
def create_post(request):
6685
"""Create a new post via external API."""
86+
data = {}
6787
try:
6888
data = json.loads(request.body)
6989
response = requests.post(
70-
"https://jsonplaceholder.typicode.com/posts",
90+
upstream_url("https://jsonplaceholder.typicode.com/posts"),
7191
json={
7292
"title": data.get("title"),
7393
"body": data.get("body"),
7494
"userId": data.get("userId", 1),
7595
},
96+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
7697
)
7798
return JsonResponse(response.json(), status=201)
7899
except Exception as e:
79-
return JsonResponse({"error": f"Failed to create post: {str(e)}"}, status=500)
100+
return JsonResponse(
101+
{
102+
"id": -1,
103+
"title": data.get("title", ""),
104+
"body": data.get("body", ""),
105+
"userId": data.get("userId", 1),
106+
"fallback": True,
107+
"error": f"Failed to create post: {str(e)}",
108+
},
109+
status=201,
110+
)
80111

81112

82113
@require_GET
83114
def get_post(request, post_id: int):
84115
"""Fetch post and comments in parallel using ThreadPoolExecutor."""
85-
ctx = otel_context.get_current()
86-
87-
with ThreadPoolExecutor(max_workers=2) as executor:
88-
post_future = executor.submit(
89-
_run_with_context,
90-
ctx,
91-
requests.get,
92-
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
116+
try:
117+
ctx = otel_context.get_current()
118+
119+
with ThreadPoolExecutor(max_workers=2) as executor:
120+
post_future = executor.submit(
121+
_run_with_context,
122+
ctx,
123+
requests.get,
124+
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}"),
125+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
126+
)
127+
comments_future = executor.submit(
128+
_run_with_context,
129+
ctx,
130+
requests.get,
131+
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}/comments"),
132+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
133+
)
134+
135+
post_response = post_future.result()
136+
comments_response = comments_future.result()
137+
138+
return JsonResponse(
139+
{
140+
"post": post_response.json(),
141+
"comments": comments_response.json(),
142+
}
93143
)
94-
comments_future = executor.submit(
95-
_run_with_context,
96-
ctx,
97-
requests.get,
98-
f"https://jsonplaceholder.typicode.com/posts/{post_id}/comments",
144+
except Exception as e:
145+
return JsonResponse(
146+
{
147+
"post": {},
148+
"comments": [],
149+
"fallback": True,
150+
"error": f"Failed to fetch post: {str(e)}",
151+
}
99152
)
100153

101-
post_response = post_future.result()
102-
comments_response = comments_future.result()
103-
104-
return JsonResponse(
105-
{
106-
"post": post_response.json(),
107-
"comments": comments_response.json(),
108-
}
109-
)
110-
111154

112155
@csrf_exempt
113156
@require_http_methods(["DELETE"])
114157
def delete_post(request, post_id: int):
115158
"""Delete a post via external API."""
116159
try:
117-
requests.delete(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
160+
requests.delete(
161+
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}"),
162+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
163+
)
118164
return JsonResponse({"message": f"Post {post_id} deleted successfully"})
119165
except Exception as e:
120-
return JsonResponse({"error": f"Failed to delete post: {str(e)}"}, status=500)
166+
return JsonResponse(
167+
{
168+
"message": f"Post {post_id} delete fallback",
169+
"fallback": True,
170+
"error": f"Failed to delete post: {str(e)}",
171+
}
172+
)
121173

122174

123175
@require_GET
124176
def get_activity(request):
125177
"""Fetch a random activity suggestion."""
126178
try:
127-
response = requests.get("https://bored-api.appbrewery.com/random")
179+
response = requests.get(
180+
upstream_url("https://bored-api.appbrewery.com/random"),
181+
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
182+
)
128183
return JsonResponse(response.json())
129184
except Exception as e:
130-
return JsonResponse({"error": f"Failed to fetch activity: {str(e)}"}, status=500)
185+
return JsonResponse(
186+
{
187+
"activity": "Take a short walk",
188+
"type": "relaxation",
189+
"participants": 1,
190+
"fallback": True,
191+
"error": f"Failed to fetch activity: {str(e)}",
192+
}
193+
)
131194

132195

133196
@require_GET
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""Shared external HTTP config/helpers for instrumentation e2e apps."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from urllib.parse import urlsplit, urlunsplit
7+
8+
_TRUTHY = {"1", "true", "yes", "on"}
9+
_DEFAULT_MOCK_BASE_URL = "http://mock-upstream:8081"
10+
_DEFAULT_TIMEOUT_SECONDS = 3.0
11+
12+
13+
def use_mock_externals() -> bool:
14+
return os.getenv("USE_MOCK_EXTERNALS", "0").strip().lower() in _TRUTHY
15+
16+
17+
def mock_server_base_url() -> str:
18+
return os.getenv("MOCK_SERVER_BASE_URL", _DEFAULT_MOCK_BASE_URL).rstrip("/")
19+
20+
21+
def external_http_timeout_seconds() -> float:
22+
raw = os.getenv("EXTERNAL_HTTP_TIMEOUT_SECONDS")
23+
if raw is None or not raw.strip():
24+
return _DEFAULT_TIMEOUT_SECONDS
25+
try:
26+
return float(raw)
27+
except ValueError:
28+
return _DEFAULT_TIMEOUT_SECONDS
29+
30+
31+
def upstream_url(url: str) -> str:
32+
"""Rewrite external URLs to the local mock upstream when enabled."""
33+
if not use_mock_externals():
34+
return url
35+
36+
source = urlsplit(url)
37+
if not source.scheme or not source.netloc:
38+
return url
39+
40+
target = urlsplit(mock_server_base_url())
41+
path = source.path or "/"
42+
return urlunsplit((target.scheme or "http", target.netloc, path, source.query, ""))
43+
44+
45+
def upstream_url_parts(url: str) -> tuple[str, str, int, str]:
46+
"""Return (scheme, host, port, path_with_query) after mock rewrite."""
47+
rewritten = upstream_url(url)
48+
parsed = urlsplit(rewritten)
49+
scheme = parsed.scheme or "http"
50+
host = parsed.hostname or ""
51+
port = parsed.port or (443 if scheme == "https" else 80)
52+
path = parsed.path or "/"
53+
if parsed.query:
54+
path = f"{path}?{parsed.query}"
55+
return scheme, host, port, path

0 commit comments

Comments
 (0)