Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/test_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_get_errors(self):
self.assertIn('error1', errors)
self.assertIn('error2', errors)

@patch('wifite.util.cleanup.subprocess.run')
@patch('wifite.util.process.subprocess.run')
def test_remove_iptables_rule(self, mock_run):
"""Test removing an iptables rule."""
mock_run.return_value = Mock(returncode=0)
Expand All @@ -155,7 +155,7 @@ def test_remove_iptables_rule(self, mock_run):
self.assertTrue(result)
mock_run.assert_called_once()

@patch('wifite.util.cleanup.subprocess.run')
@patch('wifite.util.process.subprocess.run')
def test_check_conflicting_processes(self, mock_run):
"""Test checking for conflicting processes."""
# Mock pgrep returning PIDs
Expand All @@ -166,7 +166,7 @@ def test_check_conflicting_processes(self, mock_run):
# Should find some processes
self.assertIsInstance(conflicting, list)

@patch('wifite.util.cleanup.subprocess.run')
@patch('wifite.util.process.subprocess.run')
def test_kill_orphaned_processes(self, mock_run):
"""Test killing orphaned processes."""
# Mock pgrep returning PIDs
Expand Down
190 changes: 95 additions & 95 deletions tests/test_wpa3_detection_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"""

import unittest
import time
from wifite.model.target import Target
from wifite.util.wpa3 import WPA3Detector

Expand Down Expand Up @@ -57,104 +56,105 @@ def setUp(self):
''
]

def test_cache_performance_improvement(self):
"""Benchmark cache performance improvement."""
target = Target(self.wpa3_transition_fields)
iterations = 1000

# Measure time without cache (fresh detection each time)
start_time = time.time()
for _ in range(iterations):
WPA3Detector.detect_wpa3_capability(target, use_cache=False)
no_cache_time = time.time() - start_time

# Set cache once
wpa3_info_dict = WPA3Detector.detect_wpa3_capability(target, use_cache=False)
def test_cache_short_circuits_detection(self):
"""Cached detection returns the stored result without recomputing.

