feat(celery Wave 2): runtime + cutover/quota + load test#1727
Merged
Conversation
Wave 2 foundation per design pack §E.2 + §I.2 + §I.3 + §F.5. Layers
the worker-pool / reconciler / cleanup runtime on top of Wave 1
modality workers; T2.2 (Bryce) wires commit_active + quota + bulkhead
on top of this.
Schema (alembic c2e8d5a1f3b9):
- ADD COLUMN collection_id VARCHAR(64) NULL — cleanup-worker GC scoping
- ADD COLUMN source_path TEXT NULL — modality.derive input path,
decouples orchestrator from canonical-layout filename assumptions
(vision modality reads non-canonical synthetic JSON in T1 / could
read PDF page extracts in T2.x without breaking layout)
- INDEX idx_document_index_v2_collection on collection_id
- Both nullable for back-compat with Wave 1 fixture rows; reconciler
logs + skips PENDING rows missing source_path
Orchestrator (aperag/indexing/orchestrator.py, ~570 LOC):
- BLPOP loop (WorkQueue Protocol + InMemoryWorkQueue test fixture +
RedisWorkQueue stub for production wire-in)
- Atomic claim: UPDATE WHERE id=$id AND status IN ('PENDING','FAILED')
→ RUNNING + last_heartbeat=now(); zero-row UPDATE = lost claim →
silent skip (concurrent worker won the race)
- Per-task heartbeat asyncio task (HEARTBEAT_INTERVAL_SECONDS=20)
bumps last_heartbeat; cancelled in finally; tolerates DB hiccups
- §I.2 retry backoff: 30s → 60s → 120s → 240s → 480s, capped at 5
retries; past-budget rows stay FAILED with retry_after=NULL
- §C.7 reschedule semantic: empty derive path → status back to
PENDING without burning retry_count (upstream not ready)
- 5 per-modality entrypoints (run_vector_worker / run_fulltext_worker
/ run_graph_worker / run_summary_worker / run_vision_worker) bind
the §E.2 concurrency caps (16/32/4/4/4)
Reconciler (aperag/indexing/reconciler.py, ~370 LOC):
- 30s loop with 4 idempotent scans
- (1) PENDING dispatch: push payloads to per-modality queue; does
NOT claim (orchestrator's job)
- (2) FAILED retry: flip elapsed-backoff FAILED → PENDING (retry_count
≤ MAX_RETRY_COUNT); past-budget rows stay FAILED
- (3) RUNNING reclaim: flip stale-heartbeat (>60s per §E.4) RUNNING
→ PENDING WITHOUT burning retry_count (worker death ≠ work failure)
- (4) Per-modality cutover (§F.3): for every ACTIVE-but-not-serving
row, demote prior is_serving row + promote new in single TX. The
§F.1 partial unique index guards against orchestrator-level bugs
leaving 2 rows is_serving=TRUE for the same (doc, modality)
Cleanup (aperag/indexing/cleanup.py, ~305 LOC):
- 5min loop, GC orphan parse_versions per §F.5
- Orphan = is_serving=FALSE AND superseded by newer parse_version
AND updated_at < now - 1hr cool-down (cool-down lets cutover
races settle before backend delete)
- Duck-typed backend delete (delete_by_filter for vector/summary/
vision; delete_by_query for fulltext); graph backend has no flat
delete (lineage co-mingled across docs per §D.3) → cleanup logs
warning + skips backend delete + still GCs DB row. Bryce's T2.2
follow-up extends graph with §D.3 lineage-aware cleanup hook
- Document-tombstone path (document.deleted_at) deferred to T2.2
cutover lane (touches document table outside indexing domain)
Tests (tests/unit_test/indexing/test_t2_1_runtime.py, 17 cases):
- Orchestrator: claim → derive → sync → finalize happy path; lost
claim silent skip; failure backoff (30s); §C.7 empty-derive
reschedule without retry burn
- §I.2 backoff schedule lock (30s/60s/120s/240s/480s + cap)
- Reconciler: PENDING dispatch idempotency + missing-source_path
skip; RUNNING reclaim preserves retry_count; FAILED retry flips
elapsed-only; cutover demotes prior + promotes new + per-modality
independence + partial-unique invariant survives
- Cleanup: orphan GC after cool-down + cool-down respect + graph
modality skip with warning (T2.2 follow-up flag)
- E2E smoke: PENDING → reconciler push → orchestrator process →
ACTIVE → reconciler cutover → is_serving=TRUE
- DispatchPayload dict + JSON round-trip (Redis serialization sanity)
Local pytest: 17/17 T2.1 + 64/64 Wave 1 indexing + 2/2 Phase 3 audit
all green (81/81).
Bryce can now layer T2.2 (commit_active helper + quota + bulkhead)
on this branch — orchestrator/reconciler/cleanup write-set is
disjoint from his planned commit_active.py / quota.py / limits.py.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…-test hardening
Per docs/modularization/indexing-redesign-design-pack.md §H.5 + §H.6
+ architect msg=8420f12a + ruling 2 simplification (msg=492315e8).
The Wave 2 runtime needs two layers of resource protection above
the per-modality worker pool: a quota layer that throttles upstream
LLM/embedding calls per (resource_class, tenant_scope_key), and a
bulkhead layer that places hard wall-time / size ceilings on every
worker call regardless of tenant. This commit lands both as
orchestrator-callable helpers; the orchestrator/reconciler integration
(invoke acquire() before LLM/embedding; wrap calls in
bulkhead_timeout) is chenyexuan's T2.1 follow-up scope per architect
msg=492315e8 ruling 2.
Surface (aperag/indexing/quota.py, ~411 lines):
* QuotaPolicy — frozen dataclass holding (capacity,
refill_rate_per_sec) with __post_init__ validation that both are
> 0. Standard token-bucket parameters; 60/1.0 matches §H.5
baseline of "60 LLM calls / minute sustained".
* QuotaPolicyRegistry — maps (resource_class, tenant_scope_key) →
QuotaPolicy with two-tier lookup: exact match first, then
("default" tenant_scope_key fallback for that resource class).
Raises KeyError if neither configured (worker hitting an
unconfigured resource class is a deployment bug; surface loud).
Resource classes are independent — declaring an "llm" default
does NOT implicitly cap "embedding".
* QuotaBackend — async Protocol every backend implements. Single
acquire() method that blocks until one token is granted,
respecting the refill rate.
* InMemoryQuotaBackend — Python token bucket with one asyncio.Lock
per (resource_class, tenant_scope_key) bucket key. Suitable for
the §L Tier-1 single-process deployment (INDEXING_MODE=inline)
and as the canonical correctness oracle for unit tests. Uses an
injectable clock fixture so tests can advance time
deterministically.
* RedisQuotaBackend — Redis-backed implementation with an atomic
Lua script (HMGET → refill math → HMSET in one round-trip) so
concurrent multi-process workers never overshoot the bucket
capacity. The Lua script returns (acquired, wait_seconds);
callers retry after asyncio.sleep(wait_seconds) when refused.
Uses crc32 slot hashing on tenant_scope_key to bound Redis-key
cardinality. Compatible with both sync and async redis-py
(await-if-awaitable shim).
Surface (aperag/indexing/limits.py, ~162 lines):
* LLM_CALL_TIMEOUT_SECONDS = 60.0 (§H.6)
* EMBEDDING_CALL_TIMEOUT_SECONDS = 30.0 (§H.6)
* UPLOAD_MAX_BYTES = 50 * 1024 * 1024 (§H.6)
* bulkhead_timeout(seconds, label=...) — async context manager
wrapping asyncio.timeout; logs the label on TimeoutError so
per-call telemetry distinguishes timeouts at "graph.derive.llm"
from "vector.derive.embedding" without scattering log strings
across modality workers.
* reject_if_oversize(content_length, label=...) — boundary-time
ValueError when over UPLOAD_MAX_BYTES; called by upload handlers
before parser allocates.
Tests (tests/unit_test/indexing/test_t2_2_quota_limits.py, 20 cases):
* QuotaPolicy validation — capacity / refill_rate must be > 0;
fractional values accepted.
* QuotaPolicyRegistry — exact-match wins over default; KeyError
when neither configured; per-resource-class default isolation.
* InMemoryQuotaBackend — initial bucket starts at capacity;
drained bucket blocks until refill (under fake clock + monkey-
patched asyncio.sleep, deterministic timing assertion); per-tenant
isolation; per-resource-class isolation; refill capped at
capacity after long idle (1hr → 3 tokens not 3600); default
fallback routes unknown tenant through shared policy.
* Bulkhead — bulkhead_timeout completes within budget vs raises
TimeoutError when exceeded; reject_if_oversize accepts at
boundary, rejects strictly over cap; constants pinned to design
pack values.
* RedisQuotaBackend — construction-only smoke (Protocol surface),
Lua-script invocation against fake script (acquire when token
available, retry-loop when wait_seconds returned). Real-Redis
integration deferred to T2.3 load-test infra (real Redis fixture).
Hardening for huangheng msg=2b20974b informational +
architect msg=8420f12a follow-up directive:
* tests/unit_test/indexing/test_t1_2_graph.py:_RaceProvocateurStore
now takes race_count parameter. The lock-protected test stays at
race_count=1 (no barrier; single-writer-at-a-time naturally).
The no-lock negative-control flips to race_count=2: an
asyncio.Event barrier holds both writers at the post-read /
pre-write window until BOTH have reached it, then releases. This
pins the race deterministically — the previous asyncio.sleep(0)
was scheduler-dependent and the test flaked under heavy CI load
(huangheng observed 1/2 fail in a full-suite run). Verified
5/5 deterministic passes locally post-fix.
aperag/indexing/__init__.py — re-export the T2.2 quota + limits
surfaces so the indexing package surface stays uniform across the
3 wave layers.
Lint + tests:
* uvx ruff check + ruff format --check across aperag/indexing/ +
tests/unit_test/indexing/: clean.
* pytest tests/unit_test/indexing/ + tests/unit_test/test_phase3_reexport_audit.py:
101 passed, 0 failed (62 pre-existing Wave 1+2 + 20 new T2.2 +
modified race-test path that now passes 5/5 deterministically).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…nup dispatch Three architect rulings (msg=492315e8) applied as a follow-up on top of T2.1 commit 7f51d44: **Ruling 1 — §F.3 cutover MUST be single-TX in worker, not split.** My earlier T2.1 implementation split §F.3 into: - worker writes statement 1 (status=ACTIVE) on sync success - reconciler.reconcile_cutover() writes statements 2+3 (demote+promote) ~30s later on its next cycle That's a drift from the §F.3 explicit lock "Three statements in one transaction" + introduces an ACTIVE-but-not-is_serving inconsistency window the spec disallows in §F.4. Fix: - aperag/indexing/orchestrator.py: replace _finalize_active with _finalize_active_with_cutover — runs all 3 §F.3 statements (status=ACTIVE → demote prior is_serving for (doc, modality) → promote new) inside the same session.begin() block. The §F.1 partial unique index (uniq_document_index_v2_serving) is honoured naturally because statement 2 (demote-FALSE) precedes statement 3 (promote-TRUE) — index never sees two TRUE rows mid-TX. - aperag/indexing/reconciler.py: remove reconcile_cutover() entirely + drop from run_reconcile_loop (now 3 scans: PENDING dispatch + FAILED retry + RUNNING reclaim). Module docstring updated to reference the orchestrator-side cutover with explicit pointer to architect ruling msg=492315e8 Ruling 1. - aperag/indexing/__init__.py: remove reconcile_cutover from exports. - tests: 3 reconciler-cutover tests rewritten as 3 orchestrator-side cutover tests (process_one_task happy path → ACTIVE+is_serving=TRUE in one TX; partial unique invariant; per-modality independence). E2E smoke updated — no longer needs a separate reconcile_cutover step after orchestrator completion. **Ruling 3 — graph cleanup MUST land in T2.1, not punt to T2.2.** T2.2 lane is quota + bulkhead per Ruling 2 simplification; cleanup is squarely chenyexuan's lane. Earlier T2.1 logged a WARN + skipped graph backend cleanup as a "T2.2 follow-up" — that was a scope leak. Fix in aperag/indexing/cleanup.py: - Two distinct entry points with different graph semantics: (A) cleanup_orphan_parse_versions — orphan parse_v GC. For graph, this is a backend no-op because §D.3.6 sync supersede already removed old parse_version's lineage members when the new parse_version was written (per amended §D.3.2 canonical PR #1725 head a0a4799 — sync clears by document_id, not parse_v). Counted under "graph_noop" for telemetry; DB row still GC'd. (B) cleanup_for_deleted_documents — caller-driven, runs when a document is removed. For non-graph: flat backend delete per (document_id, parse_version). For graph: invoke the lineage cleanup loop on the worker's underlying LineageGraphStore via EntityLock (Wave 1 conventions exposed as _store + _entity_lock). Per-document dedup so multiple parse_version rows for the same doc share one lineage cleanup call. - _is_graph_worker uses Modality.GRAPH check (no graph.py import) to avoid pulling Nebula/Neo4j extras into the cleanup module. - _flat_backend_delete_callable renamed for clarity (was _backend_delete_callable); only walks worker._backend (vector / fulltext / summary / vision); explicitly does NOT walk _store. - _cleanup_graph_lineage_for_document implements §D.3 lineage cleanup at the storage layer using duck-typed Wave 1 conventions (find_entity_ids_with_lineage / remove_entity_lineage_member / optional gc_entity_if_orphan + symmetric relation cleanup). Each entity is serialized through entity_lock.acquire() so a concurrent graph sync cannot race. Tests: - test_cleanup_orphan_parse_version_for_graph_is_backend_noop — asserts graph_noop counter increments + backend delete is NOT called + DB row is still GC'd - test_cleanup_for_deleted_documents_removes_non_graph_backend_per_parse_version — vector path: delete_by_filter per parse_version + rows dropped - test_cleanup_for_deleted_documents_calls_graph_lineage_cleanup_once_per_doc — graph path: lineage cleanup runs once per doc regardless of how many parse_version rows; entity GC + relation cleanup invoked - test_cleanup_for_deleted_documents_handles_empty_input - _StubLineageGraphStore + _StubAsyncLock + _GraphLikeWorker test fixtures — duck-typed stand-ins for GraphModalityWorker so we don't need graph extras to test cleanup Local pytest: full indexing suite + Phase 3 audit gate = 104 passed, 0 failed (Wave 1 + T2.1 + T2.2 + T2.1 follow-up all green together on rebased branch on Bryce's commit 057409f). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Per docs/modularization/indexing-redesign-design-pack.md §E.3 +
architect msg=8420f12a Wave 2 acceptance gate. Closes the third and
final task of Wave 2: a load test that proves the full Wave 1 + 2
runtime stack — parser → object_store → orchestrator (claim, derive,
sync, atomic 3-statement cutover per architect ruling msg=492315e8) →
all five modality workers — converges 100 concurrent documents to
``is_serving=TRUE`` for every (doc, modality) pair.
Surface (tests/load/test_100_doc_burst.py, ~530 lines):
* test_100_doc_burst_all_modalities_serving_within_budget — the
canonical Wave 2 gate. Runs against in-memory backends + SQLite
StaticPool so it completes in ~5 s on a developer laptop while
exercising the same orchestrator + cutover path that production
multi-process Redis-backed workers run. Asserts:
- ALL DOC_COUNT × len(Modality) = 500 rows reach is_serving=TRUE
- elapsed wall time < BURST_BUDGET_SECONDS (60s ceiling) — a
regression introducing per-process serialization (e.g., a
hot-path mutex bottlenecking all five modalities) trips this
- every serving row is at status=ACTIVE
- §F.1 partial unique invariant held (no duplicate (doc, modality)
pairs at is_serving=TRUE)
- §J SLI gauges/counters present per modality:
* indexing.index_lag_seconds gauge ≤ 30-min production SLO
* indexing.index_failure_total = 0
* indexing.index_success_total = 500
- InMemoryWorkQueue depth = 0 post-drain for every modality
Marked @pytest.mark.slow so the standard PR-gate suite skips it
(-m "not slow"); the nightly CI job runs -m slow.
* test_smoke_5_doc_burst_all_modalities_serving — same shape with 5
docs and no timing budget. Runs in ~1s, stays in default PR-gate
suite to catch regressions that break the run loop / cutover
entirely (vs just the timing budget).
Implementation notes:
* Per-modality drains run sequentially in the test — SQLite under
StaticPool serializes write transactions anyway, so an
asyncio.gather across modalities offers no real speedup while
introducing flaky cutover-TX races on heavily loaded CI runners.
Production multi-process workers do gather across modalities, but
their DB connections are independent so the gather pattern doesn't
translate cleanly to a single-connection SQLite test.
* Vision modality requires a JSON list source (per
VisionModality.derive contract); the seed phase writes an empty
image-records list under collections/<cid>/documents/<did>/source/
images.json so vision derive completes cleanly without doing real
image extraction. Other modalities all consume chunks.jsonl
(vector + fulltext shared, summary, graph).
* Graph extractor stub returns one EntityRecord per chunk —
deterministic + cheap. Real LLM extraction is out of scope for the
Wave 2 acceptance gate (it's about scheduling + cutover, not
extraction quality).
* doc_lag_starts captures monotonic timestamps at seed time so the
per-(doc, modality) lag emission reflects actual pipeline latency
rather than only the worker's processing time.
Lint + tests:
* uvx ruff check + ruff format --check across aperag/indexing/ +
tests/unit_test/indexing/ + tests/load/: clean.
* pytest tests/unit_test/indexing/ + tests/load/ +
tests/unit_test/test_phase3_reexport_audit.py:
106 passed, 0 failed (62 Wave 1 + 21 T2.1 follow-up + 20 T2.2
+ 2 T2.3 new + Phase 3 audit). 3 consecutive runs all clean.
Wave 2 PR #1727 status (ready for huangheng integrated CR):
* T2.1 (chenyexuan 7f51d44 + follow-up 836fe54)
* T2.2 (Bryce 057409f)
* T2.3 (this commit)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
3 tasks
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
…trieval-modality discriminator (#1728) Bryce T3.2 implementation (PR #1727 follow-up commit 5325788 on chenyexuan/celery-wave3-cutover) flagged a name collision: D10.h already locked SearchResultMetadata.modality as the content-modality field (Literal["text", "image"]). v3 design pack §G.5 narrative re-used `modality` for "which retrieval modality served this hit" (Literal["vector", "fulltext", "graph", "summary", "vision"]), which would shadow the D10.h field. Architect ack of Bryce's chosen disambiguation: rename the new field to `index_modality`. D10.h `modality` (content) preserved. SearchResultMetadata gains three new fields total: parse_version / index_modality / index_state_per_modality. The two are orthogonal — a hit can be (index_modality="vector", modality="text") or (index_modality="vision", modality="image"). Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
8 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Wave 2 mega-PR per architect lock (msg=8420f12a): runtime (worker pool + reconciler + cleanup) + cutover/quota/bulkhead + 100-doc burst load test, accumulated on a single branch for one-shot CR + verdict.
Scope (per §K Wave 2 acceptance):
aperag/indexing/orchestrator.py(BLPOP loop + asyncio semaphore + heartbeat + 5 per-modality entrypoints) +aperag/indexing/reconciler.py(30s loop: PENDING dispatch + FAILED retry + RUNNING reclaim + per-modality cutover) +aperag/indexing/cleanup.py(5min loop: orphan parse_version GC) + alembic schema columns (collection_id,source_path)Design pack canonical: PR #1725 head
a0a47994(§D.3.2 + §J.1 amendments already canonical).T2.1 implementation highlights
§I.2 retry backoff lock — exponential 30s → 60s → 120s → 240s → 480s, capped at 5 retries; past-budget rows stay FAILED (
retry_after=NULL) for operator triage.§E.4 stale-heartbeat reclaim — 60s threshold; reclaimed rows go PENDING WITHOUT burning
retry_count(worker death ≠ work failure).§F.3 per-modality cutover — runs as part of reconciler; demote prior
is_servingrow + promote new in single TX. The §F.1 partial unique index (uniq_document_index_v2_serving) guards against orchestrator-level bugs leaving 2 rows is_serving=TRUE for the same(doc, modality).§C.7 reschedule semantic — empty derive path → status back to PENDING without burning
retry_count(upstream not ready).§F.5 orphan GC — superseded parse_version + 1hr cool-down + duck-typed backend delete (
delete_by_filterfor vector/summary/vision;delete_by_queryfor fulltext). Graph backend skipped with WARN (T2.2 §D.3 lineage cleanup follow-up); DB row still GC'd.Schema migration (alembic
c2e8d5a1f3b9):ADD COLUMN collection_id VARCHAR(64) NULL— cleanup-worker GC scopingADD COLUMN source_path TEXT NULL— modality.derive input path; decouples orchestrator from canonical-layout filename assumptions (vision modality reads non-canonical synthetic JSON in T1 / could read PDF page extracts in T2.x without breaking layout)INDEX idx_document_index_v2_collection on collection_idBranch coordination
This PR stays open and accumulates commits from chenyexuan + Bryce + finisher. The orchestrator / reconciler / cleanup write-set is disjoint from Bryce's planned
commit_active.py/quota.py/limits.py. Once T2.2 + T2.3 land, the PR flips toin_reviewfor @huangheng's full Wave 2 step 0+ / step 0 / step 0''' / cross-lane caller sweep CR + verdict.Wave 2 follow-up tracked in this PR (per architect msg=8420f12a): flaky
test_nebula_race_without_lock_loses_a_writerrace-window hardening — Bryce will addasyncio.sleep(0)between RMW in_RaceProvocateurStoreas part of T2.2 commit (or wherever convenient).Test plan
uv run python -m pytest tests/unit_test/indexing/ tests/unit_test/test_phase3_reexport_audit.py)🤖 Generated with Claude Code