Skip to content

Commit df3a560

Browse files
muhammad-ali-eclaudechandrasekharan-zipstackkirtimanmishrazipstack
authored
UN-3211 [FEAT] HTTP session lifecycle management for workers API clients (#1782)
* UN-3211 [FEAT] HTTP session lifecycle management for workers API clients - Add _owns_session flag to prevent singleton shared session from being closed by individual clients - Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools - Add idempotent close() and __del__ destructor to BaseAPIClient - Add try/finally cleanup in api-deployment and callback tasks - Add on_worker_process_shutdown hook and early-return guard in postrun - Add 25 unit tests for session lifecycle behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3211 [FIX] Address CodeRabbit review: log reset failures, thread-safe counter - Log warning instead of silently swallowing exceptions in reset_singleton() - Add threading.Lock around task counter increment for thread safety with threads/gevent/eventlet pools Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3211 [FIX] Address CodeRabbit review round 2: optimize lock scope, document thread-safety - Move WorkerConfig() instantiation outside lock in increment_task_counter() - Remove redundant _task_counter=0 (already done inside reset_singleton) - Document thread-safety caveat in reset_singleton() docstring - Log close failures in task cleanup instead of silently swallowing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3211 [FIX] Address CodeRabbit review round 3: test real on_task_postrun handler Tests now call the real worker.on_task_postrun() signal handler instead of simulating the guard logic inline, catching divergence if the handler's guard, try/except, or import path changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3211 [FIX] Optimize singleton reset with cached threshold and improve close() logging Cache the singleton_reset_task_threshold to avoid re-importing WorkerConfig on every task increment. Promote api_client.close() failure logs from debug to warning for better production visibility. Update tests to reset cached threshold. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3211 [FIX] Improve API execution error handling and clean up worker imports Refactor api-deployment tasks to handle setup failures early with proper cleanup, move shared imports to module level in worker.py, and fix type annotations in client_factory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Co-authored-by: Kirtiman Mishra <110175055+kirtimanmishrazipstack@users.noreply.github.com>
1 parent dcb226b commit df3a560

12 files changed

Lines changed: 1063 additions & 324 deletions

File tree

workers/api-deployment/tasks.py

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,30 @@ def _unified_api_execution(
168168
Returns:
169169
Execution result dictionary
170170
"""
171+
# Set up execution context - exit early if setup fails
172+
organization_id = schema_name
171173
try:
172-
# Set up execution context using shared utilities
173-
organization_id = schema_name
174-
config, api_client = WorkerExecutionContext.setup_execution_context(
174+
_, api_client = WorkerExecutionContext.setup_execution_context(
175175
organization_id, execution_id, workflow_id
176176
)
177+
except Exception as e:
178+
logger.error(f"Failed to setup execution context: {e}")
179+
# Clean up StateStore since setup_execution_context may have
180+
# partially populated it before failing
181+
try:
182+
from shared.infrastructure.context import StateStore
177183

184+
StateStore.clear_all()
185+
except Exception:
186+
logger.debug("StateStore cleanup also failed during early exit")
187+
return {
188+
"execution_id": execution_id,
189+
"status": "ERROR",
190+
"error": str(e),
191+
"files_processed": 0,
192+
}
193+
194+
try:
178195
# Log task start with standardized format
179196
WorkerExecutionContext.log_task_start(
180197
f"unified_api_execution_{task_type}",
@@ -216,12 +233,12 @@ def _unified_api_execution(
216233
schema_name=organization_id,
217234
workflow_id=workflow_id,
218235
execution_id=execution_id,
219-
hash_values_of_files=converted_files, # Changed parameter name
236+
hash_values_of_files=converted_files,
220237
scheduled=scheduled,
221238
execution_mode=execution_mode,
222239
pipeline_id=pipeline_id,
223240
use_file_history=use_file_history,
224-
task_id=task_instance.request.id, # Add required task_id
241+
task_id=task_instance.request.id,
225242
**kwargs,
226243
)
227244

@@ -233,25 +250,14 @@ def _unified_api_execution(
233250
f"files_processed={len(converted_files)}",
234251
)
235252

236-
# CRITICAL: Clean up StateStore to prevent data leaks between tasks
237-
try:
238-
from shared.infrastructure.context import StateStore
239-
240-
StateStore.clear_all()
241-
logger.debug("🧹 Cleaned up StateStore context to prevent data leaks")
242-
except Exception as cleanup_error:
243-
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")
244-
245253
return result
246254

247255
except Exception as e:
248256
logger.error(f"API execution failed: {e}")
249257

250-
# Handle execution error with standardized pattern
251-
if "api_client" in locals():
252-
WorkerExecutionContext.handle_execution_error(
253-
api_client, execution_id, e, logger, f"api_execution_{task_type}"
254-
)
258+
WorkerExecutionContext.handle_execution_error(
259+
api_client, execution_id, e, logger, f"api_execution_{task_type}"
260+
)
255261

256262
# Log completion with error
257263
WorkerExecutionContext.log_task_completion(
@@ -261,26 +267,27 @@ def _unified_api_execution(
261267
f"error={str(e)}",
262268
)
263269

264-
# CRITICAL: Clean up StateStore to prevent data leaks between tasks (error path)
265-
try:
266-
from shared.infrastructure.context import StateStore
267-
268-
StateStore.clear_all()
269-
logger.debug(
270-
"🧹 Cleaned up StateStore context to prevent data leaks (error path)"
271-
)
272-
except Exception as cleanup_error:
273-
logger.warning(
274-
f"Failed to cleanup StateStore context on error: {cleanup_error}"
275-
)
276-
277270
return {
278271
"execution_id": execution_id,
279272
"status": "ERROR",
280273
"error": str(e),
281274
"files_processed": 0,
282275
}
283276

277+
finally:
278+
try:
279+
api_client.close()
280+
except Exception as e:
281+
logger.warning("api_client.close() failed during cleanup: %s", e)
282+
283+
# Clean up StateStore to prevent data leaks between tasks
284+
try:
285+
from shared.infrastructure.context import StateStore
286+
287+
StateStore.clear_all()
288+
except Exception as cleanup_error:
289+
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")
290+
284291

285292
@app.task(
286293
bind=True,

0 commit comments

Comments
 (0)