From 41aa3aab7b8d33c1b95627b482c3d42c94cdf575 Mon Sep 17 00:00:00 2001 From: Data Cloud Agents Team Date: Tue, 23 Jun 2026 01:48:55 -0700 Subject: [PATCH] feat: gcp-msaa-migration - a new skill for Airflow code migration It covers migration from Airflow 2.10.2+ to Airflow 2.11.1 and Airflow 3. PiperOrigin-RevId: 936509461 --- skills/gcp-msaa-migration/SKILL.md | 552 +++++++++++++++++++++++++++++ 1 file changed, 552 insertions(+) create mode 100644 skills/gcp-msaa-migration/SKILL.md diff --git a/skills/gcp-msaa-migration/SKILL.md b/skills/gcp-msaa-migration/SKILL.md new file mode 100644 index 0000000..ee11240 --- /dev/null +++ b/skills/gcp-msaa-migration/SKILL.md @@ -0,0 +1,552 @@ +--- +name: gcp-msaa-migration +description: Provides guidance for migrating Apache Airflow DAGs in Managed Service for Apache Airflow (MSAA; formerly Cloud Composer). Covers migration to Airflow 2.11.1 (MSAA Gen 2 and 3) and Airflow 3 (MSAA Gen 3), including environment inspection, GCS download/upload and scanning patterns for breaking changes. +license: Apache-2.0 +metadata: + version: v1 + publisher: google +--- + +# Managed Service for Apache Airflow (formerly Cloud Composer) Migration Guide + +This skill guides you through the process of adjusting Airflow DAGs from an +existing Managed Service for Apache Airflow (formerly Cloud Composer) +environment (or available locally) to make them compatible with **Airflow +2.11.1** (MSAA Gen 2 or 3) or **Airflow 3** (MSAA Gen 3). + +-------------------------------------------------------------------------------- + +## Phase 1: Discovery & Download + +Before making any changes, download the existing DAG files if explicitly +requested. Inspect the source environment to confirm source version only if +explicitly requested. + +### 1.1 List and Inspect Source Environment (Only when requested) + +*Perform this step only if explicitly requested to do so.* Run the following +commands to list environments, get detailed configuration, and identify the +starting versions of your source environment (``) in region +``. + +1. **List Environments:** Identify the available Composer environments in your + project. + + ```bash + gcloud composer environments list \ + --locations= \ + --format="table(name,location,state)" + ``` + + *Note: Always use the `--locations` flag (plural) for listing. You can omit + `--locations` to list across all regions.* + +2. **Describe Environment:** Get the complete configuration details for a + specific environment. + + ```bash + gcloud composer environments describe \ + --location + ``` + + *Note: Always use the `--location` flag (singular) for describing a specific + environment.* + +3. **Get Specific Configuration Details:** Extract specific fields from the + environment description. + + * **Get Image Version:** + + ```bash + gcloud composer environments describe \ + --location \ + --format="value(config.softwareConfig.imageVersion)" + ``` + + * **Get PyPI Packages:** + + ```bash + gcloud composer environments describe \ + --location \ + --format="value(config.softwareConfig.pypiPackages)" + ``` + + * **Get GCS Bucket Path:** + + ```bash + gcloud composer environments describe \ + --location \ + --format="value(config.dagGcsPrefix)" + ``` + + *Expected Output:* `gs:///dags` + +4. **List Active DAGs:** Identify which DAGs are currently registered and + active in the source environment. + + ```bash + gcloud composer environments run \ + --location \ + dags list + ``` + +### 1.2 Download DAGs and Bucket Dependencies (only when requested) + +*Perform this step only if explicitly requested to do so.* Download DAG files +and any other dependency files/folders from the source environment GCS bucket to +a local workspace directory (`./migration_workspace`). + +1. **Download DAGs:** + + ```bash + mkdir -p ./migration_workspace/dags + gcloud storage cp -r gs:///dags/* ./migration_workspace/dags/ + ``` + +2. **Download Other Bucket Dependencies (if applicable):** + + ```bash + gcloud storage cp -r gs:/// ./migration_workspace/ + ``` + +> [!IMPORTANT] **Additional Environment Dependencies:** DAGs may also depend on +> Airflow Connections, Variables, or custom PyPI packages. Ensure these are +> identified and documented for the target environment setup. + +-------------------------------------------------------------------------------- + +## Phase 2: Target Version & Dependency Mapping + +### 2.1 Airflow 2.11.1+ Dependency Mapping + +If migrating to Airflow 2.11.1 (MSAA Gen 2) or Airflow 3, use the table below to +trace the version progression of key dependencies. The table covers changes +needed to get to Airflow 2.11.1. Take them into account when migrating from +Airflow 2 (earlier than 2.11.1) to Airflow 3. + +Image Version (Composer) | Airflow Version | Google Provider | SSH Provider | HTTP Provider | Breaking Changes Introduced in this Version (or since previous row) +:----------------------- | :-------------- | :-------------- | :----------- | :------------ | :------------------------------------------------------------------ +`2.10.0` | `2.10.2` | `10.26.0` | `3.14.0` | `4.13.3` | *Baseline for oldest fully documented source.* +`2.15.3` | `2.10.5` | `18.0.0` | `4.1.4` | `5.3.4` | **SSH 4.0.0:** Hook `timeout` removed; `get_conn()` context manager.
**HTTP 5.0.0:** `SimpleHttpOperator` -> `HttpOperator`.
**Google 11.0.0:** `BigQueryExecuteQueryOperator` removed.
**Google 12.0.0:** Legacy Data Pipeline operators removed.
**Google 13.0.0:** `AutoMLBatchPredictOperator` removed.
**Google 17.0.0:** `BigQueryCreateEmptyTableOperator` and `BigQueryCreateExternalTableOperator` removed; Life Sciences operators removed.
**Google 18.0.0:** Legacy DV360 operators removed. +`2.16.1` | `2.10.5` | `19.0.0` | `4.1.6` | `5.5.0` | **Google 19.0.0:** AutoML operators removed (use Vertex AI). +**`2.17.0` (Target)** | **`2.11.1`** | **`20.0.0`** | **`5.0.0`** | **`6.0.2`** | **SSH Provider 5.0.0:** `sshtunnel` removed (native tunneling).
**HTTP Provider 6.0.0:** JSON serialization.
**Google Provider 20.0.0:** ADLS Gen2 migration. + +### 2.2 Airflow 3 Migration + +If migrating to Airflow 3 (MSAA Gen 3), note that this is a major version +upgrade with significant changes, including: + +* Decoupled Task SDK (imports change from `airflow` to `airflow.sdk`). +* Removal of direct metadata DB access. +* Renaming of `Dataset` to `Asset`. +* Removal of SubDAGs and SLAs. +* Changes to context variables availability. + +Take into account all applicable changes within Airflow 2 (e.g. when migrating +from Airflow 2.10.2, apply changes needed to move to Airflow 2.11.1 and Airflow +3 migration changes on top of that). + +-------------------------------------------------------------------------------- + +## Phase 3: Analysis & Remediation (Scanning Downloaded Files) + +Run the scan commands from the root of your local workspace +(`./migration_workspace`). + +-------------------------------------------------------------------------------- + +### 3.1 Airflow 2.11.1 Core & Dependency checks + +Use these scans if migrating to Airflow 2.11.1+ (intermediate step when +migrating to Airflow 3). + +#### 3.1.1 Dataset Scheduling (Airflow 2.11.0) + +* **Change:** DAGs scheduled on datasets only trigger if events occur while + the DAG is unpaused. +* **Scan Command:** `grep -rn "Dataset(" ./dags` +* **Remediation:** Document that these DAGs must remain unpaused to catch + events, or plan manual triggers for catch-up. + +#### 3.1.2 HTML in Descriptions (Airflow 2.11.0) + +* **Change:** Raw HTML in DAG docs/params is escaped by default. +* **Scan Command:** `grep -rn -E + "doc_md.*<|doc_md.*>|description.*<|description.*>" ./dags` +* **Remediation:** Convert HTML to Markdown, or set + `AIRFLOW__WEBSERVER__ALLOW_RAW_HTML_DESCRIPTIONS=True` in target. + +#### 3.1.3 Teardown Tasks (Airflow 2.10.5) + +* **Change:** Teardowns always run when a DAG is marked failed. +* **Scan Command:** `grep -rn "as_teardown" ./dags` +* **Remediation:** Ensure teardown tasks are idempotent. + +#### 3.1.4 Pendulum 3 Upgrade (Airflow 2.11.0) + +* **Change:** `Period` renamed to `Interval`, testing helpers removed. +* **Scan Command (Code):** `grep -rn -E "pendulum\.Period|pendulum\.period" + ./dags` +* **Scan Command (Tests):** `grep -rn -E "\.test\(|set_test_now\(" ./tests + 2>/dev/null || true` +* **Remediation:** Replace `Period` with `Interval`, and `period(...)` with + `interval(...)`. + +-------------------------------------------------------------------------------- + +### 3.2 Path A: Airflow 2.11.1 Provider Package Scan + +#### 3.2.1 SSH Provider (SSH 4.0.0 & 5.0.0) + +* **Scan Command (Timeout):** `grep -rn "SSHHook" ./dags | grep "timeout"` +* **Scan Command (Context Manager):** `grep -rn "with SSHHook" ./dags` +* **Scan Command (Tunnel Attributes):** `grep -rn "\.get_tunnel" ./dags` +* **Remediation:** + * Replace `timeout` with `conn_timeout` in `SSHHook`. + * Replace `with hook as conn:` with `with hook.get_conn() as conn:`. + * Use `get_tunnel()` as context manager: `with hook.get_tunnel(...) as + tunnel:`. + +#### 3.2.2 HTTP Provider (HTTP 5.0.0 & 6.0.0) + +* **Scan Command:** `grep -rn "SimpleHttpOperator" ./dags` +* **Remediation:** Replace `SimpleHttpOperator` with `HttpOperator`. + +#### 3.2.3 Google Provider (v11 to v20) + +* **Scan Command (BigQuery query):** `grep -rn "BigQueryExecuteQueryOperator" + ./dags` + * *Remediation:* Replace with `BigQueryInsertJobOperator` (use + `configuration` dict). +* **Scan Command (BigQuery table):** `grep -rn -E + "BigQueryCreateEmptyTableOperator|BigQueryCreateExternalTableOperator" + ./dags` + * *Remediation:* Replace with `BigQueryCreateTableOperator` (use + `table_resource` dict). +* **Scan Command (AutoML):** `grep -rn -E + "AutoMLTrainModelOperator|AutoMLPredictOperator|AutoMLCreateDatasetOperator|AutoMLBatchPredictOperator" + ./dags` + * *Remediation:* Migrate to Vertex AI operators. +* **Scan Command (Dataflow):** `grep -rn -E + "CreateDataPipelineOperator|RunDataPipelineOperator" ./dags` + * *Remediation:* Replace with + `DataflowCreatePipelineOperator`/`RunPipelineOperator`. +* **Scan Command (Life Sciences):** `grep -rn + "LifeSciencesRunPipelineOperator" ./dags` + * *Remediation:* Migrate to Google Cloud Batch operators + (`BatchCreateJobOperator`). +* **Scan Command (ADLS to GCS):** `grep -rn "ADLSToGCSOperator" ./dags` + * *Remediation:* Ensure `file_system_name` is provided. + +-------------------------------------------------------------------------------- + +### 3.3 Airflow 3 Migration checks + +Use these scans if migrating to Airflow 3. + +#### 3.3.1 Direct Metadata Database Access (migration to Airflow 3) + +* **Change:** Direct access to the Airflow metadata database is not allowed. + Internal DB sessions and direct SQLAlchemy queries against Airflow internals + must be removed. +* **Scan Command:** + + ```bash + grep -rn -E "provide_session|airflow\.utils\.session" ./dags + ``` + +* **Remediation:** + + * Remove `@provide_session` decorators and internal ORM queries. + * Inspect localized task runtime state via `get_current_context()`. + * For interaction with Airflow metadata resources, use the **Airflow + Python Client**. + +#### 3.3.2 Reorganized Imports (migration to Airflow 3) + +* **Change:** Core authoring primitives are migrated to the decoupled SDK + interface, and standard operators are moved to the standard provider bundle. +* **Scan Command:** + + ```bash + grep -rn -E "airflow\.models\.dag|airflow\.DAG|airflow\.models\.baseoperator|airflow\.utils\.task_group|airflow\.datasets\.Dataset|airflow\.models\.variable|airflow\.operators\.bash|airflow\.operators\.python" ./dags + ``` + +* **Remediation:** Update imports according to the following mapping: + + Legacy Airflow 2 Import | Airflow 3 Target Import + :----------------------------------------- | :---------------------- + `airflow.models.dag.DAG` / `airflow.DAG` | `airflow.sdk.DAG` + `airflow.models.baseoperator.BaseOperator` | `airflow.sdk.BaseOperator` + `airflow.utils.task_group.TaskGroup` | `airflow.sdk.TaskGroup` + `airflow.datasets.Dataset` | `airflow.sdk.Asset` + `airflow.models.variable.Variable` | `airflow.sdk.Variable` + `airflow.operators.bash.BashOperator` | `airflow.providers.standard.operators.bash.BashOperator` + `airflow.operators.python.PythonOperator` | `airflow.providers.standard.operators.python.PythonOperator` + + *Note: Prefer standard decorators (e.g. `@dag`, `@task`) unless it is not + feasible.* + +#### 3.3.4 Asset Transition: Dataset to Asset (migration to Airflow 3) + +* **Change:** Data-aware scheduling targets are renamed from `Dataset` to + `Asset`. +* **Scan Command:** + + ```bash + grep -rn "Dataset(" ./dags + ``` + +* **Remediation:** + + * Update `schedule=[Dataset(...)]` to `schedule=[Asset(...)]`. + * Update task parameter endpoints (`outlets`/`inlets`) accordingly. + * Update imports to `from airflow.sdk import Asset`. + +#### 3.3.5 Deprecated Features: SubDAGs & SLAs (migration to Airflow 3) + +* **Change:** `SubDagOperator` and `sla` parameters are removed. +* **Scan Command (SubDAGs):** + + ```bash + grep -rn "SubDagOperator" ./dags + ``` + +* **Scan Command (SLAs):** + + ```bash + grep -rn "sla=" ./dags + ``` + +* **Remediation:** + + * **SubDAGs:** Refactor `SubDagOperator` uses into nested `TaskGroup`s. + * **SLAs:** Delete the `sla` parameter from operator calls and DAG + `default_args`. + +#### 3.3.6 Deprecated Context Variables (migration to Airflow 3) + +* **Change:** Several context variables are deprecated or removed. +* **Scan Command:** + + ```bash + grep -rn -E "\bexecution_date\b|\bnext_execution_date\b|\bnext_ds\b|\bnext_ds_nodash\b|\bprev_execution_date\b|\bprev_ds\b|\bprev_ds_nodash\b|\byesterday_ds\b|\byesterday_ds_nodash\b|\btomorrow_ds\b|\btomorrow_ds_nodash\b|\bprev_execution_date_success\b|\bconf\b|\btriggering_dataset_events\b" ./dags + ``` + +* **Remediation:** Replace deprecated variables. Be defensive and secure the + code to handle cases where variables might be `None` (e.g., in manual or + asset-triggered runs). + + Airflow 2.x Variable | Airflow 3.0 Replacement / Workaround | Migration / Fallback Advice + :---------------------------- | :-------------------------------------------------------------------- | :-------------------------- + `execution_date` | `logical_date` | Replace with `logical_date`. Use `dag_run.run_id` for asset-triggered runs. + `next_execution_date` | `data_interval_end` | Replace with `data_interval_end`. Defaults to trigger time on manual runs. + `next_ds` | `{{ data_interval_end \| ds }}` | Replace with `{{ data_interval_end \| ds }}`. + `next_ds_nodash` | `{{ data_interval_end \| ds_nodash }}` | Replace with `{{ data_interval_end \| ds_nodash }}`. + `prev_execution_date` | `prev_data_interval_start_success` | Use `logical_date` for manual run fallback, or `prev_data_interval_start_success` for actual prior success. + `prev_ds` | `{{ prev_data_interval_start_success \| ds }}` | Use `ds` for manual fallback, or `{{ prev_data_interval_start_success \| ds }}`. + `prev_ds_nodash` | `{{ prev_data_interval_start_success \| ds_nodash }}` | Use `ds_nodash` for manual fallback, or `{{ prev_data_interval_start_success \| ds_nodash }}`. + `yesterday_ds` | `{{ macros.ds_add(ds, -1) }}` | Replace with `{{ macros.ds_add(ds, -1) }}`. + `yesterday_ds_nodash` | `{{ macros.ds_format(macros.ds_add(ds, -1), "%Y-%m-%d", "%Y%m%d") }}` | Replace with formatted `ds_add` macro. + `tomorrow_ds` | `{{ macros.ds_add(ds, 1) }}` | Replace with `{{ macros.ds_add(ds, 1) }}`. + `tomorrow_ds_nodash` | `{{ macros.ds_format(macros.ds_add(ds, 1), "%Y-%m-%d", "%Y%m%d") }}` | Replace with formatted `ds_add` macro. + `prev_execution_date_success` | `prev_data_interval_start_success` | Replace with `prev_data_interval_start_success`. + `conf` | None / Env Vars / Variables | Removed for Task SDK isolation. Use env vars or Airflow Variables. User run config is in `dag_run.conf`. + `triggering_dataset_events` | `triggering_asset_events` | Replace with `triggering_asset_events`. + +#### 3.3.7 Default Configurations + +* **Change:** Default behavior for `catchup` is now `False`. +* **Scan Command:** + + ```bash + grep -rn "DAG(" ./dags | grep -v "catchup" + ``` + +* **Remediation:** Set `catchup=False` explicitly unless historical runs are + strictly required. + +-------------------------------------------------------------------------------- + +### 3.4 Airflow 3.0 Conversion Examples + +#### Example 1: Basic DAG Imports & Assets + +**Legacy Airflow 2 Code:** + +```python +from airflow import DAG +from airflow.datasets import Dataset +from airflow.operators.bash import BashOperator +from datetime import datetime + +with DAG( + dag_id="example_dag", + start_date=datetime(2023, 1, 1), + schedule=[Dataset("gcs://my-bucket/data.csv")], + catchup=True, +) as dag: + task1 = BashOperator( + task_id="run_script", + bash_command="echo hello", + sla=datetime.timedelta(hours=1), + ) +``` + +**Converted Airflow 3 Code:** + +```python +from airflow.sdk import dag, task, Asset +from datetime import datetime + +@dag( + dag_id="example_dag", + start_date=datetime(2023, 1, 1), + schedule=[Asset("gcs://my-bucket/data.csv")], + catchup=True, +) +def example_dag(): + @task.bash(task_id="run_script") + def run_script(): + return "echo hello" + + run_script() + +example_dag() +``` + +-------------------------------------------------------------------------------- + +## Phase 4: Deployment & Verification + +*Perform deployment and verification steps only if explicitly requested to do +so.* + +### 4.1 Static Verification (when migrating to Airflow 3) + +After applying code changes for Airflow 3, verify syntax correctness. If +available in the development environment, run static lint checks: + +```bash +ruff check {target_dag_file} --select AIR30 +``` + +Resolve any reported deprecation warnings before finalization. If ruff is not +available, recommend installing one. + +### 4.2 Deployment to MSAA + +#### 4.2.1 Get Target GCS Bucket Path (only when requested) + +```bash +gcloud composer environments describe \ + --location \ + --format="value(config.dagGcsPrefix)" +``` + +*Expected Output:* `gs:///dags` + +### 4.2 Upload Modified DAGs and Bucket Dependencies (Only when requested) + +*Perform this step only if explicitly requested to do so.* Copy the modified +DAGs and any backed-up bucket dependencies from your local workspace to the +target GCS bucket. *If you skipped the inspection step, ensure you have the +correct ``.* + +1. **Upload DAGs:** + + ```bash + gcloud storage cp -r ./dags/* gs:///dags/ + ``` + +2. **Upload Other Bucket Dependencies (If applicable):** + + ```bash + gcloud storage cp -r ./migration_workspace/ gs:/// + ``` + +### 4.3 Verify DAGs via Airflow CLI + +*Perform this step only if explicitly requested to upload modified DAGS to a +target environment (and after uploading).* + +You can verify that your DAGs have been successfully uploaded, parsed, and +registered by the Airflow scheduler in the target environment using the Airflow +CLI. + +1. **List Registered DAGs:** Run the following command to list all DAGs + registered in the target environment. Verify that your migrated DAGs appear + in this list. + + ```bash + gcloud composer environments run \ + --location \ + dags list + ``` + +2. **Check for Import Errors:** If some DAGs are missing from the list, or to + ensure there are no parsing issues, check for import errors: + + ```bash + gcloud composer environments run \ + --location \ + dags list-import-errors + ``` + + *Expected Output:* + + * If there are no errors, the command will output `No data found`. + * If there are errors, it will list the file path and the traceback of the + error. + +*Note: It may take a couple of minutes for the Airflow scheduler to parse the +new files and for changes to reflect in these commands.* + +### 4.4 Verify in Cloud Logging + +*Perform this step only if explicitly requested to upload modified DAGS to a +target environment (and after uploading).* Monitor Cloud Logging for the target +environment to detect any runtime errors or import errors. + +Run the following query in the **GCP Cloud Logging Console** (or via `gcloud +logging read`): + +```query +resource.type="cloud_composer_environment" +resource.labels.environment_name="" +log_id("airflow-scheduler") +severity>=ERROR +``` + +-------------------------------------------------------------------------------- + +## Appendix: Local Environment Verification + +If you want to verify your changes locally before deploying to the target +environment, you can use the Composer Local Development CLI tool +(`composer-dev`). + +### A.1 List and Describe Local Environments + +1. **List Local Environments:** `composer-dev list` +2. **Describe Local Environment:** `composer-dev describe ` + +### A.2 Identify Active Environment and DAGs Location + +* **Active Environment:** Specified by ``. +* **DAGs Location:** Shown in `composer-dev describe` output under `Dags + directory`. Copy your migrated DAGs to this directory to test. + +### A.3 Verify DAGs and Check for Import Errors + +1. **List Parsed DAGs:** + + ```bash + composer-dev run-airflow-cmd dags list + ``` + +2. **Check for Import Errors:** + + ```bash + composer-dev run-airflow-cmd dags list-import-errors + ```