Skip to content

Commit 9b0355f

Browse files
committed
feat: Search pipeline run API uses pagination cursor
1 parent 0c95a90 commit 9b0355f

7 files changed

Lines changed: 330 additions & 142 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,26 +182,27 @@ def list(
182182
include_pipeline_names: bool = False,
183183
include_execution_stats: bool = False,
184184
) -> ListPipelineJobsResponse:
185-
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
185+
where_clauses = filter_query_sql.build_list_filters(
186186
filter_value=filter,
187187
filter_query_value=filter_query,
188-
page_token_value=page_token,
188+
cursor_value=page_token,
189189
current_user=current_user,
190-
page_size=_DEFAULT_PAGE_SIZE,
191190
)
192191

193192
pipeline_runs = list(
194193
session.scalars(
195194
sql.select(bts.PipelineRun)
196195
.where(*where_clauses)
197-
.order_by(bts.PipelineRun.created_at.desc())
198-
.offset(offset)
196+
.order_by(
197+
bts.PipelineRun.created_at.desc(),
198+
bts.PipelineRun.id.desc(),
199+
)
199200
.limit(_DEFAULT_PAGE_SIZE)
200201
).all()
201202
)
202203

203-
next_page_token = (
204-
next_token.encode() if len(pipeline_runs) >= _DEFAULT_PAGE_SIZE else None
204+
next_page_token = filter_query_sql.maybe_next_page_token(
205+
rows=pipeline_runs, page_size=_DEFAULT_PAGE_SIZE
205206
)
206207

207208
return ListPipelineJobsResponse(

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
1414
"ix_execution_node_container_execution_cache_key"
1515
)
16+
IX_PR_CREATED_AT_DESC_ID_DESC: Final[str] = "ix_pr_created_at_desc_id_desc"
1617
IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
1718
"ix_pipeline_run_annotation_run_id_key_value"
1819
)
@@ -167,6 +168,11 @@ class PipelineRun(_TableBase):
167168
created_by,
168169
created_at.desc(),
169170
),
171+
sql.Index(
172+
IX_PR_CREATED_AT_DESC_ID_DESC,
173+
created_at.desc(),
174+
id.desc(),
175+
),
170176
)
171177

172178

cloud_pipelines_backend/database_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
9292
index.create(db_engine, checkfirst=True)
9393
break
9494

95+
for index in bts.PipelineRun.__table__.indexes:
96+
if index.name == bts.IX_PR_CREATED_AT_DESC_ID_DESC:
97+
index.create(db_engine, checkfirst=True)
98+
break
99+
95100
backfill_created_by_annotations(db_engine=db_engine)
96101
backfill_pipeline_name_annotations(db_engine=db_engine)
97102

cloud_pipelines_backend/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ class MutuallyExclusiveFilterError(ApiValidationError):
2222

2323
class InvalidAnnotationKeyError(ApiValidationError):
2424
pass
25+
26+
27+
class InvalidPageTokenError(ApiValidationError):
28+
pass

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import base64
2-
import dataclasses
31
import datetime
42
import json
53
import enum
@@ -39,26 +37,58 @@ class SystemKey(enum.StrEnum):
3937
}
4038

4139
# ---------------------------------------------------------------------------
42-
# PageToken
40+
# Cursor encode / decode
4341
# ---------------------------------------------------------------------------
4442

43+
CURSOR_SEPARATOR: Final[str] = "~"
4544

46-
@dataclasses.dataclass(kw_only=True)
47-
class PageToken:
48-
offset: int = 0
49-
filter: str | None = None
50-
filter_query: str | None = None
5145

