Skip to content

Commit e1ec65e

Browse files
Extract inline Python from reusable workflows into scripts/ + add unit tests (#22)
Refactor only — no behavior change. Moves the Python embedded in three reusable workflows into proper .py files for syntax highlighting, linting, local debuggability, and unit testability. New scripts (mechanical extraction from inline YAML): - scripts/deploy_fabric_cicd.py — fabric-cicd two-phase deploy - scripts/run_fabric_etl.py — Fabric Job runner with name resolution and LRO polling - scripts/deploy_bulk.py — Bulk Import API deploy with payload build, POST, LRO polling Workflows updated to invoke 'python scripts/<name>.py' instead of 'python -c "..."'. Small helper functions extracted from each script's main() to enable testing of the pure logic that won't change in the upcoming bulk gap-bridging work: - deploy_fabric_cicd.py: remaining_types_for_phase2() - run_fabric_etl.py: find_item_id_by_name(), interpret_poll_response() - deploy_bulk.py: interpret_post_response() Unit tests covering the helpers and the existing pure functions: - tests/test_deploy_bulk.py — 19 tests - tests/test_deploy_fabric_cicd.py — 6 tests - tests/test_run_fabric_etl.py — 13 tests requirements-dev.txt: added requests, azure-identity, fabric-cicd so local imports resolve and the test runner can import the scripts. CI installs the same packages per workflow.
1 parent a480775 commit e1ec65e

10 files changed

Lines changed: 903 additions & 346 deletions

.github/workflows/reusable-deploy-bulk.yml

Lines changed: 1 addition & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -69,197 +69,4 @@ jobs:
6969
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
7070
FABRIC_WORKSPACE_ID: ${{ secrets.FABRIC_WORKSPACE_ID }}
7171
REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }}
72-
run: |
73-
python -c "
74-
import base64
75-
import json
76-
import os
77-
import pathlib
78-
import sys
79-
import time
80-
import requests
81-
82-
# Polling configuration (decisions #15, #16)
83-
POLL_FALLBACK_SECONDS = 30
84-
POLL_FLOOR_SECONDS = 5
85-
POLL_TIMEOUT_SECONDS = 20 * 60
86-
TOKEN_REFRESH_EVERY_N_POLLS = 20
87-
88-
# Files to skip when building definitionParts[]. Two layers of exclusion:
89-
# 1. Named files: known files that should never be sent (parameter.yml is
90-
# fabric-cicd config; .gitkeep is a Git placeholder).
91-
# 2. Structural rule: item definitions always live inside *.<Type>/ folders,
92-
# so any file directly under repository_directory is excluded by
93-
# construction (handled in build_definition_parts).
94-
EXCLUDED_FILES = {'parameter.yml', '.gitkeep'}
95-
96-
tenant_id = os.environ['AZURE_TENANT_ID']
97-
client_id = os.environ['AZURE_CLIENT_ID']
98-
client_secret = os.environ['AZURE_CLIENT_SECRET']
99-
workspace_id = os.environ['FABRIC_WORKSPACE_ID']
100-
repo_dir = pathlib.Path(os.environ['REPOSITORY_DIRECTORY']).resolve()
101-
102-
103-
def acquire_token() -> str:
104-
token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token'
105-
resp = requests.post(
106-
token_url,
107-
data={
108-
'grant_type': 'client_credentials',
109-
'client_id': client_id,
110-
'client_secret': client_secret,
111-
'scope': 'https://api.fabric.microsoft.com/.default',
112-
},
113-
timeout=30,
114-
)
115-
if resp.status_code != 200:
116-
sys.exit(f'::error::Token acquisition failed: HTTP {resp.status_code} {resp.text}')
117-
token = resp.json()['access_token']
118-
# Mask the token in workflow logs (decision #10)
119-
print(f'::add-mask::{token}')
120-
return token
121-
122-
123-
def build_definition_parts() -> list:
124-
if not repo_dir.is_dir():
125-
sys.exit(f'::error::Repository directory not found: {repo_dir}')
126-
parts = []
127-
for f in sorted(repo_dir.rglob('*')):
128-
if not f.is_file():
129-
continue
130-
if f.name in EXCLUDED_FILES:
131-
continue
132-
# Item definitions live inside *.<Type>/ subfolders; anything at
133-
# the root of repository_directory cannot belong to an item.
134-
if f.parent == repo_dir:
135-
continue
136-
rel = '/' + f.relative_to(repo_dir).as_posix()
137-
parts.append({
138-
'path': rel,
139-
'payload': base64.b64encode(f.read_bytes()).decode('ascii'),
140-
'payloadType': 'InlineBase64',
141-
})
142-
if not parts:
143-
sys.exit(f'::error::No item definition files found under {repo_dir}')
144-
return parts
145-
146-
147-
def poll_lro(operation_id: str, headers: dict, initial_retry_after: int) -> None:
148-
base = 'https://api.fabric.microsoft.com/v1/operations'
149-
retry_after = max(initial_retry_after or POLL_FALLBACK_SECONDS, POLL_FLOOR_SECONDS)
150-
started = time.monotonic()
151-
poll_count = 0
152-
153-
while True:
154-
elapsed = time.monotonic() - started
155-
if elapsed > POLL_TIMEOUT_SECONDS:
156-
sys.exit(
157-
f'::error::LRO polling timed out after {POLL_TIMEOUT_SECONDS}s '
158-
f'(operation {operation_id})'
159-
)
160-
161-
time.sleep(retry_after)
162-
poll_count += 1
163-
164-
# Refresh token periodically for long-running operations
165-
# (mirrors the pattern in reusable-fabric-etl.yml).
166-
if poll_count > 0 and poll_count % TOKEN_REFRESH_EVERY_N_POLLS == 0:
167-
headers['Authorization'] = f'Bearer {acquire_token()}'
168-
169-
resp = requests.get(f'{base}/{operation_id}', headers=headers, timeout=30)
170-
if resp.status_code != 200:
171-
sys.exit(f'::error::Poll request failed: HTTP {resp.status_code} {resp.text}')
172-
173-
body = resp.json()
174-
status = body.get('status', 'Unknown')
175-
print(f'Poll {poll_count} (t+{int(elapsed)}s): status={status}')
176-
177-
if status == 'Succeeded':
178-
return
179-
if status in ('Failed', 'Undefined'):
180-
print(json.dumps(body, indent=2))
181-
sys.exit(f'::error::LRO ended with status: {status}')
182-
183-
# NotStarted or Running — keep polling. Honor Retry-After if present.
184-
retry_after = max(
185-
int(resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS)),
186-
POLL_FLOOR_SECONDS,
187-
)
188-
189-
190-
def check_per_item_status(result: dict) -> None:
191-
details = result.get('importItemDefinitionsDetails', [])
192-
print(json.dumps(result, indent=2))
193-
if not details:
194-
sys.exit('::error::Result body has no importItemDefinitionsDetails')
195-
196-
failures = [
197-
d for d in details
198-
if d.get('operationStatus') in ('Failed', 'SucceededDespiteFailures')
199-
]
200-
if failures:
201-
summary = '\n'.join(
202-
f\" - {d.get('itemDisplayName')} ({d.get('itemType')}): \"
203-
f\"{d.get('operationStatus')}\"
204-
for d in failures
205-
)
206-
sys.exit(f'::error::{len(failures)} item(s) failed:\n{summary}')
207-
208-
print(f'All {len(details)} items deployed successfully.')
209-
210-
211-
# ---------- main flow ----------
212-
token = acquire_token()
213-
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
214-
215-
parts = build_definition_parts()
216-
print(f'Built request body with {len(parts)} definition parts from {repo_dir}')
217-
218-
request_body = {
219-
'definitionParts': parts,
220-
'options': {'allowPairingByName': False},
221-
}
222-
223-
# Endpoint URL per the API reference page (the tutorial's URL is wrong).
224-
# https://learn.microsoft.com/en-us/rest/api/fabric/core/items/bulk-import-item-definitions(beta)
225-
api_url = (
226-
f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}'
227-
f'/items/bulkImportDefinitions?beta=true'
228-
)
229-
print(f'POST {api_url}')
230-
231-
post_resp = requests.post(api_url, headers=headers, json=request_body, timeout=120)
232-
233-
if post_resp.status_code == 200:
234-
# Sync path — result is in the response body directly.
235-
check_per_item_status(post_resp.json())
236-
sys.exit(0)
237-
238-
if post_resp.status_code == 202:
239-
# Async path — poll the LRO, then fetch the result.
240-
operation_id = post_resp.headers.get('x-ms-operation-id')
241-
if not operation_id:
242-
sys.exit('::error::202 response missing x-ms-operation-id header')
243-
244-
initial_retry = int(post_resp.headers.get('Retry-After', POLL_FALLBACK_SECONDS))
245-
print(f'202 Accepted, operation_id={operation_id}, initial Retry-After={initial_retry}s')
246-
247-
poll_lro(operation_id, headers, initial_retry)
248-
249-
result_resp = requests.get(
250-
f'https://api.fabric.microsoft.com/v1/operations/{operation_id}/result',
251-
headers=headers,
252-
timeout=30,
253-
)
254-
if result_resp.status_code != 200:
255-
sys.exit(
256-
f'::error::Failed to fetch operation result: '
257-
f'HTTP {result_resp.status_code} {result_resp.text}'
258-
)
259-
check_per_item_status(result_resp.json())
260-
sys.exit(0)
261-
262-
sys.exit(
263-
f'::error::Bulk import POST failed: HTTP {post_resp.status_code} {post_resp.text}'
264-
)
265-
"
72+
run: python scripts/deploy_bulk.py

