Skip to content

Commit f86f732

Browse files
authored
fix: allow to create a complete version from a pipeline creation (HEXA-1661) (#1819)
1 parent f634020 commit f86f732

6 files changed

Lines changed: 243 additions & 34 deletions

File tree

backend/hexa/mcp/tools/pipelines.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,19 @@ def create_pipeline(
257257
return {"errors": [{"message": "source_code is required and cannot be empty"}]}
258258
zipfile_b64 = _build_zipfile_b64(source_code)
259259

260+
create_input = {
261+
"workspaceSlug": workspace_slug,
262+
"name": name,
263+
"description": description or None,
264+
"functionalType": functional_type or None,
265+
}
266+
if zipfile_b64 is not None:
267+
create_input["version"] = {"zipfile": zipfile_b64}
268+
260269
data = execute_graphql(
261270
user,
262271
"CreatePipeline",
263-
{
264-
"input": {
265-
"workspaceSlug": workspace_slug,
266-
"name": name,
267-
"description": description or None,
268-
"functionalType": functional_type or None,
269-
"zipfile": zipfile_b64,
270-
}
271-
},
272+
{"input": create_input},
272273
)
273274
if "errors" in data:
274275
return data

backend/hexa/pipelines/graphql/schema.graphql

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,20 @@ enum PipelineOrderBy {
421421
LAST_RUN_DATE_ASC
422422
}
423423

424+
"""
425+
Configures the first pipeline version, created atomically alongside the pipeline.
426+
Providing this sub-input signals that a first version should be created.
427+
"""
428+
input CreatePipelineVersionInput {
429+
zipfile: String! # Base64-encoded ZIP file containing the version code.
430+
name: String # The name of the first pipeline version.
431+
description: String # The description of the first pipeline version.
432+
externalLink: URL # The external link associated with the first pipeline version.
433+
parameters: [ParameterInput!] # The parameters of the first pipeline version (defaults to those parsed from the zipfile).
434+
timeout: Int # The timeout, in seconds, of the first pipeline version.
435+
config: JSON # The default configuration applied to runs of the first pipeline version.
436+
}
437+
424438
"""
425439
Represents the input for creating a pipeline.
426440
"""
@@ -432,7 +446,7 @@ input CreatePipelineInput {
432446
notebookPath: String # The path to the notebook file for the pipeline.
433447
functionalType: PipelineFunctionalType # The functional type of the pipeline.
434448
tags: [String!] # The names of tags to associate with the pipeline.
435-
zipfile: String # Optional base64-encoded ZIP file to upload as the first pipeline version.
449+
version: CreatePipelineVersionInput # Optional first pipeline version, created atomically with the pipeline.
436450
}
437451

438452
"""

backend/hexa/pipelines/schema/mutations.py

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,23 @@ def _parse_parameters_from_zipfile(zipfile_data: bytes) -> list:
5858
raise PipelineCodeParsingError(str(e))
5959

6060

61+
def _validate_pipeline_version_timeout(
62+
timeout: int | None, workspace: Workspace
63+
) -> None:
64+
if not timeout:
65+
return
66+
max_allowed_timeout = int(settings.PIPELINE_RUN_MAX_TIMEOUT)
67+
subscription = workspace and workspace.current_subscription
68+
if subscription and subscription.max_pipeline_timeout:
69+
max_allowed_timeout = min(
70+
max_allowed_timeout, subscription.max_pipeline_timeout
71+
)
72+
if timeout < 0 or timeout > max_allowed_timeout:
73+
raise InvalidTimeoutValueError(
74+
"Pipeline timeout value cannot be negative or greater than the maximum allowed value."
75+
)
76+
77+
6178
@pipelines_mutations.field("createPipeline")
6279
def resolve_create_pipeline(_, info, **kwargs):
6380
request: HttpRequest = info.context["request"]
@@ -106,14 +123,10 @@ def resolve_create_pipeline(_, info, **kwargs):
106123
pipeline.tags.set(tags)
107124

108125
version = None
109-
if input.get("zipfile"):
110-
zipfile_data = base64.b64decode(input["zipfile"].encode("ascii"))
111-
parameters = _parse_parameters_from_zipfile(zipfile_data)
112-
113-
version = pipeline.upload_new_version(
114-
user=request.user,
115-
zipfile=zipfile_data,
116-
parameters=parameters,
126+
version_input = input.get("version")
127+
if version_input is not None:
128+
version = _create_first_pipeline_version(
129+
request.user, workspace, pipeline, version_input
117130
)
118131

119132
event_properties = {
@@ -137,15 +150,42 @@ def resolve_create_pipeline(_, info, **kwargs):
137150
"errors": ["PIPELINE_CODE_PARSING_ERROR"],
138151
"details": str(e),
139152
}
153+
except PipelineDoesNotSupportParametersError:
154+
return {"success": False, "errors": ["PIPELINE_DOES_NOT_SUPPORT_PARAMETERS"]}
155+
except InvalidTimeoutValueError:
156+
return {"success": False, "errors": ["INVALID_TIMEOUT_VALUE"]}
140157

141158
return {
142159
"pipeline": pipeline,
143-
"pipelineVersion": version,
160+
"pipeline_version": version,
144161
"success": True,
145162
"errors": [],
146163
}
147164

148165

166+
def _create_first_pipeline_version(
167+
user: User, workspace: Workspace, pipeline: Pipeline, version_input: dict
168+
) -> PipelineVersion:
169+
zipfile_data = base64.b64decode(version_input["zipfile"].encode("ascii"))
170+
parameters = version_input.get("parameters") or _parse_parameters_from_zipfile(
171+
zipfile_data
172+
)
173+
174+
timeout = version_input.get("timeout")
175+
_validate_pipeline_version_timeout(timeout, workspace)
176+
177+
return pipeline.upload_new_version(
178+
user=user,
179+
name=version_input.get("name"),
180+
description=version_input.get("description"),
181+
external_link=version_input.get("external_link"),
182+
zipfile=zipfile_data,
183+
parameters=parameters,
184+
timeout=timeout,
185+
config=version_input.get("config"),
186+
)
187+
188+
149189
@pipelines_mutations.field("updatePipeline")
150190
def resolve_update_pipeline(_, info, **kwargs):
151191
request: HttpRequest = info.context["request"]
@@ -386,19 +426,7 @@ def resolve_upload_pipeline(_, info, **kwargs):
386426
"errors": ["PIPELINE_NOT_FOUND"],
387427
}
388428
try:
389-
max_allowed_timeout = int(settings.PIPELINE_RUN_MAX_TIMEOUT)
390-
subscription = pipeline.workspace and pipeline.workspace.current_subscription
391-
if subscription and subscription.max_pipeline_timeout:
392-
max_allowed_timeout = min(
393-
max_allowed_timeout, subscription.max_pipeline_timeout
394-
)
395-
396-
if input.get("timeout") and (
397-
input.get("timeout") < 0 or input.get("timeout") > max_allowed_timeout
398-
):
399-
raise InvalidTimeoutValueError(
400-
"Pipeline timeout value cannot be negative or greater than the maximum allowed value."
401-
)
429+
_validate_pipeline_version_timeout(input.get("timeout"), pipeline.workspace)
402430

403431
zipfile_data = base64.b64decode(input.get("zipfile").encode("ascii"))
404432
parameters = input.get("parameters")

backend/hexa/pipelines/tests/test_schema/test_pipelines.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,144 @@ def test_create_pipeline_notebook(self):
252252
pipeline = Pipeline.objects.filter_for_user(self.USER_ROOT).get()
253253
self.assertEqual(pipeline.type, PipelineType.NOTEBOOK)
254254

255+
CREATE_PIPELINE_WITH_VERSION_MUTATION = """
256+
mutation createPipeline($input: CreatePipelineInput!) {
257+
createPipeline(input: $input) {
258+
success
259+
errors
260+
pipeline { code name }
261+
pipelineVersion { name parameters { code } }
262+
}
263+
}
264+
"""
265+
266+
def test_create_pipeline_with_first_version(self):
267+
self.assertEqual(0, len(Pipeline.objects.all()))
268+
self.client.force_login(self.USER_ROOT)
269+
270+
r = self.run_query(
271+
self.CREATE_PIPELINE_WITH_VERSION_MUTATION,
272+
{
273+
"input": {
274+
"name": "AtomicPipeline",
275+
"workspaceSlug": self.WS1.slug,
276+
"version": {
277+
"zipfile": self.zip_pipeline_py,
278+
"name": "v1",
279+
},
280+
}
281+
},
282+
)
283+
284+
result = r["data"]["createPipeline"]
285+
self.assertTrue(result["success"])
286+
self.assertEqual(result["errors"], [])
287+
self.assertEqual(result["pipeline"]["name"], "AtomicPipeline")
288+
self.assertEqual(result["pipelineVersion"]["name"], "v1")
289+
self.assertEqual(
290+
[p["code"] for p in result["pipelineVersion"]["parameters"]],
291+
["file_path"],
292+
)
293+
pipeline = Pipeline.objects.filter_for_user(self.USER_ROOT).get()
294+
self.assertEqual(pipeline.versions.count(), 1)
295+
296+
def test_create_pipeline_with_first_version_negative_timeout(self):
297+
self.assertEqual(0, len(Pipeline.objects.all()))
298+
self.client.force_login(self.USER_ROOT)
299+
300+
r = self.run_query(
301+
self.CREATE_PIPELINE_WITH_VERSION_MUTATION,
302+
{
303+
"input": {
304+
"name": "AtomicPipeline",
305+
"workspaceSlug": self.WS1.slug,
306+
"version": {
307+
"zipfile": self.zip_pipeline_py,
308+
"timeout": -1,
309+
},
310+
}
311+
},
312+
)
313+
314+
self.assertEqual(
315+
r["data"]["createPipeline"]["errors"], ["INVALID_TIMEOUT_VALUE"]
316+
)
317+
self.assertFalse(r["data"]["createPipeline"]["success"])
318+
self.assertEqual(0, Pipeline.objects.all().count())
319+
320+
def test_create_pipeline_with_first_version_timeout_too_high(self):
321+
self.assertEqual(0, len(Pipeline.objects.all()))
322+
self.client.force_login(self.USER_ROOT)
323+
324+
r = self.run_query(
325+
self.CREATE_PIPELINE_WITH_VERSION_MUTATION,
326+
{
327+
"input": {
328+
"name": "AtomicPipeline",
329+
"workspaceSlug": self.WS1.slug,
330+
"version": {
331+
"zipfile": self.zip_pipeline_py,
332+
"timeout": int(settings.PIPELINE_RUN_MAX_TIMEOUT) + 1,
333+
},
334+
}
335+
},
336+
)
337+
338+
self.assertEqual(
339+
r["data"]["createPipeline"]["errors"], ["INVALID_TIMEOUT_VALUE"]
340+
)
341+
self.assertFalse(r["data"]["createPipeline"]["success"])
342+
self.assertEqual(0, Pipeline.objects.all().count())
343+
344+
def test_create_pipeline_with_first_version_bad_zipfile(self):
345+
self.assertEqual(0, len(Pipeline.objects.all()))
346+
self.client.force_login(self.USER_ROOT)
347+
348+
bad_zip = base64.b64encode(b"not a real zip").decode("ascii")
349+
r = self.run_query(
350+
self.CREATE_PIPELINE_WITH_VERSION_MUTATION,
351+
{
352+
"input": {
353+
"name": "AtomicPipeline",
354+
"workspaceSlug": self.WS1.slug,
355+
"version": {"zipfile": bad_zip},
356+
}
357+
},
358+
)
359+
360+
self.assertEqual(
361+
r["data"]["createPipeline"]["errors"], ["PIPELINE_CODE_PARSING_ERROR"]
362+
)
363+
self.assertFalse(r["data"]["createPipeline"]["success"])
364+
self.assertEqual(0, Pipeline.objects.all().count())
365+
366+
def test_create_pipeline_with_first_version_explicit_parameters(self):
367+
self.assertEqual(0, len(Pipeline.objects.all()))
368+
self.client.force_login(self.USER_ROOT)
369+
370+
r = self.run_query(
371+
self.CREATE_PIPELINE_WITH_VERSION_MUTATION,
372+
{
373+
"input": {
374+
"name": "AtomicPipeline",
375+
"workspaceSlug": self.WS1.slug,
376+
"version": {
377+
"zipfile": self.zip_pipeline_py,
378+
"parameters": pipelines_parameters_with_scalars,
379+
},
380+
}
381+
},
382+
)
383+
384+
self.assertTrue(r["data"]["createPipeline"]["success"])
385+
self.assertEqual(
386+
[
387+
p["code"]
388+
for p in r["data"]["createPipeline"]["pipelineVersion"]["parameters"]
389+
],
390+
[p["code"] for p in pipelines_parameters_with_scalars],
391+
)
392+
255393
def test_list_pipelines(self):
256394
self.assertEqual(0, len(PipelineRun.objects.all()))
257395
self._create_pipeline(name="Pipeline DHIS2")

frontend/schema.generated.graphql

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1042,8 +1042,8 @@ input CreatePipelineInput {
10421042
name: String!
10431043
notebookPath: String
10441044
tags: [String!]
1045+
version: CreatePipelineVersionInput
10451046
workspaceSlug: String!
1046-
zipfile: String
10471047
}
10481048

10491049
"""Represents the input for adding a recipient to a pipeline."""
@@ -1095,6 +1095,20 @@ type CreatePipelineTemplateVersionResult {
10951095
success: Boolean!
10961096
}
10971097

1098+
"""
1099+
Configures the first pipeline version, created atomically alongside the pipeline.
1100+
Providing this sub-input signals that a first version should be created.
1101+
"""
1102+
input CreatePipelineVersionInput {
1103+
config: JSON
1104+
description: String
1105+
externalLink: URL
1106+
name: String
1107+
parameters: [ParameterInput!]
1108+
timeout: Int
1109+
zipfile: String!
1110+
}
1111+
10981112
"""
10991113
The CreateTeamError enum represents the possible errors that can occur during the createTeam mutation.
11001114
"""

frontend/src/graphql/types.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1053,8 +1053,8 @@ export type CreatePipelineInput = {
10531053
name: Scalars['String']['input'];
10541054
notebookPath?: InputMaybe<Scalars['String']['input']>;
10551055
tags?: InputMaybe<Array<Scalars['String']['input']>>;
1056+
version?: InputMaybe<CreatePipelineVersionInput>;
10561057
workspaceSlug: Scalars['String']['input'];
1057-
zipfile?: InputMaybe<Scalars['String']['input']>;
10581058
};
10591059

10601060
/** Represents the input for adding a recipient to a pipeline. */
@@ -1106,6 +1106,20 @@ export type CreatePipelineTemplateVersionResult = {
11061106
success: Scalars['Boolean']['output'];
11071107
};
11081108

1109+
/**
1110+
* Configures the first pipeline version, created atomically alongside the pipeline.
1111+
* Providing this sub-input signals that a first version should be created.
1112+
*/
1113+
export type CreatePipelineVersionInput = {
1114+
config?: InputMaybe<Scalars['JSON']['input']>;
1115+
description?: InputMaybe<Scalars['String']['input']>;
1116+
externalLink?: InputMaybe<Scalars['URL']['input']>;
1117+
name?: InputMaybe<Scalars['String']['input']>;
1118+
parameters?: InputMaybe<Array<ParameterInput>>;
1119+
timeout?: InputMaybe<Scalars['Int']['input']>;
1120+
zipfile: Scalars['String']['input'];
1121+
};
1122+
11091123
/** The CreateTeamError enum represents the possible errors that can occur during the createTeam mutation. */
11101124
export enum CreateTeamError {
11111125
/** Indicates that a team with the same name already exists. */

0 commit comments

Comments
 (0)