52-
def encode(self) -> str:
53-
return base64.b64encode(
54-
json.dumps(dataclasses.asdict(self)).encode("utf-8")
55-
).decode("utf-8")
46+
def encode_cursor(created_at: datetime.datetime, run_id: str) -> str:
47+
"""Encode the last row's position as a tilde-separated cursor string.
5648
57-
@classmethod
58-
def decode(cls, token: str | None) -> "PageToken":
59-
if not token:
60-
return cls()
61-
return cls(**json.loads(base64.b64decode(token)))
49+
The created_at from PipelineRun is naive UTC (no UtcDateTime decorator on
50+
this column). We stamp it as UTC here so the cursor string is
51+
timezone-explicit for readability and correctness.
52+
decode_cursor() normalizes back to naive UTC for DB comparison.
53+
"""
54+
if created_at.tzinfo is None:
55+
created_at = created_at.replace(tzinfo=datetime.timezone.utc)
56+
return f"{created_at.isoformat()}{CURSOR_SEPARATOR}{run_id}"
57+
58+
59+
def decode_cursor(cursor: str | None) -> tuple[datetime.datetime, str] | None:
60+
"""Parse a tilde-separated cursor string into (created_at, run_id).
61+
62+
Returns None for empty/missing cursors. Raises InvalidPageTokenError
63+
for unrecognized formats (e.g. legacy base64 tokens).
64+
"""
65+
if not cursor:
66+
return None
67+
if CURSOR_SEPARATOR not in cursor:
68+
raise errors.InvalidPageTokenError(
69+
f"Unrecognized page_token format. "
70+
f"Expected 'created_at~id' cursor. token={cursor[:20]}... (truncated)"
71+
)
72+
# maxsplit=1: split on first ~ only, so run_id can safely contain ~
73+
created_at_str, run_id = cursor.split(CURSOR_SEPARATOR, 1)
74+
created_at = datetime.datetime.fromisoformat(created_at_str)
75+
# Normalize to naive UTC to match DB storage format (PipelineRun.created_at
76+
# is plain DateTime, not UtcDateTime -- stores/returns naive datetimes).
77+
if created_at.tzinfo is not None:
78+
created_at = created_at.astimezone(datetime.timezone.utc).replace(tzinfo=None)
79+
return created_at, run_id
80+
81+
82+
def maybe_next_page_token(
83+
*,
84+
rows: list[bts.PipelineRun],
85+
page_size: int,
86+
) -> str | None:
87+
"""Return a cursor token for the next page, or None if this is the last page."""
88+
if len(rows) < page_size:
89+
return None
90+
last = rows[page_size - 1]
91+
return encode_cursor(last.created_at, last.id)
6292

6393

6494
# ---------------------------------------------------------------------------
@@ -159,26 +189,15 @@ def build_list_filters(
159189
*,
160190
filter_value: str | None,
161191
filter_query_value: str | None,
162-
page_token_value: str | None,
192+
cursor_value: str | None,
163193
current_user: str | None,
164-
page_size: int,
165-
) -> tuple[list[sql.ColumnElement], int, PageToken]:
166-
"""Resolve pagination token, legacy filter, and filter_query into WHERE clauses.
167-
168-
Returns (where_clauses, offset, next_page_token).
169-
"""
194+
) -> list[sql.ColumnElement]:
195+
"""Build WHERE clauses from filters and cursor."""
170196
if filter_value and filter_query_value:
171197
raise errors.MutuallyExclusiveFilterError(
172198
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
173199
)
174200

175-
page_token = PageToken.decode(page_token_value)
176-
offset = page_token.offset
177-
filter_value = page_token.filter if page_token_value else filter_value
178-
filter_query_value = (
179-
page_token.filter_query if page_token_value else filter_query_value
180-
)
181-
182201
if filter_value:
183202
filter_query_value = _convert_legacy_filter_to_filter_query(
184203
filter_value=filter_value,
@@ -194,13 +213,18 @@ def build_list_filters(
194213
)
195214
)
196215

197-
next_page_token = PageToken(
198-
offset=offset + page_size,
199-
filter=None,
200-
filter_query=filter_query_value,
201-
)
216+
cursor = decode_cursor(cursor_value)
217+
if cursor:
218+
cursor_created_at, cursor_id = cursor
219+
where_clauses.append(
220+
sql.tuple_(bts.PipelineRun.created_at, bts.PipelineRun.id)
221+
< sql.tuple_(
222+
sql.literal(cursor_created_at),
223+
sql.literal(cursor_id),
224+
)
225+
)
202226

203-
return where_clauses, offset, next_page_token
227+
return where_clauses
204228

205229

