Skip to content

Commit 5a5675d

Browse files
committed
feat: Search pipeline run API uses pagination cursor
1 parent 300d652 commit 5a5675d

7 files changed

Lines changed: 329 additions & 142 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,26 +181,27 @@ def list(
181181
include_pipeline_names: bool = False,
182182
include_execution_stats: bool = False,
183183
) -> ListPipelineJobsResponse:
184-
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
184+
where_clauses = filter_query_sql.build_list_filters(
185185
filter_value=filter,
186186
filter_query_value=filter_query,
187-
page_token_value=page_token,
187+
cursor_value=page_token,
188188
current_user=current_user,
189-
page_size=_DEFAULT_PAGE_SIZE,
190189
)
191190

192191
pipeline_runs = list(
193192
session.scalars(
194193
sql.select(bts.PipelineRun)
195194
.where(*where_clauses)
196-
.order_by(bts.PipelineRun.created_at.desc())
197-
.offset(offset)
195+
.order_by(
196+
bts.PipelineRun.created_at.desc(),
197+
bts.PipelineRun.id.desc(),
198+
)
198199
.limit(_DEFAULT_PAGE_SIZE)
199200
).all()
200201
)
201202

202-
next_page_token = (
203-
next_token.encode() if len(pipeline_runs) >= _DEFAULT_PAGE_SIZE else None
203+
next_page_token = filter_query_sql.maybe_next_page_token(
204+
rows=pipeline_runs, page_size=_DEFAULT_PAGE_SIZE
204205
)
205206

206207
return ListPipelineJobsResponse(

cloud_pipelines_backend/backend_types_sql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
1414
"ix_execution_node_container_execution_cache_key"
1515
)
16+
IX_PR_CREATED_AT_DESC_ID_DESC: Final[str] = "ix_pr_created_at_desc_id_desc"
1617
IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
1718
"ix_pipeline_run_annotation_run_id_key_value"
1819
)
@@ -167,6 +168,11 @@ class PipelineRun(_TableBase):
167168
created_by,
168169
created_at.desc(),
169170
),
171+
sql.Index(
172+
IX_PR_CREATED_AT_DESC_ID_DESC,
173+
created_at.desc(),
174+
id.desc(),
175+
),
170176
)
171177

172178

cloud_pipelines_backend/database_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8686
index.create(db_engine, checkfirst=True)
8787
break
8888

89+
for index in bts.PipelineRun.__table__.indexes:
90+
if index.name == bts.IX_PR_CREATED_AT_DESC_ID_DESC:
91+
index.create(db_engine, checkfirst=True)
92+
break
93+
8994
backfill_created_by_annotations(db_engine)
9095
backfill_pipeline_name_annotations(db_engine)
9196

cloud_pipelines_backend/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ class MutuallyExclusiveFilterError(ApiValidationError):
2222

2323
class InvalidAnnotationKeyError(ApiValidationError):
2424
pass
25+
26+
27+
class InvalidPageTokenError(ApiValidationError):
28+
pass

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import base64
2-
import dataclasses
31
import datetime
42
import json
53
import enum
@@ -39,26 +37,58 @@ class SystemKey(enum.StrEnum):
3937
}
4038

4139
# ---------------------------------------------------------------------------
42-
# PageToken
40+
# Cursor encode / decode
4341
# ---------------------------------------------------------------------------
4442

43+
CURSOR_SEPARATOR: Final[str] = "~"
4544

46-
@dataclasses.dataclass(kw_only=True)
47-
class PageToken:
48-
offset: int = 0
49-
filter: str | None = None
50-
filter_query: str | None = None
5145

