55import platform
66import tempfile
77import shutil
8+ import ipaddress
89from datetime import datetime , timedelta
910from pathlib import Path
1011from typing import Dict , Set , List
@@ -28,9 +29,26 @@ def __init__(self, block_duration: int, whitelist: Set[str]):
2829 # macOS-specific: pfctl configuration
2930 if self .platform == 'darwin' :
3031 self ._init_macos_firewall ()
31-
32+
33+ def _is_valid_ip (self , ip : str ) -> bool :
34+ """Validate IPv4, IPv6, or CIDR ranges"""
35+ try :
36+ # Supports IPv4, IPv6 and CIDR (ip_network supports ranges)
37+ ipaddress .ip_network (ip , strict = False )
38+ return True
39+ except ValueError :
40+ return False
41+
42+
3243 def block_ip (self , ip : str , reason : str ) -> bool :
3344 """Block an IP address using the appropriate firewall system"""
45+
46+ # Validate IP before doing anything
47+ if not self ._is_valid_ip (ip ):
48+ self .logger .error (f"Invalid IP address format: { ip } " )
49+ print (f"{ Fore .YELLOW } ⚠ INVALID IP: { ip } { Style .RESET_ALL } " )
50+ return False
51+
3452 if self ._is_whitelisted (ip ):
3553 self .logger .info (f"IP { ip } is whitelisted, not blocking" )
3654 return False
@@ -55,9 +73,15 @@ def block_ip(self, ip: str, reason: str) -> bool:
5573 else :
5674 self .logger .debug (f"IP { ip } already blocked" )
5775 return True
58-
76+
5977 def unblock_ip (self , ip : str ) -> bool :
6078 """Manually unblock a specific IP address"""
79+
80+ # Validate IP
81+ if not self ._is_valid_ip (ip ):
82+ self .logger .error (f"Invalid IP address format (unblock): { ip } " )
83+ return False
84+
6185 with self .lock :
6286 if ip in self .blocked_ips :
6387 try :
@@ -78,9 +102,9 @@ def unblock_ip(self, ip: str) -> bool:
78102 else :
79103 self .logger .warning (f"IP { ip } was not blocked" )
80104 return False
81-
105+
106+
82107 def unblock_expired_ips (self ) -> List [str ]:
83- """Unblock IPs that have exceeded the block duration"""
84108 current_time = datetime .now ()
85109 block_duration = timedelta (seconds = self .block_duration )
86110 unblocked_ips = []
@@ -96,17 +120,15 @@ def unblock_expired_ips(self) -> List[str]:
96120 unblocked_ips .append (ip )
97121
98122 return unblocked_ips
99-
123+
100124 def _is_whitelisted (self , ip : str ) -> bool :
101- """Check if IP is in whitelist"""
102125 return ip in self .whitelist
103-
126+
104127 def _execute_block_command (self , ip : str ) -> bool :
105- """Execute platform-specific block command"""
106128 try :
107129 if self .platform == 'linux' :
108130 return self ._block_ip_linux (ip )
109- elif self .platform == 'darwin' : # macOS
131+ elif self .platform == 'darwin' :
110132 return self ._block_ip_macos (ip )
111133 elif self .platform == 'windows' :
112134 return self ._block_ip_windows (ip )
@@ -116,13 +138,12 @@ def _execute_block_command(self, ip: str) -> bool:
116138 except Exception as e :
117139 self .logger .error (f"Platform-specific blocking failed: { e } " )
118140 return False
119-
141+
120142 def _execute_unblock_command (self , ip : str ) -> bool :
121- """Execute platform-specific unblock command"""
122143 try :
123144 if self .platform == 'linux' :
124145 return self ._unblock_ip_linux (ip )
125- elif self .platform == 'darwin' : # macOS
146+ elif self .platform == 'darwin' :
126147 return self ._unblock_ip_macos (ip )
127148 elif self .platform == 'windows' :
128149 return self ._unblock_ip_windows (ip )
@@ -132,23 +153,52 @@ def _execute_unblock_command(self, ip: str) -> bool:
132153 except Exception as e :
133154 self .logger .error (f"Platform-specific unblocking failed: { e } " )
134155 return False
135-
156+
157+
136158 def _block_ip_linux (self , ip : str ) -> bool :
137- """Block IP using iptables on Linux"""
138- cmd = ['sudo' , 'iptables' , '-A' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
139- result = subprocess .run (cmd , check = True , capture_output = True , text = True )
140- return result .returncode == 0
141-
159+ """Block IPv4 or IPv6 using iptables / ip6tables"""
160+
161+ try :
162+ parsed = ipaddress .ip_network (ip , strict = False )
163+
164+ # IPv6
165+ if isinstance (parsed , ipaddress .IPv6Network ):
166+ cmd = ['sudo' , 'ip6tables' , '-A' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
167+
168+ # IPv4
169+ else :
170+ cmd = ['sudo' , 'iptables' , '-A' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
171+
172+ result = subprocess .run (cmd , check = True , capture_output = True , text = True )
173+ return result .returncode == 0
174+
175+ except Exception as e :
176+ self .logger .error (f"Linux block error for { ip } : { e } " )
177+ return False
178+
142179 def _unblock_ip_linux (self , ip : str ) -> bool :
143- """Unblock IP using iptables on Linux"""
144- cmd = ['sudo' , 'iptables' , '-D' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
145- result = subprocess .run (cmd , capture_output = True , text = True )
146- return result .returncode == 0
147-
180+ """Unblock IPv4 or IPv6"""
181+
182+ try :
183+ parsed = ipaddress .ip_network (ip , strict = False )
184+
185+ # IPv6
186+ if isinstance (parsed , ipaddress .IPv6Network ):
187+ cmd = ['sudo' , 'ip6tables' , '-D' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
188+ # IPv4
189+ else :
190+ cmd = ['sudo' , 'iptables' , '-D' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
191+
192+ result = subprocess .run (cmd , capture_output = True , text = True )
193+ return result .returncode == 0
194+
195+ except Exception as e :
196+ self .logger .error (f"Linux unblock error for { ip } : { e } " )
197+ return False
198+
199+
148200 def _init_macos_firewall (self ):
149- """Initialize macOS firewall (pfctl) - create table and rules"""
150201 try :
151- # Check if pfctl is available
152202 pfctl_path = shutil .which ('pfctl' )
153203 if not pfctl_path :
154204 self .logger .warning ("pfctl not found." )
@@ -157,7 +207,6 @@ def _init_macos_firewall(self):
157207 cmd = ['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'show' ]
158208 result = subprocess .run (cmd , capture_output = True , text = True )
159209 if result .returncode != 0 :
160- # if doesn't exist, create dummy
161210 subprocess .run (['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'add' , '127.0.0.1' ],
162211 capture_output = True , text = True )
163212 subprocess .run (['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'delete' , '127.0.0.1' ],
@@ -166,7 +215,6 @@ def _init_macos_firewall(self):
166215 cmd = ['sudo' , 'pfctl' , '-s' , 'info' ]
167216 result = subprocess .run (cmd , capture_output = True , text = True )
168217 if 'Status: Enabled' not in result .stdout :
169- # Try to enable pfctl (may require user interaction)
170218 self .logger .info ("Attempting to enable pfctl..." )
171219 subprocess .run (['sudo' , 'pfctl' , '-e' ], capture_output = True , text = True )
172220
@@ -177,21 +225,12 @@ def _init_macos_firewall(self):
177225 self .logger .error (f"Failed to initialize macOS firewall: { e } " )
178226
179227 def _reload_macos_rules (self ):
180- """Reload pfctl rules to ensure blocking rule is active"""
181228 try :
182-
183229 pf_conf_content = """# Simple Firewall - Dynamic IP Blocking Rules
184230# This file is managed by Simple Firewall
185- # DO NOT EDIT MANUALLY
186-
187- # Table for blocked IPs
188231table <blocked_ips> persist
189-
190- # Block all traffic from IPs in the blocked_ips table
191232block drop in quick from <blocked_ips> to any
192233block drop out quick from any to <blocked_ips>
193-
194- # Allow all other traffic (pass through)
195234pass in all
196235pass out all
197236"""
@@ -202,60 +241,36 @@ def _reload_macos_rules(self):
202241 f .write (pf_conf_content )
203242
204243 cmd = ['sudo' , 'pfctl' , '-f' , str (pf_conf_path )]
205- result = subprocess .run (cmd , capture_output = True , text = True )
206-
207- if result .returncode == 0 :
208- self .logger .debug ("pfctl rules loaded successfully" )
209- else :
210- self .logger .warning (f"pfctl rule loading returned: { result .returncode } " )
211- self .logger .debug (f"pfctl stderr: { result .stderr } " )
244+ subprocess .run (cmd , capture_output = True , text = True )
212245
213246 except Exception as e :
214247 self .logger .error (f"Failed to reload macOS firewall rules: { e } " )
215248
216249 def _block_ip_macos (self , ip : str ) -> bool :
217- """Block IP using pfctl on macOS"""
218250 try :
219- # First, add IP to a table
220251 cmd = ['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'add' , ip ]
221252 result = subprocess .run (cmd , capture_output = True , text = True )
222253
223254 if result .returncode == 0 :
224- self .logger .debug (f"Successfully added { ip } to blocked_ips table" )
225255 return True
226256 else :
227-
228- if 'already' in result .stderr .lower () or 'duplicate' in result .stderr .lower ():
229- self .logger .debug (f"IP { ip } already in blocked_ips table" )
257+ if 'already' in result .stderr .lower ():
230258 return True
231- else :
232- self .logger .error (f"Failed to add { ip } to blocked_ips table: { result .stderr } " )
233- return False
259+ return False
234260 except Exception as e :
235- self .logger .error (f"Exception while blocking IP { ip } on macOS: { e } " )
261+ self .logger .error (f"Exception while blocking IP macOS: { e } " )
236262 return False
237263
238264 def _unblock_ip_macos (self , ip : str ) -> bool :
239- """Unblock IP using pfctl on macOS"""
240265 try :
241266 cmd = ['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'delete' , ip ]
242267 result = subprocess .run (cmd , capture_output = True , text = True )
243- if result .returncode == 0 :
244- self .logger .debug (f"Successfully removed { ip } from blocked_ips table" )
245- return True
246- else :
247- if 'not found' in result .stderr .lower () or 'does not exist' in result .stderr .lower ():
248- self .logger .debug (f"IP { ip } not found in blocked_ips table (may have been already removed)" )
249- return True
250- else :
251- self .logger .warning (f"Failed to remove { ip } from blocked_ips table: { result .stderr } " )
252- return False
268+ return result .returncode == 0 or "not found" in result .stderr .lower ()
253269 except Exception as e :
254- self .logger .error (f"Exception while unblocking IP { ip } on macOS: { e } " )
270+ self .logger .error (f"Exception while unblocking IP macOS: { e } " )
255271 return False
256-
272+
257273 def _block_ip_windows (self , ip : str ) -> bool :
258- """Block IP using Windows Firewall (netsh)"""
259274 rule_name = f"SimpleFirewall_Block_{ ip .replace ('.' , '_' )} "
260275 cmd = [
261276 'netsh' , 'advfirewall' , 'firewall' , 'add' , 'rule' ,
@@ -266,68 +281,48 @@ def _block_ip_windows(self, ip: str) -> bool:
266281 ]
267282 try :
268283 result = subprocess .run (cmd , capture_output = True , text = True )
269- if result .returncode == 0 :
270- self .logger .debug (f"netsh add rule stdout: { result .stdout .strip ()} " )
271- return True
272- else :
273- self .logger .error (f"netsh add rule failed: rc={ result .returncode } stdout={ result .stdout .strip ()} stderr={ result .stderr .strip ()} " )
274- return False
284+ return result .returncode == 0
275285 except Exception as e :
276- self .logger .error (f"Exception when running netsh add rule: { e } " )
286+ self .logger .error (f"Exception running netsh add rule: { e } " )
277287 return False
278288
279289 def _unblock_ip_windows (self , ip : str ) -> bool :
280- """Unblock IP using Windows Firewall (netsh)"""
281290 rule_name = f"SimpleFirewall_Block_{ ip .replace ('.' , '_' )} "
282291 cmd = [
283292 'netsh' , 'advfirewall' , 'firewall' , 'delete' , 'rule' ,
284293 f'name={ rule_name } '
285294 ]
286295 try :
287296 result = subprocess .run (cmd , capture_output = True , text = True )
288- if result .returncode == 0 :
289- self .logger .debug (f"netsh delete rule stdout: { result .stdout .strip ()} " )
290- return True
291- else :
292- # Sometimes netsh returns 1 when rule not found; log and return False
293- self .logger .error (f"netsh delete rule failed: rc={ result .returncode } stdout={ result .stdout .strip ()} stderr={ result .stderr .strip ()} " )
294- return False
297+ return result .returncode == 0
295298 except Exception as e :
296- self .logger .error (f"Exception when running netsh delete rule: { e } " )
299+ self .logger .error (f"Exception running netsh delete rule: { e } " )
297300 return False
298301
299-
302+
300303 def get_blocked_ips (self ) -> Dict [str , str ]:
301- """Get currently blocked IPs with their block times"""
302304 with self .lock :
303- return {
304- ip : block_time .isoformat ()
305- for ip , block_time in self .blocked_ips .items ()
306- }
305+ return {ip : block_time .isoformat () for ip , block_time in self .blocked_ips .items ()}
307306
308307 def cleanup_all_blocks (self ) -> List [str ]:
309- """Remove all blocks (useful for shutdown)"""
310308 cleaned_ips = []
311309
312310 with self .lock :
313311 for ip in list (self .blocked_ips .keys ()):
314312 if self .unblock_ip (ip ):
315313 cleaned_ips .append (ip )
316314
317- # macOS-specific: Clean up the entire table on shutdown
315+
318316 if self .platform == 'darwin' and cleaned_ips :
319317 try :
320- cmd = ['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'flush' ]
321- result = subprocess .run (cmd , capture_output = True , text = True )
322- if result .returncode == 0 :
323- self .logger .info ("Cleaned up all blocked IPs from pfctl table" )
318+ subprocess .run (['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'flush' ],
319+ capture_output = True , text = True )
324320 except Exception as e :
325321 self .logger .warning (f"Failed to flush pfctl table: { e } " )
326322
327323 return cleaned_ips
328324
329325 def get_stats (self ) -> Dict [str , int ]:
330- """Get blocking statistics"""
331326 with self .lock :
332327 return {
333328 'currently_blocked' : len (self .blocked_ips ),
0 commit comments