Skip to content

Commit 057409f

Browse files
earayuclaude
andcommitted
feat(celery T2.2): tenant-scoped quota + bulkhead limits + flaky race-test hardening
Per docs/modularization/indexing-redesign-design-pack.md §H.5 + §H.6 + architect msg=8420f12a + ruling 2 simplification (msg=492315e8). The Wave 2 runtime needs two layers of resource protection above the per-modality worker pool: a quota layer that throttles upstream LLM/embedding calls per (resource_class, tenant_scope_key), and a bulkhead layer that places hard wall-time / size ceilings on every worker call regardless of tenant. This commit lands both as orchestrator-callable helpers; the orchestrator/reconciler integration (invoke acquire() before LLM/embedding; wrap calls in bulkhead_timeout) is chenyexuan's T2.1 follow-up scope per architect msg=492315e8 ruling 2. Surface (aperag/indexing/quota.py, ~411 lines): * QuotaPolicy — frozen dataclass holding (capacity, refill_rate_per_sec) with __post_init__ validation that both are > 0. Standard token-bucket parameters; 60/1.0 matches §H.5 baseline of "60 LLM calls / minute sustained". * QuotaPolicyRegistry — maps (resource_class, tenant_scope_key) → QuotaPolicy with two-tier lookup: exact match first, then ("default" tenant_scope_key fallback for that resource class). Raises KeyError if neither configured (worker hitting an unconfigured resource class is a deployment bug; surface loud). Resource classes are independent — declaring an "llm" default does NOT implicitly cap "embedding". * QuotaBackend — async Protocol every backend implements. Single acquire() method that blocks until one token is granted, respecting the refill rate. * InMemoryQuotaBackend — Python token bucket with one asyncio.Lock per (resource_class, tenant_scope_key) bucket key. Suitable for the §L Tier-1 single-process deployment (INDEXING_MODE=inline) and as the canonical correctness oracle for unit tests. Uses an injectable clock fixture so tests can advance time deterministically. * RedisQuotaBackend — Redis-backed implementation with an atomic Lua script (HMGET → refill math → HMSET in one round-trip) so concurrent multi-process workers never overshoot the bucket capacity. The Lua script returns (acquired, wait_seconds); callers retry after asyncio.sleep(wait_seconds) when refused. Uses crc32 slot hashing on tenant_scope_key to bound Redis-key cardinality. Compatible with both sync and async redis-py (await-if-awaitable shim). Surface (aperag/indexing/limits.py, ~162 lines): * LLM_CALL_TIMEOUT_SECONDS = 60.0 (§H.6) * EMBEDDING_CALL_TIMEOUT_SECONDS = 30.0 (§H.6) * UPLOAD_MAX_BYTES = 50 * 1024 * 1024 (§H.6) * bulkhead_timeout(seconds, label=...) — async context manager wrapping asyncio.timeout; logs the label on TimeoutError so per-call telemetry distinguishes timeouts at "graph.derive.llm" from "vector.derive.embedding" without scattering log strings across modality workers. * reject_if_oversize(content_length, label=...) — boundary-time ValueError when over UPLOAD_MAX_BYTES; called by upload handlers before parser allocates. Tests (tests/unit_test/indexing/test_t2_2_quota_limits.py, 20 cases): * QuotaPolicy validation — capacity / refill_rate must be > 0; fractional values accepted. * QuotaPolicyRegistry — exact-match wins over default; KeyError when neither configured; per-resource-class default isolation. * InMemoryQuotaBackend — initial bucket starts at capacity; drained bucket blocks until refill (under fake clock + monkey- patched asyncio.sleep, deterministic timing assertion); per-tenant isolation; per-resource-class isolation; refill capped at capacity after long idle (1hr → 3 tokens not 3600); default fallback routes unknown tenant through shared policy. * Bulkhead — bulkhead_timeout completes within budget vs raises TimeoutError when exceeded; reject_if_oversize accepts at boundary, rejects strictly over cap; constants pinned to design pack values. * RedisQuotaBackend — construction-only smoke (Protocol surface), Lua-script invocation against fake script (acquire when token available, retry-loop when wait_seconds returned). Real-Redis integration deferred to T2.3 load-test infra (real Redis fixture). Hardening for huangheng msg=2b20974b informational + architect msg=8420f12a follow-up directive: * tests/unit_test/indexing/test_t1_2_graph.py:_RaceProvocateurStore now takes race_count parameter. The lock-protected test stays at race_count=1 (no barrier; single-writer-at-a-time naturally). The no-lock negative-control flips to race_count=2: an asyncio.Event barrier holds both writers at the post-read / pre-write window until BOTH have reached it, then releases. This pins the race deterministically — the previous asyncio.sleep(0) was scheduler-dependent and the test flaked under heavy CI load (huangheng observed 1/2 fail in a full-suite run). Verified 5/5 deterministic passes locally post-fix. aperag/indexing/__init__.py — re-export the T2.2 quota + limits surfaces so the indexing package surface stays uniform across the 3 wave layers. Lint + tests: * uvx ruff check + ruff format --check across aperag/indexing/ + tests/unit_test/indexing/: clean. * pytest tests/unit_test/indexing/ + tests/unit_test/test_phase3_reexport_audit.py: 101 passed, 0 failed (62 pre-existing Wave 1+2 + 20 new T2.2 + modified race-test path that now passes 5/5 deterministically). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7f51d44 commit 057409f

