Skip to content

Commit 7360f55

Browse files
committed
refactor: Simplify file handling functions and improve user input prompts in file_toolkit
1 parent bd81d23 commit 7360f55

1 file changed

Lines changed: 113 additions & 141 deletions

File tree

bugscanx/modules/others/file_toolkit.py

Lines changed: 113 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ def write_lines(file_path, lines):
2525
print(f"[red] Error writing to file {file_path}: {e}[/red]")
2626
return False
2727

28-
def split_file(file_path, parts):
28+
def split_file():
29+
file_path = get_input("Enter filename", "file")
30+
parts = int(get_input("Number of parts", "number"))
2931
lines = read_lines(file_path)
3032
if not lines:
31-
return []
33+
return
3234

3335
lines_per_file = len(lines) // parts
3436
file_base = os.path.splitext(file_path)[0]
@@ -42,85 +44,12 @@ def split_file(file_path, parts):
4244
if write_lines(part_file, lines[start_idx:end_idx]):
4345
created_files.append((part_file, len(lines[start_idx:end_idx])))
4446

45-
return created_files
46-
47-
def merge_files(directory, files_to_merge, output_file):
48-
output_path = os.path.join(directory, output_file)
49-
lines = []
50-
for filename in files_to_merge:
51-
file_path = os.path.join(directory, filename)
52-
lines.extend(read_lines(file_path))
53-
return write_lines(output_path, lines), len(lines)
54-
55-
def extract_domains_and_ips(content):
56-
domain_pattern = r'\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,6}\b'
57-
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
58-
59-
domains = set(re.findall(domain_pattern, '\n'.join(content)))
60-
ips = set(re.findall(ip_pattern, '\n'.join(content)))
61-
62-
return sorted(domains), sorted(ips)
63-
64-
def get_root_domains(subdomains):
65-
return sorted({'.'.join(d.split('.')[-2:]) for d in subdomains if len(d.split('.')) >= 2})
66-
67-
def separate_by_extension(domains):
68-
extensions_dict = defaultdict(list)
69-
for domain in domains:
70-
ext = domain.split('.')[-1].lower()
71-
extensions_dict[ext].append(domain)
72-
return extensions_dict
73-
74-
def filter_by_keywords(domains, keywords):
75-
return [d for d in domains if any(k in d.lower() for k in keywords)]
76-
77-
def resolve_domain(domain):
78-
try:
79-
ip = socket.gethostbyname_ex(domain.strip())[2][0]
80-
return domain, ip
81-
except (socket.gaierror, socket.timeout):
82-
return domain, None
83-
84-
def convert_cidr_to_ips(cidr):
85-
try:
86-
network = ipaddress.ip_network(cidr.strip(), strict=False)
87-
return [str(ip) for ip in network.hosts()]
88-
except ValueError as e:
89-
print(f"[red] Invalid CIDR range: {cidr} - {str(e)}[/red]")
90-
return []
91-
92-
def resolve_domains_to_ips(domains):
93-
ip_addresses = set()
94-
resolved_count = failed_count = 0
95-
socket.setdefaulttimeout(1)
96-
97-
with Progress(SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn()) as progress:
98-
task = progress.add_task("[yellow]Resolving", total=len(domains))
99-
100-
with ThreadPoolExecutor(max_workers=100) as executor:
101-
futures = [executor.submit(resolve_domain, domain) for domain in domains]
102-
for future in as_completed(futures):
103-
domain, ip = future.result()
104-
if ip:
105-
ip_addresses.add(ip)
106-
resolved_count += 1
107-
else:
108-
failed_count += 1
109-
progress.update(task, advance=1)
110-
111-
return sorted(ip_addresses), resolved_count, failed_count
112-
113-
def handle_split_file():
114-
file_path = get_input("Enter filename", "file")
115-
parts = int(get_input("Number of parts", "number"))
116-
created_files = split_file(file_path, parts)
117-
11847
if created_files:
11948
print(f"[green] Split '{os.path.basename(file_path)}' into {len(created_files)} parts:[/green]")
12049
for file_path, line_count in created_files:
12150
print(f"[green] - {os.path.basename(file_path)}: {line_count} lines[/green]")
12251

123-
def handle_merge_files():
52+
def merge_files():
12453
directory = get_input("Enter directory path", default=os.getcwd())
12554

12655
if get_confirm(" Merge all txt files?"):
@@ -134,30 +63,18 @@ def handle_merge_files():
13463
return
13564

