Skip to content

Long running task gets incorrectly killed in long-running DAG due to scheduler state mismatch #64658

@dabla

Description

@dabla

Apache Airflow version

3.1.8

What happened and how to reproduce it?

We are experiencing an issue with long-running DAGs (>3 hours) where a task using `GenericTransfer

worker.log

` gets unexpectedly terminated. The task is actively processing data, but the scheduler incorrectly determines that the task is no longer in a running state and instructs it to terminate.

This appears to be a state desynchronization issue between the scheduler and the worker.

    with DAG(
        default_args={
            "owner": "dabla",
            "email_on_failure": True,
            "email_on_retry": False,
            "retry_delay": timedelta(minutes=10),
        },
        dag_id=f"load_table_stg_full_load",
        schedule=None,
        start_date=datetime(2026, 4, 2),
        catchup=False,
        max_active_runs=1,
    ):
        load_data_to_postgres = GenericTransfer(
            task_id="load_data_to_postgres",
            source_conn_id="sas.prod",
            destination_conn_id="postgres.dev",
            destination_table='sa."TABLE_STG"',
            sql="SELECT * FROM BI.TABLE",
            preoperator=[
                "sql/ddl/create_table_stg.sql",
                "sql/ddl/create_table.sql",
                'TRUNCATE TABLE sa."TABLE_STG";',
            ],
            insert_args={
                "commit_every": 10000,
                "autocommit": False,
                "executemany": True,
                "fast_executemany": True,
            },
            page_size=10000,
            paginated_sql_statement_clause="{0}(firstobs=%eval({2} + 1) obs=%eval({2} + {1})) ORDER BY ID;",
            retries=2,
        )

        get_source_row_count = SQLExecuteQueryOperator(
            task_id="get_source_row_count",
            conn_id=source_conn_id,
            sql=f"""
                SELECT COUNT(*) AS row_count
                FROM BI.TABLE
            """,
            output_processor=lambda result, _: result[0][0],
        )

        validate_target_row_count = SQLValueCheckOperator(
            task_id="validate_target_row_count_matches_source",
            conn_id=dest_conn_id,
            sql=f"""
                SELECT COUNT(*)
                FROM sa."TABLE_STG"
            """,
            pass_value=get_source_row_count.output,
        )

        load_data_to_postgres >> get_source_row_count >> validate_target_row_count

What you think should happen instead?

The task shouldn't be killed.

Operating System

Fedora 5.3

Versions of Apache Airflow Providers

Latests versions for 3.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

We have 3 scheduler instances on K8S and use CeleryExecutors as workers.

Anything else?

We also have defined following parameter in hope to avoid the issue but it doesn't help:

- name: AIRFLOW__CELERY__WORKER_SEND_TASK_EVENTS
      value: "true"
    - name: AIRFLOW__CELERY__TASK_SEND_SENT_EVENT
      value: "true"
    - name: CELERY_WORKER_SEND_TASK_EVENTS
      value: "1"
    - name: AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVALS
      value: "false"
    - name: AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE
      value: "20"
    - name: AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP
      value: "10"
    # If the last scheduler heartbeat happened more than [scheduler] scheduler_health_check_threshold ago (in seconds), scheduler is considered unhealthy. This is used by the health check in the /health endpoint and in airflow jobs check CLI for SchedulerJob.
    - name: AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD
      value: "30"
    # How often (in seconds) the scheduler expects a heartbeat from a non-deferrable task instance. Increase this for long-running non-deferrable tasks to avoid premature task termination. Default if not set: 0 (Airflow may kill long-running tasks quickly).
    - name: AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC
      value: "300"

Probably related to this #48719
#54479

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions