Skip to content

Commit 0711647

Browse files
earayuclaude
andauthored
feat(celery Wave 6 #34): chunk_id schema canonical unification (#1738)
* feat(celery Wave 6 #34): chunk_id schema canonical unification Per docs/modularization/indexing-redesign-design-pack.md §K.11.1 #34 (huangheng T1 obs B + Wave 5 P5B chunk_id 5th item deferred). ## Changes aperag/indexing/vector.py - VectorBackend.upsert_point(chunk_id=) → upsert_point(point_id=) (canonical Qdrant naming, aligned with summary/vision protocols) - InMemoryVectorBackend record key renamed chunk_id → point_id - Vector worker callsite passes point_id=chunk["chunk_id"]; chunk_id remains in payload for hybrid-dedup with fulltext (§C.6 trade-off lock) aperag/indexing/worker_factory.py - _QdrantPointBackend.upsert_point(): single point_id keyword (drop pre-Wave-6 dual chunk_id|point_id transition shim) - Drop merged_payload.setdefault("chunk_id", identifier) — adapter no longer injects misleading chunk_id into summary/vision payloads (their points are not chunks); each modality controls its payload aperag/indexing/reconciler.py - utc_now audit comment: utc_now usage in reconciler is application-level wall-clock for lease comparison + gmt_updated stamps, distinct from Wave 5 P5B server_default=CURRENT_TIMESTAMP ORM-creation defaults. No further migration needed. tests/unit_test/indexing/test_chunk_id_schema_canonical.py (new) - 7 contract tests pin canonical naming: * VectorBackend / SummaryBackend / VisionBackend Protocols use point_id * InMemoryVectorBackend round-trips point_id at record level + chunk_id at payload level * Legacy chunk_id= keyword raises TypeError on InMemoryVectorBackend * _QdrantPointBackend.upsert_point uses single point_id param * _QdrantPointBackend does not inject chunk_id into payload * Parser chunks.jsonl chunk_id field naming preserved tests/integration/test_cleanup_fan_out.py tests/unit_test/indexing/test_t1_3_vector_fulltext.py tests/unit_test/indexing/test_t2_1_runtime.py tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py - backend.upsert_point(chunk_id=...) → point_id=... - record reads p["chunk_id"] → p["point_id"] / p["payload"]["chunk_id"] ## Production-readiness 三类 layer (per §K.11.4) - must-be-real: parser layer chunk_id field schema unified across vector/summary/vision backend protocol surfaces; remaining utc_now usage audited (reconciler.py only, application-level, distinct from ORM defaults Wave 5 P5B already migrated) - may-be-gated: vector worker still keeps chunk_id in payload for hybrid-dedup with fulltext (§C.6 contract preserved) - fully-resolves: huangheng T1 obs B + Wave 5 P5B chunk_id 5th item deferred (per spec §K.11.1 row 34) ## simple-stable 4 guardrail (per feedback_simple_stable_zero_maintenance.md) 1. 不无限扩范围 ✅ — scope limited to vector protocol + adapter, no cross-cutting touch 2. 功能做实 ✅ — real schema rename, no transitional shim left behind 3. 简单稳定 ✅ — drops dual-API polymorphism + drops hidden payload side-effect (setdefault chunk_id), each layer's contract is now self-evident from signature 4. 私有化免维护 ✅ — operator/dev reading code sees canonical naming without needing to track which arg name belongs to which modality ## hard-cut policy (per earayu2 msg=30c81478) No production data → schema breaking change applied directly. No backward-compat shim, no deprecation window. Test that the legacy chunk_id= keyword raises TypeError catches accidental regression. ## Pre-check (per K.11.5 Pattern 2) grep upsert_point across aperag/indexing/ + tests/ — all callsites audited, all callers migrated, contract test pins canonical name. ## Local gates - 152/152 indexing unit tests pass - 21/21 indexing integration tests pass (cleanup_fan_out + dispatch_with_parse + inline_mode_smoke + parse_async_roundtrip) - 7/7 new contract tests pass - ruff check + ruff format clean Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 6 #38): application cache cross-loop lazy-rebuild + WARN log Per docs/modularization/indexing-redesign-design-pack.md §K.11.1 #38 (huangheng PR #1734 sync point 7 + feedback_announce_equals_landed.md narrative-truth invariant). ## Problem (pre-Wave-6 narrative-truth violation) `aperag.cache.application_runtime.get_application_cache()` keyed the cached `ApplicationCache` instance on the running asyncio event loop (the Redis async client is loop-bound). When the running loop changed (test process restart, worker reinit, asyncio re-initialisation, etc.), the function silently swapped the cache for a `NoopApplicationCacheBackend` with `enabled=False`. From that point on the worker paid the full LiteLLM / embedding round-trip on every request, with no log line, no metric, and no operator-visible signal that caching had degraded to zero. ## Fix When the loop-switch branch fires: - emit a WARN log with operator-actionable phrasing - bump `application_cache_metrics["application_runtime"]["loop_switch_rebuild"]` (uses the existing ApplicationCacheMetrics instance — no new metrics infra) - reset `_async_cache=None` and fall through to the normal initialisation path (which re-establishes the real Redis client on the new loop) The Noop fallback is now reached only when Redis is genuinely unreachable on the new loop — never as a silent loop-switch downgrade. ## Tests (4 new) tests/unit_test/cache/test_application_cache.py - test_application_cache_rebuilds_on_loop_switch_instead_of_silent_noop: two `_run_in_new_loop()` calls produce distinct ApplicationCache instances, second wraps real `ApplicationRedisCacheBackend` (not `NoopApplicationCacheBackend`); metric counter incremented to 1; WARN log captured. - test_application_cache_does_not_increment_loop_switch_metric_on_first_build: first-ever build does not bump the loop-switch counter. - test_application_cache_repeat_call_in_same_loop_is_singleton_no_metric_bump: same-loop repeat returns identical singleton, no metric bump. - test_application_cache_falls_back_to_noop_when_redis_unreachable_on_new_loop: if Redis genuinely fails on the new loop, rebuild attempts the real client, falls back to Noop, but metric still records the loop switch (operators see the rebuild attempt + the Redis failure WARN that was already there). ## Production-readiness 三类 layer (per §K.11.4) - must-be-real: real WARN log + real metric counter + real Redis client rebuild on cross-loop call (no silent zero-cache) - may-be-gated: first cross-loop request may pay rebuild cost (re-ping Redis) — observable via metric, not silent - fully-resolves: huangheng PR #1734 sync point 7 + feedback_announce_equals_landed.md narrative-truth lock ## simple-stable 4 guardrail 1. 不无限扩范围 ✅ — single-function fix in application_runtime.py, reuses existing `application_cache_metrics` infra, no new public API 2. 功能做实 ✅ — real cache rebuild not a placeholder 3. 简单稳定 ✅ — fewer paths than before (one rebuild path replaces the silent-Noop branch), each branch logged 4. 私有化免维护 ✅ — operator can grep `loop_switch_rebuild` in metrics + log "rebuilding for new event loop" to diagnose worker setup issues ## Local gates - 43/43 cache unit tests pass (39 existing + 4 new) - ruff check + ruff format clean Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0edc82a commit 0711647

10 files changed

Lines changed: 452 additions & 55 deletions

aperag/cache/application_runtime.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,39 @@ def application_cache_policy(namespace: str, *, enabled: bool | None = None) ->
4343

4444

4545
async def get_application_cache() -> ApplicationCache:
46+
"""Return the singleton :class:`ApplicationCache` for the running
47+
event loop, lazily building it if needed.
48+
49+
Wave 6 #38 (per `feedback_announce_equals_landed.md` narrative-truth
50+
invariant): when the running event loop changes (test process
51+
starts a new loop, worker process reinitialises, etc.) the cached
52+
instance — whose underlying ``async_redis.Redis`` client is bound
53+
to the prior loop — is no longer usable. Pre-Wave-6 we silently
54+
swapped it for a :class:`NoopApplicationCacheBackend`, which meant
55+
callers paid LiteLLM/embedding cost on every request from then on
56+
with no signal that caching had degraded.
57+
58+
Wave 6 fix: rebuild the cache on the new loop (re-establish the
59+
real Redis client) and emit a WARN log + bump
60+
``application_cache_metrics["application_runtime"]["loop_switch_rebuild"]``
61+
so operators can observe loop-switch frequency. Rebuild can fail
62+
(Redis unreachable on the new loop) — the fallback is still a
63+
Noop backend, but only when Redis is actually broken, not when
64+
the loop merely changed.
65+
"""
4666
global _async_cache, _async_cache_loop
4767
loop = asyncio.get_running_loop()
4868
if _async_cache is not None and _async_cache_loop is loop:
4969
return _async_cache
5070
if _async_cache is not None and _async_cache_loop is not loop:
51-
_async_cache = ApplicationCache(
52-
backend=NoopApplicationCacheBackend(),
53-
default_policy=application_cache_policy("default", enabled=False),
71+
logger.warning(
72+
"Application cache rebuilding for new event loop: prior cache was bound "
73+
"to a different loop and its Redis client cannot be reused. "
74+
"Frequent loop switches indicate a worker / test setup issue."
5475
)
55-
_async_cache_loop = loop
56-
return _async_cache
76+
application_cache_metrics.increment("application_runtime", "loop_switch_rebuild")
77+
_async_cache = None
78+
_async_cache_loop = None
5779

5880
if not settings.cache_enabled:
5981
_async_cache = ApplicationCache(

aperag/indexing/reconciler.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,19 @@ async def reconcile_collection_summaries_hook(
325325
)
326326

327327
def _reclaim_stale_and_claim_pending() -> list[tuple[str, str, int, str]]:
328-
"""Sync DB-only worker. Returns list of claimed dispatch tuples."""
328+
"""Sync DB-only worker. Returns list of claimed dispatch tuples.
329+
330+
Note (Wave 6 #34 utc_now audit): the ``utc_now`` calls in this
331+
function are application-level wall-clock reads used to compute
332+
lease expiry windows and stamp ``gmt_last_reconciled`` /
333+
``gmt_updated`` for in-flight rows. They are intentionally
334+
Python-side and distinct from the ORM-default
335+
``server_default=CURRENT_TIMESTAMP`` migration done in Wave 5
336+
P5B for ``_LineageEntityRow`` / ``_LineageRelationRow``: those
337+
cover row-creation defaults; reconciler updates need a wall-
338+
clock value materialised inside the worker process for lease
339+
comparison logic, so server-side defaults do not apply here.
340+
"""
329341
from aperag.utils.utils import utc_now as _utc_now
330342

331343
claimed_dispatches: list[tuple[str, str, int, str]] = []

aperag/indexing/vector.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,24 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
7575
def upsert_point(
7676
self,
7777
*,
78-
chunk_id: str,
78+
point_id: str,
7979
embedding: list[float],
8080
payload: dict[str, Any],
8181
) -> None:
82-
"""Idempotent point insert keyed on ``chunk_id`` (Qdrant point id)."""
82+
"""Idempotent point insert keyed on ``point_id`` (Qdrant point id).
83+
84+
For the vector modality the caller passes the parser-emitted
85+
``chunk_id`` as ``point_id`` (one chunk → one point). The
86+
``chunk_id`` value is also written into ``payload["chunk_id"]``
87+
by the worker so retrieval can echo it back; vector + fulltext
88+
share that payload field for hybrid-dedup (§C.6).
89+
"""
8390

8491

8592
class InMemoryVectorBackend:
8693
"""Process-local in-memory backend for unit tests.
8794
88-
Stores points in a dict keyed by ``chunk_id``. Implements the
95+
Stores points in a dict keyed by ``point_id``. Implements the
8996
:class:`VectorBackend` protocol so vector.sync can target it
9097
transparently.
9198
"""
@@ -95,22 +102,22 @@ def __init__(self) -> None:
95102

96103
def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
97104
deleted = 0
98-
for chunk_id in list(self._points):
99-
payload = self._points[chunk_id].get("payload", {})
105+
for point_id in list(self._points):
106+
payload = self._points[point_id].get("payload", {})
100107
if payload.get("document_id") == document_id and payload.get("parse_version") == parse_version:
101-
self._points.pop(chunk_id)
108+
self._points.pop(point_id)
102109
deleted += 1
103110
return deleted
104111

105112
def upsert_point(
106113
self,
107114
*,
108-
chunk_id: str,
115+
point_id: str,
109116
embedding: list[float],
110117
payload: dict[str, Any],
111118
) -> None:
112-
self._points[chunk_id] = {
113-
"chunk_id": chunk_id,
119+
self._points[point_id] = {
120+
"point_id": point_id,
114121
"embedding": list(embedding),
115122
"payload": dict(payload),
116123
}
@@ -126,10 +133,10 @@ def points_for_document(self, document_id: str, parse_version: str | None = None
126133
if parse_version is not None and payload.get("parse_version") != parse_version:
127134
continue
128135
out.append(record)
129-
return sorted(out, key=lambda r: r["chunk_id"])
136+
return sorted(out, key=lambda r: r["point_id"])
130137

131138
def all_points(self) -> list[dict[str, Any]]:
132-
return sorted(self._points.values(), key=lambda r: r["chunk_id"])
139+
return sorted(self._points.values(), key=lambda r: r["point_id"])
133140

134141

135142
def _placeholder_embedding(text: str, dim: int = SIMULATOR_EMBEDDING_DIM) -> list[float]:
@@ -229,7 +236,7 @@ async def sync(
229236
"page_idx": chunk.get("page_idx"),
230237
}
231238
self._backend.upsert_point(
232-
chunk_id=chunk_id,
239+
point_id=chunk_id,
233240
embedding=embedding,
234241
payload=payload,
235242
)

aperag/indexing/worker_factory.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,16 @@ class _QdrantPointBackend:
9494
vision modalities consume.
9595
9696
All three modalities share the same Qdrant-shaped surface (delete
97-
by ``(document_id, parse_version)`` filter, upsert by
98-
``chunk_id``/``point_id``). One adapter class satisfies the three
99-
Protocols structurally — no inheritance needed because the
100-
Protocols are ``@runtime_checkable``.
97+
by ``(document_id, parse_version)`` filter, upsert by ``point_id``).
98+
One adapter class satisfies the three Protocols structurally — no
99+
inheritance needed because the Protocols are ``@runtime_checkable``.
100+
101+
The ``point_id`` is the canonical name for the Qdrant point
102+
identifier across all three modalities (per Wave 6 #34 schema
103+
unification). Vector workers pass the parser-emitted ``chunk_id``
104+
here; summary uses ``summary:<doc>:<v>``; vision uses
105+
``vision:<doc>:<v>:<image_id>``. Each worker controls its own
106+
payload — the adapter does not inject anything.
101107
"""
102108

103109
def __init__(self, *, connector: Any) -> None:
@@ -120,43 +126,34 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int:
120126
def upsert_point(
121127
self,
122128
*,
123-
chunk_id: str | None = None,
124-
point_id: str | None = None,
129+
point_id: str,
125130
embedding: list[float],
126131
payload: dict[str, Any],
127132
) -> None:
128-
# Vector modality calls with ``chunk_id``; summary / vision
129-
# modalities call with ``point_id``. Both end up as the
130-
# underlying Qdrant point id.
131-
#
132133
# Qdrant only accepts unsigned-integer or UUID point ids. The
133134
# T1.1 parser produces chunk ids of the form
134135
# ``<sha-prefix>:<index>`` (e.g. ``f766a946575ec3b4:0000``)
135136
# which Qdrant rejects with HTTP 400 "is not a valid point
136137
# ID". Map the caller-supplied string id into a deterministic
137138
# UUID5 so retries land on the same point and the upsert is
138-
# idempotent — and stash the original id in the payload so
139-
# the read path can still surface it to clients.
139+
# idempotent. The caller's payload is forwarded verbatim — it
140+
# already carries whatever modality-specific identifier the
141+
# read path needs (vector keeps ``chunk_id`` in payload for
142+
# hybrid-dedup; summary uses ``summary_text``; vision uses
143+
# ``image_id``).
140144
import uuid
141145

142146
from aperag.vectorstore.dto import VectorPoint
143147

144-
identifier = chunk_id if chunk_id is not None else point_id
145-
if not identifier:
146-
raise ValueError("upsert_point requires either chunk_id or point_id")
147-
identifier = str(identifier)
148-
qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, identifier))
149-
merged_payload = dict(payload)
150-
# Preserve the original id under a stable key so the read
151-
# path can echo it back; ``chunk_id`` is what vector modality
152-
# already writes so we don't overwrite it.
153-
merged_payload.setdefault("chunk_id", identifier)
148+
if not point_id:
149+
raise ValueError("upsert_point requires point_id")
150+
qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, str(point_id)))
154151
self._connector.upsert(
155152
[
156153
VectorPoint(
157154
id=qdrant_id,
158155
vector=list(embedding),
159-
payload=merged_payload,
156+
payload=dict(payload),
160157
)
161158
]
162159
)

tests/integration/test_cleanup_fan_out.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ async def factory_closure(row: DocumentIndex):
136136
pv_a = "doc-del-a-pv0001"[:16]
137137
_insert_row(engine, document_id="doc-del", parse_version=pv_a, modality=Modality.VECTOR)
138138
factory_backend.upsert_point(
139-
chunk_id="chunk-fac",
139+
point_id="chunk-fac",
140140
embedding=[0.0] * 16,
141141
payload={
142142
"document_id": "doc-del",
@@ -150,7 +150,7 @@ async def factory_closure(row: DocumentIndex):
150150
},
151151
)
152152
fallback_backend.upsert_point(
153-
chunk_id="chunk-fallback",
153+
point_id="chunk-fallback",
154154
embedding=[0.0] * 16,
155155
payload={
156156
"document_id": "doc-del",
@@ -245,7 +245,7 @@ async def factory_closure(row: DocumentIndex):
245245
backend = backends[(col, modality)]
246246
if modality is Modality.VECTOR:
247247
backend.upsert_point(
248-
chunk_id=chunk_id,
248+
point_id=chunk_id,
249249
embedding=[0.0] * 16,
250250
payload={
251251
"document_id": document_id,
@@ -360,7 +360,7 @@ def test_orphan_parse_version_gc_uses_worker_factory(engine):
360360
_set_updated_at(engine, old_id, _utcnow() - timedelta(hours=2))
361361

362362
backend.upsert_point(
363-
chunk_id="chunk-orphan",
363+
point_id="chunk-orphan",
364364
embedding=[0.0] * 16,
365365
payload={
366366
"document_id": "doc-orphan",
@@ -415,7 +415,7 @@ def test_collection_deletion_cascade_uses_worker_factory(engine):
415415
collection_id="col-doomed",
416416
)
417417
backend.upsert_point(
418-
chunk_id="chunk-col",
418+
point_id="chunk-col",
419419
embedding=[0.0] * 16,
420420
payload={
421421
"document_id": "doc-col",
@@ -489,7 +489,7 @@ def test_workers_map_only_path_unchanged_for_existing_callers(engine):
489489
pv = "compatparsever01"[:16]
490490
_insert_row(engine, document_id="doc-compat", parse_version=pv, modality=Modality.VECTOR)
491491
backend.upsert_point(
492-
chunk_id="chunk-compat",
492+
point_id="chunk-compat",
493493
embedding=[0.0] * 16,
494494
payload={
495495
"document_id": "doc-compat",

0 commit comments

Comments
 (0)