.github/workflows/reusable-deploy-supported.yml

Lines changed: 1 addition & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -66,66 +66,4 @@ jobs:
6666
REPOSITORY_DIRECTORY: ${{ inputs.repository_directory }}
6767
ENVIRONMENT: ${{ inputs.environment }}
6868
ITEM_TYPE_IN_SCOPE: ${{ inputs.item_type_in_scope }}
69-
run: |
70-
python -c "
71-
import json
72-
import os
73-
from azure.identity import ClientSecretCredential
74-
from fabric_cicd import FabricWorkspace, publish_all_items, unpublish_all_orphan_items
75-
76-
credential = ClientSecretCredential(
77-
tenant_id=os.environ['AZURE_TENANT_ID'],
78-
client_id=os.environ['AZURE_CLIENT_ID'],
79-
client_secret=os.environ['AZURE_CLIENT_SECRET'],
80-
)
81-
82-
# If item_type_in_scope is provided as a JSON array (e.g., '["Notebook"]'),
83-
# only those item types will be deployed. Otherwise, all types are in scope.
84-
item_type_in_scope = None
85-
raw = os.environ.get('ITEM_TYPE_IN_SCOPE', '').strip()
86-
if raw:
87-
item_type_in_scope = json.loads(raw)
88-
89-
repo_dir = os.environ['REPOSITORY_DIRECTORY']
90-
workspace_id = os.environ['FABRIC_WORKSPACE_ID']
91-
environment = os.environ['ENVIRONMENT']
92-
93-
# Deployment uses a 2-phase approach to satisfy item dependencies.
94-
# fabric-cicd caches workspace state once at the start of each
95-
# publish_all_items() call, so items deployed within the same call
96-
# are not visible to later items' logicalId or \$items resolution.
97-
#
98-
# Phase 1: Lakehouse + Ontology
99-
# - Lakehouse must exist so parameter.yml \$items.Lakehouse rules resolve.
100-
# - Ontology must exist so DataAgent's logicalId reference resolves.
101-
# Phase 2: All remaining items (DataAgent, Notebook, SemanticModel, etc.).
102-
#
103-
# On subsequent deployments all items already exist and phases are
104-
# idempotent — they simply update in place.
105-
106-
phase1_types = ['Lakehouse', 'Ontology']
107-
108-
print('Phase 1: Deploying Lakehouse + Ontology...')
109-
phase1_ws = FabricWorkspace(
110-
repository_directory=repo_dir,
111-
workspace_id=workspace_id,
112-
environment=environment,
113-
token_credential=credential,
114-
item_type_in_scope=phase1_types,
115-
)
116-
publish_all_items(phase1_ws)
117-
118-
# Phase 2: Deploy all remaining item types.
119-
phase1_set = set(phase1_types)
120-
remaining = [t for t in (item_type_in_scope or []) if t not in phase1_set] or None
121-
print('Phase 2: Deploying remaining items: ' + str(remaining or 'all') + '...')
122-
workspace = FabricWorkspace(
123-
repository_directory=repo_dir,
124-
workspace_id=workspace_id,
125-
environment=environment,
126-
token_credential=credential,
127-
item_type_in_scope=remaining,
128-
)
129-
publish_all_items(workspace)
130-
unpublish_all_orphan_items(workspace)
131-
"
69+
run: python scripts/deploy_fabric_cicd.py