52-
def encode(self) -> str:
53-
return base64.b64encode(
54-
json.dumps(dataclasses.asdict(self)).encode("utf-8")
55-
).decode("utf-8")
46+
def encode_cursor(created_at: datetime.datetime, run_id: str) -> str:
47+
"""Encode the last row's position as a tilde-separated cursor string.
5648
57-
@classmethod
58-
def decode(cls, token: str | None) -> "PageToken":
59-
if not token:
60-
return cls()
61-
return cls(**json.loads(base64.b64decode(token)))
49+
The created_at from PipelineRun is naive UTC (no UtcDateTime decorator on
50+
this column). We stamp it as UTC here so the cursor string is
51+
timezone-explicit for readability and correctness.
52+
decode_cursor() normalizes back to naive UTC for DB comparison.
53+
"""
54+
if created_at.tzinfo is None:
55+
created_at = created_at.replace(tzinfo=datetime.timezone.utc)
56+
return f"{created_at.isoformat()}{CURSOR_SEPARATOR}{run_id}"
57+
58+
59+
def decode_cursor(cursor: str | None) -> tuple[datetime.datetime, str] | None:
60+
"""Parse a tilde-separated cursor string into (created_at, run_id).
61+
62+
Returns None for empty/missing cursors. Raises InvalidPageTokenError
63+
for unrecognized formats (e.g. legacy base64 tokens).
64+
"""
65+
if not cursor:
66+
return None
67+
if CURSOR_SEPARATOR not in cursor:
68+
raise errors.InvalidPageTokenError(
69+
f"Unrecognized page_token format. "
70+
f"Expected 'created_at~id' cursor. token={cursor[:20]}... (truncated)"
71+
)
72+
# maxsplit=1: split on first ~ only, so run_id can safely contain ~
73+
created_at_str, run_id = cursor.split(CURSOR_SEPARATOR, 1)
74+
created_at = datetime.datetime.fromisoformat(created_at_str)
75+
# Normalize to naive UTC to match DB storage format (PipelineRun.created_at
76+
# is plain DateTime, not UtcDateTime -- stores/returns naive datetimes).
77+
if created_at.tzinfo is not None:
78+
created_at = created_at.astimezone(datetime.timezone.utc).replace(tzinfo=None)
79+
return created_at, run_id
80+
81+
82+
def maybe_next_page_token(
83+
*,
84+
rows: list[bts.PipelineRun],
85+
page_size: int,
86+
) -> str | None:
87+
"""Return a cursor token for the next page, or None if this is the last page."""
88+
if len(rows) < page_size:
89+
return None
90+
last = rows[page_size - 1]
91+
return encode_cursor(last.created_at, last.id)
6292

6393

6494
# ---------------------------------------------------------------------------
@@ -159,26 +189,15 @@ def build_list_filters(
159189
*,
160190
filter_value: str | None,
161191
filter_query_value: str | None,
162-
page_token_value: str | None,
192+
cursor_value: str | None,
163193
current_user: str | None,
164-
page_size: int,
165-
) -> tuple[list[sql.ColumnElement], int, PageToken]:
166-
"""Resolve pagination token, legacy filter, and filter_query into WHERE clauses.
167-
168-
Returns (where_clauses, offset, next_page_token).
169-
"""
194+
) -> list[sql.ColumnElement]:
195+
"""Build WHERE clauses from filters and cursor."""
170196
if filter_value and filter_query_value:
171197
raise errors.MutuallyExclusiveFilterError(
172198
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
173199
)
174200

175-
page_token = PageToken.decode(page_token_value)
176-
offset = page_token.offset
177-
filter_value = page_token.filter if page_token_value else filter_value
178-
filter_query_value = (
179-
page_token.filter_query if page_token_value else filter_query_value
180-
)
181-
182201
if filter_value:
183202
filter_query_value = _convert_legacy_filter_to_filter_query(
184203
filter_value=filter_value,
@@ -194,13 +213,18 @@ def build_list_filters(
194213
)
195214
)
196215

197-
next_page_token = PageToken(
198-
offset=offset + page_size,
199-
filter=None,
200-
filter_query=filter_query_value,
201-
)
216+
cursor = decode_cursor(cursor_value)
217+
if cursor:
218+
cursor_created_at, cursor_id = cursor
219+
where_clauses.append(
220+
sql.tuple_(bts.PipelineRun.created_at, bts.PipelineRun.id)
221+
< sql.tuple_(
222+
sql.literal(cursor_created_at),
223+
sql.literal(cursor_id),
224+
)
225+
)
202226

203-
return where_clauses, offset, next_page_token
227+
return where_clauses
204228

205229