13665
output_file = get_input("Enter output filename")
137-
success, total_lines = merge_files(directory, files_to_merge, output_file)
66+
output_path = os.path.join(directory, output_file)
67+
lines = []
68+
for filename in files_to_merge:
69+
file_path = os.path.join(directory, filename)
70+
lines.extend(read_lines(file_path))
13871

139-
if success:
72+
if write_lines(output_path, lines):
14073
print(f"[green] Successfully merged {len(files_to_merge)} files into '{output_file}'[/green]")
141-
print(f"[green] - Total lines: {total_lines}[/green]")
74+
print(f"[green] - Total lines: {len(lines)}[/green]")
14275
print(f"[green] - Output location: {directory}[/green]")
14376

144-
def handle_remove_duplicate_domains():
145-
file_path = get_input("Enter filename", "file")
146-
lines = read_lines(file_path)
147-
148-
if not lines:
149-
return
150-
151-
unique_lines = sorted(set(lines))
152-
duplicates_removed = len(lines) - len(unique_lines)
153-
154-
if write_lines(file_path, unique_lines):
155-
print(f"[green] Successfully removed duplicates from '{os.path.basename(file_path)}':[/green]")
156-
print(f"[green] - Original count: {len(lines)} lines[/green]")
157-
print(f"[green] - Unique count: {len(unique_lines)} lines[/green]")
158-
print(f"[green] - Duplicates removed: {duplicates_removed} lines[/green]")
159-
160-
def handle_txt_cleaner():
77+
def clean_file():
16178
input_file = get_input("Enter filename", "file")
16279
domain_output_file = get_input("Enter domains output filename")
16380
ip_output_file = get_input("Enter IP output filename")
@@ -166,7 +83,11 @@ def handle_txt_cleaner():
16683
if not content:
16784
return
16885

169-
domains, ips = extract_domains_and_ips(content)
86+
domain_pattern = r'\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,6}\b'
87+
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
88+
89+
domains = sorted(set(re.findall(domain_pattern, '\n'.join(content))))
90+
ips = sorted(set(re.findall(ip_pattern, '\n'.join(content))))
17091

17192
domains_success = write_lines(domain_output_file, domains)
17293
ips_success = write_lines(ip_output_file, ips)
@@ -178,90 +99,136 @@ def handle_txt_cleaner():
17899
if ips_success:
179100
print(f"[green] - Extracted {len(ips)} unique IP addresses to '{os.path.basename(ip_output_file)}'[/green]")
180101

181-
def handle_convert_subdomains_to_domains():
102+
def remove_duplicates():
103+
file_path = get_input("Enter filename", "file")
104+
lines = read_lines(file_path)
105+
if not lines:
106+
return
107+
108+
unique_lines = sorted(set(lines))
109+
duplicates_removed = len(lines) - len(unique_lines)
110+
111+
if write_lines(file_path, unique_lines):
112+
print(f"[green] Successfully removed duplicates from '{os.path.basename(file_path)}':[/green]")
113+
print(f"[green] - Original count: {len(lines)} lines[/green]")
114+
print(f"[green] - Unique count: {len(unique_lines)} lines[/green]")
115+
print(f"[green] - Duplicates removed: {duplicates_removed} lines[/green]")
116+
117+
def convert_subdomains_to_domains():
182118
file_path = get_input("Enter filename", "file")
183119
output_file = get_input("Enter output filename")
184120

185121
subdomains = read_lines(file_path)
186122
if not subdomains:
187123
return
188124

189-
root_domains = get_root_domains(subdomains)
125+
root_domains = sorted({'.'.join(d.split('.')[-2:]) for d in subdomains if len(d.split('.')) >= 2})
190126

191127
if write_lines(output_file, root_domains):
192128
print(f"[green] Successfully converted subdomains to root domains:[/green]")
193129
print(f"[green] - Input subdomains: {len(subdomains)}[/green]")
194130
print(f"[green] - Unique root domains: {len(root_domains)}[/green]")
195131
print(f"[green] - Output file: '{os.path.basename(output_file)}'[/green]")
196132

197-
def handle_separate_domains_by_extension():
133+
def filter_by_tlds():
198134
file_path = get_input("Enter filename", "file")
199-
extensions_input = get_input("Enter extensions (comma-separated) or 'all'")
135+
tlds_input = get_input("Enter TLDs ", instruction="(e.g. com, org)")
200136

