Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions aperag/cache/application_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,39 @@ def application_cache_policy(namespace: str, *, enabled: bool | None = None) ->


async def get_application_cache() -> ApplicationCache:
"""Return the singleton :class:`ApplicationCache` for the running
event loop, lazily building it if needed.

Wave 6 #38 (per `feedback_announce_equals_landed.md` narrative-truth
invariant): when the running event loop changes (test process
starts a new loop, worker process reinitialises, etc.) the cached
instance — whose underlying ``async_redis.Redis`` client is bound
to the prior loop — is no longer usable. Pre-Wave-6 we silently
swapped it for a :class:`NoopApplicationCacheBackend`, which meant
callers paid LiteLLM/embedding cost on every request from then on
with no signal that caching had degraded.

Wave 6 fix: rebuild the cache on the new loop (re-establish the
real Redis client) and emit a WARN log + bump
``application_cache_metrics["application_runtime"]["loop_switch_rebuild"]``
so operators can observe loop-switch frequency. Rebuild can fail
(Redis unreachable on the new loop) — the fallback is still a
Noop backend, but only when Redis is actually broken, not when
the loop merely changed.
"""
global _async_cache, _async_cache_loop
loop = asyncio.get_running_loop()
if _async_cache is not None and _async_cache_loop is loop:
return _async_cache
if _async_cache is not None and _async_cache_loop is not loop:
_async_cache = ApplicationCache(
backend=NoopApplicationCacheBackend(),
default_policy=application_cache_policy("default", enabled=False),
logger.warning(
"Application cache rebuilding for new event loop: prior cache was bound "
"to a different loop and its Redis client cannot be reused. "
"Frequent loop switches indicate a worker / test setup issue."
)
_async_cache_loop = loop
return _async_cache
application_cache_metrics.increment("application_runtime", "loop_switch_rebuild")
_async_cache = None
_async_cache_loop = None

if not settings.cache_enabled:
_async_cache = ApplicationCache(
Expand Down
14 changes: 13 additions & 1 deletion aperag/indexing/reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,19 @@ async def reconcile_collection_summaries_hook(
)

def _reclaim_stale_and_claim_pending() -> list[tuple[str, str, int, str]]:
"""Sync DB-only worker. Returns list of claimed dispatch tuples."""
"""Sync DB-only worker. Returns list of claimed dispatch tuples.

Note (Wave 6 #34 utc_now audit): the ``utc_now`` calls in this
function are application-level wall-clock reads used to compute
lease expiry windows and stamp ``gmt_last_reconciled`` /
``gmt_updated`` for in-flight rows. They are intentionally
Python-side and distinct from the ORM-default
``server_default=CURRENT_TIMESTAMP`` migration done in Wave 5
P5B for ``_LineageEntityRow`` / ``_LineageRelationRow``: those
cover row-creation defaults; reconciler updates need a wall-
clock value materialised inside the worker process for lease
comparison logic, so server-side defaults do not apply here.
"""
from aperag.utils.utils import utc_now as _utc_now

claimed_dispatches: list[tuple[str, str, int, str]] = []
Expand Down
31 changes: 19 additions & 12 deletions aperag/indexing/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,24 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
def upsert_point(
self,
*,
chunk_id: str,
point_id: str,
embedding: list[float],
payload: dict[str, Any],
) -> None:
"""Idempotent point insert keyed on ``chunk_id`` (Qdrant point id)."""
"""Idempotent point insert keyed on ``point_id`` (Qdrant point id).

For the vector modality the caller passes the parser-emitted
``chunk_id`` as ``point_id`` (one chunk → one point). The
``chunk_id`` value is also written into ``payload["chunk_id"]``
by the worker so retrieval can echo it back; vector + fulltext
share that payload field for hybrid-dedup (§C.6).
"""


class InMemoryVectorBackend:
"""Process-local in-memory backend for unit tests.

Stores points in a dict keyed by ``chunk_id``. Implements the
Stores points in a dict keyed by ``point_id``. Implements the
:class:`VectorBackend` protocol so vector.sync can target it
transparently.
"""
Expand All @@ -95,22 +102,22 @@ def __init__(self) -> None:

def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
deleted = 0
for chunk_id in list(self._points):
payload = self._points[chunk_id].get("payload", {})
for point_id in list(self._points):
payload = self._points[point_id].get("payload", {})
if payload.get("document_id") == document_id and payload.get("parse_version") == parse_version:
self._points.pop(chunk_id)
self._points.pop(point_id)
deleted += 1
return deleted

def upsert_point(
self,
*,
chunk_id: str,
point_id: str,
embedding: list[float],
payload: dict[str, Any],
) -> None:
self._points[chunk_id] = {
"chunk_id": chunk_id,
self._points[point_id] = {
"point_id": point_id,
"embedding": list(embedding),
"payload": dict(payload),
}
Expand All @@ -126,10 +133,10 @@ def points_for_document(self, document_id: str, parse_version: str | None = None
if parse_version is not None and payload.get("parse_version") != parse_version:
continue
out.append(record)
return sorted(out, key=lambda r: r["chunk_id"])
return sorted(out, key=lambda r: r["point_id"])

def all_points(self) -> list[dict[str, Any]]:
return sorted(self._points.values(), key=lambda r: r["chunk_id"])
return sorted(self._points.values(), key=lambda r: r["point_id"])


def _placeholder_embedding(text: str, dim: int = SIMULATOR_EMBEDDING_DIM) -> list[float]:
Expand Down Expand Up @@ -229,7 +236,7 @@ async def sync(
"page_idx": chunk.get("page_idx"),
}
self._backend.upsert_point(
chunk_id=chunk_id,
point_id=chunk_id,
embedding=embedding,
payload=payload,
)
Expand Down
43 changes: 20 additions & 23 deletions aperag/indexing/worker_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,16 @@ class _QdrantPointBackend:
vision modalities consume.

All three modalities share the same Qdrant-shaped surface (delete
by ``(document_id, parse_version)`` filter, upsert by
``chunk_id``/``point_id``). One adapter class satisfies the three
Protocols structurally — no inheritance needed because the
Protocols are ``@runtime_checkable``.
by ``(document_id, parse_version)`` filter, upsert by ``point_id``).
One adapter class satisfies the three Protocols structurally — no
inheritance needed because the Protocols are ``@runtime_checkable``.

The ``point_id`` is the canonical name for the Qdrant point
identifier across all three modalities (per Wave 6 #34 schema
unification). Vector workers pass the parser-emitted ``chunk_id``
here; summary uses ``summary:<doc>:<v>``; vision uses
``vision:<doc>:<v>:<image_id>``. Each worker controls its own
payload — the adapter does not inject anything.
"""

def __init__(self, *, connector: Any) -> None:
Expand All @@ -120,43 +126,34 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
def upsert_point(
self,
*,
chunk_id: str | None = None,
point_id: str | None = None,
point_id: str,
embedding: list[float],
payload: dict[str, Any],
) -> None:
# Vector modality calls with ``chunk_id``; summary / vision
# modalities call with ``point_id``. Both end up as the
# underlying Qdrant point id.
#
# Qdrant only accepts unsigned-integer or UUID point ids. The
# T1.1 parser produces chunk ids of the form
# ``<sha-prefix>:<index>`` (e.g. ``f766a946575ec3b4:0000``)
# which Qdrant rejects with HTTP 400 "is not a valid point
# ID". Map the caller-supplied string id into a deterministic
# UUID5 so retries land on the same point and the upsert is
# idempotent — and stash the original id in the payload so
# the read path can still surface it to clients.
# idempotent. The caller's payload is forwarded verbatim — it
# already carries whatever modality-specific identifier the
# read path needs (vector keeps ``chunk_id`` in payload for
# hybrid-dedup; summary uses ``summary_text``; vision uses
# ``image_id``).
import uuid

from aperag.vectorstore.dto import VectorPoint

identifier = chunk_id if chunk_id is not None else point_id
if not identifier:
raise ValueError("upsert_point requires either chunk_id or point_id")
identifier = str(identifier)
qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, identifier))
merged_payload = dict(payload)
# Preserve the original id under a stable key so the read
# path can echo it back; ``chunk_id`` is what vector modality
# already writes so we don't overwrite it.
merged_payload.setdefault("chunk_id", identifier)
if not point_id:
raise ValueError("upsert_point requires point_id")
qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, str(point_id)))
self._connector.upsert(
[
VectorPoint(
id=qdrant_id,
vector=list(embedding),
payload=merged_payload,
payload=dict(payload),
)
]
)
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/test_cleanup_fan_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def factory_closure(row: DocumentIndex):
pv_a = "doc-del-a-pv0001"[:16]
_insert_row(engine, document_id="doc-del", parse_version=pv_a, modality=Modality.VECTOR)
factory_backend.upsert_point(
chunk_id="chunk-fac",
point_id="chunk-fac",
embedding=[0.0] * 16,
payload={
"document_id": "doc-del",
Expand All @@ -150,7 +150,7 @@ async def factory_closure(row: DocumentIndex):
},
)
fallback_backend.upsert_point(
chunk_id="chunk-fallback",
point_id="chunk-fallback",
embedding=[0.0] * 16,
payload={
"document_id": "doc-del",
Expand Down Expand Up @@ -245,7 +245,7 @@ async def factory_closure(row: DocumentIndex):
backend = backends[(col, modality)]
if modality is Modality.VECTOR:
backend.upsert_point(
chunk_id=chunk_id,
point_id=chunk_id,
embedding=[0.0] * 16,
payload={
"document_id": document_id,
Expand Down Expand Up @@ -360,7 +360,7 @@ def test_orphan_parse_version_gc_uses_worker_factory(engine):
_set_updated_at(engine, old_id, _utcnow() - timedelta(hours=2))

backend.upsert_point(
chunk_id="chunk-orphan",
point_id="chunk-orphan",
embedding=[0.0] * 16,
payload={
"document_id": "doc-orphan",
Expand Down Expand Up @@ -415,7 +415,7 @@ def test_collection_deletion_cascade_uses_worker_factory(engine):
collection_id="col-doomed",
)
backend.upsert_point(
chunk_id="chunk-col",
point_id="chunk-col",
embedding=[0.0] * 16,
payload={
"document_id": "doc-col",
Expand Down Expand Up @@ -489,7 +489,7 @@ def test_workers_map_only_path_unchanged_for_existing_callers(engine):
pv = "compatparsever01"[:16]
_insert_row(engine, document_id="doc-compat", parse_version=pv, modality=Modality.VECTOR)
backend.upsert_point(
chunk_id="chunk-compat",
point_id="chunk-compat",
embedding=[0.0] * 16,
payload={
"document_id": "doc-compat",
Expand Down
Loading
Loading