Behavioural check (deterministic, not timing-based): when a target
already carries a ``wpa3_info`` cache, ``use_cache=True`` must return
that cached value verbatim, while ``use_cache=False`` must ignore the
cache and recompute from the target's encryption fields. We prove this
by seeding the cache with a sentinel that deliberately disagrees with
what the fields would produce.
"""
from wifite.util.wpa3 import WPA3Info
target.wpa3_info = WPA3Info.from_dict(wpa3_info_dict)

# Measure time with cache
start_time = time.time()
for _ in range(iterations):
WPA3Detector.detect_wpa3_capability(target, use_cache=True)
cache_time = time.time() - start_time

# Cache should be significantly faster
speedup = no_cache_time / cache_time if cache_time > 0 else float('inf')

print(f"\nCache Performance Benchmark ({iterations} iterations):")
print(f" Without cache: {no_cache_time:.4f}s")
print(f" With cache: {cache_time:.4f}s")
print(f" Speedup: {speedup:.2f}x")

# Cache should be at least 2x faster
self.assertGreater(speedup, 2.0,
f"Cache speedup {speedup:.2f}x is less than expected 2x")

def test_early_return_performance(self):
"""Benchmark early return optimization for WPA2-only targets."""
wpa2_target = Target(self.wpa2_only_fields)
wpa3_target = Target(self.wpa3_transition_fields)
iterations = 1000

# Measure WPA2-only detection (should be faster with early return)
start_time = time.time()
for _ in range(iterations):
WPA3Detector.detect_wpa3_capability(wpa2_target, use_cache=False)
wpa2_time = time.time() - start_time

# Measure WPA3 detection (full processing)
start_time = time.time()
for _ in range(iterations):
WPA3Detector.detect_wpa3_capability(wpa3_target, use_cache=False)
wpa3_time = time.time() - start_time

print(f"\nEarly Return Benchmark ({iterations} iterations):")
print(f" WPA2-only (early return): {wpa2_time:.4f}s")
print(f" WPA3 (full processing): {wpa3_time:.4f}s")
print(f" Ratio: {wpa3_time/wpa2_time:.2f}x")

# WPA2-only should be faster or similar (early return optimization)
# Allow some variance due to system load
self.assertLessEqual(wpa2_time, wpa3_time * 1.5,
"WPA2-only detection should benefit from early return")

def test_helper_method_cache_usage(self):
"""Benchmark helper methods using cache vs fresh detection."""
target = Target(self.wpa3_transition_fields)
iterations = 1000

# Without cache - helper methods trigger full detection
start_time = time.time()
for _ in range(iterations):
target.wpa3_info = None # Clear cache
WPA3Detector.identify_transition_mode(target)
WPA3Detector.check_pmf_status(target)
WPA3Detector.get_supported_sae_groups(target)
no_cache_time = time.time() - start_time

# With cache - helper methods use cached data
wpa3_info_dict = WPA3Detector.detect_wpa3_capability(target, use_cache=False)

# What a fresh (uncached) detection derives from the fields.
fresh = WPA3Detector.detect_wpa3_capability(target, use_cache=False)
self.assertTrue(fresh['has_wpa3'],
'Fixture should detect WPA3 from the encryption fields')

# Seed the cache with a sentinel that disagrees with the fields, so a
# cache hit is distinguishable from a recompute.
sentinel = WPA3Info.from_dict({
'has_wpa3': False,
'has_wpa2': True,
'is_transition': False,
'pmf_status': WPA3Detector.PMF_DISABLED,
'sae_groups': [],
'dragonblood_vulnerable': False,
})
target.wpa3_info = sentinel

# Cache hit: must return the sentinel, proving detection was skipped.
cached = WPA3Detector.detect_wpa3_capability(target, use_cache=True)
self.assertEqual(cached, sentinel.to_dict(),
'Cached path must return the stored wpa3_info verbatim')
self.assertFalse(cached['has_wpa3'],
'Cached path must not recompute from the fields')

# Cache bypass: must recompute and match the fresh detection.
recomputed = WPA3Detector.detect_wpa3_capability(target, use_cache=False)
self.assertEqual(recomputed, fresh,
'use_cache=False must ignore the cache and recompute')
self.assertTrue(recomputed['has_wpa3'])

def test_early_return_for_wpa2_only(self):
"""WPA2-only targets take the early-return branch (no WPA3 work).

Behavioural check (deterministic): a target whose fields advertise no
WPA3/SAE must short-circuit to the WPA2-only result shape, while a
WPA3 transition target must go through full detection. This exercises
the same early-return optimisation the old benchmark targeted, without
asserting on wall-clock time.
"""
wpa2 = WPA3Detector.detect_wpa3_capability(
Target(self.wpa2_only_fields), use_cache=False)
wpa3 = WPA3Detector.detect_wpa3_capability(
Target(self.wpa3_transition_fields), use_cache=False)

# Early-return branch: WPA2-only, not transition, no SAE groups.
self.assertFalse(wpa2['has_wpa3'])
self.assertFalse(wpa2['is_transition'])
self.assertEqual(wpa2['sae_groups'], [])
self.assertFalse(wpa2['dragonblood_vulnerable'])

# Full-detection branch still flags WPA3.
self.assertTrue(wpa3['has_wpa3'])

def test_helper_methods_use_cache(self):
"""Helper methods read cached wpa3_info instead of recomputing.