201137
domains = read_lines(file_path)
202138
if not domains:
203139
return
204140

205-
extensions_dict = separate_by_extension(domains)
141+
tld_dict = defaultdict(list)
142+
for domain in domains:
143+
parts = domain.split('.')
144+
if len(parts) > 1:
145+
tld = parts[-1].lower()
146+
tld_dict[tld].append(domain)
147+
206148
base_name = os.path.splitext(file_path)[0]
207-
target_extensions = [ext.strip() for ext in extensions_input.lower().split(',')] if extensions_input.lower() != 'all' else list(extensions_dict.keys())
149+
target_tlds = [tld.strip().lstrip('.').lower() for tld in tlds_input.split(',')] if tlds_input.lower() != 'all' else list(tld_dict.keys())
208150

209151
success_count = 0
210-
print(f"[green] Separating domains by extension from '{os.path.basename(file_path)}':[/green]")
152+
print(f"[green]Filtering domains by TLDs from '{os.path.basename(file_path)}':[/green]")
211153

212-
for ext in target_extensions:
213-
if ext in extensions_dict:
214-
ext_file = f"{base_name}_{ext}.txt"
215-
if write_lines(ext_file, sorted(extensions_dict[ext])):
154+
for tld in target_tlds:
155+
if (tld in tld_dict):
156+
tld_file = f"{base_name}_{tld}.txt"
157+
if write_lines(tld_file, sorted(tld_dict[tld])):
216158
success_count += 1
217-
print(f"[green] - Created '{os.path.basename(ext_file)}' with {len(extensions_dict[ext])} domains[/green]")
159+
print(f"[green]- Created '{os.path.basename(tld_file)}' with {len(tld_dict[tld])} domains[/green]")
218160
else:
219-
print(f"[yellow] - No domains found with .{ext} extension[/yellow]")
220-
221-
if success_count > 0:
222-
print(f"[green] Successfully created {success_count} files based on domain extensions[/green]")
161+
print(f"[yellow]- No domains found with .{tld} TLD[/yellow]")
223162

224-
def handle_filter_by_keywords():
163+
def filter_by_keywords():
225164
file_path = get_input("Enter filename", "file")
226-
keywords = [k.strip().lower() for k in get_input("Enter keywords (comma-separated)").split(',')]
165+
keywords = [k.strip().lower() for k in get_input("Enter keyword(s)").split(',')]
227166
output_file = get_input("Enter output filename")
228167

229168
lines = read_lines(file_path)
230169
if not lines:
231170
return
232171

233-
filtered_domains = filter_by_keywords(lines, keywords)
172+
filtered_lines = [line for line in lines if any(k in line.lower() for k in keywords)]
234173

235-
if write_lines(output_file, filtered_domains):
236-
print(f"[green] Successfully filtered domains by keywords:[/green]")
237-
print(f"[green] - Input domains: {len(lines)}[/green]")
238-
print(f"[green] - Matched domains: {len(filtered_domains)}[/green]")
174+
if write_lines(output_file, filtered_lines):
175+
print(f"[green] Successfully filtered content by keywords:[/green]")
176+
print(f"[green] - Input lines: {len(lines)}[/green]")
177+
print(f"[green] - Matched lines: {len(filtered_lines)}[/green]")
239178
print(f"[green] - Keywords used: {', '.join(keywords)}[/green]")
240179
print(f"[green] - Output file: '{os.path.basename(output_file)}'[/green]")
241180

242-
def handle_cidr_to_ip():
181+
def cidr_to_ip():
243182
cidr_input = get_input("Enter CIDR range")
244183
output_file = get_input("Enter output filename")
245184

246-
ip_addresses = convert_cidr_to_ips(cidr_input)
185+
try:
186+
network = ipaddress.ip_network(cidr_input.strip(), strict=False)
187+
ip_addresses = [str(ip) for ip in network.hosts()]
188+
except ValueError as e:
189+
print(f"[red] Invalid CIDR range: {cidr_input} - {str(e)}[/red]")
190+
return
247191

248192
if ip_addresses and write_lines(output_file, ip_addresses):
249193
print(f"[green] Successfully converted CIDR to IP addresses:[/green]")
250194
print(f"[green] - CIDR range: {cidr_input}[/green]")
251195
print(f"[green] - Total IPs: {len(ip_addresses)}[/green]")
252196
print(f"[green] - Output file: '{os.path.basename(output_file)}'[/green]")
253197

