Skip to content

Commit 7a4f334

Browse files
committed
feat(subbrute): implement subdomain bruteforcing with wildcard detection
1 parent f0edaa8 commit 7a4f334

5 files changed

Lines changed: 513 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ share/python-wheels/
2727
MANIFEST
2828
*.txt
2929
!structure.txt
30+
!wordlist.txt
3031

3132
# PyInstaller
3233
# Usually these files are written by a python script from a template
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from rich.console import Console
2+
3+
class SubBruteConsole(Console):
4+
def __init__(self):
5+
super().__init__()
6+
self.total_subdomains = 0
7+
self.domain_stats = {}
8+
9+
def print_domain_start(self, domain):
10+
print(f"\r\033[K", end="")
11+
self.print(f"[cyan]Processing: {domain}[/cyan]")
12+
13+
def update_domain_stats(self, domain, count):
14+
self.domain_stats[domain] = count
15+
self.total_subdomains += count
16+
17+
def print_domain_complete(self, domain, count):
18+
print(f"\r\033[K", end="")
19+
self.print(f"[green]{domain}: {count} subdomains found[/green]")
20+
21+
def print_final_summary(self, output_file):
22+
print("\r\033[K", end="")
23+
self.print(f"\n[green]Total: [bold]{self.total_subdomains}[/bold] subdomains found")
24+
self.print(f"[green]Results saved to {output_file}[/green]")
25+
26+
def print_progress(self, current_domain, total_domains, tested_words, total_words):
27+
wordlist_percent = (tested_words / total_words) * 100 if total_words > 0 else 0
28+
domain_percent = (current_domain / total_domains) * 100 if total_domains > 0 else 0
29+
30+
print(f"\rDomain {current_domain}/{total_domains} ({domain_percent:.1f}%) | "
31+
f"{tested_words}/{total_words} words ({wordlist_percent:.1f}%)", end="", flush=True)
32+
33+
def print_error(self, message):
34+
print(f"\r\033[K", end="")
35+
self.print(f"[red]{message}[/red]")
36+
37+
def print_info(self, message):
38+
print(f"\r\033[K", end="")
39+
self.print(f"[blue]{message}[/blue]")
40+
41+
def print_found_subdomain(self, subdomain, ip):
42+
print(f"\r\033[K", end="")
43+
self.print(f"[yellow]Found:[/yellow] [bold]{subdomain}[/bold] -> [dim]{ip}[/dim]")
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import os
2+
from concurrent.futures import ThreadPoolExecutor, as_completed
3+
4+
from bugscanx.utils.common import get_input
5+
from .logger import SubBruteConsole
6+
from .utils import DomainValidator, SubdomainBruteforcer, CursorManager
7+
8+
9+
class SubBrute:
10+
def __init__(self, wordlist_path=None, max_workers=50, dns_timeout=3, enable_wildcard_filtering=True):
11+
self.console = SubBruteConsole()
12+
self.completed = 0
13+
self.cursor_manager = CursorManager()
14+
15+
if wordlist_path is None:
16+
current_dir = os.path.dirname(os.path.abspath(__file__))
17+
wordlist_path = os.path.join(current_dir, 'wordlist.txt')
18+
19+
self.bruteforcer = SubdomainBruteforcer(
20+
wordlist_path=wordlist_path,
21+
max_workers=max_workers,
22+
timeout=dns_timeout,
23+
enable_wildcard_filtering=enable_wildcard_filtering
24+
)
25+
26+
def _create_progress_callback(self, total_words):
27+
def progress_callback(event_type, *args):
28+
if event_type == 'progress':
29+
tested, found = args
30+
if tested % 10 == 0 or tested == total_words:
31+
self.console.print_progress(self.completed, self.total_domains, tested, total_words)
32+
elif event_type == 'wildcard_detected':
33+
wildcard_ips = args[0]
34+
self.console.print_info(f"Wildcard DNS detected! IPs: {', '.join(wildcard_ips)}")
35+
self.console.print_info("Results will be filtered to exclude wildcard responses")
36+
elif event_type == 'found':
37+
subdomain, ip = args
38+
self.console.print_found_subdomain(subdomain, ip)
39+
return progress_callback
40+
41+
@staticmethod
42+
def save_subdomains(subdomains, output_file):
43+
if not subdomains:
44+
return
45+
46+
with open(output_file, "a", encoding="utf-8") as f:
47+
f.write("\n".join(sorted(subdomains)) + "\n")
48+
49+
def process_domain(self, domain, output_file, total_words):
50+
if not DomainValidator.is_valid_domain(domain):
51+
self.completed += 1
52+
return set()
53+
54+
self.console.print_domain_start(domain)
55+
progress_callback = self._create_progress_callback(total_words)
56+
57+
try:
58+
subdomains = self.bruteforcer.bruteforce_domain(domain, progress_callback)
59+
self.console.update_domain_stats(domain, len(subdomains))
60+
self.console.print_domain_complete(domain, len(subdomains))
61+
self.save_subdomains(subdomains, output_file)
62+
except Exception as e:
63+
self.console.print_error(f"Error processing domain {domain}: {str(e)}")
64+
subdomains = set()
65+
66+
self.completed += 1
67+
return subdomains
68+
69+
def run(self, domains, output_file, max_concurrent_domains=1):
70+
if not domains:
71+
self.console.print_error("No valid domains provided")
72+
return set()
73+
74+
os.makedirs(os.path.dirname(output_file) or '.', exist_ok=True)
75+
76+
try:
77+
total_words = self.bruteforcer.load_wordlist()
78+
self.console.print_info(f"Loaded wordlist with {total_words} entries")
79+
except Exception as e:
80+
self.console.print_error(f"Failed to load wordlist: {str(e)}")
81+
return set()
82+
83+
self.completed = 0
84+
self.total_domains = len(domains)
85+
all_subdomains = set()
86+
87+
self.console.print_info(f"Starting subdomain bruteforce for {len(domains)} domain(s)")
88+
self.console.print_info(f"Using {self.bruteforcer.max_workers} concurrent threads per domain")
89+
90+
with self.cursor_manager:
91+
futures = []
92+
with ThreadPoolExecutor(max_workers=max_concurrent_domains) as executor:
93+
futures = [
94+
executor.submit(self.process_domain, domain, output_file, total_words)
95+
for domain in domains
96+
]
97+
98+
for future in as_completed(futures):
99+
try:
100+
result = future.result()
101+
all_subdomains.update(result)
102+
except Exception as e:
103+
self.console.print_error(f"Error processing domain: {str(e)}")
104+
105+
self.console.print_final_summary(output_file)
106+
return all_subdomains
107+
108+
109+
def main():
110+
domains = []
111+
112+
input_type = get_input("Select input mode", "choice",
113+
choices=["Manual", "File"])
114+
115+
if input_type == "Manual":
116+
domain_input = get_input("Enter domain(s)")
117+
domains = [d.strip() for d in domain_input.split(',')
118+
if DomainValidator.is_valid_domain(d.strip())]
119+
default_output = f"{domains[0]}_subdomains_bruteforce.txt" if domains else "subdomains_bruteforce.txt"
120+
else:
121+
file_path = get_input("Enter filename", "file")
122+
try:
123+
with open(file_path, 'r') as f:
124+
domains = [d.strip() for d in f.readlines()
125+
if DomainValidator.is_valid_domain(d.strip())]
126+
default_output = f"{file_path.rsplit('.', 1)[0]}_subdomains_bruteforce.txt"
127+
except Exception as e:
128+
print(f"Error reading file: {str(e)}")
129+
return
130+
131+
if not domains:
132+
print("No valid domains found!")
133+
return
134+
135+
output_file = get_input("Enter output filename", default=default_output)
136+
137+
use_custom_wordlist = get_input("Use custom wordlist?", "choice",
138+
choices=["No", "Yes"]) == "Yes"
139+
wordlist_path = get_input("Enter wordlist path", "file") if use_custom_wordlist else None
140+
141+
max_workers = int(get_input("Max concurrent threads per domain",
142+
default="50"))
143+
dns_timeout = int(get_input("DNS timeout in seconds",
144+
default="3"))
145+
146+
enable_wildcard_filtering = get_input("Enable wildcard filtering?", "choice",
147+
choices=["Yes", "No"]) == "Yes"
148+
149+
subbrute = SubBrute(
150+
wordlist_path=wordlist_path,
151+
max_workers=max_workers,
152+
dns_timeout=dns_timeout,
153+
enable_wildcard_filtering=enable_wildcard_filtering
154+
)
155+
156+
subbrute.run(domains, output_file, max_concurrent_domains=1)

0 commit comments

Comments
 (0)