diff --git a/.github/workflows/reusable-deploy-bulk.yml b/.github/workflows/reusable-deploy-bulk.yml index a1e1f7d..3fc3da6 100644 --- a/.github/workflows/reusable-deploy-bulk.yml +++ b/.github/workflows/reusable-deploy-bulk.yml @@ -69,197 +69,4 @@ jobs: AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} FABRIC_WORKSPACE_ID: ${{ secrets.FABRIC_WORKSPACE_ID }} REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }} - run: | - python -c " - import base64 - import json - import os - import pathlib - import sys - import time - import requests - - # Polling configuration (decisions #15, #16) - POLL_FALLBACK_SECONDS = 30 - POLL_FLOOR_SECONDS = 5 - POLL_TIMEOUT_SECONDS = 20 * 60 - TOKEN_REFRESH_EVERY_N_POLLS = 20 - - # Files to skip when building definitionParts[]. Two layers of exclusion: - # 1. Named files: known files that should never be sent (parameter.yml is - # fabric-cicd config; .gitkeep is a Git placeholder). - # 2. Structural rule: item definitions always live inside *./ folders, - # so any file directly under repository_directory is excluded by - # construction (handled in build_definition_parts). - EXCLUDED_FILES = {'parameter.yml', '.gitkeep'} - - tenant_id = os.environ['AZURE_TENANT_ID'] - client_id = os.environ['AZURE_CLIENT_ID'] - client_secret = os.environ['AZURE_CLIENT_SECRET'] - workspace_id = os.environ['FABRIC_WORKSPACE_ID'] - repo_dir = pathlib.Path(os.environ['REPOSITORY_DIRECTORY']).resolve() - - - def acquire_token() -> str: - token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token' - resp = requests.post( - token_url, - data={ - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://api.fabric.microsoft.com/.default', - }, - timeout=30, - ) - if resp.status_code != 200: - sys.exit(f'::error::Token acquisition failed: HTTP {resp.status_code} {resp.text}') - token = resp.json()['access_token'] - # Mask the token in workflow logs (decision #10) - print(f'::add-mask::{token}') - return token - - - def build_definition_parts() -> list: - if not repo_dir.is_dir(): - sys.exit(f'::error::Repository directory not found: {repo_dir}') - parts = [] - for f in sorted(repo_dir.rglob('*')): - if not f.is_file(): - continue - if f.name in EXCLUDED_FILES: - continue - # Item definitions live inside *./ subfolders; anything at - # the root of repository_directory cannot belong to an item. - if f.parent == repo_dir: - continue - rel = '/' + f.relative_to(repo_dir).as_posix() - parts.append({ - 'path': rel, - 'payload': base64.b64encode(f.read_bytes()).decode('ascii'), - 'payloadType': 'InlineBase64', - }) - if not parts: - sys.exit(f'::error::No item definition files found under {repo_dir}') - return parts - - - def poll_lro(operation_id: str, headers: dict, initial_retry_after: int) -> None: - base = 'https://api.fabric.microsoft.com/v1/operations' - retry_after = max(initial_retry_after or POLL_FALLBACK_SECONDS, POLL_FLOOR_SECONDS) - started = time.monotonic() - poll_count = 0 - - while True: - elapsed = time.monotonic() - started - if elapsed > POLL_TIMEOUT_SECONDS: - sys.exit( - f'::error::LRO polling timed out after {POLL_TIMEOUT_SECONDS}s ' - f'(operation {operation_id})' - ) - - time.sleep(retry_after) - poll_count += 1 - - # Refresh token periodically for long-running operations - # (mirrors the pattern in reusable-fabric-etl.yml). - if poll_count > 0 and poll_count % TOKEN_REFRESH_EVERY_N_POLLS == 0: - headers['Authorization'] = f'Bearer {acquire_token()}' - - resp = requests.get(f'{base}/{operation_id}', headers=headers, timeout=30) - if resp.status_code != 200: - sys.exit(f'::error::Poll request failed: HTTP {resp.status_code} {resp.text}') - - body = resp.json() - status = body.get('status', 'Unknown') - print(f'Poll {poll_count} (t+{int(elapsed)}s): status={status}') - - if status == 'Succeeded': - return - if status in ('Failed', 'Undefined'): - print(json.dumps(body, indent=2)) - sys.exit(f'::error::LRO ended with status: {status}') - - # NotStarted or Running — keep polling. Honor Retry-After if present. - retry_after = max( - int(resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS)), - POLL_FLOOR_SECONDS, - ) - - - def check_per_item_status(result: dict) -> None: - details = result.get('importItemDefinitionsDetails', []) - print(json.dumps(result, indent=2)) - if not details: - sys.exit('::error::Result body has no importItemDefinitionsDetails') - - failures = [ - d for d in details - if d.get('operationStatus') in ('Failed', 'SucceededDespiteFailures') - ] - if failures: - summary = '\n'.join( - f\" - {d.get('itemDisplayName')} ({d.get('itemType')}): \" - f\"{d.get('operationStatus')}\" - for d in failures - ) - sys.exit(f'::error::{len(failures)} item(s) failed:\n{summary}') - - print(f'All {len(details)} items deployed successfully.') - - - # ---------- main flow ---------- - token = acquire_token() - headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} - - parts = build_definition_parts() - print(f'Built request body with {len(parts)} definition parts from {repo_dir}') - - request_body = { - 'definitionParts': parts, - 'options': {'allowPairingByName': False}, - } - - # Endpoint URL per the API reference page (the tutorial's URL is wrong). - # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/bulk-import-item-definitions(beta) - api_url = ( - f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}' - f'/items/bulkImportDefinitions?beta=true' - ) - print(f'POST {api_url}') - - post_resp = requests.post(api_url, headers=headers, json=request_body, timeout=120) - - if post_resp.status_code == 200: - # Sync path — result is in the response body directly. - check_per_item_status(post_resp.json()) - sys.exit(0) - - if post_resp.status_code == 202: - # Async path — poll the LRO, then fetch the result. - operation_id = post_resp.headers.get('x-ms-operation-id') - if not operation_id: - sys.exit('::error::202 response missing x-ms-operation-id header') - - initial_retry = int(post_resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS)) - print(f'202 Accepted, operation_id={operation_id}, initial Retry-After={initial_retry}s') - - poll_lro(operation_id, headers, initial_retry) - - result_resp = requests.get( - f'https://api.fabric.microsoft.com/v1/operations/{operation_id}/result', - headers=headers, - timeout=30, - ) - if result_resp.status_code != 200: - sys.exit( - f'::error::Failed to fetch operation result: ' - f'HTTP {result_resp.status_code} {result_resp.text}' - ) - check_per_item_status(result_resp.json()) - sys.exit(0) - - sys.exit( - f'::error::Bulk import POST failed: HTTP {post_resp.status_code} {post_resp.text}' - ) - " + run: python scripts/deploy_bulk.py diff --git a/.github/workflows/reusable-deploy-supported.yml b/.github/workflows/reusable-deploy-supported.yml index b46429b..a56eb40 100644 --- a/.github/workflows/reusable-deploy-supported.yml +++ b/.github/workflows/reusable-deploy-supported.yml @@ -66,66 +66,4 @@ jobs: REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }} ENVIRONMENT: ${{ inputs.environment }} ITEM_TYPE_IN_SCOPE: ${{ inputs.item_type_in_scope }} - run: | - python -c " - import json - import os - from azure.identity import ClientSecretCredential - from fabric_cicd import FabricWorkspace, publish_all_items, unpublish_all_orphan_items - - credential = ClientSecretCredential( - tenant_id=os.environ['AZURE_TENANT_ID'], - client_id=os.environ['AZURE_CLIENT_ID'], - client_secret=os.environ['AZURE_CLIENT_SECRET'], - ) - - # If item_type_in_scope is provided as a JSON array (e.g., '["Notebook"]'), - # only those item types will be deployed. Otherwise, all types are in scope. - item_type_in_scope = None - raw = os.environ.get('ITEM_TYPE_IN_SCOPE', '').strip() - if raw: - item_type_in_scope = json.loads(raw) - - repo_dir = os.environ['REPOSITORY_DIRECTORY'] - workspace_id = os.environ['FABRIC_WORKSPACE_ID'] - environment = os.environ['ENVIRONMENT'] - - # Deployment uses a 2-phase approach to satisfy item dependencies. - # fabric-cicd caches workspace state once at the start of each - # publish_all_items() call, so items deployed within the same call - # are not visible to later items' logicalId or \$items resolution. - # - # Phase 1: Lakehouse + Ontology - # - Lakehouse must exist so parameter.yml \$items.Lakehouse rules resolve. - # - Ontology must exist so DataAgent's logicalId reference resolves. - # Phase 2: All remaining items (DataAgent, Notebook, SemanticModel, etc.). - # - # On subsequent deployments all items already exist and phases are - # idempotent — they simply update in place. - - phase1_types = ['Lakehouse', 'Ontology'] - - print('Phase 1: Deploying Lakehouse + Ontology...') - phase1_ws = FabricWorkspace( - repository_directory=repo_dir, - workspace_id=workspace_id, - environment=environment, - token_credential=credential, - item_type_in_scope=phase1_types, - ) - publish_all_items(phase1_ws) - - # Phase 2: Deploy all remaining item types. - phase1_set = set(phase1_types) - remaining = [t for t in (item_type_in_scope or []) if t not in phase1_set] or None - print('Phase 2: Deploying remaining items: ' + str(remaining or 'all') + '...') - workspace = FabricWorkspace( - repository_directory=repo_dir, - workspace_id=workspace_id, - environment=environment, - token_credential=credential, - item_type_in_scope=remaining, - ) - publish_all_items(workspace) - unpublish_all_orphan_items(workspace) - " + run: python scripts/deploy_fabric_cicd.py diff --git a/.github/workflows/reusable-fabric-etl.yml b/.github/workflows/reusable-fabric-etl.yml index 868d42a..23497be 100644 --- a/.github/workflows/reusable-fabric-etl.yml +++ b/.github/workflows/reusable-fabric-etl.yml @@ -68,92 +68,4 @@ jobs: ITEM_NAME: ${{ inputs.item_name }} ITEM_TYPE: ${{ inputs.item_type }} JOB_TYPE: ${{ inputs.job_type }} - run: | - python -c " - import os - import sys - import time - import requests - from azure.identity import ClientSecretCredential - - credential = ClientSecretCredential( - tenant_id=os.environ['AZURE_TENANT_ID'], - client_id=os.environ['AZURE_CLIENT_ID'], - client_secret=os.environ['AZURE_CLIENT_SECRET'], - ) - - token = credential.get_token('https://api.fabric.microsoft.com/.default').token - headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} - - workspace_id = os.environ['FABRIC_WORKSPACE_ID'] - item_name = os.environ['ITEM_NAME'] - item_type = os.environ['ITEM_TYPE'] - job_type = os.environ['JOB_TYPE'] - - # Resolve item ID by name using the List Items API - list_url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items?type={item_type}' - list_response = requests.get(list_url, headers=headers) - - if list_response.status_code != 200: - print(f'Failed to list items: {list_response.status_code} {list_response.text}') - sys.exit(1) - - items = list_response.json().get('value', []) - matched = [i for i in items if i['displayName'] == item_name] - - if not matched: - print(f'Item not found: {item_name} (type={item_type}) in workspace {workspace_id}') - print(f'Available items: {[i[\"displayName\"] for i in items]}') - sys.exit(1) - - item_id = matched[0]['id'] - print(f'Resolved {item_name} -> {item_id}') - - # Start the job — returns 202 Accepted with a Location header for polling - url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances?jobType={job_type}' - response = requests.post(url, headers=headers) - - if response.status_code not in (200, 202): - print(f'Failed to start job: {response.status_code} {response.text}') - sys.exit(1) - - location = response.headers.get('Location') - retry_after = int(response.headers.get('Retry-After', '30')) - print(f'Job started. Polling for completion...') - - # Poll for completion — respects Retry-After header from the API. - # max_polls * retry_after = maximum wait time (default: 120 * 30s = 60 min) - max_polls = 120 - for i in range(max_polls): - time.sleep(retry_after) - - # Re-acquire token in case of long-running jobs - if i > 0 and i % 20 == 0: - token = credential.get_token('https://api.fabric.microsoft.com/.default').token - headers['Authorization'] = f'Bearer {token}' - - poll_response = requests.get(location, headers=headers) - - if poll_response.status_code == 200: - result = poll_response.json() - status = result.get('status', 'Unknown') - print(f'Poll {i+1}: status={status}') - - if status == 'Completed': - print('Job completed successfully.') - sys.exit(0) - elif status in ('Failed', 'Cancelled', 'Deduped'): - print(f'Job ended with status: {status}') - failure_reason = result.get('failureReason', 'No failure reason provided') - print(f'Failure reason: {failure_reason}') - sys.exit(1) - elif poll_response.status_code == 202: - retry_after = int(poll_response.headers.get('Retry-After', '30')) - print(f'Poll {i+1}: still running...') - else: - print(f'Unexpected poll response: {poll_response.status_code} {poll_response.text}') - sys.exit(1) - - print('Timed out waiting for job to complete.') - sys.exit(1) - " + run: python scripts/run_fabric_etl.py diff --git a/requirements-dev.txt b/requirements-dev.txt index cc96599..059a4b6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1 +1,8 @@ pytest>=8.0 + +# Runtime dependencies used by scripts/ — installed locally so editor and +# linters can resolve imports. CI also installs these per-workflow before +# invoking each script. +requests>=2.31 +azure-identity>=1.15 +fabric-cicd>=1.0.0,<2.0.0 diff --git a/scripts/deploy_bulk.py b/scripts/deploy_bulk.py new file mode 100644 index 0000000..207031d --- /dev/null +++ b/scripts/deploy_bulk.py @@ -0,0 +1,261 @@ +"""Deploy supported Fabric items via the Bulk Import Item Definitions API (Preview). + +Alternative to deploy_fabric_cicd.py. Uses the Fabric REST API's bulk import +endpoint instead of the fabric-cicd Python library. + +Invoked by .github/workflows/reusable-deploy-bulk.yml. Selected at orchestrator +level via the DEPLOY_METHOD repository variable. + +Required environment variables: + AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, + FABRIC_WORKSPACE_ID, REPOSITORY_DIRECTORY + +Known gaps vs deploy_fabric_cicd.py (intentional, documented): +- No parameter.yml find_replace / key_value_replace substitution +- No orphan cleanup (Bulk Import API only supports Create/Update, not Delete) +- No item_type_in_scope filter (deploys everything in repository_directory) + +API references: +- Bulk import: https://learn.microsoft.com/en-us/rest/api/fabric/core/items/bulk-import-item-definitions(beta) +- Long running ops: https://learn.microsoft.com/en-us/rest/api/fabric/articles/long-running-operation + +TODO: When the Bulk Import API graduates from Preview, drop the ?beta=true +query parameter and re-verify the endpoint URL. +""" + +from __future__ import annotations + +import base64 +import json +import os +import pathlib +import sys +import time + +import requests + +# Polling configuration +POLL_FALLBACK_SECONDS = 30 +POLL_FLOOR_SECONDS = 5 +POLL_TIMEOUT_SECONDS = 20 * 60 +TOKEN_REFRESH_EVERY_N_POLLS = 20 + +# Files to skip when building definitionParts[]. Two layers of exclusion: +# 1. Named files: known files that should never be sent (parameter.yml is +# fabric-cicd config; .gitkeep is a Git placeholder). +# 2. Structural rule: item definitions always live inside *./ folders, +# so any file directly under repository_directory is excluded by +# construction (handled in build_definition_parts). +EXCLUDED_FILES = {"parameter.yml", ".gitkeep"} + + +def acquire_token(tenant_id: str, client_id: str, client_secret: str) -> str: + token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" + resp = requests.post( + token_url, + data={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + "scope": "https://api.fabric.microsoft.com/.default", + }, + timeout=30, + ) + if resp.status_code != 200: + sys.exit(f"::error::Token acquisition failed: HTTP {resp.status_code} {resp.text}") + token = resp.json()["access_token"] + # Mask the token in workflow logs + print(f"::add-mask::{token}") + return token + + +def build_definition_parts(repo_dir: pathlib.Path) -> list[dict]: + if not repo_dir.is_dir(): + sys.exit(f"::error::Repository directory not found: {repo_dir}") + parts: list[dict] = [] + for f in sorted(repo_dir.rglob("*")): + if not f.is_file(): + continue + if f.name in EXCLUDED_FILES: + continue + # Item definitions live inside *./ subfolders; anything at + # the root of repository_directory cannot belong to an item. + if f.parent == repo_dir: + continue + rel = "/" + f.relative_to(repo_dir).as_posix() + parts.append({ + "path": rel, + "payload": base64.b64encode(f.read_bytes()).decode("ascii"), + "payloadType": "InlineBase64", + }) + if not parts: + sys.exit(f"::error::No item definition files found under {repo_dir}") + return parts + + +def poll_lro( + operation_id: str, + headers: dict, + initial_retry_after: int, + tenant_id: str, + client_id: str, + client_secret: str, +) -> None: + base = "https://api.fabric.microsoft.com/v1/operations" + retry_after = max(initial_retry_after or POLL_FALLBACK_SECONDS, POLL_FLOOR_SECONDS) + started = time.monotonic() + poll_count = 0 + + while True: + elapsed = time.monotonic() - started + if elapsed > POLL_TIMEOUT_SECONDS: + sys.exit( + f"::error::LRO polling timed out after {POLL_TIMEOUT_SECONDS}s " + f"(operation {operation_id})" + ) + + time.sleep(retry_after) + poll_count += 1 + + # Refresh token periodically for long-running operations + # (mirrors the pattern in run_fabric_etl.py). + if poll_count > 0 and poll_count % TOKEN_REFRESH_EVERY_N_POLLS == 0: + headers["Authorization"] = f"Bearer {acquire_token(tenant_id, client_id, client_secret)}" + + resp = requests.get(f"{base}/{operation_id}", headers=headers, timeout=30) + if resp.status_code != 200: + sys.exit(f"::error::Poll request failed: HTTP {resp.status_code} {resp.text}") + + body = resp.json() + status = body.get("status", "Unknown") + print(f"Poll {poll_count} (t+{int(elapsed)}s): status={status}") + + if status == "Succeeded": + return + if status in ("Failed", "Undefined"): + print(json.dumps(body, indent=2)) + sys.exit(f"::error::LRO ended with status: {status}") + + # NotStarted or Running — keep polling. Honor Retry-After if present. + retry_after = max( + int(resp.headers.get("Retry-After", POLL_FALLBACK_SECONDS)), + POLL_FLOOR_SECONDS, + ) + + +def check_per_item_status(result: dict) -> None: + details = result.get("importItemDefinitionsDetails", []) + print(json.dumps(result, indent=2)) + if not details: + sys.exit("::error::Result body has no importItemDefinitionsDetails") + + failures = [ + d for d in details + if d.get("operationStatus") in ("Failed", "SucceededDespiteFailures") + ] + if failures: + summary = "\n".join( + f" - {d.get('itemDisplayName')} ({d.get('itemType')}): " + f"{d.get('operationStatus')}" + for d in failures + ) + sys.exit(f"::error::{len(failures)} item(s) failed:\n{summary}") + + print(f"All {len(details)} items deployed successfully.") + + +def interpret_post_response( + status_code: int, + body: dict, + headers: dict, +) -> tuple[str, ...]: + """Pure decision function for a bulk-import POST response. + + Returns a tuple whose first element is the action to take. Callers branch + on the action and use the rest of the tuple for action-specific data. + + Returned actions: + + - ``("sync", body)`` — 200 OK, the result body contains + ``importItemDefinitionsDetails`` directly + - ``("async", operation_id, retry_after)`` — 202 Accepted, caller must + poll the LRO. ``retry_after`` is from the ``Retry-After`` header + - ``("missing_op_id",)`` — 202 Accepted but no ``x-ms-operation-id`` + header (malformed response from the service) + - ``("error", status_code)`` — unexpected status code, caller should fail + """ + if status_code == 200: + return ("sync", body) + if status_code == 202: + operation_id = headers.get("x-ms-operation-id") + if not operation_id: + return ("missing_op_id",) + retry_after = int(headers.get("Retry-After", POLL_FALLBACK_SECONDS)) + return ("async", operation_id, retry_after) + return ("error", status_code) + + +def main() -> None: + tenant_id = os.environ["AZURE_TENANT_ID"] + client_id = os.environ["AZURE_CLIENT_ID"] + client_secret = os.environ["AZURE_CLIENT_SECRET"] + workspace_id = os.environ["FABRIC_WORKSPACE_ID"] + repo_dir = pathlib.Path(os.environ["REPOSITORY_DIRECTORY"]).resolve() + + token = acquire_token(tenant_id, client_id, client_secret) + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + parts = build_definition_parts(repo_dir) + print(f"Built request body with {len(parts)} definition parts from {repo_dir}") + + request_body = { + "definitionParts": parts, + "options": {"allowPairingByName": False}, + } + + # Endpoint URL per the API reference page (the tutorial's URL is wrong). + # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/bulk-import-item-definitions(beta) + api_url = ( + f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}" + f"/items/bulkImportDefinitions?beta=true" + ) + print(f"POST {api_url}") + + post_resp = requests.post(api_url, headers=headers, json=request_body, timeout=120) + body = post_resp.json() if post_resp.status_code == 200 else {} + action = interpret_post_response(post_resp.status_code, body, post_resp.headers) + + if action[0] == "sync": + # Result is in the response body directly. + check_per_item_status(action[1]) + sys.exit(0) + + if action[0] == "async": + _, operation_id, initial_retry = action + print(f"202 Accepted, operation_id={operation_id}, initial Retry-After={initial_retry}s") + + poll_lro(operation_id, headers, initial_retry, tenant_id, client_id, client_secret) + + result_resp = requests.get( + f"https://api.fabric.microsoft.com/v1/operations/{operation_id}/result", + headers=headers, + timeout=30, + ) + if result_resp.status_code != 200: + sys.exit( + f"::error::Failed to fetch operation result: " + f"HTTP {result_resp.status_code} {result_resp.text}" + ) + check_per_item_status(result_resp.json()) + sys.exit(0) + + if action[0] == "missing_op_id": + sys.exit("::error::202 response missing x-ms-operation-id header") + + sys.exit( + f"::error::Bulk import POST failed: HTTP {post_resp.status_code} {post_resp.text}" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/deploy_fabric_cicd.py b/scripts/deploy_fabric_cicd.py new file mode 100644 index 0000000..1efc69b --- /dev/null +++ b/scripts/deploy_fabric_cicd.py @@ -0,0 +1,104 @@ +"""Deploy supported Fabric items via the fabric-cicd library. + +Two-phase deployment to satisfy item dependencies: + +- Phase 1: Lakehouse + Ontology + - Lakehouse must exist so parameter.yml ``$items.Lakehouse`` rules resolve. + - Ontology must exist so DataAgent's logicalId reference resolves. +- Phase 2: All remaining items (DataAgent, Notebook, SemanticModel, etc.). + +fabric-cicd caches workspace state once at the start of each +``publish_all_items()`` call, so items deployed within the same call are not +visible to later items' logicalId or ``$items`` resolution. On subsequent +deployments all items already exist and phases are idempotent — they simply +update in place. + +Invoked by .github/workflows/reusable-deploy-supported.yml. + +Required environment variables: + AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, + FABRIC_WORKSPACE_ID, REPOSITORY_DIRECTORY, ENVIRONMENT +Optional: + ITEM_TYPE_IN_SCOPE — JSON array of item types to deploy. Defaults to all. +""" + +from __future__ import annotations + +import json +import os + +from azure.identity import ClientSecretCredential +from fabric_cicd import ( + FabricWorkspace, + publish_all_items, + unpublish_all_orphan_items, +) + +PHASE1_TYPES = ["Lakehouse", "Ontology"] + + +def remaining_types_for_phase2(item_type_in_scope: list[str] | None) -> list[str] | None: + """Compute Phase 2 scope by subtracting Phase 1 types from the user's filter. + + Returns ``None`` when: + + - ``item_type_in_scope`` is None or empty (deploy everything in Phase 2) + - The caller passed only Phase 1 types (nothing distinct left for Phase 2) + + Note: returning ``None`` means "all types" to ``FabricWorkspace``. So if a + caller explicitly listed only Phase 1 types (e.g., ``["Lakehouse", + "Ontology"]``), Phase 2 will deploy everything else too. This is a known + behavior of the env-var contract — callers wanting strict scoping should + include the non-Phase-1 types they want. + """ + if not item_type_in_scope: + return None + phase1_set = set(PHASE1_TYPES) + remaining = [t for t in item_type_in_scope if t not in phase1_set] + return remaining or None + + +def main() -> None: + credential = ClientSecretCredential( + tenant_id=os.environ["AZURE_TENANT_ID"], + client_id=os.environ["AZURE_CLIENT_ID"], + client_secret=os.environ["AZURE_CLIENT_SECRET"], + ) + + # If item_type_in_scope is provided as a JSON array (e.g., '["Notebook"]'), + # only those item types will be deployed. Otherwise, all types are in scope. + item_type_in_scope: list[str] | None = None + raw = os.environ.get("ITEM_TYPE_IN_SCOPE", "").strip() + if raw: + item_type_in_scope = json.loads(raw) + + repo_dir = os.environ["REPOSITORY_DIRECTORY"] + workspace_id = os.environ["FABRIC_WORKSPACE_ID"] + environment = os.environ["ENVIRONMENT"] + + print("Phase 1: Deploying Lakehouse + Ontology...") + phase1_ws = FabricWorkspace( + repository_directory=repo_dir, + workspace_id=workspace_id, + environment=environment, + token_credential=credential, + item_type_in_scope=PHASE1_TYPES, + ) + publish_all_items(phase1_ws) + + # Phase 2: Deploy all remaining item types. + remaining = remaining_types_for_phase2(item_type_in_scope) + print("Phase 2: Deploying remaining items: " + str(remaining or "all") + "...") + workspace = FabricWorkspace( + repository_directory=repo_dir, + workspace_id=workspace_id, + environment=environment, + token_credential=credential, + item_type_in_scope=remaining, + ) + publish_all_items(workspace) + unpublish_all_orphan_items(workspace) + + +if __name__ == "__main__": + main() diff --git a/scripts/run_fabric_etl.py b/scripts/run_fabric_etl.py new file mode 100644 index 0000000..79fc3d5 --- /dev/null +++ b/scripts/run_fabric_etl.py @@ -0,0 +1,162 @@ +"""Run a Fabric Notebook or Data Pipeline via the Fabric REST API. + +After items are deployed to a workspace, this script triggers an ETL job +(e.g., the Import_Patterns_Data notebook) to ingest and transform data. The +item is resolved by name at runtime via the Fabric List Items API, avoiding +the need to know item IDs ahead of time (which differ per workspace). The +Fabric Jobs API is asynchronous — this script starts the job, then polls the +Location header URL until the job completes, fails, or times out. + +Invoked by .github/workflows/reusable-fabric-etl.yml. + +Required environment variables: + AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, + FABRIC_WORKSPACE_ID, ITEM_NAME, ITEM_TYPE, JOB_TYPE + +API references: +- List items: https://learn.microsoft.com/en-us/rest/api/fabric/core/items/list-items +- Run job: https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/run-on-demand-item-job +""" + +from __future__ import annotations + +import os +import sys +import time + +import requests +from azure.identity import ClientSecretCredential + + +def find_item_id_by_name(items: list[dict], item_name: str) -> str: + """Return the ID of the item whose displayName matches item_name. + + Exits with code 1 if no match is found, printing the available item names + to aid debugging. If multiple items have the same display name, the first + one returned by the List Items API is used (Fabric does not enforce display + name uniqueness within a workspace). + """ + matched = [i for i in items if i["displayName"] == item_name] + if not matched: + print(f"Item not found: {item_name}") + print(f"Available items: {[i['displayName'] for i in items]}") + sys.exit(1) + return matched[0]["id"] + + +def interpret_poll_response( + status_code: int, + body: dict, + headers: dict, +) -> tuple[str, ...]: + """Pure decision function for a job-status poll response. + + Returns a tuple whose first element is the action to take. Callers branch + on the action and ignore the rest of the tuple for actions that don't + carry extra data. + + Returned actions: + + - ``("completed",)`` — job finished successfully (status_code 200, status + "Completed") + - ``("failed", final_status, failure_reason)`` — job ended in a terminal + failure state (status_code 200, status in {"Failed", "Cancelled", + "Deduped"}) + - ``("still_running", retry_after)`` — keep polling. ``retry_after`` is the + number of seconds to wait before the next poll, taken from the + ``Retry-After`` header when present (defaults to 30) + - ``("unexpected", status_code)`` — unrecognized response, caller should + fail the run + """ + if status_code == 200: + status = body.get("status", "Unknown") + if status == "Completed": + return ("completed",) + if status in ("Failed", "Cancelled", "Deduped"): + return ("failed", status, body.get("failureReason", "No failure reason provided")) + return ("still_running", 30) + if status_code == 202: + return ("still_running", int(headers.get("Retry-After", "30"))) + return ("unexpected", status_code) + + +def main() -> None: + credential = ClientSecretCredential( + tenant_id=os.environ["AZURE_TENANT_ID"], + client_id=os.environ["AZURE_CLIENT_ID"], + client_secret=os.environ["AZURE_CLIENT_SECRET"], + ) + + token = credential.get_token("https://api.fabric.microsoft.com/.default").token + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + workspace_id = os.environ["FABRIC_WORKSPACE_ID"] + item_name = os.environ["ITEM_NAME"] + item_type = os.environ["ITEM_TYPE"] + job_type = os.environ["JOB_TYPE"] + + # Resolve item ID by name using the List Items API + list_url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items?type={item_type}" + list_response = requests.get(list_url, headers=headers) + + if list_response.status_code != 200: + print(f"Failed to list items: {list_response.status_code} {list_response.text}") + sys.exit(1) + + items = list_response.json().get("value", []) + item_id = find_item_id_by_name(items, item_name) + print(f"Resolved {item_name} -> {item_id}") + + # Start the job — returns 202 Accepted with a Location header for polling + url = ( + f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}" + f"/items/{item_id}/jobs/instances?jobType={job_type}" + ) + response = requests.post(url, headers=headers) + + if response.status_code not in (200, 202): + print(f"Failed to start job: {response.status_code} {response.text}") + sys.exit(1) + + location = response.headers.get("Location") + retry_after = int(response.headers.get("Retry-After", "30")) + print("Job started. Polling for completion...") + + # Poll for completion — respects Retry-After header from the API. + # max_polls * retry_after = maximum wait time (default: 120 * 30s = 60 min) + max_polls = 120 + for i in range(max_polls): + time.sleep(retry_after) + + # Re-acquire token in case of long-running jobs + if i > 0 and i % 20 == 0: + token = credential.get_token("https://api.fabric.microsoft.com/.default").token + headers["Authorization"] = f"Bearer {token}" + + poll_response = requests.get(location, headers=headers) + body = poll_response.json() if poll_response.status_code == 200 else {} + action = interpret_poll_response(poll_response.status_code, body, poll_response.headers) + + if action[0] == "completed": + print(f"Poll {i + 1}: status=Completed") + print("Job completed successfully.") + sys.exit(0) + elif action[0] == "failed": + _, final_status, failure_reason = action + print(f"Poll {i + 1}: status={final_status}") + print(f"Job ended with status: {final_status}") + print(f"Failure reason: {failure_reason}") + sys.exit(1) + elif action[0] == "still_running": + retry_after = action[1] + print(f"Poll {i + 1}: still running...") + else: + print(f"Unexpected poll response: {poll_response.status_code} {poll_response.text}") + sys.exit(1) + + print("Timed out waiting for job to complete.") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_deploy_bulk.py b/tests/test_deploy_bulk.py new file mode 100644 index 0000000..f317970 --- /dev/null +++ b/tests/test_deploy_bulk.py @@ -0,0 +1,212 @@ +"""Tests for scripts/deploy_bulk.py. + +Covers the pure functions extracted from the bulk-import deployment script. +The orchestration in main() and the network-dependent poll loop are not +unit-tested here — those are validated end-to-end via the deploy workflows. +""" + +from __future__ import annotations + +import base64 +import pathlib +from unittest import mock + +import pytest + +from deploy_bulk import ( + EXCLUDED_FILES, + acquire_token, + build_definition_parts, + check_per_item_status, + interpret_post_response, +) + + +# ---------- build_definition_parts ---------- + + +def _make_item_file(repo_dir: pathlib.Path, item_path: str, content: bytes = b"x") -> None: + """Create a file under repo_dir/, ensuring parent dirs exist.""" + full = repo_dir / item_path + full.parent.mkdir(parents=True, exist_ok=True) + full.write_bytes(content) + + +def test_build_definition_parts_happy_path(tmp_path: pathlib.Path) -> None: + _make_item_file(tmp_path, "MyItem.Notebook/notebook-content.py", b"print('hi')") + parts = build_definition_parts(tmp_path) + assert len(parts) == 1 + assert parts[0]["path"] == "/MyItem.Notebook/notebook-content.py" + assert parts[0]["payloadType"] == "InlineBase64" + assert base64.b64decode(parts[0]["payload"]) == b"print('hi')" + + +def test_build_definition_parts_excludes_gitkeep(tmp_path: pathlib.Path) -> None: + (tmp_path / ".gitkeep").write_bytes(b"") + _make_item_file(tmp_path, "MyItem.Notebook/notebook-content.py") + parts = build_definition_parts(tmp_path) + paths = [p["path"] for p in parts] + assert "/.gitkeep" not in paths + assert "/MyItem.Notebook/notebook-content.py" in paths + + +def test_build_definition_parts_excludes_parameter_yml(tmp_path: pathlib.Path) -> None: + (tmp_path / "parameter.yml").write_bytes(b"find_replace: []") + _make_item_file(tmp_path, "MyItem.Notebook/notebook-content.py") + parts = build_definition_parts(tmp_path) + paths = [p["path"] for p in parts] + assert "/parameter.yml" not in paths + + +def test_build_definition_parts_excludes_root_level_files(tmp_path: pathlib.Path) -> None: + """Anything directly under repository_directory (not in *./) is skipped.""" + (tmp_path / "README.md").write_bytes(b"# notes") + _make_item_file(tmp_path, "MyItem.Notebook/notebook-content.py") + parts = build_definition_parts(tmp_path) + paths = [p["path"] for p in parts] + assert "/README.md" not in paths + assert "/MyItem.Notebook/notebook-content.py" in paths + + +def test_build_definition_parts_walks_recursively(tmp_path: pathlib.Path) -> None: + _make_item_file(tmp_path, "Item.Notebook/.platform", b"{}") + _make_item_file(tmp_path, "Item.Notebook/notebook-content.py", b"x") + _make_item_file( + tmp_path, "Item.SemanticModel/definition/tables/t.tmdl", b"table t" + ) + parts = build_definition_parts(tmp_path) + assert len(parts) == 3 + paths = sorted(p["path"] for p in parts) + assert paths == [ + "/Item.Notebook/.platform", + "/Item.Notebook/notebook-content.py", + "/Item.SemanticModel/definition/tables/t.tmdl", + ] + + +def test_build_definition_parts_empty_repo_exits(tmp_path: pathlib.Path) -> None: + with pytest.raises(SystemExit) as exc: + build_definition_parts(tmp_path) + assert "No item definition files found" in str(exc.value) + + +def test_build_definition_parts_missing_dir_exits(tmp_path: pathlib.Path) -> None: + with pytest.raises(SystemExit) as exc: + build_definition_parts(tmp_path / "does_not_exist") + assert "Repository directory not found" in str(exc.value) + + +def test_excluded_files_constant_includes_known_excludes() -> None: + """Sanity check on the constant — protects against accidental edits.""" + assert "parameter.yml" in EXCLUDED_FILES + assert ".gitkeep" in EXCLUDED_FILES + + +# ---------- check_per_item_status ---------- + + +def test_check_per_item_status_all_succeeded(capsys: pytest.CaptureFixture) -> None: + result = { + "importItemDefinitionsDetails": [ + {"itemDisplayName": "X", "itemType": "Notebook", "operationStatus": "Succeeded"}, + {"itemDisplayName": "Y", "itemType": "Report", "operationStatus": "Succeeded"}, + ] + } + check_per_item_status(result) # should not raise + assert "All 2 items deployed successfully" in capsys.readouterr().out + + +def test_check_per_item_status_one_failed_exits() -> None: + result = { + "importItemDefinitionsDetails": [ + {"itemDisplayName": "X", "itemType": "Notebook", "operationStatus": "Succeeded"}, + {"itemDisplayName": "Y", "itemType": "Report", "operationStatus": "Failed"}, + ] + } + with pytest.raises(SystemExit) as exc: + check_per_item_status(result) + assert "1 item(s) failed" in str(exc.value) + assert "Y (Report): Failed" in str(exc.value) + + +def test_check_per_item_status_succeeded_despite_failures_exits() -> None: + """SucceededDespiteFailures is treated as a failure for build-fail purposes.""" + result = { + "importItemDefinitionsDetails": [ + { + "itemDisplayName": "Z", + "itemType": "Notebook", + "operationStatus": "SucceededDespiteFailures", + }, + ] + } + with pytest.raises(SystemExit) as exc: + check_per_item_status(result) + assert "1 item(s) failed" in str(exc.value) + assert "SucceededDespiteFailures" in str(exc.value) + + +def test_check_per_item_status_missing_details_exits() -> None: + with pytest.raises(SystemExit) as exc: + check_per_item_status({}) + assert "no importItemDefinitionsDetails" in str(exc.value) + + +# ---------- interpret_post_response ---------- + + +def test_interpret_post_response_sync_200() -> None: + body = {"importItemDefinitionsDetails": [{"itemId": "abc"}]} + action = interpret_post_response(200, body, {}) + assert action == ("sync", body) + + +def test_interpret_post_response_async_202() -> None: + headers = {"x-ms-operation-id": "op-123", "Retry-After": "45"} + action = interpret_post_response(202, {}, headers) + assert action == ("async", "op-123", 45) + + +def test_interpret_post_response_async_202_default_retry_after() -> None: + """When Retry-After is missing, fall back to POLL_FALLBACK_SECONDS (30).""" + headers = {"x-ms-operation-id": "op-456"} + action = interpret_post_response(202, {}, headers) + assert action == ("async", "op-456", 30) + + +def test_interpret_post_response_missing_op_id() -> None: + """A 202 response without x-ms-operation-id is treated as malformed.""" + action = interpret_post_response(202, {}, {}) + assert action == ("missing_op_id",) + + +def test_interpret_post_response_unexpected_status() -> None: + action = interpret_post_response(500, {}, {}) + assert action == ("error", 500) + + +# ---------- acquire_token ---------- + + +def test_acquire_token_success(capsys: pytest.CaptureFixture) -> None: + fake_resp = mock.Mock(status_code=200) + fake_resp.json.return_value = {"access_token": "fake-token-xyz"} + with mock.patch("deploy_bulk.requests.post", return_value=fake_resp) as post: + token = acquire_token("tenant", "client", "secret") + assert token == "fake-token-xyz" + # Verify the call was shaped correctly + post.assert_called_once() + call_kwargs = post.call_args.kwargs + assert call_kwargs["data"]["grant_type"] == "client_credentials" + assert call_kwargs["data"]["scope"] == "https://api.fabric.microsoft.com/.default" + # Workflow log mask was emitted + assert "::add-mask::fake-token-xyz" in capsys.readouterr().out + + +def test_acquire_token_failure_exits() -> None: + fake_resp = mock.Mock(status_code=401, text="Unauthorized") + with mock.patch("deploy_bulk.requests.post", return_value=fake_resp): + with pytest.raises(SystemExit) as exc: + acquire_token("tenant", "client", "secret") + assert "Token acquisition failed" in str(exc.value) + assert "401" in str(exc.value) diff --git a/tests/test_deploy_fabric_cicd.py b/tests/test_deploy_fabric_cicd.py new file mode 100644 index 0000000..b471350 --- /dev/null +++ b/tests/test_deploy_fabric_cicd.py @@ -0,0 +1,49 @@ +"""Tests for scripts/deploy_fabric_cicd.py. + +Covers the Phase 2 scope-filter logic. The fabric-cicd library calls +themselves are not unit-tested — those are validated end-to-end via the +deploy workflows. +""" + +from __future__ import annotations + +from deploy_fabric_cicd import PHASE1_TYPES, remaining_types_for_phase2 + + +def test_remaining_types_none_input_returns_none() -> None: + """When item_type_in_scope is unset, Phase 2 deploys all types.""" + assert remaining_types_for_phase2(None) is None + + +def test_remaining_types_empty_list_returns_none() -> None: + """An empty list has the same effect as None.""" + assert remaining_types_for_phase2([]) is None + + +def test_remaining_types_no_overlap_with_phase1() -> None: + """When the user lists only non-Phase-1 types, return them unchanged.""" + assert remaining_types_for_phase2(["Notebook", "Report"]) == ["Notebook", "Report"] + + +def test_remaining_types_partial_overlap_strips_phase1() -> None: + """Phase 1 types are removed from the user's list.""" + result = remaining_types_for_phase2( + ["Lakehouse", "Notebook", "Ontology", "Report"] + ) + assert result == ["Notebook", "Report"] + + +def test_remaining_types_only_phase1_returns_none() -> None: + """Documented footgun: listing only Phase 1 types makes Phase 2 deploy all. + + The empty result is converted to None, and FabricWorkspace treats None as + 'all types in scope'. A caller wanting strict scoping must include the + non-Phase-1 types they actually want. + """ + assert remaining_types_for_phase2(["Lakehouse", "Ontology"]) is None + assert remaining_types_for_phase2(["Lakehouse"]) is None + + +def test_phase1_types_constant_unchanged() -> None: + """Pin the Phase 1 type list so accidental edits are caught.""" + assert PHASE1_TYPES == ["Lakehouse", "Ontology"] diff --git a/tests/test_run_fabric_etl.py b/tests/test_run_fabric_etl.py new file mode 100644 index 0000000..7cb81bd --- /dev/null +++ b/tests/test_run_fabric_etl.py @@ -0,0 +1,105 @@ +"""Tests for scripts/run_fabric_etl.py. + +Covers the pure helpers extracted from the ETL runner: item lookup by display +name and poll-response interpretation. The orchestration in main() and the +real polling loop are not unit-tested here — those are validated end-to-end +via the ETL workflows. +""" + +from __future__ import annotations + +import pytest + +from run_fabric_etl import find_item_id_by_name, interpret_poll_response + + +# ---------- find_item_id_by_name ---------- + + +def test_find_item_id_by_name_happy_path() -> None: + items = [ + {"id": "111", "displayName": "Other"}, + {"id": "222", "displayName": "Target"}, + ] + assert find_item_id_by_name(items, "Target") == "222" + + +def test_find_item_id_by_name_not_found_exits(capsys: pytest.CaptureFixture) -> None: + items = [{"id": "111", "displayName": "Other"}] + with pytest.raises(SystemExit): + find_item_id_by_name(items, "Missing") + captured = capsys.readouterr().out + assert "Item not found: Missing" in captured + assert "Other" in captured # available items listed + + +def test_find_item_id_by_name_empty_list_exits() -> None: + with pytest.raises(SystemExit): + find_item_id_by_name([], "Any") + + +def test_find_item_id_by_name_returns_first_when_duplicates() -> None: + """Documented behavior: if Fabric returns multiple matches, take the first.""" + items = [ + {"id": "first", "displayName": "Same"}, + {"id": "second", "displayName": "Same"}, + ] + assert find_item_id_by_name(items, "Same") == "first" + + +# ---------- interpret_poll_response ---------- + + +def test_interpret_poll_response_completed() -> None: + body = {"status": "Completed"} + assert interpret_poll_response(200, body, {}) == ("completed",) + + +def test_interpret_poll_response_failed_with_reason() -> None: + body = {"status": "Failed", "failureReason": "out of memory"} + assert interpret_poll_response(200, body, {}) == ("failed", "Failed", "out of memory") + + +def test_interpret_poll_response_failed_without_reason() -> None: + """When failureReason is missing, fall back to a sentinel string.""" + body = {"status": "Failed"} + assert interpret_poll_response(200, body, {}) == ( + "failed", + "Failed", + "No failure reason provided", + ) + + +def test_interpret_poll_response_cancelled_treated_as_failed() -> None: + body = {"status": "Cancelled"} + action = interpret_poll_response(200, body, {}) + assert action[0] == "failed" + assert action[1] == "Cancelled" + + +def test_interpret_poll_response_deduped_treated_as_failed() -> None: + body = {"status": "Deduped"} + action = interpret_poll_response(200, body, {}) + assert action[0] == "failed" + assert action[1] == "Deduped" + + +def test_interpret_poll_response_in_progress_200() -> None: + """200 with a non-terminal status means keep polling at default interval.""" + body = {"status": "InProgress"} + assert interpret_poll_response(200, body, {}) == ("still_running", 30) + + +def test_interpret_poll_response_202_with_retry_after() -> None: + """202 means keep polling; honor Retry-After if present.""" + headers = {"Retry-After": "45"} + assert interpret_poll_response(202, {}, headers) == ("still_running", 45) + + +def test_interpret_poll_response_202_default_retry_after() -> None: + """Missing Retry-After falls back to 30 seconds.""" + assert interpret_poll_response(202, {}, {}) == ("still_running", 30) + + +def test_interpret_poll_response_unexpected_status() -> None: + assert interpret_poll_response(500, {}, {}) == ("unexpected", 500)