Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .routes import auth, hosts, scans, content, scap_content, monitoring, users, audit, host_groups, scan_templates, webhooks, mfa
from .routes.system_settings_unified import router as system_settings_router
from .routes import credentials, api_keys, remediation_callback, integration_metrics, bulk_operations, compliance, rule_scanning, capabilities
from .routes import host_discovery, host_security_discovery
# Import security routes only if available
try:
from .routes import automated_fixes
Expand Down Expand Up @@ -451,6 +452,8 @@ async def metrics():
# app.include_router(terminal.router, tags=["Terminal"]) # Terminal module not available
app.include_router(compliance.router, prefix="/api/compliance", tags=["Compliance Intelligence"])
app.include_router(rule_scanning.router, prefix="/api", tags=["Rule-Specific Scanning"])
app.include_router(host_discovery.router, prefix="/api", tags=["Host Discovery"])
app.include_router(host_security_discovery.router, prefix="/api", tags=["Host Security Discovery"])

# Register security routes if available
if automated_fixes:
Expand Down
263 changes: 263 additions & 0 deletions backend/app/routes/host_security_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""
Host Security Discovery API Routes
Provides endpoints for discovering security infrastructure on hosts
"""
import logging
from typing import Dict, Any, List
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel

from ..database import get_db, Host
from ..auth import get_current_user
from ..services.host_security_discovery import HostSecurityDiscoveryService
from ..rbac import check_permission

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/host-security-discovery", tags=["Host Security Discovery"])


class SecurityDiscoveryResponse(BaseModel):
package_managers: Dict[str, Any]
service_manager: str
selinux_status: Any
apparmor_status: Any
firewall_services: Dict[str, Any]
security_tools: List[str]
discovery_timestamp: str
discovery_success: bool
discovery_errors: List[str]


class BulkSecurityDiscoveryRequest(BaseModel):
host_ids: List[str]


class BulkSecurityDiscoveryResponse(BaseModel):
total_hosts: int
successful_discoveries: int
failed_discoveries: int
results: Dict[str, SecurityDiscoveryResponse]
errors: Dict[str, str]


@router.post("/hosts/{host_id}/security-discovery", response_model=SecurityDiscoveryResponse)
async def discover_host_security_infrastructure(
host_id: str,
current_user=Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Discover security infrastructure and configurations on a specific host

Args:
host_id: UUID of the host to discover security information for

Returns:
SecurityDiscoveryResponse containing discovered security information
"""
# Check permissions
check_permission(current_user, "hosts:read")

Check failure

Code scanning / CodeQL

Wrong number of arguments in a call

Call to [function check_permission](1) with too few arguments; should be no fewer than 3.

Copilot Autofix

AI 10 months ago

The check_permission function is being called on line 63 with only two arguments, but the function signature requires at least three (most likely: the user, the permission string, and a resource/context object). To fix this, the appropriate third argument that matches the permission check context should be provided. Based on standard RBAC patterns and the context of this route (acting on a particular host), the host object (which represents the resource to access) should be passed in as the third argument. The local variable host is already defined before line 63, after retrieving it from the database, so the fix is to pass host as the third argument.

Change line 63 in backend/app/routes/host_security_discovery.py from:

check_permission(current_user, "hosts:read")

to:

check_permission(current_user, "hosts:read", host)

This fix requires no further changes, as all arguments are now correctly supplied and the function's behaviour remains intact.


Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -60,7 +60,7 @@
         SecurityDiscoveryResponse containing discovered security information
     """
     # Check permissions
-    check_permission(current_user, "hosts:read")
+    check_permission(current_user, "hosts:read", host)
     
     try:
         # Convert string UUID to UUID object
EOF
@@ -60,7 +60,7 @@
SecurityDiscoveryResponse containing discovered security information
"""
# Check permissions
check_permission(current_user, "hosts:read")
check_permission(current_user, "hosts:read", host)

try:
# Convert string UUID to UUID object
Copilot is powered by AI and may make mistakes. Always verify output.

try:
# Convert string UUID to UUID object
host_uuid = UUID(host_id)

# Get host from database
host = db.query(Host).filter(Host.id == host_uuid).first()
if not host:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Host with ID {host_id} not found"
)

# Perform security discovery
security_service = HostSecurityDiscoveryService()
discovery_results = security_service.discover_security_infrastructure(host)

# Convert datetime to string for JSON serialization
discovery_results['discovery_timestamp'] = discovery_results['discovery_timestamp'].isoformat()

