Skip to content

Commit 331fe0f

Browse files
committed
feat: Search annotations in pipeline run API
1 parent fdbdc71 commit 331fe0f

7 files changed

Lines changed: 1032 additions & 211 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 10 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import base64
21
import dataclasses
32
import datetime
4-
import json
53
import logging
64
import typing
75
from typing import Any, Final, Optional
@@ -12,7 +10,7 @@
1210
from . import backend_types_sql as bts
1311
from . import component_structures as structures
1412
from . import errors
15-
from . import filter_query_models
13+
from . import filter_query_sql
1614

1715
if typing.TYPE_CHECKING:
1816
from cloud_pipelines.orchestration.storage_providers import (
@@ -69,8 +67,6 @@ class ListPipelineJobsResponse:
6967

7068
class PipelineRunsApiService_Sql:
7169
_PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name"
72-
_PAGE_TOKEN_OFFSET_KEY: Final[str] = "offset"
73-
_PAGE_TOKEN_FILTER_KEY: Final[str] = "filter"
7470
_DEFAULT_PAGE_SIZE: Final[int] = 10
7571

7672
def create(
@@ -173,22 +169,12 @@ def list(
173169
include_pipeline_names: bool = False,
174170
include_execution_stats: bool = False,
175171
) -> ListPipelineJobsResponse:
176-
if filter and filter_query:
177-
raise errors.ApiValidationError(
178-
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
179-
)
180-
181-
if filter_query:
182-
filter_query_models.FilterQuery.model_validate_json(filter_query)
183-
raise NotImplementedError("filter_query is not yet implemented.")
184-
185-
filter_value, offset = _resolve_filter_value(
186-
filter=filter,
187-
page_token=page_token,
188-
)
189-
where_clauses, next_page_filter_value = _build_filter_where_clauses(
190-
filter_value=filter_value,
172+
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
173+
filter_value=filter,
174+
filter_query_value=filter_query,
175+
page_token_value=page_token,
191176
current_user=current_user,
177+
page_size=self._DEFAULT_PAGE_SIZE,
192178
)
193179

194180
pipeline_runs = list(
@@ -200,14 +186,10 @@ def list(
200186
.limit(self._DEFAULT_PAGE_SIZE)
201187
).all()
202188
)
203-
next_page_offset = offset + self._DEFAULT_PAGE_SIZE
204-
next_page_token_dict = {
205-
self._PAGE_TOKEN_OFFSET_KEY: next_page_offset,
206-
self._PAGE_TOKEN_FILTER_KEY: next_page_filter_value,
207-
}
208-
next_page_token = _encode_page_token(next_page_token_dict)
209-
if len(pipeline_runs) < self._DEFAULT_PAGE_SIZE:
210-
next_page_token = None
189+
190+
next_page_token = (
191+
next_token if len(pipeline_runs) >= self._DEFAULT_PAGE_SIZE else None
192+
)
211193

212194
return ListPipelineJobsResponse(
213195
pipeline_runs=[
@@ -348,88 +330,6 @@ def delete_annotation(
348330
session.commit()
349331

350332

351-
def _resolve_filter_value(
352-
*,
353-
filter: str | None,
354-
page_token: str | None,
355-
) -> tuple[str | None, int]:
356-
"""Decode page_token and return the effective (filter_value, offset).
357-
358-
If a page_token is present, its stored filter takes precedence over the
359-
raw filter parameter (the token carries the resolved filter forward across pages).
360-
"""
361-
page_token_dict = _decode_page_token(page_token)
362-
offset = page_token_dict.get(
363-
PipelineRunsApiService_Sql._PAGE_TOKEN_OFFSET_KEY,
364-
0,
365-
)
366-
if page_token:
367-
filter = page_token_dict.get(
368-
PipelineRunsApiService_Sql._PAGE_TOKEN_FILTER_KEY,
369-
None,
370-
)
371-
return filter, offset
372-
373-
374-
def _build_filter_where_clauses(
375-
*,
376-
filter_value: str | None,
377-
current_user: str | None,
378-
) -> tuple[list[sql.ColumnElement], str | None]:
379-
"""Parse a filter string into SQLAlchemy WHERE clauses.
380-
381-
Returns (where_clauses, next_page_filter_value). The second value is the
382-
filter string with shorthand values resolved (e.g. "created_by:me" becomes
383-
"created_by:alice@example.com") so it can be embedded in the next page token.
384-
"""
385-
where_clauses: list[sql.ColumnElement] = []
386-
parsed_filter = _parse_filter(filter_value) if filter_value else {}
387-
for key, value in parsed_filter.items():
388-
if key == "_text":
389-
raise NotImplementedError("Text search is not implemented yet.")
390-
elif key == "created_by":
391-
if value == "me":
392-
if current_user is None:
393-
current_user = ""
394-
value = current_user
395-
# TODO: Maybe make this a bit more robust.
396-
# We need to change the filter since it goes into the next_page_token.
397-
filter_value = filter_value.replace(
398-
"created_by:me", f"created_by:{current_user}"
399-
)
400-
if value:
401-
where_clauses.append(bts.PipelineRun.created_by == value)
402-
else:
403-
where_clauses.append(bts.PipelineRun.created_by == None)
404-
else:
405-
raise NotImplementedError(f"Unsupported filter {filter_value}.")
406-
return where_clauses, filter_value
407-
408-
409-
def _decode_page_token(page_token: str) -> dict[str, Any]:
410-
return json.loads(base64.b64decode(page_token)) if page_token else {}
411-
412-
413-
def _encode_page_token(page_token_dict: dict[str, Any]) -> str:
414-
return (base64.b64encode(json.dumps(page_token_dict).encode("utf8"))).decode(
415-
"utf-8"
416-
)
417-
418-
419-
def _parse_filter(filter: str) -> dict[str, str]:
420-
# TODO: Improve
421-
parts = filter.strip().split()
422-
parsed_filter = {}
423-
for part in parts:
424-
key, sep, value = part.partition(":")
425-
if sep:
426-
parsed_filter[key] = value
427-
else:
428-
parsed_filter.setdefault("_text", "")
429-
parsed_filter["_text"] += part
430-
return parsed_filter
431-
432-
433333
# ========== ExecutionNodeApiService_Sql
434334

435335

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import datetime
33
import enum
44
import typing
5-
from typing import Any
5+
from typing import Any, Final
66

77
import sqlalchemy as sql
88
from sqlalchemy import orm
@@ -64,7 +64,7 @@ def generate_unique_id() -> str:
6464

6565
# # Needed to put a union type into DB
6666
# class SqlIOTypeStruct(_BaseModel):
67-
# type: structures.TypeSpecType
67+
# type: structures.TypeSpecType
6868
# No. We'll represent TypeSpecType as name:str + properties:dict
6969
# Supported cases:
7070
# * type: "name"
@@ -294,6 +294,9 @@ class ExecutionToAncestorExecutionLink(_TableBase):
294294
# So we need to jump through extra hoops to make the relationship many-to-many again.
295295
class ExecutionNode(_TableBase):
296296
__tablename__ = "execution_node"
297+
_IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
298+
"ix_execution_node_container_execution_cache_key"
299+
)
297300
id: orm.Mapped[IdType] = orm.mapped_column(
298301
primary_key=True, init=False, insert_default=generate_unique_id
299302
)
@@ -491,6 +494,9 @@ class ContainerExecution(_TableBase):
491494

492495
class PipelineRunAnnotation(_TableBase):
493496
__tablename__ = "pipeline_run_annotation"
497+
_IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
498+
"ix_pipeline_run_annotation_pipeline_run_id_key_value"
499+
)
494500
pipeline_run_id: orm.Mapped[IdType] = orm.mapped_column(
495501
sql.ForeignKey(PipelineRun.id),
496502
primary_key=True,
@@ -500,6 +506,15 @@ class PipelineRunAnnotation(_TableBase):
500506
key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True)
501507
value: orm.Mapped[str | None] = orm.mapped_column(default=None)
502508

509+
__table_args__ = (
510+
sql.Index(
511+
_IX_ANNOTATION_RUN_ID_KEY_VALUE,
512+
"pipeline_run_id",
513+
"key",
514+
"value",
515+
),
516+
)
517+
503518

504519
class Secret(_TableBase):
505520
__tablename__ = "secret"

cloud_pipelines_backend/component_structures.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import pydantic.alias_generators
5555
from pydantic.dataclasses import dataclass as pydantic_dataclasses
5656

57-
5857
# PrimitiveTypes = Union[str, int, float, bool]
5958
PrimitiveTypes = str
6059

cloud_pipelines_backend/database_ops.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
7575
# Or we need to avoid calling the Index constructor.
7676

7777
for index in bts.ExecutionNode.__table__.indexes:
78-
if index.name == "ix_execution_node_container_execution_cache_key":
78+
if index.name == bts.ExecutionNode._IX_EXECUTION_NODE_CACHE_KEY:
7979
index.create(db_engine, checkfirst=True)
80+
break
81+
82+
for index in bts.PipelineRunAnnotation.__table__.indexes:
83+
if index.name == bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE:
84+
index.create(db_engine, checkfirst=True)
85+
break

0 commit comments

Comments
 (0)