diff --git a/functions/host-details/test_main.py b/functions/host-details/test_main.py new file mode 100644 index 0000000..818f756 --- /dev/null +++ b/functions/host-details/test_main.py @@ -0,0 +1,90 @@ +import unittest +from unittest.mock import patch, MagicMock + +from crowdstrike.foundry.function import Request + + +def mock_handler(*args, **kwargs): + def identity(func): + return func + + return identity + + +class FnTestCase(unittest.TestCase): + def setUp(self): + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) + self.addCleanup(patcher.stop) + self.handler_patch = patcher.start() + + import importlib + import main + importlib.reload(main) + + @patch("main.Hosts") + def test_on_post_success(self, mock_hosts_class): + from main import on_post + + # Mock the Hosts instance and its response + mock_hosts_instance = MagicMock() + mock_hosts_class.return_value = mock_hosts_instance + mock_hosts_instance.get_device_details.return_value = { + "status_code": 200, + "body": { + "resources": [{ + "device_id": "test-host-123", + "hostname": "test-host", + "platform_name": "Windows" + }] + } + } + + request = Request() + request.body = { + "host_id": "test-host-123" + } + + response = on_post(request) + + self.assertEqual(response.code, 200) + self.assertIn("host_details", response.body) + self.assertEqual(response.body["host_details"]["device_id"], "test-host-123") + mock_hosts_instance.get_device_details.assert_called_once_with(ids="test-host-123") + + def test_on_post_missing_host_id(self): + from main import on_post + request = Request() + + response = on_post(request) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertEqual(response.errors[0].message, "missing host_id from request body") + + @patch("main.Hosts") + def test_on_post_api_error(self, mock_hosts_class): + from main import on_post + + # Mock the Hosts instance to return an error + mock_hosts_instance = MagicMock() + mock_hosts_class.return_value = mock_hosts_instance + mock_hosts_instance.get_device_details.return_value = { + "status_code": 404, + "body": {"errors": [{"message": "Host not found"}]} + } + + request = Request() + request.body = { + "host_id": "nonexistent-host" + } + + response = on_post(request) + + self.assertEqual(response.code, 404) + self.assertEqual(len(response.errors), 1) + self.assertIn("Error retrieving host:", response.errors[0].message) + mock_hosts_instance.get_device_details.assert_called_once_with(ids="nonexistent-host") + + +if __name__ == "__main__": + unittest.main() diff --git a/functions/host-info/test_main.py b/functions/host-info/test_main.py new file mode 100644 index 0000000..0c7e564 --- /dev/null +++ b/functions/host-info/test_main.py @@ -0,0 +1,196 @@ +import unittest +from unittest.mock import patch, MagicMock + +from crowdstrike.foundry.function import Request + + +def mock_handler(*args, **kwargs): + def identity(func): + return func + + return identity + + +class FnTestCase(unittest.TestCase): + def setUp(self): + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) + self.addCleanup(patcher.stop) + self.handler_patch = patcher.start() + + import importlib + import main + importlib.reload(main) + + @patch("main.validate_host_id") + @patch("main.format_error_response") + def test_on_post_success(self, mock_format_error, mock_validate_host_id): + from main import on_post + + # Mock validation to return True for valid host ID + mock_validate_host_id.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "host_id": "valid-host-123" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["host"], "valid-host-123") + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_host_id.call_count, 2) + mock_validate_host_id.assert_called_with("valid-host-123") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Host ID: valid-host-123") + mock_logger.info.assert_any_call("Is valid? True") + + # Verify format_error_response was not called + mock_format_error.assert_not_called() + + @patch("main.validate_host_id") + @patch("main.format_error_response") + def test_on_post_invalid_host_id(self, mock_format_error, mock_validate_host_id): + from main import on_post + + # Mock validation to return False for invalid host ID + mock_validate_host_id.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid host ID format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "host_id": "invalid-host" + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response (default code is likely 400) + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_host_id.call_count, 2) + mock_validate_host_id.assert_called_with("invalid-host") + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid host ID format") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Host ID: invalid-host") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_host_id") + @patch("main.format_error_response") + def test_on_post_missing_host_id(self, mock_format_error, mock_validate_host_id): + from main import on_post + + # Mock validation to return False for None host ID + mock_validate_host_id.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid host ID format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = {} # No host_id provided + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_host_id.call_count, 2) + mock_validate_host_id.assert_called_with(None) + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid host ID format") + + # Verify logger was called with None + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Host ID: None") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_host_id") + @patch("main.format_error_response") + def test_on_post_empty_host_id(self, mock_format_error, mock_validate_host_id): + from main import on_post + + # Mock validation to return False for empty string + mock_validate_host_id.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid host ID format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "host_id": "" + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid host ID format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_host_id.call_count, 2) + mock_validate_host_id.assert_called_with("") + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid host ID format") + + # Verify logger was called with empty string + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Host ID: ") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_host_id") + @patch("main.format_error_response") + def test_on_post_with_config(self, mock_format_error, mock_validate_host_id): + from main import on_post + + # Mock validation to return True + mock_validate_host_id.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + # Test with config parameter + config = {"some_setting": "value"} + + request = Request() + request.body = { + "host_id": "test-host-456" + } + + response = on_post(request, config=config, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["host"], "test-host-456") + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_host_id.call_count, 2) + mock_validate_host_id.assert_called_with("test-host-456") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/functions/log-event/main.py b/functions/log-event/main.py index f7f4ef0..1037749 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -8,16 +8,16 @@ func = Function.instance() -@func.handler(method='POST', path='/log-event') +@func.handler(method="POST", path="/log-event") def on_post(request: Request) -> Response: # Validate request - if 'event_data' not in request.body: + if "event_data" not in request.body: return Response( code=400, - errors=[APIError(code=400, message='missing event_data')] + errors=[APIError(code=400, message="missing event_data")] ) - event_data = request.body['event_data'] + event_data = request.body["event_data"] try: # Store data in a collection @@ -47,7 +47,7 @@ def on_post(request: Request) -> Response: ) if response["status_code"] != 200: - error_message = response.get('error', {}).get('message', 'Unknown error') + error_message = response.get("error", {}).get("message", "Unknown error") return Response( code=response["status_code"], errors=[APIError( @@ -78,5 +78,5 @@ def on_post(request: Request) -> Response: ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/log-event/test_main.py b/functions/log-event/test_main.py new file mode 100644 index 0000000..781d3cb --- /dev/null +++ b/functions/log-event/test_main.py @@ -0,0 +1,189 @@ +import unittest +from unittest.mock import patch, MagicMock +import uuid +import time + +from crowdstrike.foundry.function import Request + + +def mock_handler(*args, **kwargs): + def identity(func): + return func + + return identity + + +class FnTestCase(unittest.TestCase): + def setUp(self): + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) + self.addCleanup(patcher.stop) + self.handler_patch = patcher.start() + + import importlib + import main + importlib.reload(main) + + @patch('main.APIHarnessV2') + @patch('main.uuid.uuid4') + @patch('main.time.time') + def test_on_post_success(self, mock_time, mock_uuid, mock_api_harness_class): + from main import on_post + + # Mock dependencies + mock_uuid.return_value = MagicMock() + mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") + mock_time.return_value = 1690123456 + + # Mock APIHarnessV2 instance + mock_api_instance = MagicMock() + mock_api_harness_class.return_value = mock_api_instance + + # Mock successful PutObject response + mock_api_instance.command.side_effect = [ + { # PutObject response + "status_code": 200, + "body": {"success": True} + }, + { # SearchObjects response + "status_code": 200, + "body": { + "resources": [{ + "event_id": "test-event-id-123", + "data": {"test": "data"}, + "timestamp": 1690123456 + }] + } + } + ] + + request = Request() + request.body = { + "event_data": {"test": "data", "message": "test event"} + } + + response = on_post(request) + + self.assertEqual(response.code, 200) + self.assertTrue(response.body["stored"]) + self.assertIn("metadata", response.body) + self.assertEqual(len(response.body["metadata"]), 1) + + # Verify API calls + self.assertEqual(mock_api_instance.command.call_count, 2) + + # Verify PutObject call + put_call = mock_api_instance.command.call_args_list[0] + self.assertEqual(put_call[0][0], "PutObject") + self.assertEqual(put_call[1]["collection_name"], "event_logs") + self.assertEqual(put_call[1]["object_key"], "test-event-id-123") + self.assertEqual(put_call[1]["body"]["event_id"], "test-event-id-123") + self.assertEqual(put_call[1]["body"]["data"], {"test": "data", "message": "test event"}) + self.assertEqual(put_call[1]["body"]["timestamp"], 1690123456) + + # Verify SearchObjects call + search_call = mock_api_instance.command.call_args_list[1] + self.assertEqual(search_call[0][0], "SearchObjects") + self.assertEqual(search_call[1]["filter"], "event_id:'test-event-id-123'") + self.assertEqual(search_call[1]["collection_name"], "event_logs") + + def test_on_post_missing_event_data(self): + from main import on_post + request = Request() + + response = on_post(request) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertEqual(response.errors[0].message, "missing event_data") + + @patch('main.APIHarnessV2') + @patch('main.uuid.uuid4') + @patch('main.time.time') + def test_on_post_put_object_error(self, mock_time, mock_uuid, mock_api_harness_class): + from main import on_post + + # Mock dependencies + mock_uuid.return_value = MagicMock() + mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") + mock_time.return_value = 1690123456 + + # Mock APIHarnessV2 instance with error response + mock_api_instance = MagicMock() + mock_api_harness_class.return_value = mock_api_instance + mock_api_instance.command.return_value = { + "status_code": 500, + "error": {"message": "Internal server error"} + } + + request = Request() + request.body = { + "event_data": {"test": "data"} + } + + response = on_post(request) + + self.assertEqual(response.code, 500) + self.assertEqual(len(response.errors), 1) + self.assertIn("Failed to store event: Internal server error", response.errors[0].message) + + @patch('main.APIHarnessV2') + @patch('main.uuid.uuid4') + @patch('main.time.time') + def test_on_post_exception_handling(self, mock_time, mock_uuid, mock_api_harness_class): + from main import on_post + + # Mock dependencies + mock_uuid.return_value = MagicMock() + mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") + mock_time.return_value = 1690123456 + + # Mock APIHarnessV2 to raise an exception + mock_api_harness_class.side_effect = Exception("Connection failed") + + request = Request() + request.body = { + "event_data": {"test": "data"} + } + + response = on_post(request) + + self.assertEqual(response.code, 500) + self.assertEqual(len(response.errors), 1) + self.assertIn("Error saving collection: Connection failed", response.errors[0].message) + + @patch.dict('main.os.environ', {'APP_ID': 'test-app-123'}) + @patch('main.APIHarnessV2') + @patch('main.uuid.uuid4') + @patch('main.time.time') + def test_on_post_with_app_id_header(self, mock_time, mock_uuid, mock_api_harness_class): + from main import on_post + + # Mock dependencies + mock_uuid.return_value = MagicMock() + mock_uuid.return_value.__str__ = MagicMock(return_value="test-event-id-123") + mock_time.return_value = 1690123456 + + # Mock APIHarnessV2 instance + mock_api_instance = MagicMock() + mock_api_harness_class.return_value = mock_api_instance + mock_api_instance.command.side_effect = [ + {"status_code": 200, "body": {"success": True}}, + {"status_code": 200, "body": {"resources": []}} + ] + + request = Request() + request.body = { + "event_data": {"test": "data"} + } + + response = on_post(request) + + self.assertEqual(response.code, 200) + + # Verify that headers with APP_ID were passed to both API calls + for call in mock_api_instance.command.call_args_list: + self.assertEqual(call[1]["headers"], {"X-CS-APP-ID": "test-app-123"}) + + +if __name__ == "__main__": + unittest.main() diff --git a/functions/servicenow/test_main.py b/functions/servicenow/test_main.py new file mode 100644 index 0000000..339cab4 --- /dev/null +++ b/functions/servicenow/test_main.py @@ -0,0 +1,373 @@ +import unittest +from unittest.mock import patch, MagicMock + +from crowdstrike.foundry.function import Request + + +def mock_handler(*args, **kwargs): + def identity(func): + return func + + return identity + + +class FnTestCase(unittest.TestCase): + def setUp(self): + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) + self.addCleanup(patcher.stop) + self.handler_patch = patcher.start() + + import importlib + import main + importlib.reload(main) + + @patch("main.APIIntegrations") + def test_on_post_success_minimal_fields(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock successful ServiceNow response + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 200, + "body": { + "result": { + "sys_id": "abc123def456", + "number": "INC0001234", + "state": "1", + "priority": "3", + "sys_created_on": "2025-07-23 13:30:00", + "assigned_to": "" + } + }, + "headers": { + "Location": "https://instance.service-now.com/api/now/table/incident/abc123def456" + } + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "This is a test incident description" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 201) + self.assertEqual(response.body["incident_id"], "abc123def456") + self.assertEqual(response.body["incident_number"], "INC0001234") + self.assertEqual(response.body["state"], "1") + self.assertEqual(response.body["priority"], "3") + self.assertEqual(response.body["created_at"], "2025-07-23 13:30:00") + self.assertEqual(response.body["assigned_to"], "") + self.assertEqual(response.body["url"], "https://instance.service-now.com/api/now/table/incident/abc123def456") + + # Verify API call was made correctly + mock_api_instance.execute_command_proxy.assert_called_once_with( + definition_id="ServiceNow", + operation_id="POST__api_now_table_tablename", + params={ + "path": {"tableName": "incident"} + }, + request={ + "json": { + "short_description": "Test incident", + "description": "This is a test incident description", + "impact": "2", + "urgency": "2" + }, + "headers": { + "Accept": "application/json", + "Content-Type": "application/json" + } + } + ) + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + + @patch("main.APIIntegrations") + def test_on_post_success_all_fields(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock successful ServiceNow response + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 200, + "body": { + "result": { + "sys_id": "xyz789abc123", + "number": "INC0001235", + "state": "2", + "priority": "1", + "sys_created_on": "2025-07-23 14:00:00", + "assigned_to": "john.doe" + } + }, + "headers": { + "Location": "https://instance.service-now.com/api/now/table/incident/xyz789abc123" + } + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Critical security incident", + "description": "Detailed description of security incident", + "impact": "1", + "urgency": "1", + "category": "Security", + "subcategory": "Malware", + "assignment_group": "Security Team", + "caller_id": "jane.smith" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 201) + self.assertEqual(response.body["incident_id"], "xyz789abc123") + + # Verify API call included all optional fields + call_args = mock_api_instance.execute_command_proxy.call_args + payload = call_args[1]["request"]["json"] + self.assertEqual(payload["impact"], "1") + self.assertEqual(payload["urgency"], "1") + self.assertEqual(payload["category"], "Security") + self.assertEqual(payload["subcategory"], "Malware") + self.assertEqual(payload["assignment_group"], "Security Team") + self.assertEqual(payload["caller_id"], "jane.smith") + + def test_on_post_missing_title(self): + from main import on_post + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "description": "Missing title field" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertEqual(response.errors[0].message, "Missing required fields: title and description") + + def test_on_post_missing_description(self): + from main import on_post + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Missing description field" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertEqual(response.errors[0].message, "Missing required fields: title and description") + + def test_on_post_missing_both_required_fields(self): + from main import on_post + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = {} + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertEqual(response.errors[0].message, "Missing required fields: title and description") + + @patch("main.APIIntegrations") + def test_on_post_servicenow_api_error(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock ServiceNow API error response + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 400, + "error": { + "message": "Invalid field value" + } + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "Test description" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 400) + self.assertEqual(len(response.errors), 1) + self.assertIn("ServiceNow integration error: Invalid field value", response.errors[0].message) + + @patch("main.APIIntegrations") + def test_on_post_servicenow_api_error_no_message(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock ServiceNow API error response without error message + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 500 + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "Test description" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 500) + self.assertEqual(len(response.errors), 1) + self.assertIn("ServiceNow integration error: Unknown error", response.errors[0].message) + + @patch("main.APIIntegrations") + def test_on_post_value_error_handling(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock response that will cause ValueError when accessing result fields + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 200, + "body": { + "result": {} # Missing expected fields + }, + "headers": {} + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "Test description" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 500) + self.assertEqual(len(response.errors), 1) + self.assertIn("Error creating incident:", response.errors[0].message) + + # Verify error was logged + mock_logger.error.assert_called() + + @patch("main.APIIntegrations") + def test_on_post_general_exception_handling(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations to raise an exception + mock_api_integrations_class.side_effect = Exception("Connection failed") + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "Test description" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 500) + self.assertEqual(len(response.errors), 1) + self.assertIn("Error creating incident: Connection failed", response.errors[0].message) + + # Verify error was logged + mock_logger.error.assert_called() + + @patch("main.APIIntegrations") + def test_on_post_optional_fields_filtering(self, mock_api_integrations_class): + from main import on_post + + # Mock APIIntegrations instance + mock_api_instance = MagicMock() + mock_api_integrations_class.return_value = mock_api_instance + + # Mock successful ServiceNow response + mock_api_instance.execute_command_proxy.return_value = { + "status_code": 200, + "body": { + "result": { + "sys_id": "test123", + "number": "INC0001236", + "state": "1", + "priority": "2", + "sys_created_on": "2025-07-23 15:00:00", + "assigned_to": "" + } + }, + "headers": { + "Location": "https://test.service-now.com/incident/test123" + } + } + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "title": "Test incident", + "description": "Test description", + "impact": "3", + "category": "Hardware", + "subcategory": None, # This should be filtered out + "assignment_group": "", # This should be included (empty string is not None) + "caller_id": None # This should be filtered out + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 201) + + # Verify API call payload excludes None values but includes empty strings + call_args = mock_api_instance.execute_command_proxy.call_args + payload = call_args[1]["request"]["json"] + self.assertEqual(payload["impact"], "3") + self.assertEqual(payload["category"], "Hardware") + self.assertEqual(payload["assignment_group"], "") + self.assertNotIn("subcategory", payload) + self.assertNotIn("caller_id", payload) + + +if __name__ == "__main__": + unittest.main() diff --git a/functions/user-management/test_main.py b/functions/user-management/test_main.py new file mode 100644 index 0000000..e27e2ce --- /dev/null +++ b/functions/user-management/test_main.py @@ -0,0 +1,339 @@ +import unittest +from unittest.mock import patch, MagicMock + +from crowdstrike.foundry.function import Request + + +def mock_handler(*args, **kwargs): + def identity(func): + return func + + return identity + + +class FnTestCase(unittest.TestCase): + def setUp(self): + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) + self.addCleanup(patcher.stop) + self.handler_patch = patcher.start() + + import importlib + import main + importlib.reload(main) + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_success(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return True for valid email + mock_validate_email.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "email": "user@example.com" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["email"], "user@example.com") + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with("user@example.com") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Email: user@example.com") + mock_logger.info.assert_any_call("Is valid? True") + + # Verify format_error_response was not called + mock_format_error.assert_not_called() + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_invalid_email(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return False for invalid email + mock_validate_email.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "email": "invalid-email" + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with("invalid-email") + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid email format") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Email: invalid-email") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_missing_email(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return False for None email + mock_validate_email.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = {} # No email provided + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with(None) + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid email format") + + # Verify logger was called with None + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Email: None") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_empty_email(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return False for empty string + mock_validate_email.return_value = False + + # Mock error response formatting + mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "email": "" + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with("") + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid email format") + + # Verify logger was called with empty string + self.assertEqual(mock_logger.info.call_count, 2) + mock_logger.info.assert_any_call("Email: ") + mock_logger.info.assert_any_call("Is valid? False") + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_valid_email_variations(self, mock_format_error, mock_validate_email): + from main import on_post + + # Test various valid email formats + valid_emails = [ + "test@example.com", + "user.name@domain.co.uk", + "admin+tag@company.org", + "123@numbers.net" + ] + + for email in valid_emails: + with self.subTest(email=email): + # Reset mocks for each iteration + mock_validate_email.reset_mock() + mock_format_error.reset_mock() + + # Mock validation to return True + mock_validate_email.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "email": email + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["email"], email) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with(email) + + # Verify format_error_response was not called + mock_format_error.assert_not_called() + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_invalid_email_variations(self, mock_format_error, mock_validate_email): + from main import on_post + + # Test various invalid email formats + invalid_emails = [ + "not-an-email", + "@domain.com", + "user@", + "user..name@domain.com", + "user@domain", + "user name@domain.com" + ] + + for email in invalid_emails: + with self.subTest(email=email): + # Reset mocks for each iteration + mock_validate_email.reset_mock() + mock_format_error.reset_mock() + + # Mock validation to return False + mock_validate_email.return_value = False + mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + + # Create mock logger + mock_logger = MagicMock() + + request = Request() + request.body = { + "email": email + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with(email) + + # Verify error formatting was called + mock_format_error.assert_called_once_with("Invalid email format") + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_with_config(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return True + mock_validate_email.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + # Test with config parameter + config = {"user_domain": "company.com", "max_users": 1000} + + request = Request() + request.body = { + "email": "newuser@company.com" + } + + response = on_post(request, config=config, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["email"], "newuser@company.com") + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with("newuser@company.com") + + # Verify logger was called + self.assertEqual(mock_logger.info.call_count, 2) + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_case_sensitivity(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return True + mock_validate_email.return_value = True + + # Create mock logger + mock_logger = MagicMock() + + # Test with mixed case email + request = Request() + request.body = { + "email": "User.Name@EXAMPLE.COM" + } + + response = on_post(request, config=None, logger=mock_logger) + + self.assertEqual(response.code, 200) + self.assertEqual(response.body["email"], "User.Name@EXAMPLE.COM") + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with("User.Name@EXAMPLE.COM") + + # Verify logger was called with the exact email + mock_logger.info.assert_any_call("Email: User.Name@EXAMPLE.COM") + + @patch("main.validate_email") + @patch("main.format_error_response") + def test_on_post_whitespace_handling(self, mock_format_error, mock_validate_email): + from main import on_post + + # Mock validation to return False for email with whitespace + mock_validate_email.return_value = False + mock_format_error.return_value = [{"code": 400, "message": "Invalid email format"}] + + # Create mock logger + mock_logger = MagicMock() + + # Test with email containing whitespace + request = Request() + request.body = { + "email": " user@example.com " + } + + response = on_post(request, config=None, logger=mock_logger) + + # Should return error response (assuming validation handles whitespace) + self.assertEqual(response.errors, [{"code": 400, "message": "Invalid email format"}]) + + # Verify validation was called twice (once for logging, once for condition) + self.assertEqual(mock_validate_email.call_count, 2) + mock_validate_email.assert_called_with(" user@example.com ") + + +if __name__ == "__main__": + unittest.main() diff --git a/ui/extensions/hello/.gitignore b/ui/extensions/hello/.gitignore index eaf3961..502e1a1 100644 --- a/ui/extensions/hello/.gitignore +++ b/ui/extensions/hello/.gitignore @@ -1,3 +1,4 @@ node_modules/ .DS_Store .idea/ +coverage/