Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions aperag/mcp/cursor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
bindings (filters / collection_id / tenant_id)
* :class:`PaginationParams` / :class:`PaginationResult` — typed
request / response generic over the paginated item type
* :class:`CursorError` + the 6 canonical error codes (pending
spec amendment double-sign per architect msg=669db73c — error
module loaded after canonical lock)
* :class:`CursorError` + the 6 canonical error codes per §C.3
(``cursor_invalid`` / ``cursor_expired`` / ``cursor_filter_mismatch``
/ ``cursor_tenant_mismatch`` / ``cursor_index_changed`` /
``cursor_schema_unsupported``)

Search-rank cursor (vector / fulltext score-based) is intentionally
NOT shared with this module — D10.d carries its own cursor type
Expand Down
8 changes: 3 additions & 5 deletions aperag/mcp/cursor/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@
from aperag.mcp.cursor.errors import CursorError

# CURSOR_SCHEMA_VERSION bumps when the on-wire payload shape changes
# in an incompatible way. Decoders treat any `schema_version` they
# don't know about as `cursor_schema_unsupported` (§C.3) — pending
# the spec amendment double-sign on the canonical error code names
# per architect msg=669db73c, the literal raised here is owned by
# ``aperag.mcp.cursor.errors`` once that module lands.
# in an incompatible way. Decoders raise ``cursor_schema_unsupported``
# (§C.3) for any value they don't recognise; the literal lives in
# :mod:`aperag.mcp.cursor.errors`.
CURSOR_SCHEMA_VERSION: int = 1

DEFAULT_TTL_SECONDS: int = 3600 # §C.4 1h default; per-tool override
Expand Down
60 changes: 20 additions & 40 deletions aperag/mcp/tools/list_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,53 +24,21 @@
``async_db_ops.query_collections([user_id])`` only returns the caller's
own collections (§F.4 lock — server ignores any client-asserted scope).

Cursor / total_count: opaque base64 ``{"offset": int}`` for the first
cut. Bryce's D10.e follow-up (#97) replaces this with the proper cursor
codec — DO NOT reach into ``aperag/mcp/cursor/`` here (§G Forbidden).
Cursor / total_count: opaque base64 cursor produced by
:mod:`aperag.service.pagination`, which wraps the canonical
:mod:`aperag.mcp.cursor` codec around the offset bookkeeping below.
Malformed / expired / scope-mismatched cursors surface as canonical
``CursorError`` per §C.3.
"""

from __future__ import annotations

import base64
import json
from typing import Literal, Optional

from aperag.db.ops import async_db_ops
from aperag.mcp.tools._d9_base import authorization_gate, resolve_authenticated_user
from aperag.mcp.tools.schemas import CollectionList, CollectionMetadata


def _decode_cursor(cursor: Optional[str]) -> int:
"""Decode the placeholder D10.c offset cursor.

Returns 0 only when ``cursor`` is ``None`` or empty; raises
``ValueError`` on malformed cursor per §C explicit-not-silent. Bryce's
D10.e cursor codec replaces this placeholder.

# TODO(D10.e #97): replace with canonical CursorError after #97 integration
"""

if cursor is None or cursor == "":
return 0
try:
decoded = base64.urlsafe_b64decode(cursor.encode("ascii")).decode("utf-8")
payload = json.loads(decoded)
except Exception as exc:
raise ValueError(f"cursor decode failed: {exc}") from exc
if not isinstance(payload, dict) or "offset" not in payload:
raise ValueError("cursor decode failed: missing 'offset' key")
raw_offset = payload["offset"]
if isinstance(raw_offset, bool) or not isinstance(raw_offset, int):
raise ValueError("cursor decode failed: 'offset' must be a non-negative int")
if raw_offset < 0:
raise ValueError("cursor decode failed: 'offset' must be non-negative")
return raw_offset


def _encode_cursor(offset: int) -> str:
return base64.urlsafe_b64encode(json.dumps({"offset": offset}, separators=(",", ":")).encode("utf-8")).decode(
"ascii"
)
from aperag.service.pagination import decode_offset_cursor, encode_offset_cursor


def _sort_key(c, sort_by: str):
Expand Down Expand Up @@ -113,8 +81,19 @@ async def list_collections(
rows.sort(key=lambda c: _sort_key(c, sort_by) or 0, reverse=(sort_order == "desc"))

total = len(rows)
offset = _decode_cursor(cursor)
limit = max(1, min(int(limit), 200))

cursor_filters = {
"title_filter": title_filter,
"sort_order": sort_order,
}
cursor_kwargs = dict(
sort_key=sort_by,
filters=cursor_filters,
collection_id=None,
tenant_id=str(user.id),
)
offset = decode_offset_cursor(cursor, **cursor_kwargs)
page = rows[offset : offset + limit]

items = [
Expand All @@ -128,7 +107,8 @@ async def list_collections(
for c in page
]

next_cursor = _encode_cursor(offset + limit) if (offset + limit) < total else None
next_offset = offset + len(items)
next_cursor = encode_offset_cursor(offset=next_offset, **cursor_kwargs) if next_offset < total else None
return CollectionList(items=items, next_cursor=next_cursor, total_count=total)


Expand Down
61 changes: 22 additions & 39 deletions aperag/mcp/tools/list_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
3. ``authorization_gate(user, "list_documents")`` — D9 §2
4. fetch authoritative — un-cached.

Cursor / total_count: opaque base64 ``{"offset": int}`` for the first
cut; D10.e (#97 @Bryce) replaces the codec.
Cursor / total_count: opaque base64 cursor produced by
:mod:`aperag.service.pagination`, which wraps the canonical
:mod:`aperag.mcp.cursor` codec around the offset bookkeeping below.
Malformed / expired / scope-mismatched cursors surface as canonical
``CursorError`` per §C.3.
"""