254-
def handle_domains_to_ip():
198+
def domains_to_ip():
255199
file_path = get_input("Enter filename", "file")
256200
output_file = get_input("Enter output filename")
257201

258202
domains = read_lines(file_path)
259203
if not domains:
260204
return
205+
206+
ip_addresses = set()
207+
resolved_count = failed_count = 0
208+
socket.setdefaulttimeout(1)
209+
210+
with Progress(SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn()) as progress:
211+
task = progress.add_task("[yellow]Resolving", total=len(domains))
261212

262-
ip_addresses, resolved_count, failed_count = resolve_domains_to_ips(domains)
213+
with ThreadPoolExecutor(max_workers=100) as executor:
214+
def resolve_domain(domain):
215+
try:
216+
ip = socket.gethostbyname_ex(domain.strip())[2][0]
217+
return domain, ip
218+
except (socket.gaierror, socket.timeout):
219+
return domain, None
220+
221+
futures = [executor.submit(resolve_domain, domain) for domain in domains]
222+
for future in as_completed(futures):
223+
domain, ip = future.result()
224+
if ip:
225+
ip_addresses.add(ip)
226+
resolved_count += 1
227+
else:
228+
failed_count += 1
229+
progress.update(task, advance=1)
263230

264-
if ip_addresses and write_lines(output_file, ip_addresses):
231+
if ip_addresses and write_lines(output_file, sorted(ip_addresses)):
265232
print(f"[green] Successfully resolved domains to IP addresses:[/green]")
266233
print(f"[green] - Input domains: {len(domains)}[/green]")
267234
print(f"[green] - Successfully resolved: {resolved_count}[/green]")
@@ -273,22 +240,27 @@ def handle_domains_to_ip():
273240

274241
def main():
275242
options = {
276-
"1": ("SPLIT FILE", handle_split_file, "bold cyan"),
277-
"2": ("MERGE FILES", handle_merge_files, "bold blue"),
278-
"3": ("CLEAN FILE", handle_txt_cleaner, "bold cyan"),
279-
"4": ("REMOVE DUPLICATES", handle_remove_duplicate_domains, "bold yellow"),
280-
"5": ("EXTRACT ROOT DOMAINS", handle_convert_subdomains_to_domains, "bold magenta"),
281-
"6": ("FILTER BY EXTENSIONS", handle_separate_domains_by_extension, "bold magenta"),
282-
"7": ("FILTER BY KEYWORDS", handle_filter_by_keywords, "bold yellow"),
283-
"8": ("CIDR TO IP", handle_cidr_to_ip, "bold green"),
284-
"9": ("DOMAINS TO IP", handle_domains_to_ip, "bold blue"),
243+
"1": ("SPLIT FILE", split_file, "bold cyan"),
244+
"2": ("MERGE FILES", merge_files, "bold blue"),
245+
"3": ("CLEAN FILE", clean_file, "bold cyan"),
246+
"4": ("REMOVE DUPLICATES", remove_duplicates, "bold yellow"),
247+
"5": ("SUBDOMAIN TO DOMAINS", convert_subdomains_to_domains, "bold magenta"),
248+
"6": ("FILTER BY TLD", filter_by_tlds, "bold magenta"),
249+
"7": ("FILTER BY KEYWORD", filter_by_keywords, "bold yellow"),
250+
"8": ("CIDR TO IP", cidr_to_ip, "bold green"),
251+
"9": ("DOMAIN TO IP", domains_to_ip, "bold blue"),
285252
"0": ("BACK", lambda: None, "bold red")
286253
}
287254

288-
print("\n".join(f"[{color}] [{key}] {desc}" for key, (desc, _, color) in options.items()))
289-
choice = input("\n \033[36m[-] Your Choice: \033[0m")
290-
291-
if choice in options:
292-
options[choice][1]()
293-
if choice == '0':
294-
return
255+
while True:
256+
print("\n".join(f"[{color}] [{key}] {desc}" for key, (desc, _, color) in options.items()))
257+
choice = input("\n \033[36m[-] Your Choice: \033[0m").strip()
258+
259+
if not choice or choice not in options:
260+
from bugscanx import text_ascii
261+
text_ascii("FILE TOOLKIT")
262+
continue
263+
264+
if choice in options:
265+
options[choice][1]()
266+
break

0 commit comments

Comments
 (0)