logger.info(f"Security discovery completed for host {host.hostname}: "
f"Found {len(discovery_results['package_managers'])} package managers, "
f"SELinux: {discovery_results['selinux_status']}, "
f"AppArmor: {discovery_results['apparmor_status']}")

return SecurityDiscoveryResponse(**discovery_results)

except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid host ID format: {str(e)}"
)
except Exception as e:
logger.error(f"Security discovery failed for host {host_id}: {str(e)}")

Check failure

Code scanning / CodeQL

Log Injection

This log entry depends on a [user-provided value](1).

Copilot Autofix

AI 10 months ago

To fix this issue, all user-supplied data logged in this file (specifically host_id) must be sanitized before being written to the logs. The best approach here is to strip or remove any newline characters (\r, \n) and carriage returns, which can be used for log entry injection. This can be done using .replace('\n', '').replace('\r', '') on the relevant string(s).

Specifically, in backend/app/routes/host_security_discovery.py, line 97 is:

logger.error(f"Security discovery failed for host {host_id}: {str(e)}")

You should update it so that host_id is sanitized prior to use in the log entry:

  • Just before line 97, assign a sanitized version:
    • sanitized_host_id = host_id.replace('\n', '').replace('\r', '')
    • Use this value in the log entry.

No additional methods or imports are required.


Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -94,7 +94,8 @@
             detail=f"Invalid host ID format: {str(e)}"
         )
     except Exception as e:
-        logger.error(f"Security discovery failed for host {host_id}: {str(e)}")
+        sanitized_host_id = host_id.replace('\n', '').replace('\r', '')
+        logger.error(f"Security discovery failed for host {sanitized_host_id}: {str(e)}")
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
             detail=f"Security discovery failed: {str(e)}"
EOF
@@ -94,7 +94,8 @@
detail=f"Invalid host ID format: {str(e)}"
)
except Exception as e:
logger.error(f"Security discovery failed for host {host_id}: {str(e)}")
sanitized_host_id = host_id.replace('\n', '').replace('\r', '')
logger.error(f"Security discovery failed for host {sanitized_host_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Security discovery failed: {str(e)}"
Copilot is powered by AI and may make mistakes. Always verify output.
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Security discovery failed: {str(e)}"
)


