Skip to content

Commit fdad09e

Browse files
earayuclaude
andcommitted
feat(celery Wave 5 P2 chunk 4): vision callsite rewrite + chunk 4b gate self-disable verify
Per §G.2.5.1 spec amend final piece: rewire `_build_vision_worker._embed` to call `EmbeddingService.embed_image(image_bytes, alt_text)` (chunk 1) with the actual image bytes the parser persisted (chunk 2 wrote `derived/parse_<v>/vision/images/<image_id>.<ext>` + a JSONL descriptor at `vision/source.jsonl`). The chunk 4b vision gate self-disables when the operator flips `Model.supports_multimodal_embedding=True` (chunk 3). `aperag/indexing/worker_factory.py`: * `_embed(image_id, alt_text, image_bytes=None)` — when image_bytes is provided, route to `embedding_service.embed_image(image_bytes, alt_text)`. None falls back to the legacy text-concat path so the T1 simulator + tests that hand the worker synthetic JSON keep working. * Gate-raise message reframed: drops "Wave 4 wiring" phrasing (now Wave 5 wiring is land), names the typed `Model.supports_multimodal_embedding` flag so an operator can fix the config directly. `aperag/indexing/vision.py`: * `VisionModality.derive` accepts both source-path formats: the legacy single-JSON-array shape (T1 simulator / pre-Wave-5 tests) AND the new JSONL-with-image-path shape (parser chunk 2 output). Format detection is by first non-whitespace byte (`[` → JSON array, else → JSONL). * `_load_image_bytes(record)` reads the descriptor's `image_path` through the object store; missing blob logs a warning and returns None (embedder still runs on the alt-text/id placeholder digest) so a partial parser write doesn't block the whole derive cycle. * `_placeholder_embedding(..., image_bytes=None)` mirrors the new embedder signature; placeholder ignores bytes. * Embedder Protocol is widened: `(image_id, alt_text, image_bytes=None)`. `tests/integration/test_full_indexing_pipeline.py`: * Renamed Layer 1 `test_phase1_vision_modality_raises_wave4_wiring_gate` → `..._gate_raises_when_embedder_not_multimodal`. Asserts the reframed message names `multimodal embedding model` + `supports_multimodal_embedding` flag. * New positive-path Layer 1 `test_phase1_vision_modality_gate_self_disables_when_embedder_multimodal`: with `is_multimodal()=True`, the factory builds a vision worker without raising — pins the chunk 4 gate-self-disable contract. * Layer 2 e2e assertion: vision modality may be ACTIVE (when CI fixture has multimodal embedder configured) OR FAILED with a gate marker including `supports_multimodal_embedding`. OR-on-marker tolerance kept for transition state. `tests/unit_test/indexing/test_t1_4_summary_vision.py`: 3 new tests covering JSONL descriptor with image_path (bytes loaded + forwarded), missing-blob graceful fallback, and legacy JSON-array backward compat. Production-readiness 三类: - must-be-real: real `embed_image` callsite + real bytes load from parser's descriptor - may-be-gated: legacy text-concat fallback + simulator JSON format preserved for tests - fully-resolves: §G.2.5.1 spec items 1+2+3 all wired end-to-end + chunk 4b vision gate self-disable verify (Wave 5 P2 closure) Wave 5 P2 (T7 multimodal vision-LLM 3-item bundle) closed. The chunk 4b vision gate self-disables when an operator configures a multimodal embedder; default-off behaviour preserved for text-only collections. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 03ff9c5 commit fdad09e

4 files changed

Lines changed: 334 additions & 47 deletions

File tree

