Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 1 addition & 194 deletions .github/workflows/reusable-deploy-bulk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,197 +69,4 @@ jobs:
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
FABRIC_WORKSPACE_ID: ${{ secrets.FABRIC_WORKSPACE_ID }}
REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }}
run: |
python -c "
import base64
import json
import os
import pathlib
import sys
import time
import requests

# Polling configuration (decisions #15, #16)
POLL_FALLBACK_SECONDS = 30
POLL_FLOOR_SECONDS = 5
POLL_TIMEOUT_SECONDS = 20 * 60
TOKEN_REFRESH_EVERY_N_POLLS = 20

# Files to skip when building definitionParts[]. Two layers of exclusion:
# 1. Named files: known files that should never be sent (parameter.yml is
# fabric-cicd config; .gitkeep is a Git placeholder).
# 2. Structural rule: item definitions always live inside *.<Type>/ folders,
# so any file directly under repository_directory is excluded by
# construction (handled in build_definition_parts).
EXCLUDED_FILES = {'parameter.yml', '.gitkeep'}

tenant_id = os.environ['AZURE_TENANT_ID']
client_id = os.environ['AZURE_CLIENT_ID']
client_secret = os.environ['AZURE_CLIENT_SECRET']
workspace_id = os.environ['FABRIC_WORKSPACE_ID']
repo_dir = pathlib.Path(os.environ['REPOSITORY_DIRECTORY']).resolve()


def acquire_token() -> str:
token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token'
resp = requests.post(
token_url,
data={
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret,
'scope': 'https://api.fabric.microsoft.com/.default',
},
timeout=30,
)
if resp.status_code != 200:
sys.exit(f'::error::Token acquisition failed: HTTP {resp.status_code} {resp.text}')
token = resp.json()['access_token']
# Mask the token in workflow logs (decision #10)
print(f'::add-mask::{token}')
return token


def build_definition_parts() -> list:
if not repo_dir.is_dir():
sys.exit(f'::error::Repository directory not found: {repo_dir}')
parts = []
for f in sorted(repo_dir.rglob('*')):
if not f.is_file():
continue
if f.name in EXCLUDED_FILES:
continue
# Item definitions live inside *.<Type>/ subfolders; anything at
# the root of repository_directory cannot belong to an item.
if f.parent == repo_dir:
continue
rel = '/' + f.relative_to(repo_dir).as_posix()
parts.append({
'path': rel,
'payload': base64.b64encode(f.read_bytes()).decode('ascii'),
'payloadType': 'InlineBase64',
})
if not parts:
sys.exit(f'::error::No item definition files found under {repo_dir}')
return parts


def poll_lro(operation_id: str, headers: dict, initial_retry_after: int) -> None:
base = 'https://api.fabric.microsoft.com/v1/operations'
retry_after = max(initial_retry_after or POLL_FALLBACK_SECONDS, POLL_FLOOR_SECONDS)
started = time.monotonic()
poll_count = 0

while True:
elapsed = time.monotonic() - started
if elapsed > POLL_TIMEOUT_SECONDS:
sys.exit(
f'::error::LRO polling timed out after {POLL_TIMEOUT_SECONDS}s '
f'(operation {operation_id})'
)

time.sleep(retry_after)
poll_count += 1

# Refresh token periodically for long-running operations
# (mirrors the pattern in reusable-fabric-etl.yml).
if poll_count > 0 and poll_count % TOKEN_REFRESH_EVERY_N_POLLS == 0:
headers['Authorization'] = f'Bearer {acquire_token()}'

resp = requests.get(f'{base}/{operation_id}', headers=headers, timeout=30)
if resp.status_code != 200:
sys.exit(f'::error::Poll request failed: HTTP {resp.status_code} {resp.text}')

body = resp.json()
status = body.get('status', 'Unknown')
print(f'Poll {poll_count} (t+{int(elapsed)}s): status={status}')

if status == 'Succeeded':
return
if status in ('Failed', 'Undefined'):
print(json.dumps(body, indent=2))
sys.exit(f'::error::LRO ended with status: {status}')

# NotStarted or Running — keep polling. Honor Retry-After if present.
retry_after = max(
int(resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS)),
POLL_FLOOR_SECONDS,
)


def check_per_item_status(result: dict) -> None:
details = result.get('importItemDefinitionsDetails', [])
print(json.dumps(result, indent=2))
if not details:
sys.exit('::error::Result body has no importItemDefinitionsDetails')