5 files changed

Lines changed: 1091 additions & 17 deletions

File tree

aperag/indexing/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@
5454
parse_kg_jsonl,
5555
serialize_kg_jsonl,
5656
)
57+
from aperag.indexing.limits import (
58+
EMBEDDING_CALL_TIMEOUT_SECONDS,
59+
LLM_CALL_TIMEOUT_SECONDS,
60+
UPLOAD_MAX_BYTES,
61+
bulkhead_timeout,
62+
reject_if_oversize,
63+
)
5764
from aperag.indexing.models import DocumentIndex, IndexStatus, Modality
5865
from aperag.indexing.object_store import (
5966
InMemoryObjectStore,
@@ -108,6 +115,14 @@
108115
parse_document,
109116
read_chunks,
110117
)
118+
from aperag.indexing.quota import (
119+
DEFAULT_TENANT_FALLBACK,
120+
InMemoryQuotaBackend,
121+
QuotaBackend,
122+
QuotaPolicy,
123+
QuotaPolicyRegistry,
124+
RedisQuotaBackend,
125+
)
111126
from aperag.indexing.reconciler import (
112127
HEARTBEAT_STALE_SECONDS,
113128
RECONCILE_BATCH_SIZE,
@@ -235,4 +250,17 @@
235250
"find_orphan_parse_versions",
236251
"cleanup_orphan_parse_versions",
237252
"run_cleanup_loop",
253+
# Quota (T2.2 §H.5)
254+
"DEFAULT_TENANT_FALLBACK",
255+
"QuotaPolicy",
256+
"QuotaPolicyRegistry",
257+
"QuotaBackend",
258+
"InMemoryQuotaBackend",
259+
"RedisQuotaBackend",
260+
# Bulkhead limits (T2.2 §H.6)
261+
"LLM_CALL_TIMEOUT_SECONDS",
262+
"EMBEDDING_CALL_TIMEOUT_SECONDS",
263+
"UPLOAD_MAX_BYTES",
264+
"bulkhead_timeout",
265+
"reject_if_oversize",
238266
]

aperag/indexing/limits.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
# Copyright 2025 ApeCloud, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Bulkhead limits — celery T2.2.
16+
17+
Per ``docs/modularization/indexing-redesign-design-pack.md`` §H.6, every
18+
worker process applies tenant-blind hard ceilings on three resources:
19+
20+
* **LLM call timeout** — 60 seconds. A long-running graph LLM call
21+
is the most expensive operation per §E.4 and the most likely to
22+
hang on rate-limit / retry storms; a 60s ceiling keeps a single
23+
stuck call from monopolising the worker slot.
24+
* **Embedding call timeout** — 30 seconds. Embeddings are smaller +
25+
cheaper than LLM completions, and a longer wait usually means an
26+
upstream-saturated provider rather than a single slow request.
27+
The shorter ceiling lets the orchestrator's retry / backpressure
28+
loop kick in faster.
29+
* **Upload size cap** — 50 MB. Documents above this size will not
30+
fit through the parser → chunks → embedding pipeline within the
31+
per-document SLO; rejecting at upload boundary is cheaper than
32+
hitting an OOM mid-parse.
33+
34+
These are §H.6 "defense in depth" limits — they apply *in addition*
35+
to the §H.5 per-tenant quota gates in :mod:`aperag.indexing.quota`.
36+
A quota acquire might let an LLM call through; the bulkhead timeout
37+
still kicks in if the call itself stalls. The two layers compose.
38+
39+
The module exposes the constants, plus :func:`bulkhead_timeout` —
40+
an async context manager that wraps :func:`asyncio.timeout` so
41+
worker code can lock the timeout in one line. Centralising the
42+
context manager here means a future change to the timeout strategy
43+
(e.g., per-modality overrides, structured cancellation telemetry)
44+
lands in one place rather than scattered across modality workers.
45+
"""
46+
47+
from __future__ import annotations
48+
49+
import asyncio
50+
import logging
51+
from collections.abc import AsyncIterator
52+
from contextlib import asynccontextmanager
53+
54+
logger = logging.getLogger(__name__)
55+
56+
57+
# ---------------------------------------------------------------------
58+
# §H.6 hard ceilings — overridable per deployment via env / config but
59+
# *not* per tenant. These are bulkheads, not quotas; quotas live in
60+
# :mod:`aperag.indexing.quota`.
61+
# ---------------------------------------------------------------------
62+
63+
64+
LLM_CALL_TIMEOUT_SECONDS: float = 60.0
65+
"""Maximum wall time for a single LLM completion call.
66+
67+
The graph modality's entity / relation extraction dominates this
68+
budget; vector / fulltext / summary / vision modalities never approach
69+
it. A worker that exceeds the budget surfaces a :class:`TimeoutError`
70+
which the orchestrator's failure-handling path treats as a transient
71+
failure (retry with §I.2 backoff) rather than a permanent one.
72+
"""
73+
74+
EMBEDDING_CALL_TIMEOUT_SECONDS: float = 30.0
75+
"""Maximum wall time for a single embedding API call.
76+
77+
Shorter than the LLM ceiling because embeddings are cheaper + more
78+
predictable; a long wait usually means provider-side saturation that
79+
the §H.5 quota system or the orchestrator's backoff should handle
80+
rather than the worker holding a slot open.
81+
"""
82+
83+
UPLOAD_MAX_BYTES: int = 50 * 1024 * 1024
84+
"""Maximum source-document size accepted at the upload boundary.
85+
86+
The parser → chunks → embedding pipeline assumes per-document body
87+
fits in single-process memory plus a few hundred MB of headroom for
88+
LLM context windows. Documents above this size should be split at
89+
ingestion before they enter the pipeline.
90+
"""
91+
92+
93+
# ---------------------------------------------------------------------
94+
# Async context manager wrapper — keeps callers' code one-liner.
95+
# ---------------------------------------------------------------------
96+
97+
98+
@asynccontextmanager
99+
async def bulkhead_timeout(seconds: float, *, label: str | None = None) -> AsyncIterator[None]:
100+
"""Run the inner block with a hard wall-time ceiling.
101+
102+
On timeout, the ``asyncio.timeout`` context manager cancels the
103+
inner task and surfaces a :class:`TimeoutError`. The caller
104+
typically catches this in the modality worker's ``derive`` /
105+
``sync`` and re-raises after recording the failure metric — see
106+
:func:`aperag.indexing.observability.emit_index_failure`.
107+
108+
``label`` is included in the timeout log line so a flood of
109+
timeouts in one modality is identifiable from a single metric.
110+
Callers should pass a stable identifier such as
111+
``"graph.derive.llm_extraction"``.
112+
113+
Example::
114+
115+
async with bulkhead_timeout(LLM_CALL_TIMEOUT_SECONDS, label="graph.derive.llm"):
116+
await llm_client.complete(prompt)
117+
"""
118+
try:
119+
async with asyncio.timeout(seconds):
120+
yield
121+
except TimeoutError:
122+
logger.warning(
123+
"bulkhead timeout — %s exceeded %.1fs",
124+
label or "<unlabeled>",
125+
seconds,
126+
)
127+
raise
128+
129+
130+
# ---------------------------------------------------------------------
131+
# Boundary check for upload size.
132+
# ---------------------------------------------------------------------
133+
134+
135+
def reject_if_oversize(content_length: int, *, label: str | None = None) -> None:
136+
"""Raise :class:`ValueError` if ``content_length`` exceeds the §H.6
137+
upload ceiling.
138+
139+
Surface the cap at the upload boundary rather than mid-parser so
140+
a 100 MB PDF does not trigger an OOM after the parser has already
141+
started loading it. The caller is the upload-handler request
142+
path, *not* the modality workers (they trust the boundary).
143+
"""
144+
if content_length > UPLOAD_MAX_BYTES:
145+
logger.warning(
146+
"upload rejected — %s exceeds %d bytes (got %d)",
147+
label or "<unlabeled>",
148+
UPLOAD_MAX_BYTES,
149+
content_length,
150+
)
151+
raise ValueError(
152+
f"upload exceeds {UPLOAD_MAX_BYTES} byte ceiling: {content_length} bytes ({label or 'document'})"
153+
)
154+
155+
156+
__all__ = [
157+
"LLM_CALL_TIMEOUT_SECONDS",
158+
"EMBEDDING_CALL_TIMEOUT_SECONDS",
159+
"UPLOAD_MAX_BYTES",
160+
"bulkhead_timeout",
161+
"reject_if_oversize",
162+
]

0 commit comments

Comments
 (0)