Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions aperag/vectorstore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@

import math
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Sequence
from dataclasses import dataclass
from typing import Any, ClassVar, Dict, List, Sequence

from aperag.vectorstore.dto import (
QueryRequest,
Expand Down Expand Up @@ -185,6 +186,48 @@ def denormalize_threshold_to_native(distance: str, normalized: float) -> float:
raise ValueError(f"Unknown distance metric: {distance!r}")


@dataclass(frozen=True)
class VectorBackendCapabilities:
"""Static capability flags for a vector store backend (task #61 P1-V2 / P1-V3 / P1-V4).

Per task-61 spec v1 § 2.3 「允许差异但显式 declaration」: not every
backend supports every operation atomically / identically, so
callers (FE / API / MCP / capability-aware optimizers) need a
machine-readable declaration of what each adapter is actually
capable of, instead of guessing from the backend name.

These are **static** declarations — they describe what the backend
*can* do, independent of any runtime probe / fallback logic. A
runtime degradation surface (e.g. "PG connection pool exhausted →
graph search degraded to fulltext-only") is a separate concern and
intentionally NOT covered here (see architect msg=3163bb4b).

Each adapter exposes its capabilities via the
:attr:`VectorStoreConnector.BACKEND_CAPABILITIES` class-level
attribute. Callers (e.g. cuiwenbo task #87 P1-D3 collection
metadata Pydantic projection) read this static declaration and
surface it on the API.
"""

#: P1-V2 — does ``upsert(points)`` apply the entire batch
#: atomically? PGVector wraps the INSERT ON CONFLICT in a
#: ``engine.begin()`` transaction so a mid-batch failure rolls back
#: the whole batch (``True``). Qdrant's ``client.upsert(points,
#: wait=True)`` is best-effort per-point — a mid-batch failure can
#: leave some points written and others not (``False``). Callers
#: that need atomic semantics must chunk + verify on Qdrant; on
#: PGVector the semantics come for free.
supports_atomic_batch_upsert: bool

#: P1-V4 — does the backend support a "legacy" non-multitenant
#: physical layout (one collection per tenant, no payload-level
#: tenant filter)? Qdrant supports both legacy and multitenant
#: modes (``True``); PGVector is multitenant-only (``False``).
#: Legacy mode is preserved for migration rollback compatibility
#: only — new collections always use the multitenant layout.
supports_legacy_mode: bool


class VectorStoreConnector(ABC):
"""Abstract contract for per-tenant vector storage.

Expand All @@ -194,6 +237,11 @@ class VectorStoreConnector(ABC):
rest; unknown keys must never be a hard error.
"""

#: Static capability flags for this backend (task #61 P1-V2/V3/V4).
#: Each concrete subclass overrides with its actual values; see
#: :class:`VectorBackendCapabilities` for the per-flag contract.
BACKEND_CAPABILITIES: ClassVar[VectorBackendCapabilities]

def __init__(self, ctx: Dict[str, Any], **_kwargs: Any) -> None:
self.ctx = ctx

Expand All @@ -214,9 +262,22 @@ def ensure_collection(self) -> None:
"""Idempotently make sure the physical storage (Qdrant collection,
pgvector table, …) exists for this connector's shape.

Must be safe to call from many connectors concurrently: typical
implementations use ``CREATE IF NOT EXISTS`` / ``collection_exists
? no-op : create`` with module-level deduping caches.
Cross-adapter contract (task #61 P1-V1):

* **Idempotent** — repeat calls after first success are a no-op,
gated through the per-process ``_ENSURED_*`` cache.
* **Race-safe** — concurrent calls from multiple processes /
connectors must not all fail when the underlying CREATE
collides. PGVector relies on ``CREATE IF NOT EXISTS``; Qdrant
treats "already exists" responses on ``create_collection`` as
success.
* **Fail-loud** — any other failure (missing privilege, bad
config, transient DB outage) raises so the caller sees the
error rather than silently leaving an unusable connector.
* **Cache-not-poisoned-on-failure** — failed runs MUST NOT
populate the ``_ENSURED_*`` cache; the next call retries.
Otherwise a transient failure during boot would wedge the
connector for the rest of the process lifetime.
"""

@abstractmethod
Expand All @@ -242,8 +303,22 @@ def upsert(self, points: Sequence[VectorPoint]) -> List[str]:

Must inject the tenant guard into each point's storage so later
searches / deletes can filter by it. Returns the ids written (in
input order). Raises on write failure — callers treat the batch
as atomic per-point.
input order). Raises on write failure.

