Skip to content

Commit 6d96b5f

Browse files
committed
feat: Search pipeline name in pipeline run API
1 parent a02aa4e commit 6d96b5f

6 files changed

Lines changed: 1110 additions & 116 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from . import backend_types_sql as bts
1111
from . import component_structures as structures
12+
from . import database_ops
1213
from . import errors
1314
from . import filter_query_sql
1415

@@ -113,19 +114,15 @@ def create(
113114
},
114115
)
115116
session.add(pipeline_run)
116-
# Mirror created_by into the annotations table so it's searchable
117-
# via filter_query like any other annotation.
118-
if created_by is not None:
119-
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
120-
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
121-
session.flush()
122-
session.add(
123-
bts.PipelineRunAnnotation(
124-
pipeline_run_id=pipeline_run.id,
125-
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
126-
value=created_by,
127-
)
128-
)
117+
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
118+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
119+
session.flush()
120+
_mirror_system_annotations(
121+
session=session,
122+
pipeline_run_id=pipeline_run.id,
123+
created_by=created_by,
124+
pipeline_name=pipeline_name,
125+
)
129126
session.commit()
130127

131128
session.refresh(pipeline_run)
@@ -244,12 +241,9 @@ def _create_pipeline_run_response(
244241
bts.ExecutionNode, pipeline_run.root_execution_id
245242
)
246243
if execution_node:
247-
task_spec = structures.TaskSpec.from_json_dict(
248-
execution_node.task_spec
244+
pipeline_name = database_ops.get_pipeline_name_from_task_spec(
245+
task_spec_dict=execution_node.task_spec
249246
)
250-
component_spec = task_spec.component_ref.spec
251-
if component_spec:
252-
pipeline_name = component_spec.name
253247
response.pipeline_name = pipeline_name
254248
if include_execution_stats:
255249
execution_status_stats = self._calculate_execution_status_stats(
@@ -1153,6 +1147,32 @@ def list_secrets(
11531147
]
11541148

11551149

1150+
def _mirror_system_annotations(
1151+
*,
1152+
session: orm.Session,
1153+
pipeline_run_id: bts.IdType,
1154+
created_by: str | None,
1155+
pipeline_name: str | None,
1156+
) -> None:
1157+
"""Mirror pipeline run fields as system annotations for filter_query search."""
1158+
if created_by:
1159+
session.add(
1160+
bts.PipelineRunAnnotation(
1161+
pipeline_run_id=pipeline_run_id,
1162+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1163+
value=created_by,
1164+
)
1165+
)
1166+
if pipeline_name:
1167+
session.add(
1168+
bts.PipelineRunAnnotation(
1169+
pipeline_run_id=pipeline_run_id,
1170+
key=filter_query_sql.PipelineRunAnnotationSystemKey.NAME,
1171+
value=pipeline_name,
1172+
)
1173+
)
1174+
1175+
11561176
def _recursively_create_all_executions_and_artifacts_root(
11571177
session: orm.Session,
11581178
root_task_spec: structures.TaskSpec,

cloud_pipelines_backend/database_ops.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
from typing import Any
2+
13
import sqlalchemy
24
from sqlalchemy import orm
35

46
from . import backend_types_sql as bts
7+
from . import component_structures as structures
58
from . import filter_query_sql
69

710

@@ -87,6 +90,7 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8790
break
8891

8992
_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
93+
_backfill_pipeline_run_name_annotations(db_engine=db_engine)
9094

9195

9296
def _is_pipeline_run_annotation_key_already_backfilled(
@@ -106,6 +110,27 @@ def _is_pipeline_run_annotation_key_already_backfilled(
106110
).scalar()
107111

108112

113+
def get_pipeline_name_from_task_spec(
114+
*,
115+
task_spec_dict: dict[str, Any],
116+
) -> str | None:
117+
"""Extract pipeline name from a task_spec dict via component_ref.spec.name.
118+
119+
Traversal path:
120+
task_spec_dict -> TaskSpec -> component_ref -> spec -> name
121+
122+
Returns None if any step in the chain is missing or parsing fails.
123+
"""
124+
try:
125+
task_spec = structures.TaskSpec.from_json_dict(task_spec_dict)
126+
except Exception:
127+
return None
128+
spec = task_spec.component_ref.spec
129+
if spec is None:
130+
return None
131+
return spec.name or None
132+
133+
109134
def _backfill_pipeline_run_created_by_annotations(
110135
*,
111136
db_engine: sqlalchemy.Engine,
@@ -142,3 +167,232 @@ def _backfill_pipeline_run_created_by_annotations(
142167
)
143168
session.execute(stmt)
144169
session.commit()
170+
171+
172+
def _backfill_pipeline_names_from_extra_data(
173+
*,
174+
session: orm.Session,
175+
) -> None:
176+
"""Phase 1: bulk SQL backfill from extra_data['pipeline_name'].
177+
178+
INSERT INTO pipeline_run_annotation
179+
SELECT id, key, json_extract(extra_data, '$.pipeline_name')
180+
FROM pipeline_run
181+
WHERE json_extract(...) IS NOT NULL
182+
183+
Valid (creates annotation row):
184+
extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline"
185+
extra_data = {"pipeline_name": ""} -> value = ""
186+
187+
Skipped (no annotation row):
188+
extra_data = NULL -> JSON_EXTRACT = NULL
189+
extra_data = {} -> key absent, NULL
190+
extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL
191+
192+
SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL
193+
when extra_data is NULL or the key is absent (no Python error).
194+
"""
195+
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
196+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
197+
["pipeline_run_id", "key", "value"],
198+
sqlalchemy.select(
199+
bts.PipelineRun.id,
200+
sqlalchemy.literal(
201+
filter_query_sql.PipelineRunAnnotationSystemKey.NAME,
202+
),
203+
pipeline_name_expr,
204+
).where(
205+
pipeline_name_expr.isnot(None),
206+
),
207+
)
208+
session.execute(stmt)
209+
210+
211+
def _backfill_pipeline_names_from_component_spec(
212+
*,
213+
session: orm.Session,
214+
) -> None:
215+
"""Phase 2: Bulk SQL fallback for runs still missing a name annotation.
216+
217+
Extracts the pipeline name from each run's ExecutionNode via the
218+
JSON path:
219+
220+
task_spec -> 'componentRef' -> 'spec' ->> 'name'
221+
222+
Starting tables:
223+
224+
pipeline_run execution_node
225+
+----+------------------+ +--------+-------------------------------------------+
226+
| id | root_execution_id| | id | task_spec (JSON) |
227+
+----+------------------+ +--------+-------------------------------------------+
228+
| 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
229+
| 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} |
230+
| 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} |
231+
| 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
232+
| 5 | exec_99 | +--------+-------------------------------------------+
233+
+----+------------------+ (no exec_99 row)
234+
235+
pipeline_run_annotation (pre-existing)
236+
+--------+---------------------------+-------+
237+
| run_id | key | value |
238+
+--------+---------------------------+-------+
239+
| 1 | system/pipeline_run.name | A |
240+
| 3 | user/custom_tag | hello |
241+
+--------+---------------------------+-------+
242+
243+
Step 1 -- JOIN execution_node (INNER JOIN):
244+
Attaches task_spec to each run. Drops runs with no execution_node.
245+
246+
FROM pipeline_run pr
247+
JOIN execution_node en ON en.id = pr.root_execution_id
248+
249+
+----+--------+-------------------------------------------+
250+
| id | en.id | en.task_spec |
251+
+----+--------+-------------------------------------------+
252+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
253+
| 2 | exec_2 | {"componentRef":{"spec":null}} |
254+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
255+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
256+
+----+--------+-------------------------------------------+
257+
(run 5 dropped -- exec_99 doesn't exist)
258+
259+
Step 2a -- LEFT JOIN annotation:
260+
Attempts to match each run to an existing name annotation.
261+
262+
LEFT JOIN pipeline_run_annotation ann
263+
ON ann.pipeline_run_id = pr.id
264+
AND ann.key = 'system/pipeline_run.name'
265+
266+
+----+--------+------------------------------------------+------------------+----------+
267+
| id | en.id | en.task_spec | ann.run_id | ann.key |
268+
+----+--------+------------------------------------------+------------------+----------+
269+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name |
270+
| 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL |
271+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL |
272+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL |
273+
+----+--------+------------------------------------------+------------------+----------+
274+
(run 1 matched -- has 'system/pipeline_run.name' annotation)
275+
(run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name')
276+
277+
Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter):
278+
Keeps only runs where the LEFT JOIN found no match.
279+
280+
WHERE ann.pipeline_run_id IS NULL
281+
282+
+----+--------+-------------------------------------------+
283+
| id | en.id | en.task_spec |
284+
+----+--------+-------------------------------------------+
285+
| 2 | exec_2 | {"componentRef":{"spec":null}} |
286+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
287+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
288+
+----+--------+-------------------------------------------+
289+
(run 1 dropped -- ann.run_id was 1, not NULL)
290+
291+
Step 3 -- JSON extraction + NULL filter:
292+
Extracts name from JSON path, keeps only non-null (empty string is allowed).
293+
294+
WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL
295+
296+
+----+-------------------------------------------+-----------+
297+
| id | en.task_spec | name_expr |
298+
+----+-------------------------------------------+-----------+
299+
| 2 | {"componentRef":{"spec":null}} | NULL | <- dropped
300+
| 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK)
301+
| 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept
302+
+----+-------------------------------------------+-----------+
303+
304+
Step 4 -- INSERT INTO pipeline_run_annotation:
305+
Inserts one row per surviving run.
306+
307+
INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value)
308+
+--------+---------------------------+-------+
309+
| run_id | key | value |
310+
+--------+---------------------------+-------+
311+
| 3 | system/pipeline_run.name | |
312+
| 4 | system/pipeline_run.name | B |
313+
+--------+---------------------------+-------+
314+
315+
The JSON path is portable across databases via SQLAlchemy:
316+
- SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name')
317+
- MySQL: JSON_UNQUOTE(JSON_EXTRACT(...))
318+
- PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name'
319+
320+
Any null at any depth (task_spec NULL, componentRef missing,
321+
spec null, name missing) produces SQL NULL, filtered out by
322+
IS NOT NULL. Empty string is allowed and will be inserted.
323+
"""
324+
key = filter_query_sql.PipelineRunAnnotationSystemKey.NAME
325+
name_expr = bts.ExecutionNode.task_spec[
326+
("componentRef", "spec", "name")
327+
].as_string()
328+
existing_ann = orm.aliased(bts.PipelineRunAnnotation)
329+
330+
# Step 4: INSERT INTO pipeline_run_annotation
331+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
332+
["pipeline_run_id", "key", "value"],
333+
sqlalchemy.select(
334+
bts.PipelineRun.id,
335+
sqlalchemy.literal(str(key)),
336+
name_expr,
337+
)
338+
# Step 1: INNER JOIN execution_node
339+
.join(
340+
bts.ExecutionNode,
341+
bts.ExecutionNode.id == bts.PipelineRun.root_execution_id,
342+
)
343+
# Step 2a: LEFT JOIN existing annotation
344+
.outerjoin(
345+
existing_ann,
346+
sqlalchemy.and_(
347+
existing_ann.pipeline_run_id == bts.PipelineRun.id,
348+
existing_ann.key == key,
349+
),
350+
).where(
351+
# Step 2b: Anti-join — keep only runs with no existing annotation
352+
existing_ann.pipeline_run_id.is_(None),
353+
# Step 3: JSON extraction — keep only non-NULL names
354+
name_expr.isnot(None),
355+
),
356+
)
357+
session.execute(stmt)
358+
359+
360+
def _backfill_pipeline_run_name_annotations(
361+
*,
362+
db_engine: sqlalchemy.Engine,
363+
) -> None:
364+
"""Backfill pipeline_run_annotation with pipeline names.
365+
366+
The check and both inserts run in a single session/transaction to
367+
avoid TOCTOU races between concurrent startup processes. If anything
368+
fails, the entire transaction rolls back automatically.
369+
370+
Skips entirely if any name annotation already exists (i.e. the
371+
write-path is populating them, so the backfill has already run or is
372+
no longer needed).
373+
374+
Phase 1 -- _backfill_pipeline_names_from_extra_data:
375+
Bulk SQL insert from extra_data['pipeline_name'].
376+
377+
Phase 2 -- _backfill_pipeline_names_from_component_spec:
378+
Bulk SQL fallback for runs Phase 1 missed (extra_data is NULL or
379+
missing the key). Extracts name via JSON path
380+
task_spec -> componentRef -> spec -> name.
381+
382+
Annotation creation rules (same for both phases):
383+
Creates row: any non-NULL string, including empty string ""
384+
Skips row: NULL at any depth in the JSON path
385+
"""
386+
with orm.Session(db_engine) as session:
387+
if _is_pipeline_run_annotation_key_already_backfilled(
388+
session=session,
389+
key=filter_query_sql.PipelineRunAnnotationSystemKey.NAME,
390+
):
391+
return
392+
393+
# execute() - rows in DB buffer for Phase 2
394+
_backfill_pipeline_names_from_extra_data(session=session)
395+
# Phase 2 sees Phase 1's rows via the shared transaction buffer.
396+
_backfill_pipeline_names_from_component_spec(session=session)
397+
# Both phases become permanent atomically.
398+
session.commit()

0 commit comments

Comments
 (0)