Skip to content

Commit 8b803ab

Browse files
initial pyJWT instrumentation
1 parent 5d97f8e commit 8b803ab

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

drift/core/drift_sdk.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,15 @@ def _init_auto_instrumentations(self) -> None:
481481
except Exception as e:
482482
logger.debug(f"Socket instrumentation initialization failed: {e}")
483483

484+
# PyJWT instrumentation for JWT verification bypass
485+
try:
486+
from ..instrumentation.pyjwt import PyJWTInstrumentation
487+
488+
_ = PyJWTInstrumentation(mode=self.mode)
489+
logger.debug("PyJWT instrumentation registered (REPLAY mode)")
490+
except Exception as e:
491+
logger.debug(f"PyJWT instrumentation registration failed: {e}")
492+
484493
def create_env_vars_snapshot(self) -> None:
485494
"""Create a span capturing all environment variables.
486495
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""PyJWT instrumentation for REPLAY mode."""
2+
3+
from .instrumentation import PyJWTInstrumentation
4+
5+
__all__ = ["PyJWTInstrumentation"]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""PyJWT instrumentation for REPLAY mode.
2+
3+
Patches PyJWT to disable all verification during test replay:
4+
1. _merge_options - returns all verification options as False
5+
2. _verify_signature - no-op (defense in depth)
6+
3. _validate_claims - no-op (defense in depth)
7+
8+
Only active in REPLAY mode.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import logging
14+
from types import ModuleType
15+
16+
from ...core.types import TuskDriftMode
17+
from ..base import InstrumentationBase
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class PyJWTInstrumentation(InstrumentationBase):
23+
"""Patches PyJWT to disable verification in REPLAY mode."""
24+
25+
def __init__(
26+
self, mode: TuskDriftMode = TuskDriftMode.DISABLED, enabled: bool = True
27+
) -> None:
28+
self.mode = mode
29+
should_enable = enabled and mode == TuskDriftMode.REPLAY
30+
31+
super().__init__(
32+
name="PyJWTInstrumentation",
33+
module_name="jwt",
34+
supported_versions="*",
35+
enabled=should_enable,
36+
)
37+
38+
def patch(self, module: ModuleType) -> None:
39+
if self.mode != TuskDriftMode.REPLAY:
40+
return
41+
42+
self._patch_merge_options()
43+
self._patch_signature_verification()
44+
self._patch_claim_validation()
45+
logger.debug("[PyJWTInstrumentation] All patches applied")
46+
47+
def _patch_signature_verification(self) -> None:
48+
"""No-op signature verification."""
49+
try:
50+
from jwt import api_jws
51+
52+
def patched_verify_signature(self, *args, **kwargs):
53+
logger.debug("[PyJWTInstrumentation] _verify_signature called - skipping verification")
54+
return None
55+
56+
api_jws.PyJWS._verify_signature = patched_verify_signature
57+
logger.debug("[PyJWTInstrumentation] Patched PyJWS._verify_signature")
58+
except Exception as e:
59+
logger.warning(f"[PyJWTInstrumentation] Failed to patch _verify_signature: {e}")
60+
61+
def _patch_claim_validation(self) -> None:
62+
"""No-op claim validation."""
63+
try:
64+
from jwt import api_jwt
65+
66+
def patched_validate_claims(self, *args, **kwargs):
67+
logger.debug("[PyJWTInstrumentation] _validate_claims called - skipping validation")
68+
return None
69+
70+
api_jwt.PyJWT._validate_claims = patched_validate_claims
71+
logger.debug("[PyJWTInstrumentation] Patched PyJWT._validate_claims")
72+
except Exception as e:
73+
logger.warning(f"[PyJWTInstrumentation] Failed to patch _validate_claims: {e}")
74+
75+
def _patch_merge_options(self) -> None:
76+
"""Patch _merge_options to always return disabled verification options."""
77+
try:
78+
from jwt import api_jwt
79+
80+
disabled_options = {
81+
"verify_signature": False,
82+
"verify_exp": False,
83+
"verify_nbf": False,
84+
"verify_iat": False,
85+
"verify_aud": False,
86+
"verify_iss": False,
87+
"verify_sub": False,
88+
"verify_jti": False,
89+
"require": [],
90+
"strict_aud": False,
91+
}
92+
93+
def patched_merge_options(self, options=None):
94+
logger.debug("[PyJWTInstrumentation] _merge_options called - returning disabled options")
95+
return disabled_options
96+
97+
api_jwt.PyJWT._merge_options = patched_merge_options
98+
logger.debug("[PyJWTInstrumentation] Patched PyJWT._merge_options")
99+
except Exception as e:
100+
logger.warning(f"[PyJWTInstrumentation] Failed to patch _merge_options: {e}")

0 commit comments

Comments
 (0)