Skip to content

Commit 6789217

Browse files
committed
feat: Search created by user in pipeline run API
1 parent 41a18a2 commit 6789217

8 files changed

Lines changed: 564 additions & 29 deletions

File tree

cloud_pipelines_backend/api_router.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ def handle_item_already_exists_error(
125125
content={"message": str(exc)},
126126
)
127127

128+
@app.exception_handler(errors.FilterValidationError)
129+
def handle_filter_validation_error(
130+
request: fastapi.Request, exc: errors.FilterValidationError
131+
):
132+
return fastapi.responses.JSONResponse(
133+
status_code=422,
134+
content={"detail": str(exc)},
135+
)
136+
128137
@app.exception_handler(NotImplementedError)
129138
def handle_not_implemented_error(
130139
request: fastapi.Request, exc: NotImplementedError

cloud_pipelines_backend/api_server_sql.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ def _get_current_time() -> datetime.datetime:
3030
from . import component_structures as structures
3131
from . import backend_types_sql as bts
3232
from . import errors
33-
from .errors import ItemNotFoundError
33+
from .errors import InvalidAnnotationKeyError, ItemNotFoundError
3434
from . import filter_query_sql
35-
from .filter_query_sql import PageToken
35+
36+
_SYSTEM_KEY_RESERVED_MSG = f"Annotation keys starting with {filter_query_sql.SYSTEM_KEY_PREFIX!r} are reserved for system use."
3637

3738

3839
# ==== PipelineJobService
@@ -119,6 +120,19 @@ def create(
119120
},
120121
)
121122
session.add(pipeline_run)
123+
# Mirror created_by into the annotations table so it's searchable
124+
# via filter_query like any other annotation.
125+
if created_by is not None:
126+
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
127+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
128+
session.flush()
129+
session.add(
130+
bts.PipelineRunAnnotation(
131+
pipeline_run_id=pipeline_run.id,
132+
key=filter_query_sql.SystemKey.CREATED_BY,
133+
value=created_by,
134+
)
135+
)
122136
session.commit()
123137

124138
session.refresh(pipeline_run)
@@ -180,16 +194,13 @@ def list(
180194
params: Annotated[ListPipelineRunsParams, Query()],
181195
) -> ListPipelineJobsResponse:
182196
page_size = 10
183-
try:
184-
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
185-
filter_value=params.filter,
186-
filter_query_value=params.filter_query,
187-
page_token_value=params.page_token,
188-
current_user=current_user,
189-
page_size=page_size,
190-
)
191-
except filter_query_sql.MutuallyExclusiveFilterError as e:
192-
raise HTTPException(status_code=422, detail=str(e))
197+
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
198+
filter_value=params.filter,
199+
filter_query_value=params.filter_query,
200+
page_token_value=params.page_token,
201+
current_user=current_user,
202+
page_size=page_size,
203+
)
193204

