From 95b8d75dc8766e85ba13313dd437d18bdac6cdc4 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Tue, 31 Mar 2026 14:04:58 -0600 Subject: [PATCH 1/4] Migrate from APIHarnessV2 (Uber) to CustomStorage (Service Class) Replace the generic APIHarnessV2 uber-class with the purpose-built CustomStorage service class for all collection CRUD operations. This uses ext_headers at construction time instead of passing headers on every call, and calls methods directly (PutObject, GetObject, SearchObjects) instead of routing through .command(). --- functions/csv-import/main.py | 34 +++++++++++--------------- functions/log-event/main.py | 37 ++++++++++++---------------- functions/process-events/main.py | 42 +++++++++++++++----------------- 3 files changed, 49 insertions(+), 64 deletions(-) diff --git a/functions/csv-import/main.py b/functions/csv-import/main.py index 15cbcef..9ef26b2 100644 --- a/functions/csv-import/main.py +++ b/functions/csv-import/main.py @@ -15,7 +15,7 @@ import pandas as pd from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() @@ -63,9 +63,8 @@ def import_csv_handler(request: Request, config: Dict[str, object] | None, logge def _process_import_request(request: Request, collection_name: str, logger: Logger) -> Response: """Process the import request and return response.""" - # Initialize API client and headers - api_client = APIHarnessV2() - headers = _get_headers() + # Initialize API client with app headers baked in + api_client = CustomStorage(ext_headers=_app_headers()) # Read CSV data csv_data_result = _read_csv_data(request, logger) @@ -77,7 +76,7 @@ def _process_import_request(request: Request, collection_name: str, logger: Logg transformed_records = _process_dataframe(df, source_filename, import_timestamp) # Import records to Collection with batch processing - import_results = batch_import_records(api_client, transformed_records, collection_name, headers) + import_results = batch_import_records(api_client, transformed_records, collection_name) return _create_success_response({ "df": df, @@ -89,12 +88,12 @@ def _process_import_request(request: Request, collection_name: str, logger: Logg }) -def _get_headers() -> Dict[str, str]: - """Get headers for API requests.""" - headers = {} - if os.environ.get("APP_ID"): - headers = {"X-CS-APP-ID": os.environ.get("APP_ID")} - return headers +def _app_headers() -> Dict[str, str]: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} def _read_csv_data(request: Request, logger: Logger) -> Dict[str, Any]: @@ -222,10 +221,9 @@ def validate_record(record: Dict[str, Any]) -> None: def batch_import_records( - api_client: APIHarnessV2, + api_client: CustomStorage, records: List[Dict[str, Any]], collection_name: str, - headers: Dict[str, str], batch_size: int = 50 ) -> Dict[str, int]: """Import records to Collection in batches with rate limiting.""" @@ -244,7 +242,6 @@ def batch_import_records( "api_client": api_client, "batch": batch, "collection_name": collection_name, - "headers": headers, "batch_number": i // batch_size + 1 } @@ -263,7 +260,6 @@ def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: api_client = batch_context["api_client"] batch = batch_context["batch"] collection_name = batch_context["collection_name"] - headers = batch_context["headers"] batch_number = batch_context["batch_number"] success_count = 0 @@ -271,11 +267,9 @@ def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: for record in batch: try: - response = api_client.command("PutObject", - body=record, - collection_name=collection_name, - object_key=record["event_id"], - headers=headers) + response = api_client.PutObject(body=record, + collection_name=collection_name, + object_key=record["event_id"]) if response["status_code"] == 200: success_count += 1 diff --git a/functions/log-event/main.py b/functions/log-event/main.py index f73924b..8b4166d 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -5,11 +5,19 @@ import uuid from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() +def _app_headers() -> dict: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} + + @FUNC.handler(method="POST", path="/log-event") def on_post(request: Request) -> Response: """ @@ -40,22 +48,12 @@ def on_post(request: Request) -> Response: "timestamp": int(time.time()) } - # Allow setting APP_ID as an env variable for local testing - headers = {} - if os.environ.get("APP_ID"): - headers = { - "X-CS-APP-ID": os.environ.get("APP_ID") - } - - api_client = APIHarnessV2() + api_client = CustomStorage(ext_headers=_app_headers()) collection_name = "event_logs" - response = api_client.command("PutObject", - body=json_data, - collection_name=collection_name, - object_key=event_id, - headers=headers - ) + response = api_client.PutObject(body=json_data, + collection_name=collection_name, + object_key=event_id) if response["status_code"] != 200: error_message = response.get("error", {}).get("message", "Unknown error") @@ -68,12 +66,9 @@ def on_post(request: Request) -> Response: ) # Query the collection to retrieve the event by id - query_response = api_client.command("SearchObjects", - filter=f"event_id:'{event_id}'", - collection_name=collection_name, - limit=5, - headers=headers - ) + query_response = api_client.SearchObjects(filter=f"event_id:'{event_id}'", + collection_name=collection_name, + limit=5) return Response( body={ diff --git a/functions/process-events/main.py b/functions/process-events/main.py index 19d3171..7a07a8e 100644 --- a/functions/process-events/main.py +++ b/functions/process-events/main.py @@ -12,7 +12,7 @@ from typing import Dict, List, Any from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 +from falconpy import CustomStorage FUNC = Function.instance() @@ -52,10 +52,7 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: """Initialize workflow context with API client and configuration.""" - api_client = APIHarnessV2() - headers = {} - if os.environ.get("APP_ID"): - headers = {"X-CS-APP-ID": os.environ.get("APP_ID")} + api_client = CustomStorage(ext_headers=_app_headers()) checkpoint_collection = "processing_checkpoints" workflow_id = request.body.get("workflow_id", "default") @@ -64,28 +61,32 @@ def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: return { "api_client": api_client, - "headers": headers, "checkpoint_collection": checkpoint_collection, "workflow_id": workflow_id, "logger": logger } +def _app_headers() -> Dict[str, str]: + """Build app headers for CustomStorage construction.""" + app_id = os.environ.get("APP_ID") + if app_id: + return {"X-CS-APP-ID": app_id} + return {} + + def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: """Retrieve the last checkpoint for the workflow.""" api_client = workflow_context["api_client"] - headers = workflow_context["headers"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] # Retrieve the most recent checkpoint for this workflow - checkpoint_response = api_client.command("SearchObjects", - filter=f"workflow_id:'{workflow_id}'", - collection_name=checkpoint_collection, - sort="last_processed_timestamp.desc", - limit=1, - headers=headers) + checkpoint_response = api_client.SearchObjects(filter=f"workflow_id:'{workflow_id}'", + collection_name=checkpoint_collection, + sort="last_processed_timestamp.desc", + limit=1) logger.debug(f"checkpoint response: {checkpoint_response}") @@ -96,10 +97,8 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: logger.debug(f"last_checkpoint: {last_checkpoint}") # SearchObjects returns metadata, not actual objects, so use GetObject for details - object_details = api_client.command("GetObject", - collection_name=checkpoint_collection, - object_key=last_checkpoint["object_key"], - headers=headers) + object_details = api_client.GetObject(collection_name=checkpoint_collection, + object_key=last_checkpoint["object_key"]) # GetObject returns bytes; convert to JSON json_response = json.loads(object_details.decode("utf-8")) @@ -115,7 +114,6 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[str, Any]) -> Response: """Process events and update checkpoint.""" api_client = workflow_context["api_client"] - headers = workflow_context["headers"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] @@ -143,11 +141,9 @@ def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[ logger.debug(f"Sending data to PutObject: {checkpoint_update}") - api_client.command("PutObject", - body=checkpoint_update, - collection_name=checkpoint_collection, - object_key=f"checkpoint_{workflow_id}", - headers=headers) + api_client.PutObject(body=checkpoint_update, + collection_name=checkpoint_collection, + object_key=f"checkpoint_{workflow_id}") return Response( body={ From 6e4ee4cfb5529fd558fb8f7f72fce75aeb9d8de2 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Thu, 2 Apr 2026 15:53:42 -0600 Subject: [PATCH 2/4] Enhance e2e tests to verify workflow execution completion Add verifyWorkflowExecutionCompleted() that polls the execution status page until terminal state, catching function failures that were previously invisible. Add extension interaction test that saves preferences to exercise collection write path end-to-end. --- e2e/src/pages/UserPreferencesExtensionPage.ts | 25 ++++++ e2e/src/pages/WorkflowsPage.ts | 77 +++++++++++++++++++ e2e/tests/foundry.spec.ts | 16 +++- 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/e2e/src/pages/UserPreferencesExtensionPage.ts b/e2e/src/pages/UserPreferencesExtensionPage.ts index 5298358..4b70602 100644 --- a/e2e/src/pages/UserPreferencesExtensionPage.ts +++ b/e2e/src/pages/UserPreferencesExtensionPage.ts @@ -15,4 +15,29 @@ export class UserPreferencesExtensionPage extends HostPanelExtensionPage { await expect(iframe.locator(`text=${config.falconUsername}`)).toBeVisible({ timeout: 10000 }); await expect(iframe.getByRole('button', { name: /Save Preferences/i })).toBeVisible({ timeout: 10000 }); } + + /** + * Interact with the extension to save a preference, exercising the collection write path. + * Assumes the extension is already expanded and iframe is visible (call verifyExtensionRenders first). + */ + async savePreference(): Promise { + return this.withTiming( + async () => { + const iframe = this.page.frameLocator('iframe'); + + // Wait for Save button to confirm form is loaded + await expect(iframe.getByRole('button', { name: /Save Preferences/i })).toBeVisible({ timeout: 10000 }); + this.logger.info('Extension form loaded'); + + // Click Save Preferences to write to the collection + await iframe.getByRole('button', { name: /Save Preferences/i }).click(); + this.logger.info('Clicked Save Preferences'); + + // Verify success alert appears + await expect(iframe.locator('text=/Preferences saved successfully/i')).toBeVisible({ timeout: 10000 }); + this.logger.success('Preferences saved successfully - collection write verified'); + }, + 'Save preference via extension' + ); + } } diff --git a/e2e/src/pages/WorkflowsPage.ts b/e2e/src/pages/WorkflowsPage.ts index b5d3d78..fc4be94 100644 --- a/e2e/src/pages/WorkflowsPage.ts +++ b/e2e/src/pages/WorkflowsPage.ts @@ -245,4 +245,81 @@ export class WorkflowsPage extends BasePage { `Execute and verify workflow: ${workflowName}` ); } + + /** + * Check the actual execution status by viewing the execution details. + * Navigates to the execution log, waits for the execution to complete, + * expands the execution row, and checks the status. + */ + async verifyWorkflowExecutionCompleted(timeoutMs = 120000): Promise { + return this.withTiming( + async () => { + this.logger.info('Checking workflow execution status in detail view'); + + // The "View" link opens in a new tab - capture it + const viewLink = this.page.getByRole('link', { name: /^view$/i }); + await viewLink.waitFor({ state: 'visible', timeout: 10000 }); + + const [executionPage] = await Promise.all([ + this.page.context().waitForEvent('page'), + viewLink.click(), + ]); + + // Wait for the new tab to load (execution pages can be slow to render) + await executionPage.waitForLoadState('networkidle'); + await executionPage.waitForLoadState('domcontentloaded'); + this.logger.info('Execution page opened in new tab'); + + // Wait for "Execution status" to appear (proves execution details loaded) + const statusLabel = executionPage.getByText('Execution status'); + await statusLabel.waitFor({ state: 'visible', timeout: 60000 }); + this.logger.info('Execution details visible'); + + // Poll until execution reaches a terminal state + this.logger.info(`Waiting up to ${timeoutMs / 1000}s for execution to complete...`); + + const startTime = Date.now(); + while (Date.now() - startTime < timeoutMs) { + // Re-find status label each iteration (DOM recreated on reload) + const currentStatusLabel = executionPage.getByText('Execution status'); + await currentStatusLabel.waitFor({ state: 'visible', timeout: 15000 }); + const statusContainer = currentStatusLabel.locator('..'); + const statusText = await statusContainer.textContent() || ''; + const currentStatus = statusText.replace('Execution status', '').trim(); + this.logger.info(`Current status: ${currentStatus}`); + + if (currentStatus.toLowerCase().includes('failed')) { + // Capture error details + const pageContent = await executionPage.textContent('body') || ''; + const messageMatch = pageContent.match(/"message":\s*"([^"]+)"/); + + let errorMessage = 'Workflow action failed'; + if (messageMatch) { + errorMessage = messageMatch[1]; + } + + await executionPage.close(); + this.logger.error(`Workflow execution failed: ${errorMessage}`); + throw new Error(`Workflow execution failed: ${errorMessage}`); + } + + if (!currentStatus.toLowerCase().includes('in progress')) { + // Terminal state that isn't "Failed" + await executionPage.close(); + this.logger.success(`Workflow execution completed with status: ${currentStatus}`); + return; + } + + await executionPage.waitForTimeout(5000); + + // Reload to get updated status - the page doesn't auto-refresh + await executionPage.reload({ waitUntil: 'networkidle' }); + } + + await executionPage.close(); + throw new Error('Workflow execution timed out - still in progress'); + }, + 'Verify workflow execution completed' + ); + } } diff --git a/e2e/tests/foundry.spec.ts b/e2e/tests/foundry.spec.ts index 9877ed4..2324faf 100644 --- a/e2e/tests/foundry.spec.ts +++ b/e2e/tests/foundry.spec.ts @@ -1,4 +1,4 @@ -import { test, expect } from '../src/fixtures'; +import { test } from '../src/fixtures'; test.describe.configure({ mode: 'serial' }); @@ -9,6 +9,12 @@ test.describe('Collections Toolkit - E2E Tests', () => { await userPreferencesExtensionPage.verifyExtensionRenders(); }); + test('should save preferences via User Preferences extension', async ({ userPreferencesExtensionPage }) => { + await userPreferencesExtensionPage.navigateToExtension(); + await userPreferencesExtensionPage.verifyExtensionRenders(); + await userPreferencesExtensionPage.savePreference(); + }); + test('should render Collections CRUD extension', async ({ collectionsCRUDExtensionPage }) => { await collectionsCRUDExtensionPage.navigateToExtension(); await collectionsCRUDExtensionPage.verifyExtensionRenders(); @@ -16,22 +22,30 @@ test.describe('Collections Toolkit - E2E Tests', () => { // Workflow Tests - All workflows are self-contained (no external API credentials required) test('should execute Test log_event_handler function workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test log_event_handler function'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Test process_events_handler function workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test process_events_handler function'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Test user_preferences collection workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Test user_preferences collection'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); test('should execute Paginate security_events collection workflow', async ({ workflowsPage }) => { + test.setTimeout(180000); await workflowsPage.navigateToWorkflows(); await workflowsPage.executeAndVerifyWorkflow('Paginate security_events collection'); + await workflowsPage.verifyWorkflowExecutionCompleted(); }); }); From 427f953931a76f4dddd8f846554e0de7ea076931 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Fri, 3 Apr 2026 11:45:01 -0600 Subject: [PATCH 3/4] Rename api_client to custom_storage for consistency across Foundry samples --- functions/csv-import/main.py | 14 +++++++------- functions/log-event/main.py | 6 +++--- functions/process-events/main.py | 18 +++++++++--------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/functions/csv-import/main.py b/functions/csv-import/main.py index 9ef26b2..1a48ccf 100644 --- a/functions/csv-import/main.py +++ b/functions/csv-import/main.py @@ -63,8 +63,8 @@ def import_csv_handler(request: Request, config: Dict[str, object] | None, logge def _process_import_request(request: Request, collection_name: str, logger: Logger) -> Response: """Process the import request and return response.""" - # Initialize API client with app headers baked in - api_client = CustomStorage(ext_headers=_app_headers()) + # Initialize custom storage with app headers baked in + custom_storage = CustomStorage(ext_headers=_app_headers()) # Read CSV data csv_data_result = _read_csv_data(request, logger) @@ -76,7 +76,7 @@ def _process_import_request(request: Request, collection_name: str, logger: Logg transformed_records = _process_dataframe(df, source_filename, import_timestamp) # Import records to Collection with batch processing - import_results = batch_import_records(api_client, transformed_records, collection_name) + import_results = batch_import_records(custom_storage, transformed_records, collection_name) return _create_success_response({ "df": df, @@ -221,7 +221,7 @@ def validate_record(record: Dict[str, Any]) -> None: def batch_import_records( - api_client: CustomStorage, + custom_storage: CustomStorage, records: List[Dict[str, Any]], collection_name: str, batch_size: int = 50 @@ -239,7 +239,7 @@ def batch_import_records( time.sleep(0.5) batch_context = { - "api_client": api_client, + "custom_storage": custom_storage, "batch": batch, "collection_name": collection_name, "batch_number": i // batch_size + 1 @@ -257,7 +257,7 @@ def batch_import_records( def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: """Process a single batch of records.""" - api_client = batch_context["api_client"] + custom_storage = batch_context["custom_storage"] batch = batch_context["batch"] collection_name = batch_context["collection_name"] batch_number = batch_context["batch_number"] @@ -267,7 +267,7 @@ def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: for record in batch: try: - response = api_client.PutObject(body=record, + response = custom_storage.PutObject(body=record, collection_name=collection_name, object_key=record["event_id"]) diff --git a/functions/log-event/main.py b/functions/log-event/main.py index 8b4166d..b874d7f 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -48,10 +48,10 @@ def on_post(request: Request) -> Response: "timestamp": int(time.time()) } - api_client = CustomStorage(ext_headers=_app_headers()) + custom_storage = CustomStorage(ext_headers=_app_headers()) collection_name = "event_logs" - response = api_client.PutObject(body=json_data, + response = custom_storage.PutObject(body=json_data, collection_name=collection_name, object_key=event_id) @@ -66,7 +66,7 @@ def on_post(request: Request) -> Response: ) # Query the collection to retrieve the event by id - query_response = api_client.SearchObjects(filter=f"event_id:'{event_id}'", + query_response = custom_storage.SearchObjects(filter=f"event_id:'{event_id}'", collection_name=collection_name, limit=5) diff --git a/functions/process-events/main.py b/functions/process-events/main.py index 7a07a8e..debfcdd 100644 --- a/functions/process-events/main.py +++ b/functions/process-events/main.py @@ -24,7 +24,7 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l _ = config try: - # Initialize API client and workflow + # Initialize custom storage and workflow workflow_context = _initialize_workflow(request, logger) # Get checkpoint data @@ -51,8 +51,8 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: - """Initialize workflow context with API client and configuration.""" - api_client = CustomStorage(ext_headers=_app_headers()) + """Initialize workflow context with custom storage and configuration.""" + custom_storage = CustomStorage(ext_headers=_app_headers()) checkpoint_collection = "processing_checkpoints" workflow_id = request.body.get("workflow_id", "default") @@ -60,7 +60,7 @@ def _initialize_workflow(request: Request, logger: Logger) -> Dict[str, Any]: logger.info(f"Processing workflow ID: {workflow_id}") return { - "api_client": api_client, + "custom_storage": custom_storage, "checkpoint_collection": checkpoint_collection, "workflow_id": workflow_id, "logger": logger @@ -77,13 +77,13 @@ def _app_headers() -> Dict[str, str]: def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: """Retrieve the last checkpoint for the workflow.""" - api_client = workflow_context["api_client"] + custom_storage = workflow_context["custom_storage"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] # Retrieve the most recent checkpoint for this workflow - checkpoint_response = api_client.SearchObjects(filter=f"workflow_id:'{workflow_id}'", + checkpoint_response = custom_storage.SearchObjects(filter=f"workflow_id:'{workflow_id}'", collection_name=checkpoint_collection, sort="last_processed_timestamp.desc", limit=1) @@ -97,7 +97,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: logger.debug(f"last_checkpoint: {last_checkpoint}") # SearchObjects returns metadata, not actual objects, so use GetObject for details - object_details = api_client.GetObject(collection_name=checkpoint_collection, + object_details = custom_storage.GetObject(collection_name=checkpoint_collection, object_key=last_checkpoint["object_key"]) # GetObject returns bytes; convert to JSON @@ -113,7 +113,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[str, Any]) -> Response: """Process events and update checkpoint.""" - api_client = workflow_context["api_client"] + custom_storage = workflow_context["custom_storage"] checkpoint_collection = workflow_context["checkpoint_collection"] workflow_id = workflow_context["workflow_id"] logger = workflow_context["logger"] @@ -141,7 +141,7 @@ def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[ logger.debug(f"Sending data to PutObject: {checkpoint_update}") - api_client.PutObject(body=checkpoint_update, + custom_storage.PutObject(body=checkpoint_update, collection_name=checkpoint_collection, object_key=f"checkpoint_{workflow_id}") From 8c5ce28a4f790d599111001ff5ebac0583398d69 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Thu, 16 Apr 2026 18:52:02 -0600 Subject: [PATCH 4/4] Fix parameter alignment in CustomStorage method calls Align continuation parameters under the opening parenthesis of PutObject, SearchObjects, and GetObject calls to match Python convention and the pre-existing style in this codebase. --- functions/csv-import/main.py | 4 ++-- functions/log-event/main.py | 8 ++++---- functions/process-events/main.py | 12 ++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/functions/csv-import/main.py b/functions/csv-import/main.py index 1a48ccf..b0f9406 100644 --- a/functions/csv-import/main.py +++ b/functions/csv-import/main.py @@ -268,8 +268,8 @@ def _process_batch(batch_context: Dict[str, Any]) -> Dict[str, int]: for record in batch: try: response = custom_storage.PutObject(body=record, - collection_name=collection_name, - object_key=record["event_id"]) + collection_name=collection_name, + object_key=record["event_id"]) if response["status_code"] == 200: success_count += 1 diff --git a/functions/log-event/main.py b/functions/log-event/main.py index b874d7f..b1360d7 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -52,8 +52,8 @@ def on_post(request: Request) -> Response: collection_name = "event_logs" response = custom_storage.PutObject(body=json_data, - collection_name=collection_name, - object_key=event_id) + collection_name=collection_name, + object_key=event_id) if response["status_code"] != 200: error_message = response.get("error", {}).get("message", "Unknown error") @@ -67,8 +67,8 @@ def on_post(request: Request) -> Response: # Query the collection to retrieve the event by id query_response = custom_storage.SearchObjects(filter=f"event_id:'{event_id}'", - collection_name=collection_name, - limit=5) + collection_name=collection_name, + limit=5) return Response( body={ diff --git a/functions/process-events/main.py b/functions/process-events/main.py index debfcdd..1142736 100644 --- a/functions/process-events/main.py +++ b/functions/process-events/main.py @@ -84,9 +84,9 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: # Retrieve the most recent checkpoint for this workflow checkpoint_response = custom_storage.SearchObjects(filter=f"workflow_id:'{workflow_id}'", - collection_name=checkpoint_collection, - sort="last_processed_timestamp.desc", - limit=1) + collection_name=checkpoint_collection, + sort="last_processed_timestamp.desc", + limit=1) logger.debug(f"checkpoint response: {checkpoint_response}") @@ -98,7 +98,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]: # SearchObjects returns metadata, not actual objects, so use GetObject for details object_details = custom_storage.GetObject(collection_name=checkpoint_collection, - object_key=last_checkpoint["object_key"]) + object_key=last_checkpoint["object_key"]) # GetObject returns bytes; convert to JSON json_response = json.loads(object_details.decode("utf-8")) @@ -142,8 +142,8 @@ def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[ logger.debug(f"Sending data to PutObject: {checkpoint_update}") custom_storage.PutObject(body=checkpoint_update, - collection_name=checkpoint_collection, - object_key=f"checkpoint_{workflow_id}") + collection_name=checkpoint_collection, + object_key=f"checkpoint_{workflow_id}") return Response( body={