206230
def filter_query_to_where_clause(

tests/test_api_server_sql.py

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def test_list_pagination(self, session_factory, service):
182182
)
183183
assert len(page1.pipeline_runs) == 10
184184
assert page1.next_page_token is not None
185+
assert "~" in page1.next_page_token
185186

186187
with session_factory() as session:
187188
page2 = service.list(
@@ -191,6 +192,69 @@ def test_list_pagination(self, session_factory, service):
191192
assert len(page2.pipeline_runs) == 2
192193
assert page2.next_page_token is None
193194

195+
def test_list_cursor_pagination_order(self, session_factory, service):
196+
for i in range(5):
197+
_create_run(
198+
session_factory,
199+
service,
200+
root_task=_make_task_spec(f"pipeline-{i}"),
201+
)
202+
203+
with session_factory() as session:
204+
result = service.list(session=session)
205+
dates = [r.created_at for r in result.pipeline_runs]
206+
assert dates == sorted(dates, reverse=True)
207+
208+
def test_list_cursor_pagination_no_overlap(self, session_factory, service):
209+
for i in range(12):
210+
_create_run(
211+
session_factory,
212+
service,
213+
root_task=_make_task_spec(f"pipeline-{i}"),
214+
)
215+
216+
with session_factory() as session:
217+
page1 = service.list(session=session)
218+
with session_factory() as session:
219+
page2 = service.list(session=session, page_token=page1.next_page_token)
220+
page1_ids = {r.id for r in page1.pipeline_runs}
221+
page2_ids = {r.id for r in page2.pipeline_runs}
222+
assert page1_ids.isdisjoint(page2_ids)
223+
224+
def test_list_cursor_pagination_stable_under_inserts(
225+
self, session_factory, service
226+
):
227+
for i in range(12):
228+
_create_run(
229+
session_factory,
230+
service,
231+
root_task=_make_task_spec(f"pipeline-{i}"),
232+
)
233+
234+
with session_factory() as session:
235+
page1 = service.list(session=session)
236+
page1_ids = {r.id for r in page1.pipeline_runs}
237+
238+
_create_run(
239+
session_factory,
240+
service,
241+
root_task=_make_task_spec("pipeline-new"),
242+
)
243+
244+
with session_factory() as session:
245+
page2 = service.list(session=session, page_token=page1.next_page_token)
246+
page2_ids = {r.id for r in page2.pipeline_runs}
247+
assert page1_ids.isdisjoint(page2_ids)
248+
assert len(page2.pipeline_runs) == 2
249+
250+
def test_list_invalid_page_token_raises(self, session_factory, service):
251+
"""page_token without ~ raises InvalidPageTokenError (422)."""
252+
with session_factory() as session:
253+
with pytest.raises(
254+
errors.InvalidPageTokenError, match="Unrecognized page_token"
255+
):
256+
service.list(session=session, page_token="not-a-cursor")
257+
194258
def test_list_filter_unsupported(self, session_factory, service):
195259
with session_factory() as session:
196260
with pytest.raises(NotImplementedError, match="Unsupported filter"):
@@ -1254,7 +1318,7 @@ def test_list_filter_query_time_range_offset_timezone(
12541318
returned_ids = {r.id for r in result.pipeline_runs}
12551319
assert returned_ids == {run_b.id, run_c.id}
12561320

1257-
def test_pagination_preserves_filter_query(self, session_factory, service):
1321+
def test_pagination_with_filter_query(self, session_factory, service):
12581322
for _ in range(12):
12591323
run = _create_run(
12601324
session_factory,
@@ -1278,14 +1342,13 @@ def test_pagination_preserves_filter_query(self, session_factory, service):
12781342
)
12791343
assert len(page1.pipeline_runs) == 10
12801344
assert page1.next_page_token is not None
1281-
1282-
decoded = filter_query_sql.PageToken.decode(page1.next_page_token)
1283-
assert decoded.filter_query == fq
1345+
assert "~" in page1.next_page_token
12841346

12851347
with session_factory() as session:
12861348
page2 = service.list(
12871349
session=session,
12881350
page_token=page1.next_page_token,
1351+
filter_query=fq,
12891352
)
12901353
assert len(page2.pipeline_runs) == 2
12911354
assert page2.next_page_token is None

0 commit comments

Comments
 (0)