diff --git a/backend/api/routes/executions.py b/backend/api/routes/executions.py index abd2ec2..96dd85b 100644 --- a/backend/api/routes/executions.py +++ b/backend/api/routes/executions.py @@ -73,25 +73,6 @@ def initialize_execution(pipeline_id: str, context: Optional[Dict[str, Any]] = N return execution -def initialize_execution(pipeline_id: str, context: Optional[Dict[str, Any]] = None) -> Execution: -def initialize_execution(pipeline_id: str, context: Dict = None) -> Execution: - """Create and store a pending execution record.""" - from datetime import datetime - - execution_id = f"exec-{uuid.uuid4()}" - pipeline = pipelines_db[pipeline_id] - execution = Execution( - id=execution_id, - pipeline_id=pipeline_id, - status=ExecutionStatus.PENDING, - started_at=datetime.now(), - total_stages=len(pipeline.stages), - context=context or {} - ) - executions_db[execution_id] = execution - return execution - - async def execute_pipeline_background(pipeline_id: str, execution_id: str): """Background task to execute pipeline.""" try: @@ -115,11 +96,6 @@ async def execute_pipeline_background(pipeline_id: str, execution_id: str): result.error = existing.error result.id = execution_id - if execution_id in executions_db: - existing_context = executions_db[execution_id].context.copy() - result.context.update(existing_context) - - result.id = execution_id # Use the pre-assigned ID executions_db[execution_id] = result except Exception as e: if execution_id in executions_db: diff --git a/backend/api/schemas.py b/backend/api/schemas.py index 21a4b52..a87d8f4 100644 --- a/backend/api/schemas.py +++ b/backend/api/schemas.py @@ -118,7 +118,6 @@ class AirflowCallbackRequest(BaseModel): """Schema for processing Airflow run callbacks.""" execution_id: str callback_type: AirflowCallbackTypeSchema - callback_type: str = Field(description="success, failure, retry, running, cancelled") dag_id: str dag_run_id: str task_id: Optional[str] = None