failures = [
d for d in details
if d.get('operationStatus') in ('Failed', 'SucceededDespiteFailures')
]
if failures:
summary = '\n'.join(
f\" - {d.get('itemDisplayName')} ({d.get('itemType')}): \"
f\"{d.get('operationStatus')}\"
for d in failures
)
sys.exit(f'::error::{len(failures)} item(s) failed:\n{summary}')

print(f'All {len(details)} items deployed successfully.')


# ---------- main flow ----------
token = acquire_token()
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}

parts = build_definition_parts()
print(f'Built request body with {len(parts)} definition parts from {repo_dir}')

request_body = {
'definitionParts': parts,
'options': {'allowPairingByName': False},
}

# Endpoint URL per the API reference page (the tutorial's URL is wrong).
# https://learn.microsoft.com/en-us/rest/api/fabric/core/items/bulk-import-item-definitions(beta)
api_url = (
f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}'
f'/items/bulkImportDefinitions?beta=true'
)
print(f'POST {api_url}')

post_resp = requests.post(api_url, headers=headers, json=request_body, timeout=120)

if post_resp.status_code == 200:
# Sync path — result is in the response body directly.
check_per_item_status(post_resp.json())
sys.exit(0)

if post_resp.status_code == 202:
# Async path — poll the LRO, then fetch the result.
operation_id = post_resp.headers.get('x-ms-operation-id')
if not operation_id:
sys.exit('::error::202 response missing x-ms-operation-id header')

initial_retry = int(post_resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS))
print(f'202 Accepted, operation_id={operation_id}, initial Retry-After={initial_retry}s')

poll_lro(operation_id, headers, initial_retry)

result_resp = requests.get(
f'https://api.fabric.microsoft.com/v1/operations/{operation_id}/result',
headers=headers,
timeout=30,
)
if result_resp.status_code != 200:
sys.exit(
f'::error::Failed to fetch operation result: '
f'HTTP {result_resp.status_code} {result_resp.text}'
)
check_per_item_status(result_resp.json())
sys.exit(0)

sys.exit(
f'::error::Bulk import POST failed: HTTP {post_resp.status_code} {post_resp.text}'
)
"
run: python scripts/deploy_bulk.py
64 changes: 1 addition & 63 deletions .github/workflows/reusable-deploy-supported.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,66 +66,4 @@ jobs:
REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }}
ENVIRONMENT: ${{ inputs.environment }}
ITEM_TYPE_IN_SCOPE: ${{ inputs.item_type_in_scope }}
run: |
python -c "
import json
import os
from azure.identity import ClientSecretCredential
from fabric_cicd import FabricWorkspace, publish_all_items, unpublish_all_orphan_items

credential = ClientSecretCredential(
tenant_id=os.environ['AZURE_TENANT_ID'],
client_id=os.environ['AZURE_CLIENT_ID'],
client_secret=os.environ['AZURE_CLIENT_SECRET'],
)

# If item_type_in_scope is provided as a JSON array (e.g., '["Notebook"]'),
# only those item types will be deployed. Otherwise, all types are in scope.
item_type_in_scope = None
raw = os.environ.get('ITEM_TYPE_IN_SCOPE', '').strip()
if raw:
item_type_in_scope = json.loads(raw)

repo_dir = os.environ['REPOSITORY_DIRECTORY']
workspace_id = os.environ['FABRIC_WORKSPACE_ID']
environment = os.environ['ENVIRONMENT']

# Deployment uses a 2-phase approach to satisfy item dependencies.
# fabric-cicd caches workspace state once at the start of each
# publish_all_items() call, so items deployed within the same call
# are not visible to later items' logicalId or \$items resolution.
#
# Phase 1: Lakehouse + Ontology
# - Lakehouse must exist so parameter.yml \$items.Lakehouse rules resolve.
# - Ontology must exist so DataAgent's logicalId reference resolves.
# Phase 2: All remaining items (DataAgent, Notebook, SemanticModel, etc.).
#
# On subsequent deployments all items already exist and phases are
# idempotent — they simply update in place.

phase1_types = ['Lakehouse', 'Ontology']