@router.post("/bulk-security-discovery", response_model=BulkSecurityDiscoveryResponse)
async def bulk_discover_security_infrastructure(
request: BulkSecurityDiscoveryRequest,
current_user=Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Discover security infrastructure for multiple hosts in bulk

Args:
request: BulkSecurityDiscoveryRequest containing list of host IDs

Returns:
BulkSecurityDiscoveryResponse with results for all hosts
"""
# Check permissions
check_permission(current_user, "hosts:read")

Check failure

Code scanning / CodeQL

Wrong number of arguments in a call

Call to [function check_permission](1) with too few arguments; should be no fewer than 3.

Copilot Autofix

AI 10 months ago

To fix the problem, we need to supply the correct number of arguments to the check_permission function on line 120 of backend/app/routes/host_security_discovery.py. Specifically, we must determine what the third required parameter to check_permission should be. Often, RBAC functions take as parameters: the user, the permission string, and some subject (such as an object, resource, or context) being checked.

Since this is an endpoint authorizing bulk discovery across multiple hosts (as given by the request object), the third argument could plausibly be the db session (for context), or even the Host object, or a list of resources. However, since we are at the bulk level, and both current_user and db are available, it is most likely that db is expected as the third parameter. We'll add db as the third argument to the function call unless the signature is known to require something else.

Edit the code at line 120 so the function call is: check_permission(current_user, "hosts:read", db). No import or method changes are required.


Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -117,7 +117,7 @@
         BulkSecurityDiscoveryResponse with results for all hosts
     """
     # Check permissions
-    check_permission(current_user, "hosts:read")
+    check_permission(current_user, "hosts:read", db)
     
     if not request.host_ids:
         raise HTTPException(
EOF
@@ -117,7 +117,7 @@
BulkSecurityDiscoveryResponse with results for all hosts
"""
# Check permissions
check_permission(current_user, "hosts:read")
check_permission(current_user, "hosts:read", db)

if not request.host_ids:
raise HTTPException(
Copilot is powered by AI and may make mistakes. Always verify output.

if not request.host_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No host IDs provided"
)

if len(request.host_ids) > 50: # Limit bulk operations
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Too many hosts requested. Maximum 50 hosts per bulk operation."
)

logger.info(f"Starting bulk security discovery for {len(request.host_ids)} hosts")

results = {}
errors = {}
successful_discoveries = 0
failed_discoveries = 0

security_service = HostSecurityDiscoveryService()

for host_id in request.host_ids:
try:
# Convert string UUID to UUID object
host_uuid = UUID(host_id)

# Get host from database
host = db.query(Host).filter(Host.id == host_uuid).first()
if not host:
errors[host_id] = f"Host with ID {host_id} not found"
failed_discoveries += 1
continue

# Perform security discovery
discovery_results = security_service.discover_security_infrastructure(host)

# Convert datetime to string for JSON serialization
discovery_results['discovery_timestamp'] = discovery_results['discovery_timestamp'].isoformat()

results[host_id] = SecurityDiscoveryResponse(**discovery_results)

if discovery_results['discovery_success']:
successful_discoveries += 1
else:
failed_discoveries += 1

except ValueError as e:
errors[host_id] = f"Invalid host ID format: {str(e)}"
failed_discoveries += 1
except Exception as e:
logger.error(f"Security discovery failed for host {host_id}: {str(e)}")

Check failure

Code scanning / CodeQL

Log Injection

This log entry depends on a [user-provided value](1).

Copilot Autofix

AI 10 months ago

To fix this log injection vulnerability, any potentially user-controlled string (here, host_id) should be sanitized before being interpolated into a log entry. Specifically, newline characters should be removed or replaced, to prevent log forging attacks. The best way is to replace occurrences of both \r and \n in host_id with empty strings (effectively removing any newlines). This should be done immediately before logging. Only lines where host_id is logged as part of an error should be modified; regular UUID processing does not require further changes unless it too logs unsanitized values. No new dependencies are required; Python str.replace is sufficient.

Edit the logging statement on line 172 to use a sanitized version of host_id. Optionally, sanitize the logging on line 169 as well if desired (because it too includes user input in errors, though not as a log line but in the errors dict; it's not strictly a log write, but for completeness same technique can be applied).


Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -169,7 +169,8 @@
             errors[host_id] = f"Invalid host ID format: {str(e)}"
             failed_discoveries += 1
         except Exception as e:
-            logger.error(f"Security discovery failed for host {host_id}: {str(e)}")
+            safe_host_id = host_id.replace('\r', '').replace('\n', '')
+            logger.error(f"Security discovery failed for host {safe_host_id}: {str(e)}")
             errors[host_id] = f"Security discovery failed: {str(e)}"
             failed_discoveries += 1
     
EOF
@@ -169,7 +169,8 @@
errors[host_id] = f"Invalid host ID format: {str(e)}"
failed_discoveries += 1
except Exception as e:
logger.error(f"Security discovery failed for host {host_id}: {str(e)}")
safe_host_id = host_id.replace('\r', '').replace('\n', '')
logger.error(f"Security discovery failed for host {safe_host_id}: {str(e)}")
errors[host_id] = f"Security discovery failed: {str(e)}"
failed_discoveries += 1

Copilot is powered by AI and may make mistakes. Always verify output.
errors[host_id] = f"Security discovery failed: {str(e)}"
failed_discoveries += 1

logger.info(f"Bulk security discovery completed: {successful_discoveries} successful, "
f"{failed_discoveries} failed out of {len(request.host_ids)} total hosts")

return BulkSecurityDiscoveryResponse(
total_hosts=len(request.host_ids),
successful_discoveries=successful_discoveries,
failed_discoveries=failed_discoveries,
results=results,
errors=errors
)


@router.get("/hosts/{host_id}/security-summary")
async def get_host_security_summary(
host_id: str,
current_user=Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get a quick security summary for a host without running full discovery

Args:
host_id: UUID of the host

Returns:
Security summary based on existing host data
"""
# Check permissions
check_permission(current_user, "hosts:read")

Check failure

Code scanning / CodeQL

Wrong number of arguments in a call

Call to [function check_permission](1) with too few arguments; should be no fewer than 3.

Copilot Autofix

AI 10 months ago

To fix the problem, supply the required third argument to check_permission in the get_host_security_summary function. Since there is a pre-existing database session object named db, and RBAC permission checks very often require this parameter for database queries or context, it is logical to pass db as the third argument.

Edit the call at line 204 in backend/app/routes/host_security_discovery.py within the get_host_security_summary endpoint:

  • Change check_permission(current_user, "hosts:read") to check_permission(current_user, "hosts:read", db).

No further changes, imports, or method definitions are necessary, as db is already being provided to the function as a dependency.


Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -201,7 +201,7 @@
         Security summary based on existing host data
     """
     # Check permissions
-    check_permission(current_user, "hosts:read")
+    check_permission(current_user, "hosts:read", db)
     
     try:
         # Convert string UUID to UUID object
EOF
@@ -201,7 +201,7 @@
Security summary based on existing host data
"""
# Check permissions
check_permission(current_user, "hosts:read")
check_permission(current_user, "hosts:read", db)

try:
# Convert string UUID to UUID object
Copilot is powered by AI and may make mistakes. Always verify output.

try:
# Convert string UUID to UUID object
host_uuid = UUID(host_id)

# Get host from database
host = db.query(Host).filter(Host.id == host_uuid).first()
if not host:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Host with ID {host_id} not found"
)

# Generate security summary based on existing host information
summary = {
"host_id": str(host.id),
"hostname": host.hostname,
"os_family": host.os_family,
"os_version": host.os_version,
"architecture": host.architecture,
"last_os_detection": host.last_os_detection.isoformat() if host.last_os_detection else None,
"auth_method": host.auth_method,
"security_recommendations": []
}

# Add security recommendations based on OS family
if host.os_family:
if "rhel" in host.os_family.lower() or "centos" in host.os_family.lower() or "fedora" in host.os_family.lower():
summary["security_recommendations"].extend([
"Consider enabling SELinux if not already active",
"Ensure firewalld is configured properly",
"Keep system updated with dnf/yum"
])
elif "ubuntu" in host.os_family.lower() or "debian" in host.os_family.lower():
summary["security_recommendations"].extend([
"Consider configuring AppArmor profiles",
"Ensure UFW firewall is configured",
"Keep system updated with apt"
])
elif "suse" in host.os_family.lower():
summary["security_recommendations"].extend([
"Configure AppArmor or SELinux as appropriate",
"Ensure firewall is configured",
"Keep system updated with zypper"
])

return summary

except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid host ID format: {str(e)}"
)
except Exception as e:
logger.error(f"Failed to get security summary for host {host_id}: {str(e)}")