Deterministic check: seed the cache with a sentinel that disagrees
with the target's fields, then confirm each helper returns the cached
value. Clearing the cache makes them recompute from the fields.
"""
from wifite.util.wpa3 import WPA3Info
target.wpa3_info = WPA3Info.from_dict(wpa3_info_dict)

start_time = time.time()
for _ in range(iterations):
WPA3Detector.identify_transition_mode(target)
WPA3Detector.check_pmf_status(target)
WPA3Detector.get_supported_sae_groups(target)
cache_time = time.time() - start_time

speedup = no_cache_time / cache_time if cache_time > 0 else float('inf')

print(f"\nHelper Method Cache Benchmark ({iterations} iterations):")
print(f" Without cache: {no_cache_time:.4f}s")
print(f" With cache: {cache_time:.4f}s")
print(f" Speedup: {speedup:.2f}x")

# Cache should provide significant speedup
self.assertGreater(speedup, 2.0,
f"Helper method cache speedup {speedup:.2f}x is less than expected")

target = Target(self.wpa3_transition_fields)

sentinel = WPA3Info.from_dict({
'has_wpa3': True,
'has_wpa2': True,
'is_transition': False, # disagrees with the fields
'pmf_status': WPA3Detector.PMF_REQUIRED,
'sae_groups': [21], # not the default [19]
'dragonblood_vulnerable': False,
})
target.wpa3_info = sentinel

# Cache hit: helpers return the sentinel's values.
self.assertEqual(WPA3Detector.identify_transition_mode(target),
sentinel.is_transition)
self.assertEqual(WPA3Detector.check_pmf_status(target),
sentinel.pmf_status)
self.assertEqual(WPA3Detector.get_supported_sae_groups(target),
sentinel.sae_groups)

# Cache cleared: helpers recompute from the fields (transition mode).
target.wpa3_info = None
self.assertTrue(WPA3Detector.identify_transition_mode(target))


if __name__ == '__main__':
Expand Down
14 changes: 14 additions & 0 deletions wifite/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def __init__(self, configuration):
# Hack: Check for -v before parsing args;
# so we know which commands to display.
self.verbose = '-v' in sys.argv or '-hv' in sys.argv or '-vh' in sys.argv
# Activate Kalidroid MiniTerminal output mode as early as possible (before
# any option-echo prints) so the whole run streams scrollback-friendly
# output to the Kalidroid app's MiniTerminal.
if '--kalidroid' in sys.argv:
Color.kalidroid = True
self.config = configuration
self.args = self.get_arguments()

Expand Down Expand Up @@ -270,6 +275,15 @@ def _add_global_args(self, glob):
help=self._verbose(
'Write debug log to {C}[path]{W} (implies {C}-vv{W} minimum verbosity)'))

glob.add_argument('--kalidroid',
action='store_true',
default=False,
dest='kalidroid',
help=self._verbose(
'Emit {C}MiniTerminal{W}-friendly output for the {C}Kalidroid{W} app: '
'flatten carriage-return progress redraws into discrete lines and '
'skip clear-line escapes (default: {G}off{W})'))

glob.add_argument('-i',
action='store',
dest='interface',
Expand Down
2 changes: 1 addition & 1 deletion wifite/attack/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..config import Configuration
from ..model.target import WPSState
from ..util.color import Color
from ..util.logger import log_info, log_debug
from ..util.logger import log_info
from ..util.wpa3_tools import WPA3ToolChecker
from ..util.memory import MemoryMonitor, get_infinite_monitor

Expand Down
2 changes: 0 additions & 2 deletions wifite/attack/attack_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from ..util.color import Color
from ..tools.tshark import TsharkMonitor
from ..util.process import Process
import os
import time
import re

# TUI imports (optional)
try:
Expand Down
11 changes: 4 additions & 7 deletions wifite/attack/eviltwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import re
import time
import signal
from typing import Optional, List
from typing import TYPE_CHECKING
from enum import Enum

if TYPE_CHECKING:
from ..util.session import EvilTwinAttackState

from ..model.attack import Attack
from ..model.eviltwin_result import CrackResultEvilTwin
from ..config import Configuration
Expand All @@ -25,7 +28,6 @@
from ..util.cleanup import CleanupManager
from ..util.adaptive_deauth import AdaptiveDeauthManager
from ..util.process import Process
from ..tools.aireplay import Aireplay


class AttackState(Enum):
Expand Down Expand Up @@ -251,7 +253,6 @@ def _get_interface_assignment(self):
# This is a fallback for when the attack is run directly
import contextlib
with contextlib.suppress(ImportError, AttributeError):
from ..wifite import Wifite
# Note: This is a fallback and may not always work
# The preferred approach is to set interface_assignment before calling run()
log_debug('EvilTwin', 'No interface assignment available, will use single interface mode')
Expand Down Expand Up @@ -1472,7 +1473,6 @@ def _send_deauth(self, client_mac: str, count: int = 5):
count: Number of deauth packets to send
"""
try:
from ..tools.aireplay import Aireplay
from ..util.process import Process