from __future__ import annotations

import base64
import json
import mimetypes
from typing import Literal, Optional
Expand All @@ -49,40 +51,7 @@
tenancy_gate,
)
from aperag.mcp.tools.schemas import DocumentList, DocumentMetadata


def _decode_cursor(cursor: Optional[str]) -> int:
"""Decode the placeholder D10.c offset cursor.

Returns 0 only when ``cursor`` is ``None`` or empty; raises
``ValueError`` on malformed cursor per §C explicit-not-silent. Bryce's
D10.e cursor codec replaces this placeholder.

# TODO(D10.e #97): replace with canonical CursorError after #97 integration
"""

if cursor is None or cursor == "":
return 0
try:
decoded = base64.urlsafe_b64decode(cursor.encode("ascii")).decode("utf-8")
payload = json.loads(decoded)
except Exception as exc:
raise ValueError(f"cursor decode failed: {exc}") from exc
if not isinstance(payload, dict) or "offset" not in payload:
raise ValueError("cursor decode failed: missing 'offset' key")
raw_offset = payload["offset"]
if isinstance(raw_offset, bool) or not isinstance(raw_offset, int):
raise ValueError("cursor decode failed: 'offset' must be a non-negative int")
if raw_offset < 0:
raise ValueError("cursor decode failed: 'offset' must be non-negative")
return raw_offset


def _encode_cursor(offset: int) -> str:
return base64.urlsafe_b64encode(json.dumps({"offset": offset}, separators=(",", ":")).encode("utf-8")).decode(
"ascii"
)

from aperag.service.pagination import decode_offset_cursor, encode_offset_cursor