206230
def filter_query_to_where_clause(

tests/test_api_server_sql.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def test_list_pagination(self, session_factory, service):
182182
)
183183
assert len(page1.pipeline_runs) == 10
184184
assert page1.next_page_token is not None
185+
assert "~" in page1.next_page_token
185186

186187
with session_factory() as session:
187188
page2 = service.list(
@@ -191,6 +192,70 @@ def test_list_pagination(self, session_factory, service):
191192
assert len(page2.pipeline_runs) == 2
192193
assert page2.next_page_token is None
193194

195+
def test_list_cursor_pagination_order(self, session_factory, service):
196+
for i in range(5):
197+
_create_run(
198+
session_factory,
199+
service,
200+
root_task=_make_task_spec(f"pipeline-{i}"),
201+
)
202+
203+
with session_factory() as session:
204+
result = service.list(session=session)
205+
206+
dates = [r.created_at for r in result.pipeline_runs]
207+
assert dates == sorted(dates, reverse=True)
208+
209+
def test_list_cursor_pagination_no_overlap(self, session_factory, service):
210+
for i in range(12):
211+
_create_run(
212+
session_factory,
213+
service,
214+
root_task=_make_task_spec(f"pipeline-{i}"),
215+
)
216+
217+
with session_factory() as session:
218+
page1 = service.list(session=session)
219+
with session_factory() as session:
220+
page2 = service.list(session=session, page_token=page1.next_page_token)
221+
page1_ids = {r.id for r in page1.pipeline_runs}
222+
page2_ids = {r.id for r in page2.pipeline_runs}
223+
assert page1_ids.isdisjoint(page2_ids)
224+
225+
def test_list_cursor_pagination_stable_under_inserts(
226+
self, session_factory, service
227+
):
228+
for i in range(12):
229+
_create_run(
230+
session_factory,
231+
service,
232+
root_task=_make_task_spec(f"pipeline-{i}"),
233+
)
234+
235+
with session_factory() as session:
236+
page1 = service.list(session=session)
237+
page1_ids = {r.id for r in page1.pipeline_runs}
238+
239+
_create_run(
240+
session_factory,
241+
service,
242+
root_task=_make_task_spec("pipeline-new"),
243+
)
244+
245+
with session_factory() as session:
246+
page2 = service.list(session=session, page_token=page1.next_page_token)
247+
page2_ids = {r.id for r in page2.pipeline_runs}
248+
assert page1_ids.isdisjoint(page2_ids)
249+
assert len(page2.pipeline_runs) == 2
250+
251+
def test_list_invalid_page_token_raises(self, session_factory, service):
252+
"""page_token without ~ raises InvalidPageTokenError (422)."""
253+
with session_factory() as session:
254+
with pytest.raises(
255+
errors.InvalidPageTokenError, match="Unrecognized page_token"
256+
):
257+
service.list(session=session, page_token="not-a-cursor")
258+
194259
def test_list_filter_unsupported(self, session_factory, service):
195260
with session_factory() as session:
196261
with pytest.raises(NotImplementedError, match="Unsupported filter"):
@@ -1254,7 +1319,7 @@ def test_list_filter_query_time_range_offset_timezone(
12541319
returned_ids = {r.id for r in result.pipeline_runs}
12551320
assert returned_ids == {run_b.id, run_c.id}
12561321

1257-
def test_pagination_preserves_filter_query(self, session_factory, service):
1322+
def test_pagination_with_filter_query(self, session_factory, service):
12581323
for _ in range(12):
12591324
run = _create_run(
12601325
session_factory,
@@ -1278,14 +1343,13 @@ def test_pagination_preserves_filter_query(self, session_factory, service):
12781343
)
12791344
assert len(page1.pipeline_runs) == 10
12801345
assert page1.next_page_token is not None
1281-
1282-
decoded = filter_query_sql.PageToken.decode(page1.next_page_token)
1283-
assert decoded.filter_query == fq
1346+
assert "~" in page1.next_page_token
12841347

12851348
with session_factory() as session:
12861349
page2 = service.list(
12871350
session=session,
12881351
page_token=page1.next_page_token,
1352+
filter_query=fq,
12891353
)
12901354
assert len(page2.pipeline_runs) == 2
12911355
assert page2.next_page_token is None

0 commit comments

Comments
 (0)