# Verify deauth interface is available
Expand Down Expand Up @@ -2074,7 +2074,6 @@ def restore_state_from_session(self, state: 'EvilTwinAttackState') -> bool:
True if state was restored successfully, False otherwise
"""
try:
from ..util.session import EvilTwinAttackState

# Restore configuration
self.interface_ap = state.interface_ap or self.interface_ap
Expand Down Expand Up @@ -2137,7 +2136,6 @@ def is_attack_running() -> bool:
Returns:
True if an attack is running, False otherwise
"""
import subprocess

try:
# Check for hostapd processes with wifite config
Expand Down Expand Up @@ -2168,7 +2166,6 @@ def cleanup_orphaned_processes(self) -> None:
other processes that may have been left running from a previous
interrupted attack.
"""
import subprocess

log_info('EvilTwin', 'Checking for orphaned processes')

Expand Down
6 changes: 3 additions & 3 deletions wifite/attack/pmkid.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Check for native PMKID availability
try:
from ..native.pmkid import ScapyPMKID, PMKIDResult as NativePMKIDResult
from ..native.pmkid import ScapyPMKID
NATIVE_PMKID_AVAILABLE = ScapyPMKID.is_available()
except BaseException:
NATIVE_PMKID_AVAILABLE = False
Expand Down Expand Up @@ -565,7 +565,7 @@ def crack_pmkid_file(self, pmkid_file):
def _handle_pmkid_crack_success(self, key, pmkid_file):
# Successfully cracked.
if self.view:
self.view.add_log(f"Successfully cracked PMKID!")
self.view.add_log("Successfully cracked PMKID!")
self.view.add_log(f"Password: {mask_sensitive(key)}")
self.view.update_progress({
'progress': 1.0,
Expand Down Expand Up @@ -633,7 +633,7 @@ def capture_pmkid_native(self):
def on_pmkid_captured(result):
log_info('AttackPMKID', f'Native capture found PMKID: {result.pmkid[:16]}...')
if self.view:
self.view.add_log(f'PMKID captured!')
self.view.add_log('PMKID captured!')

# Use ScapyPMKID capture
result = ScapyPMKID.capture(
Expand Down
1 change: 0 additions & 1 deletion wifite/attack/portal/credential_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typing import Optional, Callable, Dict, List, Tuple
from queue import Queue, Empty, Full
from dataclasses import dataclass
from datetime import datetime

from ...util.logger import log_info, log_error, log_warning, log_debug

Expand Down
2 changes: 1 addition & 1 deletion wifite/attack/portal/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import os
import threading
import time
from http.server import HTTPServer, ThreadingHTTPServer, BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
from typing import Optional, Callable, Dict, Any
import socket
Expand Down
3 changes: 1 addition & 2 deletions wifite/attack/portal/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

import html
import os
from typing import Dict, Any, Optional

from ...util.logger import log_info, log_error, log_warning, log_debug
from ...util.logger import log_error, log_debug


class TemplateRenderer:
Expand Down
Loading
Loading