194205
pipeline_runs = list(
195206
session.scalars(
@@ -309,6 +320,8 @@ def set_annotation(
309320
user_name: str | None = None,
310321
skip_user_check: bool = False,
311322
):
323+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
324+
raise InvalidAnnotationKeyError(_SYSTEM_KEY_RESERVED_MSG)
312325
pipeline_run = session.get(bts.PipelineRun, id)
313326
if not pipeline_run:
314327
raise ItemNotFoundError(f"Pipeline run {id} not found.")
@@ -331,6 +344,8 @@ def delete_annotation(
331344
user_name: str | None = None,
332345
skip_user_check: bool = False,
333346
):
347+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
348+
raise InvalidAnnotationKeyError(_SYSTEM_KEY_RESERVED_MSG)
334349
pipeline_run = session.get(bts.PipelineRun, id)
335350
if not pipeline_run:
336351
raise ItemNotFoundError(f"Pipeline run {id} not found.")

cloud_pipelines_backend/database_ops.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sqlalchemy
2+
from sqlalchemy import orm
23

34
from . import backend_types_sql as bts
5+
from . import filter_query_sql
46

57

68
def create_db_engine_and_migrate_db(
@@ -89,6 +91,36 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8991
)
9092
annotation_index.create(db_engine, checkfirst=True)
9193

92-
# TODO: I believe this is needed with the comments above?
9394
# Workaround for https://github.com/sqlalchemy/sqlalchemy/issues/12965
9495
bts.PipelineRunAnnotation.__table__.indexes.discard(annotation_index)
96+
97+
backfill_created_by_annotations(db_engine)
98+
99+
100+
def backfill_created_by_annotations(db_engine: sqlalchemy.Engine):
101+
"""Copy pipeline_run.created_by into pipeline_run_annotation so
102+
annotation-based search works for created_by.
103+
104+
Idempotent -- skips rows that already have the annotation.
105+
"""
106+
with orm.Session(db_engine) as session:
107+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
108+
["pipeline_run_id", "key", "value"],
109+
sqlalchemy.select(
110+
bts.PipelineRun.id,
111+
sqlalchemy.literal(filter_query_sql.SystemKey.CREATED_BY),
112+
bts.PipelineRun.created_by,
113+
).where(
114+
bts.PipelineRun.created_by.isnot(None),
115+
# NOT EXISTS makes the backfill idempotent
116+
~sqlalchemy.exists(
117+
sqlalchemy.select(bts.PipelineRunAnnotation.pipeline_run_id).where(
118+
bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id,
119+
bts.PipelineRunAnnotation.key
120+
== filter_query_sql.SystemKey.CREATED_BY,
121+
)
122+
),
123+
),
124+
)
125+
session.execute(stmt)
126+
session.commit()

cloud_pipelines_backend/errors.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,17 @@ class ItemAlreadyExistsError(Exception):
88

99
class PermissionError(Exception):
1010
pass
11+
12+
13+
class FilterValidationError(Exception):
14+
"""Base for all filter/annotation validation errors -> 422."""
15+
16+
pass
17+
18+
19+
class MutuallyExclusiveFilterError(FilterValidationError):
20+
pass
21+
22+
23+
class InvalidAnnotationKeyError(FilterValidationError):
24+
pass

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 138 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,44 @@
11
import base64
22
import dataclasses
33
import json
4+
from enum import StrEnum
5+
from typing import Final
46

57
import sqlalchemy as sql
68

79
from . import backend_types_sql as bts
10+
from .errors import (
11+
InvalidAnnotationKeyError,
12+
MutuallyExclusiveFilterError,
13+
)
814
from .filter_query_models import (
915
AndPredicate,
1016
FilterQuery,
1117
KeyExistsPredicate,
1218
NotPredicate,
1319
OrPredicate,
20+
Predicate,
1421
ValueContainsPredicate,
22+
ValueEquals,
1523
ValueEqualsPredicate,
1624
ValueInPredicate,
1725
)
1826

27+
SYSTEM_KEY_PREFIX: Final[str] = "system/"
28+
29+
30+
class SystemKey(StrEnum):
31+
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
32+
33+
34+
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
35+
SystemKey.CREATED_BY: {
36+
KeyExistsPredicate,
37+
ValueEqualsPredicate,
38+
ValueInPredicate,
39+
},
40+
}
41+
1942
# ---------------------------------------------------------------------------
2043
# PageToken
2144
# ---------------------------------------------------------------------------
@@ -39,8 +62,89 @@ def decode(cls, token: str | None) -> "PageToken":
3962
return cls(**json.loads(base64.b64decode(token)))
4063

4164

