Skip to content

Commit 41a18a2

Browse files
committed
feat: Search annotations in pipeline run API
1 parent 78fdccf commit 41a18a2

6 files changed

Lines changed: 989 additions & 218 deletions

File tree

cloud_pipelines_backend/api_router.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ def handle_item_already_exists_error(
125125
content={"message": str(exc)},
126126
)
127127

128+
@app.exception_handler(NotImplementedError)
129+
def handle_not_implemented_error(
130+
request: fastapi.Request, exc: NotImplementedError
131+
):
132+
return fastapi.responses.JSONResponse(
133+
status_code=501,
134+
content={"detail": str(exc)},
135+
)
136+
128137
get_user_details_dependency = fastapi.Depends(user_details_getter)
129138

130139
def get_user_name(

cloud_pipelines_backend/api_server_sql.py

Lines changed: 17 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import base64
21
import dataclasses
32
import datetime
4-
import json
53
import logging
64
import typing
7-
from typing import Annotated, Any, Final, Optional
5+
from typing import Annotated, Any, Optional
86

97
from fastapi import HTTPException, Query
108
from pydantic import BaseModel
@@ -33,10 +31,8 @@ def _get_current_time() -> datetime.datetime:
3331
from . import backend_types_sql as bts
3432
from . import errors
3533
from .errors import ItemNotFoundError
36-
from .filter_query_models import FilterQuery
37-
38-
_PAGE_TOKEN_OFFSET_KEY: Final[str] = "offset"
39-
_PAGE_TOKEN_FILTER_KEY: Final[str] = "filter"
34+
from . import filter_query_sql
35+
from .filter_query_sql import PageToken
4036

4137

4238
# ==== PipelineJobService
@@ -183,29 +179,18 @@ def list(
183179
current_user: str | None = None,
184180
params: Annotated[ListPipelineRunsParams, Query()],
185181
) -> ListPipelineJobsResponse:
186-
if params.filter and params.filter_query:
187-
raise HTTPException(
188-
status_code=422,
189-
detail="Cannot use both 'filter' and 'filter_query'. Use one or the other.",
190-
)
191-
192-
if params.filter_query:
193-
FilterQuery.model_validate_json(params.filter_query)
194-
raise HTTPException(
195-
status_code=501,
196-
detail="filter_query is not yet implemented.",
182+
page_size = 10
183+
try:
184+
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
185+
filter_value=params.filter,
186+
filter_query_value=params.filter_query,
187+
page_token_value=params.page_token,
188+
current_user=current_user,
189+
page_size=page_size,
197190
)
191+
except filter_query_sql.MutuallyExclusiveFilterError as e:
192+
raise HTTPException(status_code=422, detail=str(e))
198193

199-
filter_value, offset = _resolve_filter_value(
200-
filter=params.filter,
201-
page_token=params.page_token,
202-
)
203-
where_clauses, next_page_filter_value = _build_filter_where_clauses(
204-
filter_value=filter_value,
205-
current_user=current_user,
206-
)
207-
208-
page_size = 10
209194
pipeline_runs = list(
210195
session.scalars(
211196
sql.select(bts.PipelineRun)
@@ -215,14 +200,10 @@ def list(
215200
.limit(page_size)
216201
).all()
217202
)
218-
next_page_offset = offset + page_size
219-
next_page_token_dict = {
220-
_PAGE_TOKEN_OFFSET_KEY: next_page_offset,
221-
_PAGE_TOKEN_FILTER_KEY: next_page_filter_value,
222-
}
223-
next_page_token = _encode_page_token(next_page_token_dict)
224-
if len(pipeline_runs) < page_size:
225-
next_page_token = None
203+
204+
next_page_token = (
205+
next_token.encode() if len(pipeline_runs) >= page_size else None
206+
)
226207

227208
return ListPipelineJobsResponse(
228209
pipeline_runs=[
@@ -363,82 +344,6 @@ def delete_annotation(
363344
session.commit()
364345

365346

366-
def _resolve_filter_value(
367-
*,
368-
filter: str | None,
369-
page_token: str | None,
370-
) -> tuple[str | None, int]:
371-
"""Decode page_token and return the effective (filter_value, offset).
372-
373-
If a page_token is present, its stored filter takes precedence over the
374-
raw filter parameter (the token carries the resolved filter forward across pages).
375-
"""
376-
page_token_dict = _decode_page_token(page_token)
377-
offset = page_token_dict.get(_PAGE_TOKEN_OFFSET_KEY, 0)
378-
if page_token:
379-
filter = page_token_dict.get(_PAGE_TOKEN_FILTER_KEY, None)
380-
return filter, offset
381-
382-
383-
def _build_filter_where_clauses(
384-
*,
385-
filter_value: str | None,
386-
current_user: str | None,
387-
) -> tuple[list[sql.ColumnElement], str | None]:
388-
"""Parse a filter string into SQLAlchemy WHERE clauses.
389-
390-
Returns (where_clauses, next_page_filter_value). The second value is the
391-
filter string with shorthand values resolved (e.g. "created_by:me" becomes
392-
"created_by:alice@example.com") so it can be embedded in the next page token.
393-
"""
394-
where_clauses: list[sql.ColumnElement] = []
395-
parsed_filter = _parse_filter(filter_value) if filter_value else {}
396-
for key, value in parsed_filter.items():
397-
if key == "_text":
398-
raise NotImplementedError("Text search is not implemented yet.")
399-
elif key == "created_by":
400-
if value == "me":
401-
if current_user is None:
402-
current_user = ""
403-
value = current_user
404-
# TODO: Maybe make this a bit more robust.
405-
# We need to change the filter since it goes into the next_page_token.
406-
filter_value = filter_value.replace(
407-
"created_by:me", f"created_by:{current_user}"
408-
)
409-
if value:
410-
where_clauses.append(bts.PipelineRun.created_by == value)
411-
else:
412-
where_clauses.append(bts.PipelineRun.created_by == None)
413-
else:
414-
raise NotImplementedError(f"Unsupported filter {filter_value}.")
415-
return where_clauses, filter_value
416-
417-
418-
def _decode_page_token(page_token: str) -> dict[str, Any]:
419-
return json.loads(base64.b64decode(page_token)) if page_token else {}
420-
421-
422-
def _encode_page_token(page_token_dict: dict[str, Any]) -> str:
423-
return (base64.b64encode(json.dumps(page_token_dict).encode("utf8"))).decode(
424-
"utf-8"
425-
)
426-
427-
428-
def _parse_filter(filter: str) -> dict[str, str]:
429-
# TODO: Improve
430-
parts = filter.strip().split()
431-
parsed_filter = {}
432-
for part in parts:
433-
key, sep, value = part.partition(":")
434-
if sep:
435-
parsed_filter[key] = value
436-
else:
437-
parsed_filter.setdefault("_text", "")
438-
parsed_filter["_text"] += part
439-
return parsed_filter
440-
441-
442347
# ========== ExecutionNodeApiService_Sql
443348

444349

cloud_pipelines_backend/database_ops.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,18 @@ def migrate_db(db_engine: sqlalchemy.Engine):
7777
for index in bts.ExecutionNode.__table__.indexes:
7878
if index.name == "ix_execution_node_container_execution_cache_key":
7979
index.create(db_engine, checkfirst=True)
80+
81+
# Covering index for annotation-based filter_query predicates.
82+
# Column order: (key, pipeline_run_id, value) enables B-tree seek on key,
83+
# ordered pipeline_run_id scan, and avoids table lookups for all predicates.
84+
annotation_index = sqlalchemy.Index(
85+
"ix_pipeline_run_annotation_key_run_id_value",
86+
bts.PipelineRunAnnotation.key,
87+
bts.PipelineRunAnnotation.pipeline_run_id,
88+
bts.PipelineRunAnnotation.value,
89+
)
90+
annotation_index.create(db_engine, checkfirst=True)
91+
92+
# TODO: I believe this is needed with the comments above?
93+
# Workaround for https://github.com/sqlalchemy/sqlalchemy/issues/12965
94+
bts.PipelineRunAnnotation.__table__.indexes.discard(annotation_index)

0 commit comments

Comments
 (0)