|
12 | 12 | # language governing permissions and limitations under the License. |
13 | 13 | from __future__ import absolute_import |
14 | 14 |
|
| 15 | +import subprocess |
| 16 | + |
15 | 17 | import pytest |
16 | 18 |
|
17 | 19 | from sagemaker.core import git_utils |
@@ -308,6 +310,129 @@ def test_git_clone_repo_blocks_url_encoded_attack(self): |
308 | 310 | git_utils.git_clone_repo(malicious_git_config, entry_point) |
309 | 311 | assert "Suspicious URL encoding detected" in str(error.value) |
310 | 312 |
|
| 313 | +class TestCredentialRedaction: |
| 314 | + """Test cases for credential redaction in clone error handling.""" |
| 315 | + |
| 316 | + def test_redact_token_from_url(self): |
| 317 | + """Test that a token embedded in an HTTPS URL is redacted.""" |
| 318 | + url = "https://ghp_SuperSecretToken123@github.com/user/repo.git" |
| 319 | + result = git_utils._redact_credentials_from_url(url) |
| 320 | + assert "ghp_SuperSecretToken123" not in result |
| 321 | + assert result == "https://<credentials-redacted>@github.com/user/repo.git" |
| 322 | + |
| 323 | + def test_redact_username_password_from_url(self): |
| 324 | + """Test that username:password embedded in an HTTPS URL is redacted.""" |
| 325 | + url = "https://myuser:mypassword@github.com/user/repo.git" |
| 326 | + result = git_utils._redact_credentials_from_url(url) |
| 327 | + assert "myuser" not in result |
| 328 | + assert "mypassword" not in result |
| 329 | + assert result == "https://<credentials-redacted>@github.com/user/repo.git" |
| 330 | + |
| 331 | + def test_redact_url_encoded_password(self): |
| 332 | + """Test that URL-encoded credentials are redacted.""" |
| 333 | + url = "https://user:p%40ss%20word@git-codecommit.us-east-1.amazonaws.com/v1/repos/myrepo" |
| 334 | + result = git_utils._redact_credentials_from_url(url) |
| 335 | + assert "p%40ss%20word" not in result |
| 336 | + assert "<credentials-redacted>" in result |
| 337 | + |
| 338 | + def test_no_redaction_without_credentials(self): |
| 339 | + """Test that URLs without credentials are unchanged.""" |
| 340 | + url = "https://github.com/user/repo.git" |
| 341 | + result = git_utils._redact_credentials_from_url(url) |
| 342 | + assert result == url |
| 343 | + |
| 344 | + def test_no_redaction_for_ssh_url(self): |
| 345 | + """Test that SSH URLs are not affected by redaction.""" |
| 346 | + url = "git@github.com:user/repo.git" |
| 347 | + result = git_utils._redact_credentials_from_url(url) |
| 348 | + assert result == url |
| 349 | + |
| 350 | + @pytest.fixture |
| 351 | + def mock_env(self, monkeypatch): |
| 352 | + """Set minimal env for subprocess calls.""" |
| 353 | + monkeypatch.setenv("PATH", "/usr/bin:/bin") |
| 354 | + |
| 355 | + def test_clone_failure_redacts_token(self, mock_env): |
| 356 | + """Test that CalledProcessError from a failed clone does not contain the token.""" |
| 357 | + from unittest.mock import patch |
| 358 | + |
| 359 | + token_url = "https://ghp_secret123@github.com/user/repo.git" |
| 360 | + with patch( |
| 361 | + "subprocess.check_call", |
| 362 | + side_effect=subprocess.CalledProcessError( |
| 363 | + 128, ["git", "clone", token_url, "/tmp/dest"] |
| 364 | + ), |
| 365 | + ): |
| 366 | + with pytest.raises(subprocess.CalledProcessError) as exc_info: |
| 367 | + git_utils._run_clone_command(token_url, "/tmp/dest") |
| 368 | + |
| 369 | + # The token must NOT appear anywhere in the re-raised exception |
| 370 | + assert "ghp_secret123" not in str(exc_info.value) |
| 371 | + assert "ghp_secret123" not in str(exc_info.value.cmd) |
| 372 | + assert "<credentials-redacted>" in str(exc_info.value.cmd) |
| 373 | + |
| 374 | + def test_clone_failure_redacts_username_password(self, mock_env): |
| 375 | + """Test that CalledProcessError from a failed clone does not contain username/password.""" |
| 376 | + from unittest.mock import patch |
| 377 | + |
| 378 | + cred_url = "https://admin:hunter2@github.com/org/repo.git" |
| 379 | + with patch( |
| 380 | + "subprocess.check_call", |
| 381 | + side_effect=subprocess.CalledProcessError( |
| 382 | + 128, ["git", "clone", cred_url, "/tmp/dest"] |
| 383 | + ), |
| 384 | + ): |
| 385 | + with pytest.raises(subprocess.CalledProcessError) as exc_info: |
| 386 | + git_utils._run_clone_command(cred_url, "/tmp/dest") |
| 387 | + |
| 388 | + assert "admin" not in str(exc_info.value.cmd) |
| 389 | + assert "hunter2" not in str(exc_info.value.cmd) |
| 390 | + assert "<credentials-redacted>" in str(exc_info.value.cmd) |
| 391 | + |
| 392 | + def test_clone_failure_redacts_codecommit_credentials(self, mock_env): |
| 393 | + """Test that CodeCommit HTTPS credentials are redacted on failure.""" |
| 394 | + from unittest.mock import patch |
| 395 | + |
| 396 | + cc_url = "https://user:pass@git-codecommit.us-east-1.amazonaws.com/v1/repos/myrepo" |
| 397 | + with patch( |
| 398 | + "subprocess.check_call", |
| 399 | + side_effect=subprocess.CalledProcessError( |
| 400 | + 128, ["git", "clone", cc_url, "/tmp/dest"] |
| 401 | + ), |
| 402 | + ): |
| 403 | + with pytest.raises(subprocess.CalledProcessError) as exc_info: |
| 404 | + git_utils._run_clone_command(cc_url, "/tmp/dest") |
| 405 | + |
| 406 | + assert "user:pass" not in str(exc_info.value.cmd) |
| 407 | + assert "<credentials-redacted>" in str(exc_info.value.cmd) |
| 408 | + |
| 409 | + def test_clone_failure_suppresses_exception_chain(self, mock_env): |
| 410 | + """Test that the original exception chain is suppressed (from None).""" |
| 411 | + from unittest.mock import patch |
| 412 | + |
| 413 | + token_url = "https://ghp_secret@github.com/user/repo.git" |
| 414 | + with patch( |
| 415 | + "subprocess.check_call", |
| 416 | + side_effect=subprocess.CalledProcessError( |
| 417 | + 128, ["git", "clone", token_url, "/tmp/dest"] |
| 418 | + ), |
| 419 | + ): |
| 420 | + with pytest.raises(subprocess.CalledProcessError) as exc_info: |
| 421 | + git_utils._run_clone_command(token_url, "/tmp/dest") |
| 422 | + |
| 423 | + # __cause__ should be None due to 'from None' |
| 424 | + assert exc_info.value.__cause__ is None |
| 425 | + |
| 426 | + def test_clone_success_no_exception(self, mock_env): |
| 427 | + """Test that successful clone does not raise.""" |
| 428 | + from unittest.mock import patch |
| 429 | + |
| 430 | + url = "https://ghp_token@github.com/user/repo.git" |
| 431 | + with patch("subprocess.check_call"): |
| 432 | + # Should not raise |
| 433 | + git_utils._run_clone_command(url, "/tmp/dest") |
| 434 | + |
| 435 | + |
311 | 436 | def test_sanitize_git_url_comprehensive_attack_scenarios(self): |
312 | 437 | attack_scenarios = [ |
313 | 438 | "https://USER@YOUR_NGROK_OR_LOCALHOST/malicious.git@github.com%25legit%25repo.git", |
|
0 commit comments