Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions backend/api/routes/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,6 @@ def initialize_execution(pipeline_id: str, context: Optional[Dict[str, Any]] = N
return execution


def initialize_execution(pipeline_id: str, context: Optional[Dict[str, Any]] = None) -> Execution:
def initialize_execution(pipeline_id: str, context: Dict = None) -> Execution:
"""Create and store a pending execution record."""
from datetime import datetime

execution_id = f"exec-{uuid.uuid4()}"
pipeline = pipelines_db[pipeline_id]
execution = Execution(
id=execution_id,
pipeline_id=pipeline_id,
status=ExecutionStatus.PENDING,
started_at=datetime.now(),
total_stages=len(pipeline.stages),
context=context or {}
)
executions_db[execution_id] = execution
return execution


async def execute_pipeline_background(pipeline_id: str, execution_id: str):
"""Background task to execute pipeline."""
try:
Expand All @@ -115,11 +96,6 @@ async def execute_pipeline_background(pipeline_id: str, execution_id: str):
result.error = existing.error

result.id = execution_id
if execution_id in executions_db:
existing_context = executions_db[execution_id].context.copy()
result.context.update(existing_context)

result.id = execution_id # Use the pre-assigned ID
executions_db[execution_id] = result
except Exception as e:
if execution_id in executions_db:
Expand Down
1 change: 0 additions & 1 deletion backend/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class AirflowCallbackRequest(BaseModel):
"""Schema for processing Airflow run callbacks."""
execution_id: str
callback_type: AirflowCallbackTypeSchema
callback_type: str = Field(description="success, failure, retry, running, cancelled")
dag_id: str
dag_run_id: str
task_id: Optional[str] = None
Expand Down