-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathtest_mcp_command.py
More file actions
315 lines (231 loc) · 9.99 KB
/
Copy pathtest_mcp_command.py
File metadata and controls
315 lines (231 loc) · 9.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import json
import os
import sys
from unittest.mock import AsyncMock, patch
import pytest
if sys.version_info < (3, 10):
pytest.skip('MCP requires Python 3.10+', allow_module_level=True)
from cycode.cli.apps.mcp.mcp_command import (
_sanitize_file_path,
_TempFilesManager,
)
pytestmark = pytest.mark.anyio
@pytest.fixture
def anyio_backend() -> str:
return 'asyncio'
# --- _sanitize_file_path input validation ---
def test_sanitize_file_path_rejects_empty_string() -> None:
with pytest.raises(ValueError, match='non-empty string'):
_sanitize_file_path('')
def test_sanitize_file_path_rejects_none() -> None:
with pytest.raises(ValueError, match='non-empty string'):
_sanitize_file_path(None)
def test_sanitize_file_path_rejects_non_string() -> None:
with pytest.raises(ValueError, match='non-empty string'):
_sanitize_file_path(123)
def test_sanitize_file_path_strips_null_bytes() -> None:
result = _sanitize_file_path('foo/bar\x00baz.py')
assert '\x00' not in result
def test_sanitize_file_path_passes_valid_path_through() -> None:
result = _sanitize_file_path('src/main.py')
assert os.path.normpath(result) == os.path.normpath('src/main.py')
# --- _TempFilesManager: path traversal prevention ---
#
# _sanitize_file_path delegates to pathvalidate which does NOT block
# path traversal (../ passes through). The real security boundary is
# the normpath containment check in _TempFilesManager.__enter__ (lines 136-139).
# These tests verify that the two layers together prevent escaping the temp dir.
def test_traversal_simple_dotdot_rejected() -> None:
"""../../../etc/passwd must not escape the temp directory."""
files = {
'../../../etc/passwd': 'malicious',
'safe.py': 'ok',
}
with _TempFilesManager(files, 'test-traversal') as temp_files:
assert len(temp_files) == 1
assert temp_files[0].endswith('safe.py')
for tf in temp_files:
assert '/etc/passwd' not in tf
def test_traversal_backslash_dotdot_rejected() -> None:
"""..\\..\\windows\\system32 must not escape the temp directory."""
files = {
'..\\..\\windows\\system32\\config': 'malicious',
'safe.py': 'ok',
}
with _TempFilesManager(files, 'test-backslash') as temp_files:
assert len(temp_files) == 1
assert temp_files[0].endswith('safe.py')
def test_traversal_embedded_dotdot_rejected() -> None:
"""foo/../../../etc/passwd resolves outside temp dir and must be rejected."""
files = {
'foo/../../../etc/passwd': 'malicious',
'safe.py': 'ok',
}
with _TempFilesManager(files, 'test-embedded') as temp_files:
assert len(temp_files) == 1
assert temp_files[0].endswith('safe.py')
def test_traversal_absolute_path_rejected() -> None:
"""Absolute paths must not be written outside the temp directory."""
files = {
'/etc/passwd': 'malicious',
'safe.py': 'ok',
}
with _TempFilesManager(files, 'test-absolute') as temp_files:
assert len(temp_files) == 1
assert temp_files[0].endswith('safe.py')
def test_traversal_dotdot_only_rejected() -> None:
"""A bare '..' path must be rejected."""
files = {
'..': 'malicious',
'safe.py': 'ok',
}
with _TempFilesManager(files, 'test-bare-dotdot') as temp_files:
assert len(temp_files) == 1
def test_traversal_all_malicious_raises() -> None:
"""If every file path is a traversal attempt, no files are created and ValueError is raised."""
files = {
'../../../etc/passwd': 'malicious',
'../../shadow': 'also malicious',
}
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-all-malicious'):
pass
def test_all_created_files_are_inside_temp_dir() -> None:
"""Every created file must be under the temp base directory."""
files = {
'a.py': 'aaa',
'sub/b.py': 'bbb',
'sub/deep/c.py': 'ccc',
}
manager = _TempFilesManager(files, 'test-containment')
with manager as temp_files:
base = os.path.normcase(os.path.normpath(manager.temp_base_dir))
for tf in temp_files:
normalized = os.path.normcase(os.path.normpath(tf))
assert normalized.startswith(base + os.sep), f'{tf} escaped temp dir {base}'
def test_mixed_valid_and_traversal_only_creates_valid() -> None:
"""Valid files are created, traversal attempts are silently skipped."""
files = {
'../escape.py': 'bad',
'legit.py': 'good',
'foo/../../escape2.py': 'bad',
'src/app.py': 'good',
}
manager = _TempFilesManager(files, 'test-mixed')
with manager as temp_files:
base = os.path.normcase(os.path.normpath(manager.temp_base_dir))
assert len(temp_files) == 2
for tf in temp_files:
assert os.path.normcase(os.path.normpath(tf)).startswith(base + os.sep)
basenames = [os.path.basename(tf) for tf in temp_files]
assert 'legit.py' in basenames
assert 'app.py' in basenames
# --- _TempFilesManager: general functionality ---
def test_temp_files_manager_creates_files() -> None:
files = {
'test1.py': 'print("hello")',
'subdir/test2.js': 'console.log("world")',
}
with _TempFilesManager(files, 'test-call-id') as temp_files:
assert len(temp_files) == 2
for tf in temp_files:
assert os.path.exists(tf)
def test_temp_files_manager_writes_correct_content() -> None:
files = {'hello.py': 'print("hello world")'}
with _TempFilesManager(files, 'test-content') as temp_files, open(temp_files[0]) as f:
assert f.read() == 'print("hello world")'
def test_temp_files_manager_cleans_up_on_exit() -> None:
files = {'cleanup.py': 'code'}
manager = _TempFilesManager(files, 'test-cleanup')
with manager as temp_files:
temp_dir = manager.temp_base_dir
assert os.path.exists(temp_dir)
assert len(temp_files) == 1
assert not os.path.exists(temp_dir)
def test_temp_files_manager_empty_path_raises() -> None:
files = {'': 'empty path'}
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-empty-path'):
pass
def test_temp_files_manager_preserves_subdirectory_structure() -> None:
files = {
'src/main.py': 'main',
'src/utils/helper.py': 'helper',
}
with _TempFilesManager(files, 'test-dirs') as temp_files:
assert len(temp_files) == 2
paths = [os.path.basename(tf) for tf in temp_files]
assert 'main.py' in paths
assert 'helper.py' in paths
# --- _run_cycode_command (async) ---
@pytest.mark.anyio
async def test_run_cycode_command_returns_dict() -> None:
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
mock_process = AsyncMock()
mock_process.communicate.return_value = (b'', b'error output')
mock_process.returncode = 1
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
result = await _run_cycode_command('--invalid-flag-for-test')
assert isinstance(result, dict)
assert 'error' in result
@pytest.mark.anyio
async def test_run_cycode_command_parses_json_output() -> None:
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
mock_process = AsyncMock()
mock_process.communicate.return_value = (b'{"status": "ok"}', b'')
mock_process.returncode = 0
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
result = await _run_cycode_command('version')
assert result == {'status': 'ok'}
@pytest.mark.anyio
async def test_run_cycode_command_handles_invalid_json() -> None:
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
mock_process = AsyncMock()
mock_process.communicate.return_value = (b'not json{', b'')
mock_process.returncode = 0
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
result = await _run_cycode_command('version')
assert result['error'] == 'Failed to parse JSON output'
@pytest.mark.anyio
async def test_run_cycode_command_timeout() -> None:
import asyncio
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
async def slow_communicate() -> tuple[bytes, bytes]:
await asyncio.sleep(10)
return b'', b''
mock_process = AsyncMock()
mock_process.communicate = slow_communicate
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
result = await _run_cycode_command('status', timeout=0.001)
assert isinstance(result, dict)
assert 'error' in result
assert 'timeout' in result['error'].lower()
# --- _cycode_scan_tool ---
@pytest.mark.anyio
async def test_cycode_scan_tool_no_files() -> None:
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
from cycode.cli.cli_types import ScanTypeOption
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {})
parsed = json.loads(result)
assert 'error' in parsed
assert 'No files provided' in parsed['error']
@pytest.mark.anyio
async def test_cycode_scan_tool_invalid_files() -> None:
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
from cycode.cli.cli_types import ScanTypeOption
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {'': 'content'})
parsed = json.loads(result)
assert 'error' in parsed
# --- _create_mcp_server ---
def test_create_mcp_server() -> None:
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
server = _create_mcp_server('127.0.0.1', 8000)
assert server is not None
assert server.name == 'cycode'
def test_create_mcp_server_registers_tools() -> None:
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
server = _create_mcp_server('127.0.0.1', 8000)
tool_names = [t.name for t in server._tool_manager._tools.values()]
assert 'cycode_status' in tool_names
assert 'cycode_secret_scan' in tool_names
assert 'cycode_sca_scan' in tool_names
assert 'cycode_iac_scan' in tool_names
assert 'cycode_sast_scan' in tool_names