Skip to content

Commit ad4e725

Browse files
authored
fix: pipeline push should rollback on failure (HEXA-1661) (#391)
1 parent fcd6ede commit ad4e725

8 files changed

Lines changed: 433 additions & 48 deletions

File tree

openhexa/cli/api.py

Lines changed: 98 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,47 @@ def get_pipeline_from_code(pipeline_code: str) -> dict[str, typing.Any]:
295295
return data["pipelineByCode"]
296296

297297

298-
def create_pipeline(pipeline_name: str, functional_type: str = None, tags: list[str] = None):
299-
"""Create a pipeline using the API."""
298+
def _build_pipeline_version_input(
299+
pipeline,
300+
pipeline_directory_path: str | Path,
301+
name: str = None,
302+
description: str = None,
303+
external_link: str = None,
304+
) -> dict:
305+
"""Build the GraphQL input to create a new pipeline version."""
306+
zip_file = generate_zip_file(pipeline_directory_path.absolute())
307+
308+
if settings.debug:
309+
# Write zip_file to disk for debugging
310+
with open("pipeline.zip", "wb") as debug_file:
311+
debug_file.write(zip_file.read())
312+
zip_file.seek(0)
313+
314+
return {
315+
"name": name,
316+
"description": description,
317+
"externalLink": external_link,
318+
"zipfile": base64.b64encode(zip_file.read()).decode("ascii"),
319+
"parameters": [p.to_dict() for p in pipeline.parameters],
320+
"timeout": pipeline.timeout,
321+
}
322+
323+
324+
def create_pipeline(
325+
pipeline_name: str,
326+
pipeline_directory_path: str | Path = None,
327+
version_name: str = None,
328+
version_description: str = None,
329+
version_external_link: str = None,
330+
functional_type: str = None,
331+
tags: list[str] = None,
332+
):
333+
"""Create a pipeline, optionally with its first version in a single atomic call.
334+
335+
When ``pipeline_directory_path`` is provided, the directory is zipped and sent as
336+
the first version (atomic on the backend). The returned dict then carries both the
337+
``pipeline`` and the ``pipelineVersion`` keys; otherwise only ``pipeline`` is set.
338+
"""
300339
if settings.current_workspace is None:
301340
raise NoActiveWorkspaceError
302341

@@ -311,27 +350,64 @@ def create_pipeline(pipeline_name: str, functional_type: str = None, tags: list[
311350
if tags:
312351
input_data["tags"] = tags
313352

353+
if pipeline_directory_path is not None:
354+
pipeline = get_pipeline(pipeline_directory_path.absolute())
355+
input_data["version"] = _build_pipeline_version_input(
356+
pipeline,
357+
pipeline_directory_path,
358+
name=version_name,
359+
description=version_description,
360+
external_link=version_external_link,
361+
)
362+
314363
data = graphql(
315364
"""
316-
mutation createPipeline($input: CreatePipelineInput!) {
317-
createPipeline(input: $input) {
318-
success
319-
errors
320-
pipeline {
321-
id
322-
code
323-
name
365+
mutation createPipeline($input: CreatePipelineInput!) {
366+
createPipeline(input: $input) {
367+
success
368+
errors
369+
pipeline {
370+
id
371+
code
372+
permissions {
373+
createTemplateVersion {
374+
isAllowed
375+
}
376+
}
377+
template {
378+
id
379+
code
380+
name
381+
}
382+
}
383+
pipelineVersion {
384+
id
385+
versionName
386+
pipeline {
387+
id
388+
code
389+
permissions {
390+
createTemplateVersion {
391+
isAllowed
392+
}
393+
}
394+
template {
395+
id
396+
code
397+
name
398+
}
399+
}
400+
}
401+
}
324402
}
325-
}
326-
}
327-
""",
403+
""",
328404
{"input": input_data},
329405
)
330406

331407
if not data["createPipeline"]["success"]:
332408
raise Exception(data["createPipeline"]["errors"])
333409

334-
return data["createPipeline"]["pipeline"]
410+
return data["createPipeline"]
335411

336412

337413
def download_pipeline_sourcecode(pipeline_code, output_path: Path = None, force_overwrite=False):
@@ -628,27 +704,18 @@ def upload_pipeline(
628704
if settings.current_workspace is None:
629705
raise NoActiveWorkspaceError
630706

631-
directory = pipeline_directory_path.absolute()
632-
pipeline = get_pipeline(directory)
633-
zip_file = generate_zip_file(directory)
634-
635-
if settings.debug:
636-
# Write zip_file to disk for debugging
637-
with open("pipeline.zip", "wb") as debug_file:
638-
debug_file.write(zip_file.read())
639-
zip_file.seek(0)
640-
641-
base64_content = base64.b64encode(zip_file.read()).decode("ascii")
707+
pipeline = get_pipeline(pipeline_directory_path.absolute())
642708

643709
input_data = {
644710
"workspaceSlug": settings.current_workspace,
645711
"code": target_pipeline_code,
646-
"name": name,
647-
"description": description,
648-
"externalLink": link,
649-
"zipfile": base64_content,
650-
"parameters": [p.to_dict() for p in pipeline.parameters],
651-
"timeout": pipeline.timeout,
712+
**_build_pipeline_version_input(
713+
pipeline,
714+
pipeline_directory_path,
715+
name=name,
716+
description=description,
717+
external_link=link,
718+
),
652719
}
653720

654721
if functional_type or pipeline.functional_type:

openhexa/cli/cli.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -461,20 +461,30 @@ def pipelines_push(
461461
click.confirm(confirmation_message, default=True, abort=True)
462462

463463
normalized_tags = [normalize_tag(t) for t in tag] if tag else []
464-
selected_pipeline = selected_pipeline or create_pipeline(
465-
pipeline.name, functional_type=functional_type, tags=normalized_tags
466-
)
467464
uploaded_pipeline_version = None
468465
try:
469-
uploaded_pipeline_version = upload_pipeline(
470-
selected_pipeline["code"],
471-
path,
472-
name,
473-
description=description,
474-
link=link,
475-
functional_type=functional_type,
476-
tags=normalized_tags,
477-
)
466+
if selected_pipeline:
467+
uploaded_pipeline_version = upload_pipeline(
468+
selected_pipeline["code"],
469+
path,
470+
name,
471+
description=description,
472+
link=link,
473+
functional_type=functional_type,
474+
tags=normalized_tags,
475+
)
476+
else:
477+
create_result = create_pipeline(
478+
pipeline.name,
479+
path,
480+
version_name=name,
481+
version_description=description,
482+
version_external_link=link,
483+
functional_type=functional_type,
484+
tags=normalized_tags,
485+
)
486+
uploaded_pipeline_version = create_result["pipelineVersion"]
487+
selected_pipeline = create_result["pipeline"]
478488
version_url = click.style(
479489
f"{settings.public_api_url}/workspaces/{workspace}/pipelines/{selected_pipeline['code']}",
480490
fg="bright_blue",

openhexa/graphql/graphql_client/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
CreateAccessmodProjectError,
110110
CreateAccessmodProjectMemberError,
111111
CreateAccessmodZonalStatisticsError,
112+
CreateAssistantConversationError,
112113
CreateBucketFolderError,
113114
CreateConnectionError,
114115
CreateDatasetError,
@@ -156,6 +157,7 @@
156157
DHIS2MetadataType,
157158
DisableTwoFactorError,
158159
EnableTwoFactorError,
160+
FileEncoding,
159161
FileSampleStatus,
160162
FileType,
161163
GenerateChallengeError,
@@ -171,6 +173,7 @@
171173
LaunchAccessmodAnalysisError,
172174
LaunchNotebookServerError,
173175
LinkDatasetError,
176+
LinkedObjectType,
174177
LoginError,
175178
MembershipRole,
176179
MessagePriority,
@@ -233,6 +236,7 @@
233236
UpdateWorkspaceMemberError,
234237
UpgradePipelineVersionFromTemplateError,
235238
VerifyDeviceError,
239+
WebappOperationScope,
236240
WebappType,
237241
WorkspaceInvitationStatus,
238242
WorkspaceMembershipRole,
@@ -275,6 +279,7 @@
275279
CreateAccessmodProjectInput,
276280
CreateAccessmodProjectMemberInput,
277281
CreateAccessmodZonalStatisticsInput,
282+
CreateAssistantConversationInput,
278283
CreateBucketFolderInput,
279284
CreateConnectionInput,
280285
CreateDatasetInput,
@@ -286,9 +291,11 @@
286291
CreatePipelineInput,
287292
CreatePipelineRecipientInput,
288293
CreatePipelineTemplateVersionInput,
294+
CreatePipelineVersionInput,
289295
CreateTeamInput,
290296
CreateWebappInput,
291297
CreateWorkspaceInput,
298+
DatasetVersionFileContentInput,
292299
DeclineWorkspaceInvitationInput,
293300
DeleteAccessmodAnalysisInput,
294301
DeleteAccessmodFilesetInput,
@@ -392,6 +399,7 @@
392399
UpgradePipelineVersionFromTemplateInput,
393400
UploadPipelineInput,
394401
VerifyDeviceInput,
402+
WebappFileInput,
395403
WebappSourceInput,
396404
WorkspaceInvitationInput,
397405
WorkspacePermissionInput,
@@ -541,6 +549,8 @@
541549
"CreateAccessmodProjectMemberInput",
542550
"CreateAccessmodZonalStatisticsError",
543551
"CreateAccessmodZonalStatisticsInput",
552+
"CreateAssistantConversationError",
553+
"CreateAssistantConversationInput",
544554
"CreateBucketFolderError",
545555
"CreateBucketFolderInput",
546556
"CreateConnection",
@@ -580,6 +590,7 @@
580590
"CreatePipelineTemplateVersionCreatePipelineTemplateVersionPipelineTemplate",
581591
"CreatePipelineTemplateVersionError",
582592
"CreatePipelineTemplateVersionInput",
593+
"CreatePipelineVersionInput",
583594
"CreateTeamError",
584595
"CreateTeamInput",
585596
"CreateTemplateVersionPermissionReason",
@@ -609,6 +620,7 @@
609620
"DatasetDatasetVersionsItems",
610621
"DatasetDatasetVersionsItemsCreatedBy",
611622
"DatasetDatasetWorkspace",
623+
"DatasetVersionFileContentInput",
612624
"Datasets",
613625
"DatasetsDatasets",
614626
"DatasetsDatasetsItems",
@@ -683,6 +695,7 @@
683695
"DisableTwoFactorInput",
684696
"EnableTwoFactorError",
685697
"EnableTwoFactorInput",
698+
"FileEncoding",
686699
"FileSampleStatus",
687700
"FileType",
688701
"GenerateChallengeError",
@@ -732,6 +745,7 @@
732745
"LaunchNotebookServerInput",
733746
"LinkDatasetError",
734747
"LinkDatasetInput",
748+
"LinkedObjectType",
735749
"LogPipelineMessageInput",
736750
"LoginError",
737751
"LoginInput",
@@ -912,6 +926,8 @@
912926
"UploadPipelineUploadPipeline",
913927
"VerifyDeviceError",
914928
"VerifyDeviceInput",
929+
"WebappFileInput",
930+
"WebappOperationScope",
915931
"WebappSourceInput",
916932
"WebappType",
917933
"Workspace",

0 commit comments

Comments
 (0)