42-
class MutuallyExclusiveFilterError(ValueError):
43-
pass
65+
# ---------------------------------------------------------------------------
66+
# SystemKey validation and resolution
67+
# ---------------------------------------------------------------------------
68+
69+
70+
def _get_predicate_key(*, predicate: Predicate) -> str | None:
71+
"""Extract the annotation key from a leaf predicate, or None for logical operators."""
72+
match predicate:
73+
case KeyExistsPredicate():
74+
return predicate.key_exists.key
75+
case ValueEqualsPredicate():
76+
return predicate.value_equals.key
77+
case ValueContainsPredicate():
78+
return predicate.value_contains.key
79+
case ValueInPredicate():
80+
return predicate.value_in.key
81+
case _:
82+
return None
83+
84+
85+
def _check_predicate_allowed(*, predicate: Predicate) -> None:
86+
"""Raise if a system key is used with an unsupported predicate type."""
87+
key = _get_predicate_key(predicate=predicate)
88+
if key is None:
89+
return
90+
91+
try:
92+
system_key = SystemKey(key)
93+
except ValueError:
94+
return
95+
96+
supported = SYSTEM_KEY_SUPPORTED_PREDICATES.get(system_key, set())
97+
if type(predicate) not in supported:
98+
raise InvalidAnnotationKeyError(
99+
f"Predicate {type(predicate).__name__} is not supported "
100+
f"for system key {system_key!r}. "
101+
f"Supported: {[t.__name__ for t in supported]}"
102+
)
103+
104+
105+
def _resolve_system_key_value(
106+
*,
107+
key: str,
108+
value: str,
109+
current_user: str | None,
110+
) -> str:
111+
"""Resolve special placeholder values for system keys."""
112+
if key == SystemKey.CREATED_BY and value == "me":
113+
return current_user if current_user is not None else ""
114+
return value
115+
116+
117+
def _maybe_resolve_system_values(
118+
*,
119+
predicate: ValueEqualsPredicate,
120+
current_user: str | None,
121+
) -> ValueEqualsPredicate:
122+
"""Resolve special values in a ValueEqualsPredicate."""
123+
key = predicate.value_equals.key
124+
value = predicate.value_equals.value
125+
resolved = _resolve_system_key_value(
126+
key=key,
127+
value=value,
128+
current_user=current_user,
129+
)
130+
if resolved != value:
131+
return ValueEqualsPredicate(value_equals=ValueEquals(key=key, value=resolved))
132+
return predicate
133+
134+
135+
def _validate_and_resolve_predicate(
136+
*,
137+
predicate: Predicate,
138+
current_user: str | None,
139+
) -> Predicate:
140+
"""Validate system key support, then resolve special values."""
141+
_check_predicate_allowed(predicate=predicate)
142+
if isinstance(predicate, ValueEqualsPredicate):
143+
return _maybe_resolve_system_values(
144+
predicate=predicate,
145+
current_user=current_user,
146+
)
147+
return predicate
44148

45149

46150
# ---------------------------------------------------------------------------
@@ -79,7 +183,12 @@ def build_list_filters(
79183

80184
if filter_query_value:
81185
parsed = FilterQuery.model_validate_json(filter_query_value)
82-
where_clauses.append(filter_query_to_where_clause(filter_query=parsed))
186+
where_clauses.append(
187+
filter_query_to_where_clause(
188+
filter_query=parsed,
189+
current_user=current_user,
190+
)
191+
)
83192

84193
next_page_token = PageToken(
85194
offset=offset + page_size,
@@ -93,10 +202,13 @@ def build_list_filters(
93202
def filter_query_to_where_clause(
94203
*,
95204
filter_query: FilterQuery,
205+
current_user: str | None = None,
96206
) -> sql.ColumnElement:
97207
predicates = filter_query.and_ or filter_query.or_
98208
is_and = filter_query.and_ is not None
99-
clauses = [_predicate_to_clause(predicate=p) for p in predicates]
209+
clauses = [
210+
_predicate_to_clause(predicate=p, current_user=current_user) for p in predicates
211+
]
100212
return sql.and_(*clauses) if is_and else sql.or_(*clauses)
101213

102214

@@ -161,17 +273,35 @@ def _build_filter_where_clauses(
161273

162274
def _predicate_to_clause(
163275
*,
164-
predicate,
276+
predicate: Predicate,
277+
current_user: str | None = None,
165278
) -> sql.ColumnElement:
279+
predicate = _validate_and_resolve_predicate(
280+
predicate=predicate,
281+
current_user=current_user,
282+
)
283+
166284
match predicate:
167285
case AndPredicate():
168286
return sql.and_(
169-
*[_predicate_to_clause(predicate=p) for p in predicate.and_]
287+
*[
288+
_predicate_to_clause(predicate=p, current_user=current_user)
289+
for p in predicate.and_
290+
]
170291
)
171292
case OrPredicate():
172-
return sql.or_(*[_predicate_to_clause(predicate=p) for p in predicate.or_])
293+
return sql.or_(
294+
*[
295+
_predicate_to_clause(predicate=p, current_user=current_user)
296+
for p in predicate.or_
297+
]
298+
)
173299
case NotPredicate():
174-
return sql.not_(_predicate_to_clause(predicate=predicate.not_))
300+
return sql.not_(
301+
_predicate_to_clause(
302+
predicate=predicate.not_, current_user=current_user
303+
)
304+
)
175305
case KeyExistsPredicate():
176306
return _key_exists_to_clause(predicate=predicate)
177307
case ValueEqualsPredicate():

0 commit comments

Comments
 (0)