Check failure

Code scanning / CodeQL

Log Injection

This log entry depends on a [user-provided value](1).

Copilot Autofix

AI 10 months ago

To remediate this, we should sanitize the host_id variable before including it in log entries. Specifically, we should strip out all line breaks (\n, \r) to prevent users from injecting new log entries. The recommended approach is to define a helper function inside the file (since we don’t see any external sanitizer imported) that removes problematic characters from user input. Whenever logging host_id, pass it through this sanitizer first.

Changes to make in backend/app/routes/host_security_discovery.py:

  • Add a simple sanitizer function (e.g., sanitize_for_log) above the route definition.
  • Update the logger.error line on 259 to use the sanitized version of host_id (do not log the raw user input).
Suggested changeset 1
backend/app/routes/host_security_discovery.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/backend/app/routes/host_security_discovery.py b/backend/app/routes/host_security_discovery.py
--- a/backend/app/routes/host_security_discovery.py
+++ b/backend/app/routes/host_security_discovery.py
@@ -17,6 +17,11 @@
 
 logger = logging.getLogger(__name__)
 
+def sanitize_for_log(value: str) -> str:
+    if value is None:
+        return ''
+    # Remove carriage returns and line feeds
+    return value.replace('\r\n', '').replace('\n', '').replace('\r', '')
 router = APIRouter(prefix="/host-security-discovery", tags=["Host Security Discovery"])
 
 
@@ -256,7 +261,7 @@
             detail=f"Invalid host ID format: {str(e)}"
         )
     except Exception as e:
-        logger.error(f"Failed to get security summary for host {host_id}: {str(e)}")
+        logger.error(f"Failed to get security summary for host {sanitize_for_log(host_id)}: {str(e)}")
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
             detail=f"Failed to get security summary: {str(e)}"
EOF
@@ -17,6 +17,11 @@

logger = logging.getLogger(__name__)

def sanitize_for_log(value: str) -> str:
if value is None:
return ''
# Remove carriage returns and line feeds
return value.replace('\r\n', '').replace('\n', '').replace('\r', '')
router = APIRouter(prefix="/host-security-discovery", tags=["Host Security Discovery"])


@@ -256,7 +261,7 @@
detail=f"Invalid host ID format: {str(e)}"
)
except Exception as e:
logger.error(f"Failed to get security summary for host {host_id}: {str(e)}")
logger.error(f"Failed to get security summary for host {sanitize_for_log(host_id)}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get security summary: {str(e)}"
Copilot is powered by AI and may make mistakes. Always verify output.
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get security summary: {str(e)}"
)
Loading
Loading