1+ """
2+ Host Security Discovery API Routes
3+ Provides endpoints for discovering security infrastructure on hosts
4+ """
5+ import logging
6+ from typing import Dict , Any , List
7+ from uuid import UUID
8+
9+ from fastapi import APIRouter , Depends , HTTPException , status
10+ from sqlalchemy .orm import Session
11+ from pydantic import BaseModel
12+
13+ from ..database import get_db , Host
14+ from ..auth import get_current_user
15+ from ..services .host_security_discovery import HostSecurityDiscoveryService
16+ from ..rbac import check_permission
17+
18+ logger = logging .getLogger (__name__ )
19+
20+ router = APIRouter (prefix = "/host-security-discovery" , tags = ["Host Security Discovery" ])
21+
22+
23+ class SecurityDiscoveryResponse (BaseModel ):
24+ package_managers : Dict [str , Any ]
25+ service_manager : str
26+ selinux_status : Any
27+ apparmor_status : Any
28+ firewall_services : Dict [str , Any ]
29+ security_tools : List [str ]
30+ discovery_timestamp : str
31+ discovery_success : bool
32+ discovery_errors : List [str ]
33+
34+
35+ class BulkSecurityDiscoveryRequest (BaseModel ):
36+ host_ids : List [str ]
37+
38+
39+ class BulkSecurityDiscoveryResponse (BaseModel ):
40+ total_hosts : int
41+ successful_discoveries : int
42+ failed_discoveries : int
43+ results : Dict [str , SecurityDiscoveryResponse ]
44+ errors : Dict [str , str ]
45+
46+
47+ @router .post ("/hosts/{host_id}/security-discovery" , response_model = SecurityDiscoveryResponse )
48+ async def discover_host_security_infrastructure (
49+ host_id : str ,
50+ current_user = Depends (get_current_user ),
51+ db : Session = Depends (get_db )
52+ ):
53+ """
54+ Discover security infrastructure and configurations on a specific host
55+
56+ Args:
57+ host_id: UUID of the host to discover security information for
58+
59+ Returns:
60+ SecurityDiscoveryResponse containing discovered security information
61+ """
62+ # Check permissions
63+ check_permission (current_user , "hosts:read" )
64+
65+ try :
66+ # Convert string UUID to UUID object
67+ host_uuid = UUID (host_id )
68+
69+ # Get host from database
70+ host = db .query (Host ).filter (Host .id == host_uuid ).first ()
71+ if not host :
72+ raise HTTPException (
73+ status_code = status .HTTP_404_NOT_FOUND ,
74+ detail = f"Host with ID { host_id } not found"
75+ )
76+
77+ # Perform security discovery
78+ security_service = HostSecurityDiscoveryService ()
79+ discovery_results = security_service .discover_security_infrastructure (host )
80+
81+ # Convert datetime to string for JSON serialization
82+ discovery_results ['discovery_timestamp' ] = discovery_results ['discovery_timestamp' ].isoformat ()
83+
84+ logger .info (f"Security discovery completed for host { host .hostname } : "
85+ f"Found { len (discovery_results ['package_managers' ])} package managers, "
86+ f"SELinux: { discovery_results ['selinux_status' ]} , "
87+ f"AppArmor: { discovery_results ['apparmor_status' ]} " )
88+
89+ return SecurityDiscoveryResponse (** discovery_results )
90+
91+ except ValueError as e :
92+ raise HTTPException (
93+ status_code = status .HTTP_400_BAD_REQUEST ,
94+ detail = f"Invalid host ID format: { str (e )} "
95+ )
96+ except Exception as e :
97+ logger .error (f"Security discovery failed for host { host_id } : { str (e )} " )
98+ raise HTTPException (
99+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
100+ detail = f"Security discovery failed: { str (e )} "
101+ )
102+
103+
104+ @router .post ("/bulk-security-discovery" , response_model = BulkSecurityDiscoveryResponse )
105+ async def bulk_discover_security_infrastructure (
106+ request : BulkSecurityDiscoveryRequest ,
107+ current_user = Depends (get_current_user ),
108+ db : Session = Depends (get_db )
109+ ):
110+ """
111+ Discover security infrastructure for multiple hosts in bulk
112+
113+ Args:
114+ request: BulkSecurityDiscoveryRequest containing list of host IDs
115+
116+ Returns:
117+ BulkSecurityDiscoveryResponse with results for all hosts
118+ """
119+ # Check permissions
120+ check_permission (current_user , "hosts:read" )
121+
122+ if not request .host_ids :
123+ raise HTTPException (
124+ status_code = status .HTTP_400_BAD_REQUEST ,
125+ detail = "No host IDs provided"
126+ )
127+
128+ if len (request .host_ids ) > 50 : # Limit bulk operations
129+ raise HTTPException (
130+ status_code = status .HTTP_400_BAD_REQUEST ,
131+ detail = "Too many hosts requested. Maximum 50 hosts per bulk operation."
132+ )
133+
134+ logger .info (f"Starting bulk security discovery for { len (request .host_ids )} hosts" )
135+
136+ results = {}
137+ errors = {}
138+ successful_discoveries = 0
139+ failed_discoveries = 0
140+
141+ security_service = HostSecurityDiscoveryService ()
142+
143+ for host_id in request .host_ids :
144+ try :
145+ # Convert string UUID to UUID object
146+ host_uuid = UUID (host_id )
147+
148+ # Get host from database
149+ host = db .query (Host ).filter (Host .id == host_uuid ).first ()
150+ if not host :
151+ errors [host_id ] = f"Host with ID { host_id } not found"
152+ failed_discoveries += 1
153+ continue
154+
155+ # Perform security discovery
156+ discovery_results = security_service .discover_security_infrastructure (host )
157+
158+ # Convert datetime to string for JSON serialization
159+ discovery_results ['discovery_timestamp' ] = discovery_results ['discovery_timestamp' ].isoformat ()
160+
161+ results [host_id ] = SecurityDiscoveryResponse (** discovery_results )
162+
163+ if discovery_results ['discovery_success' ]:
164+ successful_discoveries += 1
165+ else :
166+ failed_discoveries += 1
167+
168+ except ValueError as e :
169+ errors [host_id ] = f"Invalid host ID format: { str (e )} "
170+ failed_discoveries += 1
171+ except Exception as e :
172+ logger .error (f"Security discovery failed for host { host_id } : { str (e )} " )
173+ errors [host_id ] = f"Security discovery failed: { str (e )} "
174+ failed_discoveries += 1
175+
176+ logger .info (f"Bulk security discovery completed: { successful_discoveries } successful, "
177+ f"{ failed_discoveries } failed out of { len (request .host_ids )} total hosts" )
178+
179+ return BulkSecurityDiscoveryResponse (
180+ total_hosts = len (request .host_ids ),
181+ successful_discoveries = successful_discoveries ,
182+ failed_discoveries = failed_discoveries ,
183+ results = results ,
184+ errors = errors
185+ )
186+
187+
188+ @router .get ("/hosts/{host_id}/security-summary" )
189+ async def get_host_security_summary (
190+ host_id : str ,
191+ current_user = Depends (get_current_user ),
192+ db : Session = Depends (get_db )
193+ ):
194+ """
195+ Get a quick security summary for a host without running full discovery
196+
197+ Args:
198+ host_id: UUID of the host
199+
200+ Returns:
201+ Security summary based on existing host data
202+ """
203+ # Check permissions
204+ check_permission (current_user , "hosts:read" )
205+
206+ try :
207+ # Convert string UUID to UUID object
208+ host_uuid = UUID (host_id )
209+
210+ # Get host from database
211+ host = db .query (Host ).filter (Host .id == host_uuid ).first ()
212+ if not host :
213+ raise HTTPException (
214+ status_code = status .HTTP_404_NOT_FOUND ,
215+ detail = f"Host with ID { host_id } not found"
216+ )
217+
218+ # Generate security summary based on existing host information
219+ summary = {
220+ "host_id" : str (host .id ),
221+ "hostname" : host .hostname ,
222+ "os_family" : host .os_family ,
223+ "os_version" : host .os_version ,
224+ "architecture" : host .architecture ,
225+ "last_os_detection" : host .last_os_detection .isoformat () if host .last_os_detection else None ,
226+ "auth_method" : host .auth_method ,
227+ "security_recommendations" : []
228+ }
229+
230+ # Add security recommendations based on OS family
231+ if host .os_family :
232+ if "rhel" in host .os_family .lower () or "centos" in host .os_family .lower () or "fedora" in host .os_family .lower ():
233+ summary ["security_recommendations" ].extend ([
234+ "Consider enabling SELinux if not already active" ,
235+ "Ensure firewalld is configured properly" ,
236+ "Keep system updated with dnf/yum"
237+ ])
238+ elif "ubuntu" in host .os_family .lower () or "debian" in host .os_family .lower ():
239+ summary ["security_recommendations" ].extend ([
240+ "Consider configuring AppArmor profiles" ,
241+ "Ensure UFW firewall is configured" ,
242+ "Keep system updated with apt"
243+ ])
244+ elif "suse" in host .os_family .lower ():
245+ summary ["security_recommendations" ].extend ([
246+ "Configure AppArmor or SELinux as appropriate" ,
247+ "Ensure firewall is configured" ,
248+ "Keep system updated with zypper"
249+ ])
250+
251+ return summary
252+
253+ except ValueError as e :
254+ raise HTTPException (
255+ status_code = status .HTTP_400_BAD_REQUEST ,
256+ detail = f"Invalid host ID format: { str (e )} "
257+ )
258+ except Exception as e :
259+ logger .error (f"Failed to get security summary for host { host_id } : { str (e )} " )
260+ raise HTTPException (
261+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
262+ detail = f"Failed to get security summary: { str (e )} "
263+ )
0 commit comments