Skip to content

Commit 2451f6c

Browse files
committed
Store body_raw as well as a fix for bb report
1 parent 4af94ea commit 2451f6c

3 files changed

Lines changed: 95 additions & 1 deletion

File tree

aikido_zen/context/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .extract_route_params import extract_route_params
1818
from ..helpers.headers import Headers
1919

20-
UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"]
20+
UINPUT_SOURCES = ["body", "body_raw", "cookies", "query", "headers", "xml", "route_params"]
2121
current_context = contextvars.ContextVar("current_context", default=None)
2222

2323
WSGI_SOURCES = ["django", "flask"]
@@ -79,6 +79,7 @@ def __reduce__(self):
7979
"remote_address": self.remote_address,
8080
"url": self.url,
8181
"body": self.body,
82+
"body_raw": self.body_raw,
8283
"headers": self.headers,
8384
"query": self.query,
8485
"cookies": self.cookies,
@@ -115,6 +116,7 @@ def set_body(self, body):
115116
def set_body_internal(self, body):
116117
"""Sets the body and checks if it's possibly JSON"""
117118
self.body = body
119+
self.body_raw = None
118120
if isinstance(self.body, (str, bytes)) and len(body) == 0:
119121
# Make sure that empty bodies like b"" don't get sent.
120122
self.body = None
@@ -125,6 +127,12 @@ def set_body_internal(self, body):
125127
try:
126128
parsed_body = json.loads(self.body)
127129
if parsed_body:
130+
# Save the raw decoded string so injection detection still works
131+
# against code that reads the body as raw bytes/string. json.loads
132+
# decodes unicode escapes (e.g. # -> #), creating a mismatch
133+
# between self.body and what reaches the sink if the application
134+
# reads request.data directly instead of request.json.
135+
self.body_raw = self.body.decode("utf-8", errors="replace")
128136
self.body = parsed_body
129137
return
130138
except (JSONDecodeError, ValueError):
@@ -139,6 +147,7 @@ def set_body_internal(self, body):
139147
# Might be JSON, but might not have been parsed correctly by server because of wrong headers
140148
parsed_body = json.loads(self.body)
141149
if parsed_body:
150+
self.body_raw = self.body
142151
self.body = parsed_body
143152

144153
def get_route_metadata(self):

aikido_zen/context/init_test.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def test_wsgi_context_1():
6262
"url": "https://example.com/hello",
6363
"query": {"user": ["JohnDoe"], "age": ["30", "35"]},
6464
"body": 123,
65+
"body_raw": None,
6566
"route": "/hello",
6667
"subdomains": [],
6768
"user": None,
@@ -94,6 +95,7 @@ def test_wsgi_context_2():
9495
"url": "http://localhost:8080/hello",
9596
"query": {"user": ["JohnDoe"], "age": ["30", "35"]},
9697
"body": {"test": True},
98+
"body_raw": None,
9799
"route": "/hello",
98100
"subdomains": [],
99101
"user": None,
@@ -317,3 +319,34 @@ def test_set_bytes_json_with_surrogate_bytes():
317319
context = Context(req=basic_wsgi_req, body=body, source="flask")
318320
assert isinstance(context.body, dict)
319321
assert context.body.get("username") == {"$regex": ".*"}
322+
323+
324+
def test_body_raw_set_when_bytes_json_parsed():
325+
# Regression: AIKIDO-FVRDOX5M — json.loads decodes unicode escapes (e.g. # -> #)
326+
# so self.body has '#' but the application's raw read still has '#'.
327+
# body_raw must preserve the pre-decode string so detection finds the raw form.
328+
body = b'"\\u0023 payload"'
329+
context = Context(req=basic_wsgi_req, body=body, source="flask")
330+
assert context.body == "# payload"
331+
assert context.body_raw == '"\\u0023 payload"'
332+
333+
334+
def test_body_raw_set_when_string_json_parsed():
335+
# Same bypass via a string body (framework already decoded bytes before set_body).
336+
body = '"\\u0023 payload"'
337+
context = Context(req=basic_wsgi_req, body=body, source="flask")
338+
assert context.body == "# payload"
339+
assert context.body_raw == '"\\u0023 payload"'
340+
341+
342+
def test_body_raw_none_when_no_json_parsing():
343+
# When the body is not JSON-parsed, body_raw should remain None.
344+
context = Context(req=basic_wsgi_req, body=b"plain bytes", source="flask")
345+
assert context.body == "plain bytes"
346+
assert context.body_raw is None
347+
348+
349+
def test_body_raw_none_for_non_string_body():
350+
context = Context(req=basic_wsgi_req, body={"key": "value"}, source="flask")
351+
assert context.body == {"key": "value"}
352+
assert context.body_raw is None

aikido_zen/vulnerabilities/sql_injection/context_contains_sql_injection_test.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
22
from .context_contains_sql_injection import context_contains_sql_injection
3+
from aikido_zen.context import Context
4+
from aikido_zen.helpers.headers import Headers
35
import aikido_zen.test_utils as test_utils
46

57

@@ -30,3 +32,53 @@ def test_doesnt_crash_with_invalid_sql(invalid_input):
3032
dialect="mysql",
3133
)
3234
assert result == {}
35+
36+
37+
def _make_context_with_bytes_body(body_bytes):
38+
"""Create a minimal context whose body comes from raw bytes (as a real request would)."""
39+
ctx = Context.__new__(Context)
40+
ctx.cookies = {}
41+
ctx.headers = Headers()
42+
ctx.remote_address = "1.2.3.4"
43+
ctx.method = "POST"
44+
ctx.url = "http://localhost:5000/user"
45+
ctx.query = {}
46+
ctx.source = "flask"
47+
ctx.route = "/user"
48+
ctx.subdomains = []
49+
ctx.parsed_userinput = {}
50+
ctx.xml = {}
51+
ctx.outgoing_req_redirects = []
52+
ctx.user = None
53+
ctx.rate_limit_group = None
54+
ctx.executed_middleware = False
55+
ctx.protection_forced_off = False
56+
ctx.route_params = []
57+
ctx.set_body(body_bytes)
58+
return ctx
59+
60+
61+
def test_unicode_escape_sqli_bypass_via_bytes_body():
62+
# Regression: AIKIDO-FVRDOX5M — json.loads decodes # -> # so self.body has '#'
63+
# but the sink receives the raw decoded string with literal '#'. Without
64+
# body_raw the firewall checks '#' against a query that has '#' and misses it.
65+
raw_payload = b'"\\' + b"u0023 ' Union Select password From users -- x\""
66+
ctx = _make_context_with_bytes_body(raw_payload)
67+
68+
# body is the JSON-decoded form (with '#'); body_raw is the original decoded bytes string
69+
assert ctx.body.startswith("#")
70+
assert ctx.body_raw is not None
71+
assert "\\u0023" in ctx.body_raw
72+
73+
# The SQL query the application builds using request.data.decode() (raw bytes → string)
74+
user_id_raw = ctx.body_raw # what reaches the sink when app reads raw body
75+
sql = f"SELECT username FROM users WHERE id = '{user_id_raw}'"
76+
77+
result = context_contains_sql_injection(
78+
sql=sql,
79+
operation="pymysql.execute",
80+
context=ctx,
81+
dialect="mysql",
82+
)
83+
assert result != {}, "SQLi via unicode-escape bypass should be detected"
84+
assert result["source"] == "body_raw"

0 commit comments

Comments
 (0)