@@ -24,7 +24,7 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l
2424 _ = config
2525
2626 try :
27- # Initialize API client and workflow
27+ # Initialize custom storage and workflow
2828 workflow_context = _initialize_workflow (request , logger )
2929
3030 # Get checkpoint data
@@ -51,16 +51,16 @@ def process_events_handler(request: Request, config: Dict[str, object] | None, l
5151
5252
5353def _initialize_workflow (request : Request , logger : Logger ) -> Dict [str , Any ]:
54- """Initialize workflow context with API client and configuration."""
55- api_client = CustomStorage (ext_headers = _app_headers ())
54+ """Initialize workflow context with custom storage and configuration."""
55+ custom_storage = CustomStorage (ext_headers = _app_headers ())
5656
5757 checkpoint_collection = "processing_checkpoints"
5858 workflow_id = request .body .get ("workflow_id" , "default" )
5959
6060 logger .info (f"Processing workflow ID: { workflow_id } " )
6161
6262 return {
63- "api_client " : api_client ,
63+ "custom_storage " : custom_storage ,
6464 "checkpoint_collection" : checkpoint_collection ,
6565 "workflow_id" : workflow_id ,
6666 "logger" : logger
@@ -77,13 +77,13 @@ def _app_headers() -> Dict[str, str]:
7777
7878def _get_checkpoint (workflow_context : Dict [str , Any ]) -> Dict [str , Any ]:
7979 """Retrieve the last checkpoint for the workflow."""
80- api_client = workflow_context ["api_client " ]
80+ custom_storage = workflow_context ["custom_storage " ]
8181 checkpoint_collection = workflow_context ["checkpoint_collection" ]
8282 workflow_id = workflow_context ["workflow_id" ]
8383 logger = workflow_context ["logger" ]
8484
8585 # Retrieve the most recent checkpoint for this workflow
86- checkpoint_response = api_client .SearchObjects (filter = f"workflow_id:'{ workflow_id } '" ,
86+ checkpoint_response = custom_storage .SearchObjects (filter = f"workflow_id:'{ workflow_id } '" ,
8787 collection_name = checkpoint_collection ,
8888 sort = "last_processed_timestamp.desc" ,
8989 limit = 1 )
@@ -97,7 +97,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]:
9797 logger .debug (f"last_checkpoint: { last_checkpoint } " )
9898
9999 # SearchObjects returns metadata, not actual objects, so use GetObject for details
100- object_details = api_client .GetObject (collection_name = checkpoint_collection ,
100+ object_details = custom_storage .GetObject (collection_name = checkpoint_collection ,
101101 object_key = last_checkpoint ["object_key" ])
102102
103103 # GetObject returns bytes; convert to JSON
@@ -113,7 +113,7 @@ def _get_checkpoint(workflow_context: Dict[str, Any]) -> Dict[str, Any]:
113113
114114def _process_and_update (workflow_context : Dict [str , Any ], checkpoint_data : Dict [str , Any ]) -> Response :
115115 """Process events and update checkpoint."""
116- api_client = workflow_context ["api_client " ]
116+ custom_storage = workflow_context ["custom_storage " ]
117117 checkpoint_collection = workflow_context ["checkpoint_collection" ]
118118 workflow_id = workflow_context ["workflow_id" ]
119119 logger = workflow_context ["logger" ]
@@ -141,7 +141,7 @@ def _process_and_update(workflow_context: Dict[str, Any], checkpoint_data: Dict[
141141
142142 logger .debug (f"Sending data to PutObject: { checkpoint_update } " )
143143
144- api_client .PutObject (body = checkpoint_update ,
144+ custom_storage .PutObject (body = checkpoint_update ,
145145 collection_name = checkpoint_collection ,
146146 object_key = f"checkpoint_{ workflow_id } " )
147147
0 commit comments