**Batch atomicity** is **backend-specific** and declared on
:attr:`BACKEND_CAPABILITIES.supports_atomic_batch_upsert`
(task #61 P1-V2):

* PGVector wraps the bulk INSERT ON CONFLICT in
``engine.begin()`` so a mid-batch failure rolls back the
entire batch (``supports_atomic_batch_upsert=True``).
* Qdrant's ``client.upsert(points, wait=True)`` is best-effort
per-point — a mid-batch failure can leave a partial write
(``supports_atomic_batch_upsert=False``).

Callers that need atomic-batch semantics must read the
capability flag and chunk + verify when ``False``; on
``True``-declaring backends the semantics come for free.
"""

@abstractmethod
Expand Down
27 changes: 27 additions & 0 deletions aperag/vectorstore/pgvector_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@

from aperag.vectorstore.base import (
UnsupportedFilterError,
VectorBackendCapabilities,
VectorStoreConnector,
denormalize_threshold_to_native,
normalize_score,
Expand Down Expand Up @@ -254,6 +255,21 @@ def _walk(self, flt: VectorFilter) -> str:
return "(" + " AND ".join(parts) + ")"
if isinstance(flt, Or):
parts = [self._walk(p) for p in flt.parts]
# task #61 P1-V3 defense-in-depth: ``Or.__post_init__``
# already rejects empty ``parts`` at DSL construction so
# this list is normally non-empty. The translator-level
# guard catches future refactors that bypass the DSL
# constructor (e.g. ``dataclasses.replace(or_node, parts=())``)
# before they reach pgvector. An empty Or in SQL would
# collapse to ``()`` which is a syntax error — but in
# principle could degrade to a vacuous "always true" via
# some future translator change. Symmetric with the Qdrant
# ``Or`` translator guard for cross-adapter parity.
if not parts:
raise UnsupportedFilterError(
"pgvector: Or filter has zero translatable parts; "
"an empty Or is a vacuous disjunction (task #61 P1-V3)."
)
return "(" + " OR ".join(parts) + ")"
if isinstance(flt, Not):
return f"NOT ({self._walk(flt.inner)})"
Expand Down Expand Up @@ -325,6 +341,17 @@ def _vector_literal(vec: Sequence[float]) -> str:
class PgvectorVectorStoreConnector(VectorStoreConnector):
"""pgvector implementation of ``VectorStoreConnector``."""

#: Static capability declaration (task #61 P1-V2 / P1-V4).
#: ``upsert`` wraps the bulk INSERT ON CONFLICT in a SQLAlchemy
#: ``engine.begin()`` transaction so a mid-batch failure rolls back
#: the whole batch — atomic. Legacy mode is not supported on
#: pgvector (would require one table per tenant; defeats the point
#: of sharing PG with the main ApeRAG DB).
BACKEND_CAPABILITIES = VectorBackendCapabilities(
supports_atomic_batch_upsert=True,
supports_legacy_mode=False,
)

def __init__(self, ctx: Dict[str, Any], **kwargs: Any) -> None:
super().__init__(ctx, **kwargs)

Expand Down
70 changes: 65 additions & 5 deletions aperag/vectorstore/qdrant_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

from aperag.vectorstore.base import (
UnsupportedFilterError,
VectorBackendCapabilities,
VectorStoreConnector,
denormalize_threshold_to_native,
normalize_score,
Expand Down Expand Up @@ -271,6 +272,21 @@ def _translate_filter(flt: Optional[VectorFilter]) -> Optional[rest.Filter]:
if isinstance(flt, Or):
subs = [_translate_filter(p) for p in flt.parts]
subs = [s for s in subs if s is not None]
# task #61 P1-V3 defense-in-depth: ``Or.__post_init__`` already
# rejects empty ``parts`` at DSL construction, so this list is
# normally non-empty. The translator-level guard catches future
# refactors that bypass the DSL constructor (e.g.
# ``dataclasses.replace(or_node, parts=())``) before they reach
# Qdrant. Without this, ``rest.Filter(should=[])`` is a vacuous
# disjunction that Qdrant treats as "match everything" — a
# silent data-correctness footgun. Cross-adapter parity with
# the pgvector ``_SqlFilter._walk`` Or-empty guard.
if not subs:
raise UnsupportedFilterError(
"qdrant: Or filter has zero translatable parts after pruning; "
"an empty Or is a vacuous disjunction that would match every "
"point in the collection (task #61 P1-V3)."
)
return rest.Filter(should=subs)
if isinstance(flt, Not):
sub = _translate_filter(flt.inner)
Expand Down Expand Up @@ -416,6 +432,18 @@ def _extract_vector(p: Any) -> Optional[List[float]]:
class QdrantVectorStoreConnector(VectorStoreConnector):
"""Qdrant implementation of ``VectorStoreConnector``."""

#: Static capability declaration (task #61 P1-V2 / P1-V4).
#: Qdrant's ``client.upsert(points, wait=True)`` is best-effort
#: per-point — a mid-batch failure can leave a partial write — so
#: ``supports_atomic_batch_upsert=False``. The legacy
#: (``multitenant=False``) layout is supported, where each ApeRAG
#: collection gets its own physical Qdrant collection; new
#: deployments default to multitenant.
BACKEND_CAPABILITIES = VectorBackendCapabilities(
supports_atomic_batch_upsert=False,
supports_legacy_mode=True,
)

def __init__(self, ctx: Dict[str, Any], **kwargs: Any) -> None:
super().__init__(ctx, **kwargs)

Expand Down Expand Up @@ -713,11 +741,43 @@ def retrieve(
)
for p in raw
]
# Represent "no vector requested" as empty list rather than None
# to keep VectorPoint.__post_init__ happy (vector must be list).
if not self.multitenant:
return out
return [p for p in out if p.payload.get(TENANT_PAYLOAD_KEY) == self._tenant.id]
# task #61 P1-V4 defense-in-depth — payload-level tenant
# filter applied with **mode-specific semantics** (per Weston
# msg=910cad66 BLOCKER catch on initial commit: a uniform
# "no payload key → pass through" branch leaked stray ``{}``
# payload rows in the shared multitenant collection to every
# tenant on ``retrieve(ids=...)``):
#
# * Multitenant mode (shared physical collection): payload
# key is the **primary** isolation layer. STRICT — a row
# must carry ``TENANT_PAYLOAD_KEY`` matching this tenant.
# No "no payload key → pass through" branch here, because
# in the shared collection a missing key would expose the
# row to every tenant, not just this one. ``upsert()``
# stamps the key on every point we write, so the only way
# a missing-key row reaches the collection is tooling
# drift / migration drift, exactly the case this gate is
# meant to catch.
# * Legacy mode (per-tenant physical collection): the
# collection name is the primary isolation layer
# (``collection_name == tenant_id``); the payload-level
# filter is belt-and-braces. PERMISSIVE — a row that
# doesn't carry the payload key still passes through
# (typical legacy data shape pre-multitenant), but a stray
# foreign-tenant payload gets dropped (catches tooling
# drift / migration mistakes).
#
# Lesson #14 multi-iteration cleanup family — legacy mode is
# preserved for migration rollback only; a future PR can drop
# the mode entirely once telemetry confirms zero production
# usage.
if self.multitenant:
return [p for p in out if p.payload.get(TENANT_PAYLOAD_KEY) == self._tenant.id]
return [
p
for p in out
if TENANT_PAYLOAD_KEY not in p.payload or p.payload.get(TENANT_PAYLOAD_KEY) == self._tenant.id
]

# ================================================================ delete
def delete(self, ids: Sequence[str]) -> None:
Expand Down
102 changes: 102 additions & 0 deletions tests/unit_test/vectorstore/test_backend_capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2026 ApeCloud, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Static capability declaration tests (task #61 P1-V2 / P1-V4).

Each :class:`VectorStoreConnector` subclass declares its
``BACKEND_CAPABILITIES`` class-level attribute so callers
(API / FE / capability-aware optimizers) can read a machine-readable
declaration of what the adapter actually does — instead of guessing
from the backend name.

These tests pin the static values so:

1. A future refactor that drops a flag declaration on a concrete
subclass fails fast (e.g. removing ``BACKEND_CAPABILITIES`` from
:class:`PgvectorVectorStoreConnector` makes the flag undefined).
2. The cross-adapter capability matrix surfaces in code review as a
single test file — consumers (cuiwenbo task #87 P1-D3 collection
metadata Pydantic projection) read these values verbatim, so any
change to the behaviour they describe must update both the adapter
docstring + this test in the same PR.
"""

from __future__ import annotations

from aperag.vectorstore.base import VectorBackendCapabilities, VectorStoreConnector
from aperag.vectorstore.pgvector_connector import PgvectorVectorStoreConnector
from aperag.vectorstore.qdrant_connector import QdrantVectorStoreConnector

# ---------------------------------------------------------------------
# Shape — ensure both adapters declare the attribute and it's the
# right type.
# ---------------------------------------------------------------------


def test_pgvector_declares_backend_capabilities():
caps = PgvectorVectorStoreConnector.BACKEND_CAPABILITIES
assert isinstance(caps, VectorBackendCapabilities)


def test_qdrant_declares_backend_capabilities():
caps = QdrantVectorStoreConnector.BACKEND_CAPABILITIES
assert isinstance(caps, VectorBackendCapabilities)


def test_abstract_base_does_not_set_concrete_capabilities():
""":class:`VectorStoreConnector` is abstract — only concrete
subclasses declare a value. Keeping the base class assignment
absent means a future subclass that forgets to declare gets a
``AttributeError`` at the call site, not a silent default."""
# ``BACKEND_CAPABILITIES`` is a ``ClassVar`` annotation on the base
# class without a value, so it doesn't actually exist on the base.
assert "BACKEND_CAPABILITIES" not in VectorStoreConnector.__dict__


# ---------------------------------------------------------------------
# Capability matrix values — pinned by spec § 2.3 + task #83 P1-V*
# implementation. cuiwenbo task #87 P1-D3 reads these values for the
# collection metadata Pydantic projection, so changes here must be
# coordinated with that PR.
# ---------------------------------------------------------------------


def test_pgvector_supports_atomic_batch_upsert():
"""PGVector wraps the bulk INSERT ON CONFLICT in
``engine.begin()`` so a mid-batch failure rolls back the entire
batch (task #61 P1-V2)."""
assert PgvectorVectorStoreConnector.BACKEND_CAPABILITIES.supports_atomic_batch_upsert is True


def test_qdrant_does_not_support_atomic_batch_upsert():
"""Qdrant ``client.upsert(points, wait=True)`` is best-effort
per-point — a mid-batch failure can leave some points written and
others not (task #61 P1-V2). Callers needing atomic-batch
semantics must chunk + verify."""
assert QdrantVectorStoreConnector.BACKEND_CAPABILITIES.supports_atomic_batch_upsert is False


def test_pgvector_does_not_support_legacy_mode():
"""PGVector is multitenant-only — a per-tenant table layout would
require dropping the shared-PG topology entirely (task #61 P1-V4)."""
assert PgvectorVectorStoreConnector.BACKEND_CAPABILITIES.supports_legacy_mode is False


def test_qdrant_supports_legacy_mode():
"""Qdrant supports both legacy (``multitenant=False``,
one-collection-per-tenant) and multitenant
(``multitenant=True``, shared-collection + payload filter)
layouts, controlled by the ``multitenant`` ctx flag (task #61
P1-V4). New deployments default to multitenant."""
assert QdrantVectorStoreConnector.BACKEND_CAPABILITIES.supports_legacy_mode is True
Loading
Loading