Skip to content

Commit 7d1ec22

Browse files
committed
Chore: Add unit and integ tests for already upgraded search functionality in jumpstart code
1 parent b839617 commit 7d1ec22

2 files changed

Lines changed: 95 additions & 0 deletions

File tree

sagemaker-core/tests/integ/jumpstart/test_search_integ.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,19 @@ def test_search_public_hub_models_all_args():
6666

6767
assert isinstance(results, list)
6868
assert all(isinstance(m, HubContent) for m in results)
69+
70+
71+
@pytest.mark.serial
72+
@pytest.mark.integ
73+
def test_search_public_hub_models_safe_from_injection():
74+
"""Integration test to verify malicious queries don't execute code."""
75+
# This would have executed code with the old eval() implementation
76+
malicious_query = "__import__('os').system('echo test')"
77+
78+
# Should safely return empty results without executing code
79+
results = search_public_hub_models(malicious_query)
80+
81+
# Verify it returns a list (even if empty) and doesn't crash
82+
assert isinstance(results, list)
83+
# Should not match any models since it's not a valid filter expression
84+
assert len(results) == 0

sagemaker-core/tests/unit/jumpstart/test_search_unit.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,85 @@ def test_filter_match(query, keywords, expected):
4747
assert f.match(keywords) == expected
4848

4949

50+
@pytest.mark.parametrize(
51+
"malicious_query,keywords",
52+
[
53+
# Code injection attempts that would work with eval()
54+
("__import__('os').system('echo pwned')", ["test"]),
55+
("exec('import os; os.system(\"ls\")')", ["test"]),
56+
("eval('1+1')", ["test"]),
57+
("__builtins__.__import__('os').system('ls')", ["test"]),
58+
# Attribute access attempts
59+
("keywords.__class__.__bases__[0].__subclasses__()", ["test"]),
60+
# Lambda injection
61+
("(lambda: __import__('os').system('ls'))()", ["test"]),
62+
# Dict/list comprehension injection
63+
("[x for x in ().__class__.__bases__[0].__subclasses__()]", ["test"]),
64+
# Function call injection
65+
("open('/etc/passwd').read()", ["test"]),
66+
# Module access
67+
("sys.exit()", ["test"]),
68+
("os.system('ls')", ["test"]),
69+
],
70+
)
71+
def test_filter_blocks_code_injection(malicious_query, keywords):
72+
"""Test that malicious code injection attempts are safely handled."""
73+
f = _Filter(malicious_query)
74+
# Should not execute code, just return False for non-matching patterns
75+
result = f.match(keywords)
76+
assert isinstance(result, bool)
77+
# The filter should safely fail to match rather than execute code
78+
assert result is False
79+
80+
81+
@pytest.mark.parametrize(
82+
"injection_query",
83+
[
84+
# Various eval-based injection patterns
85+
"'; __import__('os').system('ls'); '",
86+
"\"; exec('import os'); \"",
87+
"') or __import__('os').system('ls') or ('",
88+
# Nested injection attempts
89+
"test AND (__import__('os').system('ls'))",
90+
"NOT (__import__('subprocess').call(['ls']))",
91+
# String escape attempts
92+
"test' + str(__import__('os').system('ls')) + '",
93+
],
94+
)
95+
def test_filter_injection_variants(injection_query):
96+
"""Test various code injection patterns are blocked."""
97+
f = _Filter(injection_query)
98+
result = f.match(["test", "keyword"])
99+
assert isinstance(result, bool)
100+
# Should not raise exceptions or execute code
101+
assert result in [True, False]
102+
103+
104+
def test_filter_no_eval_execution():
105+
"""Verify that expressions are parsed safely without eval()."""
106+
# This would execute code if eval() was used
107+
dangerous_expr = "__import__('sys').exit(1)"
108+
f = _Filter(dangerous_expr)
109+
110+
# Should not crash the program or execute the exit
111+
result = f.match(["test"])
112+
assert result is False
113+
114+
115+
def test_filter_safe_ast_parsing():
116+
"""Test that the filter uses AST parsing instead of eval()."""
117+
f = _Filter("test AND keyword")
118+
119+
# Verify AST is created
120+
assert f._ast is None # Not parsed yet
121+
f.match(["test", "keyword"])
122+
assert f._ast is not None # AST created after first match
123+
124+
# Verify it's an AST node, not a string for eval
125+
from sagemaker.core.jumpstart.search import _ExpressionNode
126+
assert isinstance(f._ast, _ExpressionNode)
127+
128+
50129
def test_search_public_hub_models():
51130
mock_models = [
52131
HubContent(

0 commit comments

Comments
 (0)