diff --git a/e2e/src/pages/UserPreferencesExtensionPage.ts b/e2e/src/pages/UserPreferencesExtensionPage.ts index 5298358..4b70602 100644 --- a/e2e/src/pages/UserPreferencesExtensionPage.ts +++ b/e2e/src/pages/UserPreferencesExtensionPage.ts @@ -15,4 +15,29 @@ export class UserPreferencesExtensionPage extends HostPanelExtensionPage { await expect(iframe.locator(`text=${config.falconUsername}`)).toBeVisible({ timeout: 10000 }); await expect(iframe.getByRole('button', { name: /Save Preferences/i })).toBeVisible({ timeout: 10000 }); } + + /** + * Interact with the extension to save a preference, exercising the collection write path. + * Assumes the extension is already expanded and iframe is visible (call verifyExtensionRenders first). + */ + async savePreference(): Promise { + return this.withTiming( + async () => { + const iframe = this.page.frameLocator('iframe'); + + // Wait for Save button to confirm form is loaded + await expect(iframe.getByRole('button', { name: /Save Preferences/i })).toBeVisible({ timeout: 10000 }); + this.logger.info('Extension form loaded'); + + // Click Save Preferences to write to the collection + await iframe.getByRole('button', { name: /Save Preferences/i }).click(); + this.logger.info('Clicked Save Preferences'); + + // Verify success alert appears + await expect(iframe.locator('text=/Preferences saved successfully/i')).toBeVisible({ timeout: 10000 }); + this.logger.success('Preferences saved successfully - collection write verified'); + }, + 'Save preference via extension' + ); + } } diff --git a/e2e/src/pages/WorkflowsPage.ts b/e2e/src/pages/WorkflowsPage.ts index b5d3d78..fc4be94 100644 --- a/e2e/src/pages/WorkflowsPage.ts +++ b/e2e/src/pages/WorkflowsPage.ts @@ -245,4 +245,81 @@ export class WorkflowsPage extends BasePage { `Execute and verify workflow: ${workflowName}` ); } + + /** + * Check the actual execution status by viewing the execution details. + * Navigates to the execution log, waits for the execution to complete, + * expands the execution row, and checks the status. + */ + async verifyWorkflowExecutionCompleted(timeoutMs = 120000): Promise { + return this.withTiming( + async () => { + this.logger.info('Checking workflow execution status in detail view'); + + // The "View" link opens in a new tab - capture it + const viewLink = this.page.getByRole('link', { name: /^view$/i }); + await viewLink.waitFor({ state: 'visible', timeout: 10000 }); + + const [executionPage] = await Promise.all([ + this.page.context().waitForEvent('page'), + viewLink.click(), + ]); + + // Wait for the new tab to load (execution pages can be slow to render) + await executionPage.waitForLoadState('networkidle'); + await executionPage.waitForLoadState('domcontentloaded'); + this.logger.info('Execution page opened in new tab'); + + // Wait for "Execution status" to appear (proves execution details loaded) + const statusLabel = executionPage.getByText('Execution status'); + await statusLabel.waitFor({ state: 'visible', timeout: 60000 }); + this.logger.info('Execution details visible'); + + // Poll until execution reaches a terminal state + this.logger.info(`Waiting up to ${timeoutMs / 1000}s for execution to complete...`); + + const startTime = Date.now(); + while (Date.now() - startTime < timeoutMs) { + // Re-find status label each iteration (DOM recreated on reload) + const currentStatusLabel = executionPage.getByText('Execution status'); + await currentStatusLabel.waitFor({ state: 'visible', timeout: 15000 }); + const statusContainer = currentStatusLabel.locator('..'); + const statusText = await statusContainer.textContent() || ''; + const currentStatus = statusText.replace('Execution status', '').trim(); + this.logger.info(`Current status: ${currentStatus}`); + + if (currentStatus.toLowerCase().includes('failed')) { + // Capture error details + const pageContent = await executionPage.textContent('body') || ''; + const messageMatch = pageContent.match(/"message":\s*"([^"]+)"/); + + let errorMessage = 'Workflow action failed'; + if (messageMatch) { + errorMessage = messageMatch[1]; + } + + await executionPage.close(); + this.logger.error(`Workflow execution failed: ${errorMessage}`); + throw new Error(`Workflow execution failed: ${errorMessage}`); + } + + if (!currentStatus.toLowerCase().includes('in progress')) { + // Terminal state that isn't "Failed" + await executionPage.close(); + this.logger.success(`Workflow execution completed with status: ${currentStatus}`); + return; + } + + await executionPage.waitForTimeout(5000); + + // Reload to get updated status - the page doesn't auto-refresh + await executionPage.reload({ waitUntil: 'networkidle' }); + } + + await executionPage.close(); + throw new Error('Workflow execution timed out - still in progress'); + }, + 'Verify workflow execution completed' + ); + } } diff --git a/e2e/tests/foundry.spec.ts b/e2e/tests/foundry.spec.ts index 9877ed4..2324faf 100644 --- a/e2e/tests/foundry.spec.ts +++ b/e2e/tests/foundry.spec.ts @@ -1,4 +1,4 @@ -import { test, expect } from '../src/fixtures'; +import { test } from '../src/fixtures'; test.describe.configure({ mode: 'serial' }); @@ -9,6 +9,12 @@ test.describe('Collections Toolkit - E2E Tests', () => { await userPreferencesExtensionPage.verifyExtensionRenders(); }); + test('should save preferences via User Preferences extension', async ({ userPreferencesExtensionPage }) => { + await userPreferencesExtensionPage.navigateToExtension(); + await userPreferencesExtensionPage.verifyExtensionRenders(); + await userPreferencesExtensionPage.savePreference(); + }); + test('should render Collections CRUD extension', async ({ collectionsCRUDExtensionPage }) => { await collectionsCRUDExtensionPage.navigateToExtension(); await collectionsCRUDExtensionPage.verifyExtensionRenders(); @@ -16,22 +22,30 @@ test.describe('Collections Toolkit - E2E Tests', () => { // Workflow Tests - All workflows are self-contained (no external API credentials required) test('should execute Test log_event_handler function workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test log_event_handler function'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Test process_events_handler function workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test process_events_handler function'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Test user_preferences collection workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test user_preferences collection'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Paginate security_events collection workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Paginate security_events collection'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); }); diff --git a/functions/csv-import/main.py b/functions/csv-import/main.py index 15cbcef..b0f9406 100644 --- a/functions/csv-import/main.py +++ b/functions/csv-import/main.py @@ -15,7 +15,7 @@ import pandas as pd from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() @@ -63,9 +63,8 @@ def import_csv_handler(request: Request, config: Dict[str, object] | None, logge def _process_import_request(request: Request, collection_name: str, logger: Logger) -> Response: """Process the import request and return response.""" - # Initialize API client and headers - api_client = APIHarnessV2() - headers = _get_headers() + # Initialize custom storage with app headers baked in + custom_storage = CustomStorage(ext_headers=_app_headers()) # Read CSV data csv_data_result = _read_csv_data(request, logger) @@ -77,7 +76,7 @@ def _process_import_request(request: Request, collection_name: str, logger: Logg transformed_records = _process_dataframe(df, source_filename, import_timestamp) # Import records to Collection with batch processing - import_results = batch_import_records(api_client, transformed_records, collection_name, headers) + import_results = batch_import_records(custom_storage, transformed_records, collection_name) return _create_success_response({ "df": df, @@ -89,12 +88,12 @@ def _process_import_request(request: Request, collection_name: str, logger: Logg }) -def _get_headers() -> Dict[str, str]: - """Get headers for API requests.""" - headers = {} - if os.environ.get("APP_ID"): - headers = {"X-CS-APP-ID": os.environ.get("APP_ID")} - return headers +def _app_headers() -> Dict[str, str]: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} def _read_csv_data(request: Request, logger: Logger) -> Dict[str, Any]: @@ -222,10 +221,9 @@ def validate_record(record: Dict[str, Any]) -> None: def batch_import_records( - api_client: APIHarnessV2, + custom_storage: CustomStorage, records: List[Dict[str, Any]], collection_name: str, - headers: Dict[str, str], batch_size: int = 50 ) -> Dict[str, int]: """Import records to Collection in batches with rate limiting.""" @@ -241,10 +239,9 @@ def batch_import_records( time.sleep(0.5) batch_context = { - "api_client": api_client, + "custom_storage": custom_storage, "batch": batch, "collection_name": collection_name, - "headers": headers, "batch_number": i // batch_size + 1 } @@ -260,10 +257,9 @@ def batch_import_records( def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: """Process a single batch of records.""" - api_client = batch_context["api_client"] + custom_storage = batch_context["custom_storage"] batch = batch_context["batch"] collection_name = batch_context["collection_name"] - headers = batch_context["headers"] batch_number = batch_context["batch_number"] success_count = 0 @@ -271,11 +267,9 @@ def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: for record in batch: try: - response = api_client.command("PutObject", - body=record, - collection_name=collection_name, - object_key=record["event_id"], - headers=headers) + response = custom_storage.PutObject(body=record, + collection_name=collection_name, + object_key=record["event_id"]) if response["status_code"] == 200: success_count += 1 diff --git a/functions/log-event/main.py b/functions/log-event/main.py index f73924b..b1360d7 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -5,11 +5,19 @@ import uuid from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() +def _app_headers() -> dict: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} + + @FUNC.handler(method="POST", path="/log-event") def on_post(request: Request) -> Response: """ @@ -40,22 +48,12 @@ def on_post(request: Request) -> Response: "timestamp": int(time.time()) } - # Allow setting APP_ID as an env variable for local testing - headers = {} - if os.environ.get("APP_ID"): - headers = { - "X-CS-APP-ID": os.environ.get("APP_ID") - } - - api_client = APIHarnessV2() + custom_storage = CustomStorage(ext_headers=_app_headers()) collection_name = "event_logs" - response = api_client.command("PutObject", - body=json_data, - collection_name=collection_name, - object_key=event_id, - headers=headers - ) + response = custom_storage.PutObject(body=json_data, + collection_name=collection_name, + object_key=event_id) if response["status_code"] != 200: error_message = response.get("error", {}).get("message", "Unknown error") @@ -68,12 +66,9 @@ def on_post(request: Request) -> Response: ) # Query the collection to retrieve the event by id - query_response = api_client.command("SearchObjects", - filter=f"event_id:'{event_id}'", - collection_name=collection_name, - limit=5, - headers=headers - ) + query_response = custom_storage.SearchObjects(filter=f"event_id:'{event_id}'", + collection_name=collection_name, + limit=5) return Response( body={ diff --git a/functions/process-events/main.py b/functions/process-events/main.py index 19d3171..1142736 100644 --- a/functions/process-events/main.py +++ b/functions/process-events/main.py @@ -12,7 +12,7 @@ from typing import Dict, List, Any from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() @@ -24,7 +24,7 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l _ = config try: - # Initialize API client and workflow + # Initialize custom storage and workflow workflow_context = _initialize_workflow(request, logger) # Get checkpoint data @@ -51,11 +51,8 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: - """Initialize workflow context with API client and configuration.""" - api_client = APIHarnessV2() - headers = {} - if os.environ.get("APP_ID"): - headers = {"X-CS-APP-ID": os.environ.get("APP_ID")} + """Initialize workflow context with custom storage and configuration.""" + custom_storage = CustomStorage(ext_headers=_app_headers()) checkpoint_collection = "processing_checkpoints" workflow_id = request.body.get("workflow_id", "default") @@ -63,29 +60,33 @@ def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: logger.info(f"Processing workflow ID: {workflow_id}") return { - "api_client": api_client, - "headers": headers, + "custom_storage": custom_storage, "checkpoint_collection": checkpoint_collection, "workflow_id": workflow_id, "logger": logger } +def _app_headers() -> Dict[str, str]: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} + + def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: """Retrieve the last checkpoint for the workflow.""" - api_client = workflow_context["api_client"] - headers = workflow_context["headers"] + custom_storage = workflow_context["custom_storage"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] # Retrieve the most recent checkpoint for this workflow - checkpoint_response = api_client.command("SearchObjects", - filter=f"workflow_id:'{workflow_id}'", - collection_name=checkpoint_collection, - sort="last_processed_timestamp.desc", - limit=1, - headers=headers) + checkpoint_response = custom_storage.SearchObjects(filter=f"workflow_id:'{workflow_id}'", + collection_name=checkpoint_collection, + sort="last_processed_timestamp.desc", + limit=1) logger.debug(f"checkpoint response: {checkpoint_response}") @@ -96,10 +97,8 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: logger.debug(f"last_checkpoint: {last_checkpoint}") # SearchObjects returns metadata, not actual objects, so use GetObject for details - object_details = api_client.command("GetObject", - collection_name=checkpoint_collection, - object_key=last_checkpoint["object_key"], - headers=headers) + object_details = custom_storage.GetObject(collection_name=checkpoint_collection, + object_key=last_checkpoint["object_key"]) # GetObject returns bytes; convert to JSON json_response = json.loads(object_details.decode("utf-8")) @@ -114,8 +113,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[str, Any]) -> Response: """Process events and update checkpoint.""" - api_client = workflow_context["api_client"] - headers = workflow_context["headers"] + custom_storage = workflow_context["custom_storage"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] @@ -143,11 +141,9 @@ def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[ logger.debug(f"Sending data to PutObject: {checkpoint_update}") - api_client.command("PutObject", - body=checkpoint_update, - collection_name=checkpoint_collection, - object_key=f"checkpoint_{workflow_id}", - headers=headers) + custom_storage.PutObject(body=checkpoint_update, + collection_name=checkpoint_collection, + object_key=f"checkpoint_{workflow_id}") return Response( body={