Skip to content

Commit 7369ffc

Browse files
committed
feat: Search annotations in pipeline run API
1 parent fdbdc71 commit 7369ffc

7 files changed

Lines changed: 952 additions & 209 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 12 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import base64
21
import dataclasses
32
import datetime
4-
import json
53
import logging
64
import typing
75
from typing import Any, Final, Optional
@@ -12,7 +10,7 @@
1210
from . import backend_types_sql as bts
1311
from . import component_structures as structures
1412
from . import errors
15-
from . import filter_query_models
13+
from . import filter_query_sql
1614

1715
if typing.TYPE_CHECKING:
1816
from cloud_pipelines.orchestration.storage_providers import (
@@ -173,22 +171,12 @@ def list(
173171
include_pipeline_names: bool = False,
174172
include_execution_stats: bool = False,
175173
) -> ListPipelineJobsResponse:
176-
if filter and filter_query:
177-
raise errors.ApiValidationError(
178-
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
179-
)
180-
181-
if filter_query:
182-
filter_query_models.FilterQuery.model_validate_json(filter_query)
183-
raise NotImplementedError("filter_query is not yet implemented.")
184-
185-
filter_value, offset = _resolve_filter_value(
186-
filter=filter,
187-
page_token=page_token,
188-
)
189-
where_clauses, next_page_filter_value = _build_filter_where_clauses(
190-
filter_value=filter_value,
174+
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
175+
filter_value=filter,
176+
filter_query_value=filter_query,
177+
page_token_value=page_token,
191178
current_user=current_user,
179+
page_size=self._DEFAULT_PAGE_SIZE,
192180
)
193181

194182
pipeline_runs = list(
@@ -200,14 +188,12 @@ def list(
200188
.limit(self._DEFAULT_PAGE_SIZE)
201189
).all()
202190
)
203-
next_page_offset = offset + self._DEFAULT_PAGE_SIZE
204-
next_page_token_dict = {
205-
self._PAGE_TOKEN_OFFSET_KEY: next_page_offset,
206-
self._PAGE_TOKEN_FILTER_KEY: next_page_filter_value,
207-
}
208-
next_page_token = _encode_page_token(next_page_token_dict)
209-
if len(pipeline_runs) < self._DEFAULT_PAGE_SIZE:
210-
next_page_token = None
191+
192+
next_page_token = (
193+
next_token.encode()
194+
if len(pipeline_runs) >= self._DEFAULT_PAGE_SIZE
195+
else None
196+
)
211197

212198
return ListPipelineJobsResponse(
213199
pipeline_runs=[
@@ -348,88 +334,6 @@ def delete_annotation(
348334
session.commit()
349335

350336

351-
def _resolve_filter_value(
352-
*,
353-
filter: str | None,
354-
page_token: str | None,
355-
) -> tuple[str | None, int]:
356-
"""Decode page_token and return the effective (filter_value, offset).
357-
358-
If a page_token is present, its stored filter takes precedence over the
359-
raw filter parameter (the token carries the resolved filter forward across pages).
360-
"""
361-
page_token_dict = _decode_page_token(page_token)
362-
offset = page_token_dict.get(
363-
PipelineRunsApiService_Sql._PAGE_TOKEN_OFFSET_KEY,
364-
0,
365-
)
366-
if page_token:
367-
filter = page_token_dict.get(
368-
PipelineRunsApiService_Sql._PAGE_TOKEN_FILTER_KEY,
369-
None,
370-
)
371-
return filter, offset
372-
373-
374-
def _build_filter_where_clauses(
375-
*,
376-
filter_value: str | None,
377-
current_user: str | None,
378-
) -> tuple[list[sql.ColumnElement], str | None]:
379-
"""Parse a filter string into SQLAlchemy WHERE clauses.
380-
381-
Returns (where_clauses, next_page_filter_value). The second value is the
382-
filter string with shorthand values resolved (e.g. "created_by:me" becomes
383-
"created_by:alice@example.com") so it can be embedded in the next page token.
384-
"""
385-
where_clauses: list[sql.ColumnElement] = []
386-
parsed_filter = _parse_filter(filter_value) if filter_value else {}
387-
for key, value in parsed_filter.items():
388-
if key == "_text":
389-
raise NotImplementedError("Text search is not implemented yet.")
390-
elif key == "created_by":
391-
if value == "me":
392-
if current_user is None:
393-
current_user = ""
394-
value = current_user
395-
# TODO: Maybe make this a bit more robust.
396-
# We need to change the filter since it goes into the next_page_token.
397-
filter_value = filter_value.replace(
398-
"created_by:me", f"created_by:{current_user}"
399-
)
400-
if value:
401-
where_clauses.append(bts.PipelineRun.created_by == value)
402-
else:
403-
where_clauses.append(bts.PipelineRun.created_by == None)
404-
else:
405-
raise NotImplementedError(f"Unsupported filter {filter_value}.")
406-
return where_clauses, filter_value
407-
408-
409-
def _decode_page_token(page_token: str) -> dict[str, Any]:
410-
return json.loads(base64.b64decode(page_token)) if page_token else {}
411-
412-
413-
def _encode_page_token(page_token_dict: dict[str, Any]) -> str:
414-
return (base64.b64encode(json.dumps(page_token_dict).encode("utf8"))).decode(
415-
"utf-8"
416-
)
417-
418-
419-
def _parse_filter(filter: str) -> dict[str, str]:
420-
# TODO: Improve
421-
parts = filter.strip().split()
422-
parsed_filter = {}
423-
for part in parts:
424-
key, sep, value = part.partition(":")
425-
if sep:
426-
parsed_filter[key] = value
427-
else:
428-
parsed_filter.setdefault("_text", "")
429-
parsed_filter["_text"] += part
430-
return parsed_filter
431-
432-
433337
# ========== ExecutionNodeApiService_Sql
434338

435339

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@
22
import datetime
33
import enum
44
import typing
5-
from typing import Any
5+
from typing import Any, Final
66

77
import sqlalchemy as sql
88
from sqlalchemy import orm
99
from sqlalchemy.ext import mutable
1010

1111
IdType: typing.TypeAlias = str
1212

13+
IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
14+
"ix_execution_node_container_execution_cache_key"
15+
)
16+
IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
17+
"ix_pipeline_run_annotation_run_id_key_value"
18+
)
19+
1320

1421
class ContainerExecutionStatus(str, enum.Enum):
1522
INVALID = "INVALID" # Compatibility with Vertex AI CustomJob
@@ -64,7 +71,7 @@ def generate_unique_id() -> str:
6471

6572
# # Needed to put a union type into DB
6673
# class SqlIOTypeStruct(_BaseModel):
67-
# type: structures.TypeSpecType
74+
# type: structures.TypeSpecType
6875
# No. We'll represent TypeSpecType as name:str + properties:dict
6976
# Supported cases:
7077
# * type: "name"
@@ -500,6 +507,15 @@ class PipelineRunAnnotation(_TableBase):
500507
key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True)
501508
value: orm.Mapped[str | None] = orm.mapped_column(default=None)
502509

510+
__table_args__ = (
511+
sql.Index(
512+
IX_ANNOTATION_RUN_ID_KEY_VALUE,
513+
"pipeline_run_id",
514+
"key",
515+
"value",
516+
),
517+
)
518+
503519

504520
class Secret(_TableBase):
505521
__tablename__ = "secret"

cloud_pipelines_backend/component_structures.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import pydantic.alias_generators
5555
from pydantic.dataclasses import dataclass as pydantic_dataclasses
5656

57-
5857
# PrimitiveTypes = Union[str, int, float, bool]
5958
PrimitiveTypes = str
6059

cloud_pipelines_backend/database_ops.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
7575
# Or we need to avoid calling the Index constructor.
7676

7777
for index in bts.ExecutionNode.__table__.indexes:
78-
if index.name == "ix_execution_node_container_execution_cache_key":
78+
if index.name == bts.IX_EXECUTION_NODE_CACHE_KEY:
7979
index.create(db_engine, checkfirst=True)
80+
break
81+
82+
for index in bts.PipelineRunAnnotation.__table__.indexes:
83+
if index.name == bts.IX_ANNOTATION_RUN_ID_KEY_VALUE:
84+
index.create(db_engine, checkfirst=True)
85+
break

0 commit comments

Comments
 (0)