aperag/indexing/vision.py

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,21 @@ def points_for_document(self, document_id: str, parse_version: str | None = None
112112
return sorted(out, key=lambda r: r["point_id"])
113113

114114

115-
def _placeholder_embedding(image_id: str, alt_text: str, dim: int = SIMULATOR_VISION_EMBEDDING_DIM) -> list[float]:
115+
def _placeholder_embedding(
116+
image_id: str,
117+
alt_text: str,
118+
image_bytes: bytes | None = None,
119+
dim: int = SIMULATOR_VISION_EMBEDDING_DIM,
120+
) -> list[float]:
121+
"""Deterministic synthetic embedding for tests + the simulator.
122+
123+
Wave 5 P2 chunk 4: the optional ``image_bytes`` parameter mirrors
124+
the production embedder signature (real multimodal embedders consume
125+
bytes). The placeholder ignores it — the digest is over
126+
``image_id|alt_text`` so re-running the simulator produces the same
127+
vector for the same record.
128+
"""
129+
del image_bytes # placeholder ignores actual bytes
116130
digest = hashlib.sha256(f"{image_id}|{alt_text}".encode("utf-8")).digest()
117131
repeat = (dim + len(digest) - 1) // len(digest)
118132
expanded = (digest * repeat)[:dim]
@@ -144,11 +158,21 @@ async def derive(
144158
) -> DeriveResult:
145159
"""Extract images, run vision-LLM, persist manifest atomically.
146160
147-
T1 simulator contract: ``source_path`` is a JSON file in the
148-
object store containing ``[{"image_id": ..., "alt_text": ...,
149-
"page_idx": int|None, "bbox": [...]|None}, ...]``. The real
150-
T2.x pipeline replaces this with PDF extraction; the
151-
``manifest.jsonl`` schema is the contract that must not change.
161+
Two source-path formats are supported (the production parser
162+
wrote the second one starting Wave 5 P2 chunk 2):
163+
164+
* **Legacy simulator** (single JSON array): a JSON list at
165+
``source_path`` holding ``[{"image_id": ..., "alt_text": ...,
166+
"page_idx": int|None, "bbox": [...]|None}, ...]``. No image
167+
bytes — the embedder gets ``image_bytes=None`` and falls back
168+
to the alt-text/id placeholder digest.
169+
* **Wave 5 production descriptor** (JSONL with ``image_path``):
170+
one record per line at ``derived/parse_<v>/vision/source.jsonl``
171+
with ``{"image_id", "image_path", "mime_type", "alt_text",
172+
"page_idx", "bbox"}``. The worker loads each image's bytes
173+
from ``image_path`` and hands them to the embedder so a real
174+
multimodal embedding model can produce a real visual vector
175+
(Wave 5 P2 chunk 4 callsite rewrite).
152176
"""
153177
body = read_or_none(self._store, source_path)
154178
if body is None:
@@ -158,12 +182,7 @@ async def derive(
158182
)
159183
return DeriveResult(derived_artifact_path="")
160184

161-
try:
162-
image_records = json.loads(body)
163-
except json.JSONDecodeError as exc:
164-
raise ValueError(f"vision.derive expected JSON list at {source_path}, got non-JSON: {exc}") from exc
165-
if not isinstance(image_records, list):
166-
raise ValueError(f"vision.derive expected JSON list of image records, got {type(image_records).__name__}")
185+
image_records = self._parse_descriptor(body, source_path=source_path)
167186

168187
parts = source_path.split("/")
169188
try:
@@ -179,7 +198,8 @@ async def derive(
179198
for record in image_records:
180199
image_id = record["image_id"]
181200
alt_text = record.get("alt_text", "")
182-
embedding = self._embedder(image_id, alt_text)
201+
image_bytes = self._load_image_bytes(record)
202+
embedding = self._embedder(image_id, alt_text, image_bytes=image_bytes)
183203
entry = {
184204
"image_id": image_id,
185205
"alt_text": alt_text,
@@ -251,6 +271,60 @@ async def sync(
251271
payload=payload,
252272
)
253273

274+
def _parse_descriptor(self, body: bytes, *, source_path: str) -> list[dict]:
275+
"""Decode the source-image descriptor file.
276+
277+
Tolerates both the legacy single-JSON-array shape and the
278+
Wave 5 P2 chunk 2 JSONL-with-image-path shape (parser writes
279+
the latter). The first byte of body picks the format: ``[``
280+
means a JSON array, anything else is JSONL.
281+
"""
282+
text = body.decode("utf-8")
283+
stripped = text.lstrip()
284+
if stripped.startswith("["):
285+
try:
286+
records = json.loads(text)
287+
except json.JSONDecodeError as exc:
288+
raise ValueError(f"vision.derive expected JSON list at {source_path}, got non-JSON: {exc}") from exc
289+
if not isinstance(records, list):
290+
raise ValueError(f"vision.derive expected JSON list of image records, got {type(records).__name__}")
291+
return [record for record in records if isinstance(record, dict)]
292+
records: list[dict] = []
293+
for line in text.splitlines():
294+
if not line.strip():
295+
continue
296+
try:
297+
record = json.loads(line)
298+
except json.JSONDecodeError as exc:
299+
raise ValueError(f"vision.derive expected JSONL at {source_path}, malformed line: {exc}") from exc
300+
if not isinstance(record, dict):
301+
raise ValueError(f"vision.derive expected JSONL records to be objects, got {type(record).__name__}")
302+
records.append(record)
303+
return records
304+
305+
def _load_image_bytes(self, record: dict) -> bytes | None:
306+
"""Load image bytes from the descriptor's ``image_path`` if
307+
present.
308+
309+
Returns ``None`` for legacy simulator records (no ``image_path``
310+
field) so the embedder falls back to its non-bytes path. A
311+
missing-blob error logs but doesn't raise — the embedder still
312+
runs with ``image_bytes=None`` so a partial parser write doesn't
313+
block the whole derive cycle.
314+
"""
315+
image_path = record.get("image_path")
316+
if not image_path:
317+
return None
318+
body = read_or_none(self._store, str(image_path))
319+
if body is None:
320+
logger.warning(
321+
"vision.derive: image_path %s referenced in descriptor but blob missing; "
322+
"embedder falls back to text-only path",
323+
image_path,
324+
)
325+
return None
326+
return body
327+
254328

255329
__all__ = [
256330
"VisionModality",

aperag/indexing/worker_factory.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -442,23 +442,28 @@ def _build_vision_worker(*, collection: Any, object_store: Any) -> ModalityWorke
442442
embedding_service, vector_size = get_collection_embedding_service_sync(collection)
443443
if not embedding_service.is_multimodal():
444444
raise WorkerFactoryError(
445-
"vision modality requires a real multimodal vision-LLM (Wave 4 wiring); "
446-
"current text-only embedder produces fake string-concat vision vectors — "
447-
"set collection.config.enable_vision=false until Wave 4 lands "
448-
"OR configure a multimodal embedding model on the collection's embedding spec"
445+
"vision modality requires a multimodal embedding model "
446+
"(set Model.supports_multimodal_embedding=True on the collection's "
447+
"embedder spec — Voyage Multimodal / Jina v3 / OpenAI multimodal / etc.) "
448+
"OR set collection.config.enable_vision=false to keep the modality off"
449449
)
450450
qdrant_collection = generate_vector_db_collection_name(collection.id)
451451
adaptor = get_vector_db_connector(qdrant_collection, vector_size=vector_size)
452452
backend = _QdrantPointBackend(connector=adaptor.connector)
453453

454-
def _embed(image_id: str, alt_text: str) -> list[float]:
455-
# ``is_multimodal()`` gate above only verifies that operators
456-
# explicitly opted into a multimodal embedder. The body is
457-
# still the Wave 3 string-concat placeholder until T7 replaces
458-
# it with a real image-bytes path (load image from object
459-
# store → multimodal embed); ``embed_query`` of an alt-text
460-
# surrogate is not actual visual indexing.
461-
return embedding_service.embed_query(f"{image_id}|{alt_text}")
454+
def _embed(image_id: str, alt_text: str, image_bytes: bytes | None = None) -> list[float]:
455+
# Wave 5 P2 chunk 4: real callsite rewrite — load the actual
456+
# image bytes from the parser's descriptor (chunk 2 writes
457+
# ``vision/source.jsonl`` with ``image_path``) and call the
458+
# multimodal-aware ``embed_image`` API surface (chunk 1).
459+
# ``image_bytes=None`` only happens on the legacy simulator
460+
# path used by tests — fall back to the text-only embedding so
461+
# those fixtures keep working without standing up real image
462+
# bytes. The chunk 4b gate above already prevents a non-multi-
463+
# modal embedder from reaching this body.
464+
if image_bytes is None:
465+
return embedding_service.embed_query(f"{image_id}|{alt_text}")
466+
return embedding_service.embed_image(image_bytes=image_bytes, alt_text=alt_text)
462467

463468
return VisionModality(backend=backend, store=object_store, embedder=_embed)
464469

tests/integration/test_full_indexing_pipeline.py

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -211,18 +211,19 @@ async def _run() -> None:
211211
engine.dispose()
212212

213213

214-
def test_phase1_vision_modality_raises_wave4_wiring_gate(monkeypatch: pytest.MonkeyPatch):
215-
"""Layer 1 gate invariant: vision modality requires a real
216-
multimodal embedder. The Wave 3 vision gate (Wave 4 backlog #7)
217-
raises ``WorkerFactoryError`` with ``"Wave 4 wiring"`` until T7
218-
lands a multimodal model. Phase 1 smoke pins this — Phase 2 (after
219-
T7) flips it to ACTIVE assertion.
214+
def test_phase1_vision_modality_gate_raises_when_embedder_not_multimodal(monkeypatch: pytest.MonkeyPatch):
215+
"""Layer 1 gate invariant: vision modality requires a multimodal
216+
embedding model. When ``EmbeddingService.is_multimodal()`` is False
217+
the gate raises ``WorkerFactoryError`` with an operator-actionable
218+
message naming the typed `Model.supports_multimodal_embedding`
219+
capability flag (Wave 5 P2 chunk 3) so the operator can fix the
220+
config.
221+
222+
Wave 5 P2 chunk 4 reframed the message — it no longer claims
223+
"Wave 4 wiring" since the multimodal pieces are landed; the gate
224+
now flags an operator-config gap, not a code gap.
220225
"""
221226

222-
# Stub the embedder so the gate reachability is decoupled from
223-
# the model-provider config. The gate compares
224-
# ``embedding_service.is_multimodal()`` — a non-multimodal stub
225-
# exercises the gate; a multimodal stub flips it (Phase 2).
226227
class _StubEmbeddingService:
227228
def is_multimodal(self) -> bool:
228229
return False
@@ -256,7 +257,76 @@ async def _run() -> None:
256257
with pytest.raises(WorkerFactoryError) as exc:
257258
await factory(payload)
258259
msg = str(exc.value)
259-
assert "Wave 4 wiring" in msg
260+
assert "multimodal embedding model" in msg
261+
assert "supports_multimodal_embedding" in msg
262+
263+
asyncio.run(_run())
264+
finally:
265+
engine.dispose()
266+
267+
268+
def test_phase1_vision_modality_gate_self_disables_when_embedder_multimodal(monkeypatch: pytest.MonkeyPatch):
269+
"""Layer 1 positive-path invariant: when the collection's embedder
270+
is configured multimodal (``Model.supports_multimodal_embedding=True``
271+
via Wave 5 P2 chunk 3 → ``EmbeddingService.is_multimodal()=True``),
272+
the chunk 4b vision gate self-disables and ``ProductionWorkerFactory``
273+
builds a vision worker without raising.
274+
275+
Wave 5 P2 chunk 4 acceptance: the gate must self-disable end-to-end
276+
once chunks 1+2+3 land. Chunk 4 wires the callsite; this test pins
277+
that the gate no longer holds back vision when the multimodal
278+
capability is honestly present.
279+
"""
280+
281+
class _StubEmbeddingService:
282+
def is_multimodal(self) -> bool:
283+
return True
284+
285+
def embed_query(self, text: str) -> list[float]:
286+
return [0.0]
287+
288+
def embed_image(self, *, image_bytes: bytes, alt_text: str = "") -> list[float]:
289+
return [0.0]
290+
291+
def _stub_get_embedding_service(_collection: Any) -> tuple[Any, int]:
292+
return _StubEmbeddingService(), 1
293+
294+
monkeypatch.setattr(
295+
"aperag.llm.embed.base_embedding.get_collection_embedding_service_sync",
296+
_stub_get_embedding_service,
297+
)
298+
299+
# Vision builder calls into ``get_vector_db_connector`` to wire a
300+
# Qdrant adaptor. Stub it out so the gate-self-disable invariant
301+
# is decoupled from Qdrant being reachable.
302+
def _stub_connector(*_args: Any, **_kwargs: Any) -> Any:
303+
class _A:
304+
connector = object()
305+
306+
return _A()
307+
308+
monkeypatch.setattr(
309+
"aperag.config.get_vector_db_connector",
310+
_stub_connector,
311+
)
312+
313+
engine = _make_engine()
314+
try:
315+
cid = _seed_collection(engine, enable_vision=True)
316+
row_id = _seed_pending_row(engine, modality=Modality.VISION, collection_id=cid)
317+
payload = DispatchPayload(
318+
index_id=row_id,
319+
document_id=f"doc-{Modality.VISION.value}-phase1-active",
320+
parse_version="parse-v1",
321+
modality=Modality.VISION,
322+
source_path="source/path",
323+
collection_id=cid,
324+
)
325+
326+
async def _run() -> None:
327+
factory = ProductionWorkerFactory(engine=engine, object_store=object())
328+
worker = await factory(payload)
329+
assert worker is not None, "vision worker must build when embedder is multimodal"
260330

261331
asyncio.run(_run())
262332
finally:
@@ -452,9 +522,7 @@ async def _run_phase1_workers_until_quiet(
452522
while asyncio.get_event_loop().time() < deadline:
453523
with Session(engine) as session:
454524
rows = list(
455-
session.execute(
456-
sa_select(DocumentIndex).where(DocumentIndex.document_id == document_id)
457-
).scalars()
525+
session.execute(sa_select(DocumentIndex).where(DocumentIndex.document_id == document_id)).scalars()
458526
)
459527
if not rows:
460528
await asyncio.sleep(0.1)
@@ -536,7 +604,6 @@ def test_phase1_full_pipeline_vector_fulltext_summary_active_graph_vision_failed
536604
fixture supports document-delete API access.
537605
"""
538606

539-
540607
from aperag.indexing.dispatcher import DispatchRequest, IndexingMode, dispatch_indexing
541608
from aperag.indexing.parser import ParseConfig, parse_document
542609
from aperag.objectstore.base import get_object_store
@@ -549,7 +616,7 @@ def test_phase1_full_pipeline_vector_fulltext_summary_active_graph_vision_failed
549616
b"# Phase 1 e2e smoke\n\n"
550617
b"This document exercises the canonical Phase 1 contract: "
551618
b"vector + fulltext + summary reach ACTIVE; graph + vision "
552-
b"finalise FAILED with the Wave 4 wiring gate message.\n"
619+
b"finalise per the collection's gate state.\n"
553620
)
554621

555622
async def _run() -> None:
@@ -605,16 +672,28 @@ async def _run() -> None:
605672
)
606673
assert row.is_serving is True
607674

675+
# Wave 5 P2 chunk 4: vision modality may be ACTIVE or
676+
# FAILED depending on whether the e2e fixture's collection
677+
# was bootstrapped with a multimodal embedder. Either is
678+
# acceptable as long as the FAILED case surfaces a gate
679+
# marker (so an operator can fix the config). Graph stays
680+
# gated on a configured completion model — same OR-on-
681+
# marker tolerance as before.
608682
for modality in (Modality.GRAPH, Modality.VISION):
609683
row = finalised[modality]
684+
if modality is Modality.VISION and row.status == IndexStatus.ACTIVE.value:
685+
# Multimodal embedder configured + vision pipeline
686+
# produced a real point set — Wave 5 closure path.
687+
assert row.is_serving is True
688+
continue
610689
assert row.status == IndexStatus.FAILED.value, (
611-
f"modality={modality.value} must finalise FAILED until Wave 5 T7 lands; "
612-
f"actual={row.status}"
690+
f"modality={modality.value} must finalise ACTIVE (when prerequisites met) "
691+
f"or FAILED with a gate marker; actual={row.status}"
613692
)
614693
msg = row.error_message or ""
615694
assert any(
616695
marker in msg
617-
for marker in ("Wave 4 wiring", "completion model", "multimodal")
696+
for marker in ("multimodal", "completion model", "supports_multimodal_embedding", "Wave 4 wiring")
618697
), f"modality={modality.value} FAILED message must surface a gate marker; got {msg!r}"
619698
finally:
620699
engine.dispose()
@@ -657,9 +736,7 @@ async def _run() -> None:
657736
from aperag.indexing.runtime import get_runtime
658737

659738
runtime = get_runtime()
660-
assert runtime is not None and runtime.queue is not None, (
661-
"sweep D Layer 2 requires a live IndexingRuntime"
662-
)
739+
assert runtime is not None and runtime.queue is not None, "sweep D Layer 2 requires a live IndexingRuntime"
663740

664741
object_store = get_object_store()
665742
parsed = parse_document(

0 commit comments

Comments
 (0)