diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 76a77d1..1a04875 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -4,13 +4,9 @@ on: push: paths: - '**.py' - branches: - - main pull_request: paths: - '**.py' - branches: - - main jobs: analyze: diff --git a/functions/hello/main.py b/functions/hello/main.py index d190134..0c42b8c 100644 --- a/functions/hello/main.py +++ b/functions/hello/main.py @@ -1,11 +1,21 @@ +"""Main module for the hello function handler.""" + from crowdstrike.foundry.function import Function, Request, Response, APIError -func = Function.instance() +FUNC = Function.instance() -# Handler hello -@func.handler(method="POST", path="/hello") +@FUNC.handler(method="POST", path="/hello") def on_post(request: Request) -> Response: + """ + Handle POST requests to /hello endpoint. + + Args: + request: The incoming request object containing the request body. + + Returns: + Response: JSON response with greeting or error message. + """ # # Replace the following example code with your handler code # @@ -29,4 +39,4 @@ def on_post(request: Request) -> Response: if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/hello/test_main.py b/functions/hello/test_main.py index 8e9dc50..9dc2a5b 100644 --- a/functions/hello/test_main.py +++ b/functions/hello/test_main.py @@ -1,10 +1,17 @@ +"""Test module for the hello function handler.""" + +import importlib import unittest from unittest.mock import patch from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -12,31 +19,32 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) def test_on_post_success(self): - from main import on_post + """Test successful POST request with valid name in body.""" request = Request() request.body = { "name": "Test User" } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 200) self.assertEqual(response.body["greeting"], "Hello Test User! It is nice to see you.") def test_on_post_missing_name(self): - from main import on_post + """Test POST request with missing name in body returns error.""" request = Request() - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) self.assertEqual(response.errors[0].message, "missing name from request body") diff --git a/functions/host-details/main.py b/functions/host-details/main.py index 9e77a69..32a5da7 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -1,12 +1,23 @@ +"""Main module for the host-details function handler.""" + from crowdstrike.foundry.function import Function, Request, Response, APIError # Import service collection you'd like to use from falconpy import Hosts -func = Function.instance() +FUNC = Function.instance() -@func.handler(method="POST", path="/host-details") +@FUNC.handler(method="POST", path="/host-details") def on_post(request: Request) -> Response: + """ + Handle POST requests to /host-details endpoint. + + Args: + request: The incoming request object containing the request body. + + Returns: + Response: JSON response with host details or error message. + """ # Validate request if "host_id" not in request.body: return Response( @@ -37,4 +48,4 @@ def on_post(request: Request) -> Response: if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/host-details/test_main.py b/functions/host-details/test_main.py index 818f756..32cd550 100644 --- a/functions/host-details/test_main.py +++ b/functions/host-details/test_main.py @@ -1,10 +1,17 @@ +"""Test module for the host-details function handler.""" + +import importlib import unittest from unittest.mock import patch, MagicMock from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -12,19 +19,19 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) @patch("main.Hosts") def test_on_post_success(self, mock_hosts_class): - from main import on_post - + """Test successful POST request with valid host_id in body.""" # Mock the Hosts instance and its response mock_hosts_instance = MagicMock() mock_hosts_class.return_value = mock_hosts_instance @@ -44,7 +51,7 @@ def test_on_post_success(self, mock_hosts_class): "host_id": "test-host-123" } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 200) self.assertIn("host_details", response.body) @@ -52,10 +59,10 @@ def test_on_post_success(self, mock_hosts_class): mock_hosts_instance.get_device_details.assert_called_once_with(ids="test-host-123") def test_on_post_missing_host_id(self): - from main import on_post + """Test POST request with missing host_id in body returns error.""" request = Request() - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) @@ -63,8 +70,7 @@ def test_on_post_missing_host_id(self): @patch("main.Hosts") def test_on_post_api_error(self, mock_hosts_class): - from main import on_post - + """Test POST request when API returns an error.""" # Mock the Hosts instance to return an error mock_hosts_instance = MagicMock() mock_hosts_class.return_value = mock_hosts_instance @@ -78,7 +84,7 @@ def test_on_post_api_error(self, mock_hosts_class): "host_id": "nonexistent-host" } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 404) self.assertEqual(len(response.errors), 1) diff --git a/functions/host-info/main.py b/functions/host-info/main.py index 2e29aae..88e8488 100755 --- a/functions/host-info/main.py +++ b/functions/host-info/main.py @@ -1,15 +1,28 @@ +"""Main module for the host-info function handler.""" + from logging import Logger -from typing import Dict +from typing import Dict, Optional from crowdstrike.foundry.function import Function, Request, Response from utils import validate_host_id, format_error_response -func = Function.instance() +FUNC = Function.instance() + + +@FUNC.handler(method="POST", path="/host-info") +def on_post(request: Request, _config: Optional[Dict[str, object]], logger: Logger) -> Response: + """ + Handle POST requests to /host-info endpoint. + Args: + request: The incoming request object containing the request body. + _config: Configuration dictionary (unused). + logger: Logger instance for logging. -@func.handler(method="POST", path="/host-info") -def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: + Returns: + Response: JSON response with host info or error message. + """ host_id = request.body.get("host_id") logger.info(f"Host ID: {host_id}") @@ -27,4 +40,4 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/host-info/test_main.py b/functions/host-info/test_main.py index 0c7e564..c85b38d 100644 --- a/functions/host-info/test_main.py +++ b/functions/host-info/test_main.py @@ -1,10 +1,17 @@ +"""Test module for the host-info function handler.""" + +import importlib import unittest from unittest.mock import patch, MagicMock from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -12,20 +19,20 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) @patch("main.validate_host_id") @patch("main.format_error_response") def test_on_post_success(self, mock_format_error, mock_validate_host_id): - from main import on_post - + """Test successful POST request with valid host_id in body.""" # Mock validation to return True for valid host ID mock_validate_host_id.return_value = True @@ -37,7 +44,7 @@ def test_on_post_success(self, mock_format_error, mock_validate_host_id): "host_id": "valid-host-123" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["host"], "valid-host-123") @@ -57,8 +64,7 @@ def test_on_post_success(self, mock_format_error, mock_validate_host_id): @patch("main.validate_host_id") @patch("main.format_error_response") def test_on_post_invalid_host_id(self, mock_format_error, mock_validate_host_id): - from main import on_post - + """Test POST request with invalid host_id returns error.""" # Mock validation to return False for invalid host ID mock_validate_host_id.return_value = False @@ -73,7 +79,7 @@ def test_on_post_invalid_host_id(self, mock_format_error, mock_validate_host_id) "host_id": "invalid-host" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response (default code is likely 400) self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) @@ -93,8 +99,7 @@ def test_on_post_invalid_host_id(self, mock_format_error, mock_validate_host_id) @patch("main.validate_host_id") @patch("main.format_error_response") def test_on_post_missing_host_id(self, mock_format_error, mock_validate_host_id): - from main import on_post - + """Test POST request with missing host_id returns error.""" # Mock validation to return False for None host ID mock_validate_host_id.return_value = False @@ -107,7 +112,7 @@ def test_on_post_missing_host_id(self, mock_format_error, mock_validate_host_id) request = Request() request.body = {} # No host_id provided - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) @@ -127,8 +132,7 @@ def test_on_post_missing_host_id(self, mock_format_error, mock_validate_host_id) @patch("main.validate_host_id") @patch("main.format_error_response") def test_on_post_empty_host_id(self, mock_format_error, mock_validate_host_id): - from main import on_post - + """Test POST request with empty host_id returns error.""" # Mock validation to return False for empty string mock_validate_host_id.return_value = False @@ -143,7 +147,7 @@ def test_on_post_empty_host_id(self, mock_format_error, mock_validate_host_id): "host_id": "" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) @@ -162,9 +166,8 @@ def test_on_post_empty_host_id(self, mock_format_error, mock_validate_host_id): @patch("main.validate_host_id") @patch("main.format_error_response") - def test_on_post_with_config(self, mock_format_error, mock_validate_host_id): - from main import on_post - + def test_on_post_with_config(self, _mock_format_error, mock_validate_host_id): + """Test POST request with config parameter.""" # Mock validation to return True mock_validate_host_id.return_value = True @@ -179,7 +182,7 @@ def test_on_post_with_config(self, mock_format_error, mock_validate_host_id): "host_id": "test-host-456" } - response = on_post(request, config=config, logger=mock_logger) + response = main.on_post(request, _config=config, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["host"], "test-host-456") diff --git a/functions/host-info/utils.py b/functions/host-info/utils.py index 3c43144..75765e8 100644 --- a/functions/host-info/utils.py +++ b/functions/host-info/utils.py @@ -1,3 +1,5 @@ +"""Utility functions for host-info function handler.""" + import json import re from typing import Dict, Any, Optional diff --git a/functions/log-event/main.py b/functions/log-event/main.py index 1037749..f73924b 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -1,3 +1,5 @@ +"""Main module for the log-event function handler.""" + import os import time import uuid @@ -5,11 +7,20 @@ from crowdstrike.foundry.function import Function, Request, Response, APIError from falconpy import APIHarnessV2 -func = Function.instance() +FUNC = Function.instance() -@func.handler(method="POST", path="/log-event") +@FUNC.handler(method="POST", path="/log-event") def on_post(request: Request) -> Response: + """ + Handle POST requests to /log-event endpoint. + + Args: + request: The incoming request object containing the request body. + + Returns: + Response: JSON response with event storage result or error message. + """ # Validate request if "event_data" not in request.body: return Response( @@ -23,7 +34,7 @@ def on_post(request: Request) -> Response: # Store data in a collection # This assumes you've already created a collection named "event_logs" event_id = str(uuid.uuid4()) - json = { + json_data = { "event_id": event_id, "data": event_data, "timestamp": int(time.time()) @@ -40,7 +51,7 @@ def on_post(request: Request) -> Response: collection_name = "event_logs" response = api_client.command("PutObject", - body=json, + body=json_data, collection_name=collection_name, object_key=event_id, headers=headers @@ -71,7 +82,7 @@ def on_post(request: Request) -> Response: }, code=200 ) - except Exception as e: + except (ConnectionError, ValueError, KeyError) as e: return Response( code=500, errors=[APIError(code=500, message=f"Error saving collection: {str(e)}")] @@ -79,4 +90,4 @@ def on_post(request: Request) -> Response: if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/log-event/test_main.py b/functions/log-event/test_main.py index 781d3cb..967ad86 100644 --- a/functions/log-event/test_main.py +++ b/functions/log-event/test_main.py @@ -1,12 +1,17 @@ +"""Test module for the log-event function handler.""" + +import importlib import unittest from unittest.mock import patch, MagicMock -import uuid -import time from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -14,21 +19,21 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) @patch('main.APIHarnessV2') @patch('main.uuid.uuid4') @patch('main.time.time') def test_on_post_success(self, mock_time, mock_uuid, mock_api_harness_class): - from main import on_post - + """Test successful POST request with valid event_data in body.""" # Mock dependencies mock_uuid.return_value = MagicMock() mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") @@ -61,7 +66,7 @@ def test_on_post_success(self, mock_time, mock_uuid, mock_api_harness_class): "event_data": {"test": "data", "message": "test event"} } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 200) self.assertTrue(response.body["stored"]) @@ -87,10 +92,10 @@ def test_on_post_success(self, mock_time, mock_uuid, mock_api_harness_class): self.assertEqual(search_call[1]["collection_name"], "event_logs") def test_on_post_missing_event_data(self): - from main import on_post + """Test POST request with missing event_data returns error.""" request = Request() - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) @@ -100,8 +105,7 @@ def test_on_post_missing_event_data(self): @patch('main.uuid.uuid4') @patch('main.time.time') def test_on_post_put_object_error(self, mock_time, mock_uuid, mock_api_harness_class): - from main import on_post - + """Test POST request when PutObject API returns an error.""" # Mock dependencies mock_uuid.return_value = MagicMock() mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") @@ -120,7 +124,7 @@ def test_on_post_put_object_error(self, mock_time, mock_uuid, mock_api_harness_c "event_data": {"test": "data"} } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 500) self.assertEqual(len(response.errors), 1) @@ -130,22 +134,21 @@ def test_on_post_put_object_error(self, mock_time, mock_uuid, mock_api_harness_c @patch('main.uuid.uuid4') @patch('main.time.time') def test_on_post_exception_handling(self, mock_time, mock_uuid, mock_api_harness_class): - from main import on_post - + """Test POST request when an exception is raised.""" # Mock dependencies mock_uuid.return_value = MagicMock() mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") mock_time.return_value = 1690123456 # Mock APIHarnessV2 to raise an exception - mock_api_harness_class.side_effect = Exception("Connection failed") + mock_api_harness_class.side_effect = ConnectionError("Connection failed") request = Request() request.body = { "event_data": {"test": "data"} } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 500) self.assertEqual(len(response.errors), 1) @@ -156,8 +159,7 @@ def test_on_post_exception_handling(self, mock_time, mock_uuid, mock_api_harness @patch('main.uuid.uuid4') @patch('main.time.time') def test_on_post_with_app_id_header(self, mock_time, mock_uuid, mock_api_harness_class): - from main import on_post - + """Test POST request with APP_ID environment variable set.""" # Mock dependencies mock_uuid.return_value = MagicMock() mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") @@ -176,7 +178,7 @@ def test_on_post_with_app_id_header(self, mock_time, mock_uuid, mock_api_harness "event_data": {"test": "data"} } - response = on_post(request) + response = main.on_post(request) self.assertEqual(response.code, 200) diff --git a/functions/servicenow/main.py b/functions/servicenow/main.py index e03369d..58c6a6e 100644 --- a/functions/servicenow/main.py +++ b/functions/servicenow/main.py @@ -1,17 +1,27 @@ +"""Main module for the ServiceNow function handler.""" + from logging import Logger -from typing import Dict +from typing import Dict, Optional from crowdstrike.foundry.function import Function, Request, Response, APIError from falconpy import APIIntegrations -func = Function.instance() +FUNC = Function.instance() -@func.handler(method="POST", path="/ticket") -def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: +@FUNC.handler(method="POST", path="/ticket") +def on_post(request: Request, _config: Optional[Dict[str, object]], logger: Logger) -> Response: """ Create an incident ticket in ServiceNow using the Table API. + Args: + request: The incoming request object containing the request body. + _config: Configuration dictionary (unused). + logger: Logger instance for logging. + + Returns: + Response: JSON response with incident details or error message. + Required fields in request body: - title: Short description of the incident - description: Detailed description of the incident @@ -101,15 +111,15 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) }, code=201 if response["status_code"] == 200 else response["status_code"] ) - except ValueError as v: - # ValueError can be thrown if one of the fields accessed in the response does not exist + except (ValueError, KeyError) as v: + # ValueError/KeyError can be thrown if one of the fields accessed in the response does not exist logger.error(f"Error processing ServiceNow response: {str(v)}", exc_info=True) return Response( code=500, errors=[APIError(code=500, message=f"Error creating incident: {str(v)}")] ) - except Exception as e: - # Catch-all for unexpected errors + except (ConnectionError, TimeoutError) as e: + # Specific network-related errors logger.error(f"Error creating ServiceNow incident: {str(e)}", exc_info=True) return Response( code=500, @@ -118,4 +128,4 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/servicenow/test_main.py b/functions/servicenow/test_main.py index 339cab4..8c51ba4 100644 --- a/functions/servicenow/test_main.py +++ b/functions/servicenow/test_main.py @@ -1,10 +1,17 @@ +"""Test module for the ServiceNow function handler.""" + +import importlib import unittest from unittest.mock import patch, MagicMock from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -12,19 +19,19 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) @patch("main.APIIntegrations") def test_on_post_success_minimal_fields(self, mock_api_integrations_class): - from main import on_post - + """Test successful POST request with minimal required fields.""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -56,7 +63,7 @@ def test_on_post_success_minimal_fields(self, mock_api_integrations_class): "description": "This is a test incident description" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 201) self.assertEqual(response.body["incident_id"], "abc123def456") @@ -93,8 +100,7 @@ def test_on_post_success_minimal_fields(self, mock_api_integrations_class): @patch("main.APIIntegrations") def test_on_post_success_all_fields(self, mock_api_integrations_class): - from main import on_post - + """Test successful POST request with all optional fields.""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -132,7 +138,7 @@ def test_on_post_success_all_fields(self, mock_api_integrations_class): "caller_id": "jane.smith" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 201) self.assertEqual(response.body["incident_id"], "xyz789abc123") @@ -148,8 +154,7 @@ def test_on_post_success_all_fields(self, mock_api_integrations_class): self.assertEqual(payload["caller_id"], "jane.smith") def test_on_post_missing_title(self): - from main import on_post - + """Test POST request with missing title field returns error.""" # Create mock logger mock_logger = MagicMock() @@ -158,15 +163,14 @@ def test_on_post_missing_title(self): "description": "Missing title field" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) self.assertEqual(response.errors[0].message, "Missing required fields: title and description") def test_on_post_missing_description(self): - from main import on_post - + """Test POST request with missing description field returns error.""" # Create mock logger mock_logger = MagicMock() @@ -175,22 +179,21 @@ def test_on_post_missing_description(self): "title": "Missing description field" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) self.assertEqual(response.errors[0].message, "Missing required fields: title and description") def test_on_post_missing_both_required_fields(self): - from main import on_post - + """Test POST request with both required fields missing returns error.""" # Create mock logger mock_logger = MagicMock() request = Request() request.body = {} - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) @@ -198,8 +201,7 @@ def test_on_post_missing_both_required_fields(self): @patch("main.APIIntegrations") def test_on_post_servicenow_api_error(self, mock_api_integrations_class): - from main import on_post - + """Test POST request when ServiceNow API returns an error.""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -221,7 +223,7 @@ def test_on_post_servicenow_api_error(self, mock_api_integrations_class): "description": "Test description" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) @@ -229,8 +231,7 @@ def test_on_post_servicenow_api_error(self, mock_api_integrations_class): @patch("main.APIIntegrations") def test_on_post_servicenow_api_error_no_message(self, mock_api_integrations_class): - from main import on_post - + """Test POST request when ServiceNow API returns error without message.""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -249,7 +250,7 @@ def test_on_post_servicenow_api_error_no_message(self, mock_api_integrations_cla "description": "Test description" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 500) self.assertEqual(len(response.errors), 1) @@ -257,8 +258,7 @@ def test_on_post_servicenow_api_error_no_message(self, mock_api_integrations_cla @patch("main.APIIntegrations") def test_on_post_value_error_handling(self, mock_api_integrations_class): - from main import on_post - + """Test POST request when ValueError is raised during response processing.""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -281,7 +281,7 @@ def test_on_post_value_error_handling(self, mock_api_integrations_class): "description": "Test description" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 500) self.assertEqual(len(response.errors), 1) @@ -292,10 +292,9 @@ def test_on_post_value_error_handling(self, mock_api_integrations_class): @patch("main.APIIntegrations") def test_on_post_general_exception_handling(self, mock_api_integrations_class): - from main import on_post - + """Test POST request when general exception is raised.""" # Mock APIIntegrations to raise an exception - mock_api_integrations_class.side_effect = Exception("Connection failed") + mock_api_integrations_class.side_effect = ConnectionError("Connection failed") # Create mock logger mock_logger = MagicMock() @@ -306,7 +305,7 @@ def test_on_post_general_exception_handling(self, mock_api_integrations_class): "description": "Test description" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 500) self.assertEqual(len(response.errors), 1) @@ -317,8 +316,7 @@ def test_on_post_general_exception_handling(self, mock_api_integrations_class): @patch("main.APIIntegrations") def test_on_post_optional_fields_filtering(self, mock_api_integrations_class): - from main import on_post - + """Test POST request with optional fields filtering (None values excluded).""" # Mock APIIntegrations instance mock_api_instance = MagicMock() mock_api_integrations_class.return_value = mock_api_instance @@ -355,7 +353,7 @@ def test_on_post_optional_fields_filtering(self, mock_api_integrations_class): "caller_id": None # This should be filtered out } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 201) diff --git a/functions/user-management/main.py b/functions/user-management/main.py index 3c633d8..bfea6a7 100755 --- a/functions/user-management/main.py +++ b/functions/user-management/main.py @@ -1,15 +1,28 @@ +"""Main module for the user-management function handler.""" + from logging import Logger -from typing import Dict +from typing import Dict, Optional from crowdstrike.foundry.function import Function, Request, Response from utils import validate_email, format_error_response -func = Function.instance() +FUNC = Function.instance() + + +@FUNC.handler(method="POST", path="/create-user") +def on_post(request: Request, _config: Optional[Dict[str, object]], logger: Logger) -> Response: + """ + Handle POST requests to /create-user endpoint. + Args: + request: The incoming request object containing the request body. + _config: Configuration dictionary (unused). + logger: Logger instance for logging. -@func.handler(method="POST", path="/create-user") -def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: + Returns: + Response: JSON response with user info or error message. + """ email = request.body.get("email") logger.info(f"Email: {email}") @@ -27,4 +40,4 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) if __name__ == "__main__": - func.run() + FUNC.run() diff --git a/functions/user-management/test_main.py b/functions/user-management/test_main.py index e27e2ce..d1c6a18 100644 --- a/functions/user-management/test_main.py +++ b/functions/user-management/test_main.py @@ -1,10 +1,17 @@ +"""Test module for the user-management function handler.""" + +import importlib import unittest from unittest.mock import patch, MagicMock from crowdstrike.foundry.function import Request +import main + + +def mock_handler(*_args, **_kwargs): + """Mock handler decorator for testing.""" -def mock_handler(*args, **kwargs): def identity(func): return func @@ -12,20 +19,20 @@ def identity(func): class FnTestCase(unittest.TestCase): + """Test case class for function handler tests.""" + def setUp(self): + """Set up test fixtures before each test method.""" patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() - import importlib - import main importlib.reload(main) @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_success(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test successful POST request with valid email in body.""" # Mock validation to return True for valid email mock_validate_email.return_value = True @@ -37,7 +44,7 @@ def test_on_post_success(self, mock_format_error, mock_validate_email): "email": "user@example.com" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["email"], "user@example.com") @@ -57,8 +64,7 @@ def test_on_post_success(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_invalid_email(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test POST request with invalid email returns error.""" # Mock validation to return False for invalid email mock_validate_email.return_value = False @@ -73,7 +79,7 @@ def test_on_post_invalid_email(self, mock_format_error, mock_validate_email): "email": "invalid-email" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) @@ -93,8 +99,7 @@ def test_on_post_invalid_email(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_missing_email(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test POST request with missing email returns error.""" # Mock validation to return False for None email mock_validate_email.return_value = False @@ -107,7 +112,7 @@ def test_on_post_missing_email(self, mock_format_error, mock_validate_email): request = Request() request.body = {} # No email provided - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) @@ -127,8 +132,7 @@ def test_on_post_missing_email(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_empty_email(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test POST request with empty email returns error.""" # Mock validation to return False for empty string mock_validate_email.return_value = False @@ -143,7 +147,7 @@ def test_on_post_empty_email(self, mock_format_error, mock_validate_email): "email": "" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) @@ -163,8 +167,7 @@ def test_on_post_empty_email(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_valid_email_variations(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test POST request with various valid email formats.""" # Test various valid email formats valid_emails = [ "test@example.com", @@ -190,7 +193,7 @@ def test_on_post_valid_email_variations(self, mock_format_error, mock_validate_e "email": email } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["email"], email) @@ -204,9 +207,8 @@ def test_on_post_valid_email_variations(self, mock_format_error, mock_validate_e @patch("main.validate_email") @patch("main.format_error_response") - def test_on_post_invalid_email_variations(self, mock_format_error, mock_validate_email): - from main import on_post - + def test_on_post_invalid_email_variations(self, _mock_format_error, mock_validate_email): + """Test POST request with various invalid email formats.""" # Test various invalid email formats invalid_emails = [ "not-an-email", @@ -221,11 +223,11 @@ def test_on_post_invalid_email_variations(self, mock_format_error, mock_validate with self.subTest(email=email): # Reset mocks for each iteration mock_validate_email.reset_mock() - mock_format_error.reset_mock() + _mock_format_error.reset_mock() # Mock validation to return False mock_validate_email.return_value = False - mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + _mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] # Create mock logger mock_logger = MagicMock() @@ -235,7 +237,7 @@ def test_on_post_invalid_email_variations(self, mock_format_error, mock_validate "email": email } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) @@ -245,13 +247,12 @@ def test_on_post_invalid_email_variations(self, mock_format_error, mock_validate mock_validate_email.assert_called_with(email) # Verify error formatting was called - mock_format_error.assert_called_once_with("Invalid email format") + _mock_format_error.assert_called_once_with("Invalid email format") @patch("main.validate_email") @patch("main.format_error_response") - def test_on_post_with_config(self, mock_format_error, mock_validate_email): - from main import on_post - + def test_on_post_with_config(self, _mock_format_error, mock_validate_email): + """Test POST request with config parameter.""" # Mock validation to return True mock_validate_email.return_value = True @@ -266,7 +267,7 @@ def test_on_post_with_config(self, mock_format_error, mock_validate_email): "email": "newuser@company.com" } - response = on_post(request, config=config, logger=mock_logger) + response = main.on_post(request, _config=config, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["email"], "newuser@company.com") @@ -280,9 +281,8 @@ def test_on_post_with_config(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") - def test_on_post_case_sensitivity(self, mock_format_error, mock_validate_email): - from main import on_post - + def test_on_post_case_sensitivity(self, _mock_format_error, mock_validate_email): + """Test POST request with mixed case email.""" # Mock validation to return True mock_validate_email.return_value = True @@ -295,7 +295,7 @@ def test_on_post_case_sensitivity(self, mock_format_error, mock_validate_email): "email": "User.Name@EXAMPLE.COM" } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) self.assertEqual(response.code, 200) self.assertEqual(response.body["email"], "User.Name@EXAMPLE.COM") @@ -310,8 +310,7 @@ def test_on_post_case_sensitivity(self, mock_format_error, mock_validate_email): @patch("main.validate_email") @patch("main.format_error_response") def test_on_post_whitespace_handling(self, mock_format_error, mock_validate_email): - from main import on_post - + """Test POST request with email containing whitespace.""" # Mock validation to return False for email with whitespace mock_validate_email.return_value = False mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] @@ -325,7 +324,7 @@ def test_on_post_whitespace_handling(self, mock_format_error, mock_validate_emai "email": " user@example.com " } - response = on_post(request, config=None, logger=mock_logger) + response = main.on_post(request, _config=None, logger=mock_logger) # Should return error response (assuming validation handles whitespace) self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) diff --git a/functions/user-management/utils.py b/functions/user-management/utils.py index 3c43144..2d302b8 100644 --- a/functions/user-management/utils.py +++ b/functions/user-management/utils.py @@ -1,3 +1,5 @@ +"""Utility functions for user-management function handler.""" + import json import re from typing import Dict, Any, Optional @@ -14,6 +16,8 @@ def validate_host_id(host_id: str) -> bool: def validate_email(email: str) -> bool: """Validate email format.""" + if not email or not isinstance(email, str): + return False pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email))