_DOCUMENT_STATUS_TO_INDEXING = {
DocumentStatus.PENDING: "pending",
Expand Down Expand Up @@ -136,7 +105,20 @@ async def list_documents(

# 4. Fetch authoritative.
limit = max(1, min(int(limit), 200))
offset = _decode_cursor(cursor)

cursor_filters = {
"title_filter": title_filter,
"type_filter": sorted(t.lower() for t in type_filter) if type_filter else None,
"indexed_only": bool(indexed_only),
"sort_order": sort_order,
}
cursor_kwargs = dict(
sort_key=sort_by,
filters=cursor_filters,
collection_id=collection_id,
tenant_id=str(user.id),
)
offset = decode_offset_cursor(cursor, **cursor_kwargs)

sort_col = _SORT_COLS.get(sort_by, Document.gmt_created)
sort_clause = sort_col.asc() if sort_order == "asc" else sort_col.desc()
Expand Down Expand Up @@ -207,7 +189,8 @@ async def list_documents(
for d in documents
]

next_cursor = _encode_cursor(offset + len(items)) if (offset + len(items)) < total else None
next_offset = offset + len(items)
next_cursor = encode_offset_cursor(offset=next_offset, **cursor_kwargs) if next_offset < total else None
return DocumentList(items=items, next_cursor=next_cursor, total_count=total)


Expand Down
158 changes: 158 additions & 0 deletions aperag/service/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright 2025 ApeCloud, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""D10.e (#97) follow-up — cursor wiring helper for D10.c list primitives.

Per design pack §G D10.e write-set the lane provides a ``service``-layer
helper that wraps :mod:`aperag.mcp.cursor` around the ORM-side queries
the read primitives already perform. The helper exists so each list
primitive only writes two helper calls instead of inlining cursor
encode / decode + invariant validation, and so the canonical
``CursorError`` codes flow uniformly out of every D10 list tool.

The cursor wire shape is the canonical opaque base64url
``CursorPayload`` from #1712 — clients still treat it as opaque. The
``last_position`` carries the offset for the current page (the design
pack's "ORM ``id``-based seek pagination" promotion is intentionally
deferred: keeping offset semantics minimises diff against #1714's
already-merged primitive bodies and is byte-equivalent on the wire,
since the cursor is opaque to clients).

Public surface (D10.c list primitives import from here):

* :func:`encode_offset_cursor` — produce the next-page wire token
* :func:`decode_offset_cursor` — accept either ``None`` / ``""`` (start
fresh) or a wire token; raise canonical :class:`CursorError` with
``cursor_invalid`` / ``cursor_expired`` / ``cursor_filter_mismatch``
/ ``cursor_tenant_mismatch`` / ``cursor_schema_unsupported``
"""

from __future__ import annotations

import os
import time
from typing import Any, Optional

from aperag.mcp.cursor import (
CursorPayload,
compute_invariant_hash,
decode_cursor,
encode_cursor,
)
from aperag.mcp.cursor.codec import DEFAULT_TTL_SECONDS
from aperag.mcp.cursor.errors import CursorError

# Stable per-process server identifier — surfaces in the cursor's
# ``server_id`` field for cross-instance debugging only; clients never
# read this value.
_SERVER_ID = f"aperag-{os.getpid()}"


def encode_offset_cursor(
*,
offset: int,
sort_key: str,
filters: dict[str, Any],
collection_id: Optional[str],
tenant_id: str,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
) -> str:
"""Produce the next-page cursor token for a list primitive.

``filters`` should be a normalised dict of every user-supplied
narrowing predicate (title_filter, type_filter, indexed_only, …) —
the helper sha256-hashes it together with ``sort_key`` and the
tenancy bindings so any change between two calls is loud-failed
on decode.
"""

invariant_hash = compute_invariant_hash(
sort_key=sort_key,
filters=filters,
collection_id=collection_id,
tenant_id=tenant_id,
)
payload = CursorPayload(
sort_key=sort_key,
last_position={"offset": int(offset)},
invariant_hash=invariant_hash,
issued_at=int(time.time()),
ttl_seconds=ttl_seconds,
server_id=_SERVER_ID,
)
return encode_cursor(payload)


def decode_offset_cursor(
cursor: Optional[str],
*,
sort_key: str,
filters: dict[str, Any],
collection_id: Optional[str],
tenant_id: str,
) -> int:
"""Validate ``cursor`` against the current scope and return the offset.

A ``None`` / empty cursor is the canonical "start fresh" signal and
yields offset ``0`` without raising. Any non-empty token is decoded
via :func:`aperag.mcp.cursor.decode_cursor` (which already enforces
``cursor_invalid`` / ``cursor_expired`` / ``cursor_schema_unsupported``)
and then matched against the current scope; mismatches raise
canonical ``cursor_filter_mismatch`` / ``cursor_tenant_mismatch``
so the client gets a different recovery path per §C.3 line 567-571.
"""

if cursor is None or cursor == "":
return 0

payload = decode_cursor(cursor)

if payload.sort_key != sort_key:
raise CursorError(
"cursor_filter_mismatch",
"cursor was issued for a different sort_key",
details={
"received_sort_key": payload.sort_key,
"expected_sort_key": sort_key,
},
)

expected_hash = compute_invariant_hash(
sort_key=sort_key,
filters=filters,
collection_id=collection_id,
tenant_id=tenant_id,
)
if payload.invariant_hash != expected_hash:
# The original tenant_id is hashed away inside ``invariant_hash``,
# so the helper can't reliably discriminate filter-drift from
# tenant-drift on its own. We emit ``cursor_filter_mismatch`` —
# the broader, user-actionable code per §C.3 line 567-571.
# The list primitives still wrap their own bodies with a
# tenancy gate before this helper is called, so a true cross-
# tenant cursor never reaches here in practice.
raise CursorError(
"cursor_filter_mismatch",
"cursor scope no longer matches the current request",
details={"sort_key": sort_key},
)

raw_offset = payload.last_position.get("offset")
if isinstance(raw_offset, bool) or not isinstance(raw_offset, int) or raw_offset < 0:
raise CursorError(
"cursor_invalid",
"cursor last_position offset must be a non-negative int",
details={"last_position": payload.last_position},
)
return raw_offset
Loading
Loading