diff --git a/aikido_zen/sources/functions/request_handler.py b/aikido_zen/sources/functions/request_handler.py index 33d37ea09..c94accb19 100644 --- a/aikido_zen/sources/functions/request_handler.py +++ b/aikido_zen/sources/functions/request_handler.py @@ -106,7 +106,7 @@ def post_response(status_code): if cache.is_bypassed_ip(context.remote_address): return - attack_wave = attack_wave_detector_store.is_attack_wave(context) + attack_wave = attack_wave_detector_store.is_attack_wave(context, status_code) if attack_wave: cache.stats.on_detected_attack_wave(blocked=False) diff --git a/aikido_zen/storage/attack_wave_detector_store.py b/aikido_zen/storage/attack_wave_detector_store.py index e256f53d0..cee82278d 100644 --- a/aikido_zen/storage/attack_wave_detector_store.py +++ b/aikido_zen/storage/attack_wave_detector_store.py @@ -10,9 +10,9 @@ def __init__(self): self._detector = AttackWaveDetector() self._lock = threading.RLock() # Reentrant lock for thread safety - def is_attack_wave(self, context: Context) -> bool: + def is_attack_wave(self, context: Context, status_code: int) -> bool: with self._lock: - return self._detector.is_attack_wave(context) + return self._detector.is_attack_wave(context, status_code) def get_samples_for_ip(self, ip: str): with self._lock: diff --git a/aikido_zen/storage/attack_wave_detector_store_test.py b/aikido_zen/storage/attack_wave_detector_store_test.py index 095f3962e..4fc274567 100644 --- a/aikido_zen/storage/attack_wave_detector_store_test.py +++ b/aikido_zen/storage/attack_wave_detector_store_test.py @@ -37,16 +37,16 @@ def test_is_attack_wave_basic_functionality(): return_value=True, ): # Should return False for first few calls - assert not store.is_attack_wave(context) - assert not store.is_attack_wave(context) + assert not store.is_attack_wave(context, 404) + assert not store.is_attack_wave(context, 404) # Call 12 more times to get to 14 total (still below threshold) for _ in range(12): - result = store.is_attack_wave(context) + result = store.is_attack_wave(context, 404) assert not result # The 15th call should trigger attack wave detection and return True - assert store.is_attack_wave(context) + assert store.is_attack_wave(context, 404) def test_is_attack_wave_different_ips(): @@ -62,25 +62,25 @@ def test_is_attack_wave_different_ips(): ): # Call multiple times for different IPs for _ in range(10): - store.is_attack_wave(context1) - store.is_attack_wave(context2) + store.is_attack_wave(context1, 404) + store.is_attack_wave(context2, 404) # Neither should trigger attack wave yet - assert not store.is_attack_wave(context1) - assert not store.is_attack_wave(context2) + assert not store.is_attack_wave(context1, 404) + assert not store.is_attack_wave(context2, 404) def test_is_attack_wave_none_context(): """Test handling of None context""" store = AttackWaveDetectorStore() - assert not store.is_attack_wave(None) + assert not store.is_attack_wave(None, 404) def test_is_attack_wave_no_ip_in_context(): """Test handling of context with no IP address""" store = AttackWaveDetectorStore() context = test_utils.generate_context(ip=None) - assert not store.is_attack_wave(context) + assert not store.is_attack_wave(context, 404) def test_thread_safety_multiple_threads(): @@ -94,7 +94,7 @@ def worker(ip_suffix, result_list): """Worker function that calls is_attack_wave multiple times""" context = test_utils.generate_context(ip=f"192.168.1.{ip_suffix}") for _ in range(5): - result = store.is_attack_wave(context) + result = store.is_attack_wave(context, 404) result_list.append((context.remote_address, result)) time.sleep(0.001) # Small delay to simulate real usage @@ -127,7 +127,7 @@ def worker(result_list): """Worker function that calls is_attack_wave for the same IP""" context = test_utils.generate_context(ip="10.0.0.1") for _ in range(10): - result = store.is_attack_wave(context) + result = store.is_attack_wave(context, 404) with lock: result_list.append(result) time.sleep(0.001) @@ -161,13 +161,13 @@ def test_attack_wave_cooldown(): ): # Call 14 times to get close to threshold for _ in range(14): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # The 15th call should trigger attack wave detection and return True - assert store.is_attack_wave(context) + assert store.is_attack_wave(context, 404) # Subsequent calls should return False due to cooldown - assert not store.is_attack_wave(context) + assert not store.is_attack_wave(context, 404) def test_attack_wave_time_frame(): @@ -182,10 +182,10 @@ def test_attack_wave_time_frame(): ): # Make some calls for _ in range(5): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # Should not trigger attack wave yet - assert not store.is_attack_wave(context) + assert not store.is_attack_wave(context, 404) # Wait for the time frame to expire (60 seconds) # We can't actually wait 60 seconds in a test, but we can verify the behavior @@ -232,7 +232,7 @@ def worker(worker_id): try: for i in range(10): context = test_utils.generate_context(ip=f"192.168.{worker_id}.{i}") - result = store.is_attack_wave(context) + result = store.is_attack_wave(context, 404) results.append((worker_id, context.remote_address, result)) except Exception as e: results.append((worker_id, "error", str(e))) @@ -266,7 +266,7 @@ def test_samples_tracking_in_store(): ): # Make a few requests for i in range(3): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # Check that samples are being tracked (should have 1 unique sample) samples = store.get_samples_for_ip(context.remote_address) @@ -295,7 +295,7 @@ def test_samples_structure_and_content(): ): # Make enough requests to trigger attack wave for i in range(15): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # Get samples samples = store.get_samples_for_ip(context.remote_address) @@ -323,7 +323,7 @@ def test_samples_json_serialization(): ): # Make enough requests to trigger attack wave for i in range(15): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # Get samples samples = store.get_samples_for_ip(context.remote_address) @@ -356,8 +356,8 @@ def test_samples_with_different_contexts(): ): # Make requests for both contexts for i in range(15): - store.is_attack_wave(context1) - store.is_attack_wave(context2) + store.is_attack_wave(context1, 404) + store.is_attack_wave(context2, 404) # Get samples for each IP samples1 = store.get_samples_for_ip(context1.remote_address) @@ -424,7 +424,7 @@ def create_context_with_url(ip, url, method="GET"): # Make enough requests to trigger attack wave for j in range(15): - store.is_attack_wave(context) + store.is_attack_wave(context, 404) # Check a few IPs to verify sample structure for i in range(5): @@ -446,7 +446,7 @@ def test_mock_detector_integration(mock_detector_class): store = AttackWaveDetectorStore() context = test_utils.generate_context() - # Should use the mocked detector - result = store.is_attack_wave(context) + # Should use the mocked detector (default status_code=404) + result = store.is_attack_wave(context, 404) assert result is True - mock_detector.is_attack_wave.assert_called_once_with(context) + mock_detector.is_attack_wave.assert_called_once_with(context, 404) diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py index 5897c6cb9..9fd296f99 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py @@ -34,7 +34,7 @@ def __init__( time_to_live_in_ms=self.attack_wave_time_frame, ) - def is_attack_wave(self, context: Context) -> bool: + def is_attack_wave(self, context: Context, status_code: int) -> bool: """ Function gets called with context to check if there is an attack wave request. """ @@ -45,7 +45,7 @@ def is_attack_wave(self, context: Context) -> bool: if self.sent_events_map.get(ip) is not None: return False - if not is_web_scanner(context): + if not is_web_scanner(context, status_code): return False # Increment suspicious requests count -> there is a new or first suspicious request diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector_test.py index 3c192ce9b..efd9c85f5 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector_test.py @@ -20,13 +20,13 @@ def mock_get_unixtime_ms(monotonic=True, mock_time=0): def test_no_context(): detector = new_attack_wave_detector() - assert not detector.is_attack_wave(None) + assert not detector.is_attack_wave(None, 404) def test_no_ip_address_in_context(): detector = new_attack_wave_detector() context = test_utils.generate_context(ip=None) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) def test_a_web_scanner(): @@ -38,15 +38,15 @@ def test_a_web_scanner(): "aikido_zen.vulnerabilities.attack_wave_detection.attack_wave_detector.is_web_scanner", return_value=True, ): - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) # Is true because the threshold is 6 - assert detector.is_attack_wave(context) + assert detector.is_attack_wave(context, 404) # False again because event should have been sent last time - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) def test_non_web_scanner(): @@ -60,7 +60,7 @@ def test_non_web_scanner(): ): # Should return False even after multiple calls because it's not a web scanner for _ in range(10): - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) def test_a_web_scanner_with_delays(): @@ -74,10 +74,10 @@ def test_a_web_scanner_with_delays(): return_value=True, ): detector = new_attack_wave_detector() - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -86,11 +86,11 @@ def test_a_web_scanner_with_delays(): "aikido_zen.vulnerabilities.attack_wave_detection.attack_wave_detector.is_web_scanner", return_value=True, ): - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) # Is true because the threshold is 6 - assert detector.is_attack_wave(context) + assert detector.is_attack_wave(context, 404) # False again because event should have been sent last time - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -100,12 +100,12 @@ def test_a_web_scanner_with_delays(): return_value=True, ): # Still false because minimum time between events is 1 hour - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -115,12 +115,12 @@ def test_a_web_scanner_with_delays(): return_value=True, ): # Should resend event after 1 hour - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert detector.is_attack_wave(context, 404) def test_unique_samples_only(): @@ -138,12 +138,12 @@ def test_unique_samples_only(): ): # Make enough requests to trigger attack wave (threshold is 6) for i in range(6): - detector.is_attack_wave(context1) + detector.is_attack_wave(context1, 404) # Make a few more identical requests with different context objects for i in range(3): - detector.is_attack_wave(context2) - detector.is_attack_wave(context3) + detector.is_attack_wave(context2, 404) + detector.is_attack_wave(context3, 404) # Should have only 1 unique sample despite 9 identical requests samples = detector.get_samples_for_ip(context1.remote_address) @@ -167,9 +167,9 @@ def test_unique_samples_with_different_methods(): ): # Make enough requests to trigger attack wave for each method for i in range(2): # 2 requests per method = 6 total - detector.is_attack_wave(context_get) - detector.is_attack_wave(context_post) - detector.is_attack_wave(context_put) + detector.is_attack_wave(context_get, 404) + detector.is_attack_wave(context_post, 404) + detector.is_attack_wave(context_put, 404) # Should have 3 unique samples (one for each method) samples = detector.get_samples_for_ip(context_get.remote_address) @@ -230,7 +230,7 @@ def create_context_with_url(url, method="GET"): for context in unique_contexts: # Need to make 6 requests per context to trigger attack wave for i in range(6): - detector.is_attack_wave(context) + detector.is_attack_wave(context, 404) # Check samples for the first IP - should have 1 sample samples = detector.get_samples_for_ip(unique_contexts[0].remote_address) @@ -256,7 +256,7 @@ def test_samples_structure(): ): # Make enough requests to trigger attack wave for i in range(6): - detector.is_attack_wave(context) + detector.is_attack_wave(context, 404) # Get the samples samples = detector.get_samples_for_ip(context.remote_address) @@ -316,17 +316,17 @@ def create_context_with_url(url, method="GET"): context_htaccess.remote_address = "1.1.1.3" for i in range(6): - detector.is_attack_wave(context_env) + detector.is_attack_wave(context_env, 404) for i in range(6): - detector.is_attack_wave(context_git) + detector.is_attack_wave(context_git, 404) for i in range(6): - detector.is_attack_wave(context_htaccess) + detector.is_attack_wave(context_htaccess, 404) # Add many duplicate requests for the first context (same IP) for i in range(10): - detector.is_attack_wave(context_env) + detector.is_attack_wave(context_env, 404) # Should have 1 unique sample for the first IP samples = detector.get_samples_for_ip(context_env.remote_address) @@ -352,7 +352,7 @@ def test_samples_tracking(): ): # Make a few requests for i in range(3): - detector.is_attack_wave(context) + detector.is_attack_wave(context, 404) # Check that samples are being tracked (should have only 1 unique sample) samples = detector.get_samples_for_ip(context.remote_address) @@ -362,7 +362,7 @@ def test_samples_tracking(): # Make more requests to exceed the sample limit for i in range(10): - detector.is_attack_wave(context) + detector.is_attack_wave(context, 404) # Should still have only the most recent samples (limited to 10) samples = detector.get_samples_for_ip(context.remote_address) @@ -380,7 +380,7 @@ def test_clear_samples(): ): # Make some requests for i in range(5): - detector.is_attack_wave(context) + detector.is_attack_wave(context, 404) # Verify samples exist (should have only 1 unique sample) samples = detector.get_samples_for_ip(context.remote_address) @@ -405,10 +405,10 @@ def test_a_slow_web_scanner_that_triggers_in_the_second_interval(): return_value=True, ): detector = new_attack_wave_detector() - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -417,12 +417,12 @@ def test_a_slow_web_scanner_that_triggers_in_the_second_interval(): "aikido_zen.vulnerabilities.attack_wave_detection.attack_wave_detector.is_web_scanner", return_value=True, ): - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert detector.is_attack_wave(context, 404) def test_a_slow_web_scanner_that_triggers_in_the_third_interval(): @@ -436,10 +436,10 @@ def test_a_slow_web_scanner_that_triggers_in_the_third_interval(): return_value=True, ): detector = new_attack_wave_detector() - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -449,10 +449,10 @@ def test_a_slow_web_scanner_that_triggers_in_the_third_interval(): return_value=True, ): # Still false because minimum time between events is 1 hour - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) with patch( "aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms", @@ -462,9 +462,9 @@ def test_a_slow_web_scanner_that_triggers_in_the_third_interval(): return_value=True, ): # Should resend event after 1 hour - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert not detector.is_attack_wave(context) - assert detector.is_attack_wave(context) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert not detector.is_attack_wave(context, 404) + assert detector.is_attack_wave(context, 404) diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py index eacf23454..3794cfbe6 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py @@ -16,15 +16,31 @@ "sqlitedb", "sqlite3db", } + +# Extensions that belong to other platforms (e.g. PHP, Java). +# A 200 response may mean the Python app is proxying to that backend, +# so we only count these as scan hits when the response is 404. +foreign_extensions = { + "php", + "php3", + "php4", + "php5", + "phtml", + "java", + "jsp", + "jspx", +} + filenames = {name.lower() for name in file_names} directories = {name.lower() for name in directory_names} -def is_web_scan_path(path: str) -> bool: +def is_web_scan_path(path: str, status_code: int = 404) -> bool: """ is_web_scan_path gets the current route and wants to determine whether it's a test by some web scanner. Checks filename if it exists (list of suspicious filenames & list of supsicious extensions) Checks all other segments for suspicious directories + Foreign-platform extensions (php, jsp, etc.) are only counted when status_code is 404. """ normalized = path.lower() segments = normalized.split("/") @@ -40,6 +56,8 @@ def is_web_scan_path(path: str) -> bool: ext = filename.split(".")[-1] if ext in file_extensions: return True + if ext in foreign_extensions and status_code == 404: + return True for directory in segments: if directory in directories: diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py index ee9d13fa1..2b7acbe1d 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py @@ -8,30 +8,41 @@ def test_is_web_scan_path(): - assert is_web_scan_path("/.env") - assert is_web_scan_path("/test/.env") - assert is_web_scan_path("/test/.env.bak") - assert is_web_scan_path("/.git/config") - assert is_web_scan_path("/.aws/config") - assert is_web_scan_path("/some/path/.git/test") - assert is_web_scan_path("/some/path/.gitlab-ci.yml") - assert is_web_scan_path("/some/path/.github/workflows/test.yml") - assert is_web_scan_path("/.travis.yml") - assert is_web_scan_path("/../example/") - assert is_web_scan_path("/./test") - assert is_web_scan_path("/Cargo.lock") - assert is_web_scan_path("/System32/test") + assert is_web_scan_path("/.env", 404) + assert is_web_scan_path("/test/.env", 404) + assert is_web_scan_path("/test/.env.bak", 404) + assert is_web_scan_path("/.git/config", 404) + assert is_web_scan_path("/.aws/config", 404) + assert is_web_scan_path("/some/path/.git/test", 404) + assert is_web_scan_path("/some/path/.gitlab-ci.yml", 404) + assert is_web_scan_path("/some/path/.github/workflows/test.yml", 404) + assert is_web_scan_path("/.travis.yml", 404) + assert is_web_scan_path("/../example/", 404) + assert is_web_scan_path("/./test", 404) + assert is_web_scan_path("/Cargo.lock", 404) + assert is_web_scan_path("/System32/test", 404) def test_is_not_web_scan_path(): - assert not is_web_scan_path("/test/file.txt") - assert not is_web_scan_path("/some/route/to/file.txt") - assert not is_web_scan_path("/some/route/to/file.json") - assert not is_web_scan_path("/en") - assert not is_web_scan_path("/") - assert not is_web_scan_path("/test/route") - assert not is_web_scan_path("/static/file.css") - assert not is_web_scan_path("/static/file.a461f56e.js") + assert not is_web_scan_path("/test/file.txt", 404) + assert not is_web_scan_path("/some/route/to/file.txt", 404) + assert not is_web_scan_path("/some/route/to/file.json", 404) + assert not is_web_scan_path("/en", 404) + assert not is_web_scan_path("/", 404) + assert not is_web_scan_path("/test/route", 404) + assert not is_web_scan_path("/static/file.css", 404) + assert not is_web_scan_path("/static/file.a461f56e.js", 404) + + +def test_foreign_extensions_404(): + assert is_web_scan_path("/admin.php", 404) + assert is_web_scan_path("/app.jsp", 404) + + +def test_foreign_extensions_non_404(): + assert not is_web_scan_path("/admin.php", 200) + assert not is_web_scan_path("/admin.php", 301) + assert not is_web_scan_path("/app.jsp", 200) def test_no_duplicates_in_file_names(): diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py index e1638f89e..44c80b822 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py @@ -10,10 +10,10 @@ ) -def is_web_scanner(context: Context) -> bool: +def is_web_scanner(context: Context, status_code: int) -> bool: if context.method and is_web_scan_method(context.method): return True - if context.route and is_web_scan_path(context.route): + if context.route and is_web_scan_path(context.route, status_code): return True if query_params_contain_dangerous_strings(context): return True diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_benchmark_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_benchmark_test.py index 9da02a694..8758e2214 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_benchmark_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_benchmark_test.py @@ -32,11 +32,11 @@ def test_performance(): iterations = 25_000 start = time.perf_counter_ns() for _ in range(iterations): - is_web_scanner(get_test_context("/wp-config.php", "GET", {"test": "1"})) + is_web_scanner(get_test_context("/wp-config.php", "GET", {"test": "1"}), 404) is_web_scanner( get_test_context("/vulnerable", "GET", {"test": "1'; DROP TABLE users; --"}) ) - is_web_scanner(get_test_context("/", "GET", {"test": "1"})) + is_web_scanner(get_test_context("/", "GET", {"test": "1"}), 404) end = time.perf_counter_ns() total_time_ms = (end - start) / 1_000_000 diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py index eb03a44f4..d5eb97e5e 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py @@ -22,22 +22,32 @@ def get_test_context(path="/", method="GET", query=None): def test_is_web_scanner(): - assert is_web_scanner(get_test_context("/wp-config.php", "GET")) - assert is_web_scanner(get_test_context("/.env", "GET")) - assert is_web_scanner(get_test_context("/test/.env.bak", "GET")) - assert is_web_scanner(get_test_context("/.git/config", "GET")) - assert is_web_scanner(get_test_context("/.aws/config", "GET")) - assert is_web_scanner(get_test_context("/../secret", "GET")) - assert is_web_scanner(get_test_context("/", "BADMETHOD")) - assert is_web_scanner(get_test_context("/", "GET", {"test": "SELECT * FROM admin"})) - assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"})) + assert is_web_scanner(get_test_context("/wp-config.php", "GET"), 404) + assert is_web_scanner(get_test_context("/.env", "GET"), 404) + assert is_web_scanner(get_test_context("/test/.env.bak", "GET"), 404) + assert is_web_scanner(get_test_context("/.git/config", "GET"), 404) + assert is_web_scanner(get_test_context("/.aws/config", "GET"), 404) + assert is_web_scanner(get_test_context("/../secret", "GET"), 404) + assert is_web_scanner(get_test_context("/", "BADMETHOD"), 404) + assert is_web_scanner( + get_test_context("/", "GET", {"test": "SELECT * FROM admin"}), 404 + ) + assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"}), 404) def test_is_not_web_scanner(): - assert not is_web_scanner(get_test_context("graphql", "POST")) - assert not is_web_scanner(get_test_context("/api/v1/users", "GET")) - assert not is_web_scanner(get_test_context("/public/index.html", "GET")) - assert not is_web_scanner(get_test_context("/static/js/app.js", "GET")) - assert not is_web_scanner(get_test_context("/uploads/image.png", "GET")) - assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"})) - assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"})) + assert not is_web_scanner(get_test_context("graphql", "POST"), 404) + assert not is_web_scanner(get_test_context("/api/v1/users", "GET"), 404) + assert not is_web_scanner(get_test_context("/public/index.html", "GET"), 404) + assert not is_web_scanner(get_test_context("/static/js/app.js", "GET"), 404) + assert not is_web_scanner(get_test_context("/uploads/image.png", "GET"), 404) + assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"}), 404) + assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"}), 404) + + +def test_foreign_extension_only_on_404(): + assert is_web_scanner(get_test_context("/admin.php", "GET"), 404) + assert not is_web_scanner(get_test_context("/admin.php", "GET"), 200) + assert not is_web_scanner(get_test_context("/admin.php", "GET"), 301) + assert is_web_scanner(get_test_context("/app.jsp", "GET"), 404) + assert not is_web_scanner(get_test_context("/app.jsp", "GET"), 200)