.github/workflows/reusable-fabric-etl.yml

Lines changed: 1 addition & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -68,92 +68,4 @@ jobs:
6868
ITEM_NAME: ${{ inputs.item_name }}
6969
ITEM_TYPE: ${{ inputs.item_type }}
7070
JOB_TYPE: ${{ inputs.job_type }}
71-
run: |
72-
python -c "
73-
import os
74-
import sys
75-
import time
76-
import requests
77-
from azure.identity import ClientSecretCredential
78-
79-
credential = ClientSecretCredential(
80-
tenant_id=os.environ['AZURE_TENANT_ID'],
81-
client_id=os.environ['AZURE_CLIENT_ID'],
82-
client_secret=os.environ['AZURE_CLIENT_SECRET'],
83-
)
84-
85-
token = credential.get_token('https://api.fabric.microsoft.com/.default').token
86-
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
87-
88-
workspace_id = os.environ['FABRIC_WORKSPACE_ID']
89-
item_name = os.environ['ITEM_NAME']
90-
item_type = os.environ['ITEM_TYPE']
91-
job_type = os.environ['JOB_TYPE']
92-
93-
# Resolve item ID by name using the List Items API
94-
list_url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items?type={item_type}'
95-
list_response = requests.get(list_url, headers=headers)
96-
97-
if list_response.status_code != 200:
98-
print(f'Failed to list items: {list_response.status_code} {list_response.text}')
99-
sys.exit(1)
100-
101-
items = list_response.json().get('value', [])
102-
matched = [i for i in items if i['displayName'] == item_name]
103-
104-
if not matched:
105-
print(f'Item not found: {item_name} (type={item_type}) in workspace {workspace_id}')
106-
print(f'Available items: {[i[\"displayName\"] for i in items]}')
107-
sys.exit(1)
108-
109-
item_id = matched[0]['id']
110-
print(f'Resolved {item_name} -> {item_id}')
111-
112-
# Start the job — returns 202 Accepted with a Location header for polling
113-
url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances?jobType={job_type}'
114-
response = requests.post(url, headers=headers)
115-
116-
if response.status_code not in (200, 202):
117-
print(f'Failed to start job: {response.status_code} {response.text}')
118-
sys.exit(1)
119-
120-
location = response.headers.get('Location')
121-
retry_after = int(response.headers.get('Retry-After', '30'))
122-
print(f'Job started. Polling for completion...')
123-
124-
# Poll for completion — respects Retry-After header from the API.
125-
# max_polls * retry_after = maximum wait time (default: 120 * 30s = 60 min)
126-
max_polls = 120
127-
for i in range(max_polls):
128-
time.sleep(retry_after)
129-
130-
# Re-acquire token in case of long-running jobs
131-
if i > 0 and i % 20 == 0:
132-
token = credential.get_token('https://api.fabric.microsoft.com/.default').token
133-
headers['Authorization'] = f'Bearer {token}'
134-
135-
poll_response = requests.get(location, headers=headers)
136-
137-
if poll_response.status_code == 200:
138-
result = poll_response.json()
139-
status = result.get('status', 'Unknown')
140-
print(f'Poll {i+1}: status={status}')
141-
142-
if status == 'Completed':
143-
print('Job completed successfully.')
144-
sys.exit(0)
145-
elif status in ('Failed', 'Cancelled', 'Deduped'):
146-
print(f'Job ended with status: {status}')
147-
failure_reason = result.get('failureReason', 'No failure reason provided')
148-
print(f'Failure reason: {failure_reason}')
149-
sys.exit(1)
150-
elif poll_response.status_code == 202:
151-
retry_after = int(poll_response.headers.get('Retry-After', '30'))
152-
print(f'Poll {i+1}: still running...')
153-
else:
154-
print(f'Unexpected poll response: {poll_response.status_code} {poll_response.text}')
155-
sys.exit(1)
156-
157-
print('Timed out waiting for job to complete.')
158-
sys.exit(1)
159-
"
71+
run: python scripts/run_fabric_etl.py

requirements-dev.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
11
pytest>=8.0
2+
3+
# Runtime dependencies used by scripts/ — installed locally so editor and
4+
# linters can resolve imports. CI also installs these per-workflow before
5+
# invoking each script.
6+
requests>=2.31
7+
azure-identity>=1.15
8+
fabric-cicd>=1.0.0,<2.0.0

0 commit comments

Comments
 (0)