print('Phase 1: Deploying Lakehouse + Ontology...')
phase1_ws = FabricWorkspace(
repository_directory=repo_dir,
workspace_id=workspace_id,
environment=environment,
token_credential=credential,
item_type_in_scope=phase1_types,
)
publish_all_items(phase1_ws)

# Phase 2: Deploy all remaining item types.
phase1_set = set(phase1_types)
remaining = [t for t in (item_type_in_scope or []) if t not in phase1_set] or None
print('Phase 2: Deploying remaining items: ' + str(remaining or 'all') + '...')
workspace = FabricWorkspace(
repository_directory=repo_dir,
workspace_id=workspace_id,
environment=environment,
token_credential=credential,
item_type_in_scope=remaining,
)
publish_all_items(workspace)
unpublish_all_orphan_items(workspace)
"
run: python scripts/deploy_fabric_cicd.py
90 changes: 1 addition & 89 deletions .github/workflows/reusable-fabric-etl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,92 +68,4 @@ jobs:
ITEM_NAME: ${{ inputs.item_name }}
ITEM_TYPE: ${{ inputs.item_type }}
JOB_TYPE: ${{ inputs.job_type }}
run: |
python -c "
import os
import sys
import time
import requests
from azure.identity import ClientSecretCredential

credential = ClientSecretCredential(
tenant_id=os.environ['AZURE_TENANT_ID'],
client_id=os.environ['AZURE_CLIENT_ID'],
client_secret=os.environ['AZURE_CLIENT_SECRET'],
)

token = credential.get_token('https://api.fabric.microsoft.com/.default').token
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}

workspace_id = os.environ['FABRIC_WORKSPACE_ID']
item_name = os.environ['ITEM_NAME']
item_type = os.environ['ITEM_TYPE']
job_type = os.environ['JOB_TYPE']

# Resolve item ID by name using the List Items API
list_url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items?type={item_type}'
list_response = requests.get(list_url, headers=headers)

if list_response.status_code != 200:
print(f'Failed to list items: {list_response.status_code} {list_response.text}')
sys.exit(1)

items = list_response.json().get('value', [])
matched = [i for i in items if i['displayName'] == item_name]

if not matched:
print(f'Item not found: {item_name} (type={item_type}) in workspace {workspace_id}')
print(f'Available items: {[i[\"displayName\"] for i in items]}')
sys.exit(1)

item_id = matched[0]['id']
print(f'Resolved {item_name} -> {item_id}')

# Start the job — returns 202 Accepted with a Location header for polling
url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances?jobType={job_type}'
response = requests.post(url, headers=headers)

if response.status_code not in (200, 202):
print(f'Failed to start job: {response.status_code} {response.text}')
sys.exit(1)

location = response.headers.get('Location')
retry_after = int(response.headers.get('Retry-After', '30'))
print(f'Job started. Polling for completion...')

# Poll for completion — respects Retry-After header from the API.
# max_polls * retry_after = maximum wait time (default: 120 * 30s = 60 min)
max_polls = 120
for i in range(max_polls):
time.sleep(retry_after)

# Re-acquire token in case of long-running jobs
if i > 0 and i % 20 == 0:
token = credential.get_token('https://api.fabric.microsoft.com/.default').token
headers['Authorization'] = f'Bearer {token}'

poll_response = requests.get(location, headers=headers)

if poll_response.status_code == 200:
result = poll_response.json()
status = result.get('status', 'Unknown')
print(f'Poll {i+1}: status={status}')

if status == 'Completed':
print('Job completed successfully.')
sys.exit(0)
elif status in ('Failed', 'Cancelled', 'Deduped'):
print(f'Job ended with status: {status}')
failure_reason = result.get('failureReason', 'No failure reason provided')
print(f'Failure reason: {failure_reason}')
sys.exit(1)
elif poll_response.status_code == 202:
retry_after = int(poll_response.headers.get('Retry-After', '30'))
print(f'Poll {i+1}: still running...')
else:
print(f'Unexpected poll response: {poll_response.status_code} {poll_response.text}')
sys.exit(1)

print('Timed out waiting for job to complete.')
sys.exit(1)
"
run: python scripts/run_fabric_etl.py
7 changes: 7 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
pytest>=8.0

# Runtime dependencies used by scripts/ — installed locally so editor and
# linters can resolve imports. CI also installs these per-workflow before
# invoking each script.
requests>=2.31
azure-identity>=1.15
fabric-cicd>=1.0.0,<2.0.0
Loading