|
| 1 | +# Copyright 2025 ApeCloud, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""task #17 sub-task #22 — multi-document concurrent upload e2e load test. |
| 16 | +
|
| 17 | +Acceptance gate for the API/Worker hard-cut deployment: |
| 18 | +- API pod must remain responsive (auth + collection list latency |
| 19 | + unaffected) while N documents flow through the indexing pipeline. |
| 20 | +- All N documents must reach ``ACTIVE`` for the enabled modalities |
| 21 | + within a wall-time budget that proves worker throughput is not |
| 22 | + bottlenecked by API request handling. |
| 23 | +- DB connection count must stay under the configured pool budget |
| 24 | + (asserted via PG ``pg_stat_activity`` count from a side channel — |
| 25 | + out of scope for this script; see ``tests/load/test_pool_budget.py`` |
| 26 | + in @Planetegg's lane). |
| 27 | +
|
| 28 | +Skipped by default; runs against a deployed ApeRAG stack. Set |
| 29 | +``RUN_TASK_17_E2E=1`` and the e2e_pytest config env vars (model |
| 30 | +providers + base url) to run. |
| 31 | +
|
| 32 | +Owners: @Planetegg main task #22 / @chenyexuan co-implementer for |
| 33 | +this script. Cross-reviewed with @huangzhangshu (task #18 deployment |
| 34 | +gates) and @ziang (task #19 cleanup SoT). |
| 35 | +""" |
| 36 | + |
| 37 | +from __future__ import annotations |
| 38 | + |
| 39 | +import concurrent.futures |
| 40 | +import os |
| 41 | +import time |
| 42 | +import uuid |
| 43 | +from http import HTTPStatus |
| 44 | + |
| 45 | +import httpx |
| 46 | +import pytest |
| 47 | + |
| 48 | +from tests.e2e_pytest.config import ( |
| 49 | + API_BASE_URL, |
| 50 | + EMBEDDING_MODEL_CUSTOM_PROVIDER, |
| 51 | + EMBEDDING_MODEL_NAME, |
| 52 | + EMBEDDING_MODEL_PROVIDER, |
| 53 | +) |
| 54 | + |
| 55 | +# --------------------------------------------------------------------- |
| 56 | +# Tunables — keep low for local-dev defaults; CI / ops override via env |
| 57 | +# so the same script powers both quick smoke and 100-doc burst runs. |
| 58 | +# --------------------------------------------------------------------- |
| 59 | + |
| 60 | +DOC_COUNT: int = int(os.getenv("TASK_17_E2E_DOC_COUNT", "20")) |
| 61 | +"""Number of documents to upload concurrently per run.""" |
| 62 | + |
| 63 | +UPLOAD_CONCURRENCY: int = int(os.getenv("TASK_17_E2E_UPLOAD_CONCURRENCY", "10")) |
| 64 | +"""How many in-flight upload requests to keep against the API. The API |
| 65 | +must keep ``/health/live`` and ``/api/v2/auth/user`` stable while this |
| 66 | +load runs — that is the cascading-failure-mode the hard cut prevents.""" |
| 67 | + |
| 68 | +POLL_BUDGET_SECONDS: float = float(os.getenv("TASK_17_E2E_POLL_BUDGET_SECONDS", "300")) |
| 69 | +"""Wall-time ceiling from upload-confirm to all-modalities-ACTIVE. |
| 70 | +Production SLO is 30 minutes for 100 docs (graph LLM extraction |
| 71 | +dominates). The default 5 min budget here covers DOC_COUNT=20 with |
| 72 | +embedding + fulltext + graph_facts enabled. Override for larger runs.""" |
| 73 | + |
| 74 | +POLL_INTERVAL_SECONDS: float = float(os.getenv("TASK_17_E2E_POLL_INTERVAL_SECONDS", "5")) |
| 75 | + |
| 76 | +API_HEALTH_PROBE_INTERVAL_SECONDS: float = 2.0 |
| 77 | +"""How often to sample ``/health/live`` while documents are indexing. |
| 78 | +Any single sample > 500ms or any non-200 response constitutes a hard |
| 79 | +cut regression — the API must never be blocked by worker pressure |
| 80 | +once the deployments are split.""" |
| 81 | + |
| 82 | +API_HEALTH_PROBE_LATENCY_BUDGET_SECONDS: float = 0.5 |
| 83 | + |
| 84 | +REQUIRES_DEPLOYMENT_RUN_TOKEN = "RUN_TASK_17_E2E" |
| 85 | + |
| 86 | +pytestmark = pytest.mark.skipif( |
| 87 | + os.getenv(REQUIRES_DEPLOYMENT_RUN_TOKEN) != "1", |
| 88 | + reason=( |
| 89 | + f"deployment-aware e2e; set {REQUIRES_DEPLOYMENT_RUN_TOKEN}=1 + " |
| 90 | + "tests/e2e_pytest config env vars to run against a live stack." |
| 91 | + ), |
| 92 | +) |
| 93 | + |
| 94 | + |
| 95 | +# --------------------------------------------------------------------- |
| 96 | +# Fixtures |
| 97 | +# --------------------------------------------------------------------- |
| 98 | + |
| 99 | + |
| 100 | +@pytest.fixture |
| 101 | +def concurrent_collection(client): |
| 102 | + """A collection with the modalities exercised by the burst run. |
| 103 | +
|
| 104 | + Vector + fulltext + graph_facts cover the three pipelines that |
| 105 | + contend for the worker pool; vision + summary are off by default |
| 106 | + so a CI run without GPU / vision provider can still exercise the |
| 107 | + deployment gate. |
| 108 | + """ |
| 109 | + payload = { |
| 110 | + "title": f"task17 burst {uuid.uuid4().hex[:8]}", |
| 111 | + "type": "document", |
| 112 | + "config": { |
| 113 | + "source": "system", |
| 114 | + "enable_vector": True, |
| 115 | + "enable_fulltext": True, |
| 116 | + "enable_knowledge_graph": True, |
| 117 | + "enable_summary": False, |
| 118 | + "enable_vision": False, |
| 119 | + "embedding": { |
| 120 | + "model": EMBEDDING_MODEL_NAME, |
| 121 | + "model_service_provider": EMBEDDING_MODEL_PROVIDER, |
| 122 | + "custom_llm_provider": EMBEDDING_MODEL_CUSTOM_PROVIDER, |
| 123 | + }, |
| 124 | + }, |
| 125 | + } |
| 126 | + resp = client.post("/api/v2/collections", json=payload) |
| 127 | + assert resp.status_code == HTTPStatus.OK, resp.text |
| 128 | + coll = resp.json() |
| 129 | + yield coll |
| 130 | + client.delete(f"/api/v2/collections/{coll['id']}") |
| 131 | + |
| 132 | + |
| 133 | +# --------------------------------------------------------------------- |
| 134 | +# Helpers |
| 135 | +# --------------------------------------------------------------------- |
| 136 | + |
| 137 | + |
| 138 | +def _build_doc_body(idx: int) -> bytes: |
| 139 | + """Each document is unique enough that vector / fulltext / graph |
| 140 | + extraction must actually run — pure repeat content would let |
| 141 | + parse_version dedup short-circuit the worker path.""" |
| 142 | + return ( |
| 143 | + f"# task #17 burst document {idx}\n\n" |
| 144 | + f"This synthetic document number {idx} carries unique content so " |
| 145 | + f"the indexing pipeline cannot collapse parse_version dedup. " |
| 146 | + f"It mentions Alice-{idx} talking to Bob-{idx} about Project-{idx} " |
| 147 | + f"so the graph extractor produces at least one entity triple per doc.\n\n" |
| 148 | + f"## Section\n\n" |
| 149 | + f"Paragraph two of document {idx}, with content suitable for " |
| 150 | + f"chunking-window splitter.\n" |
| 151 | + ).encode("utf-8") |
| 152 | + |
| 153 | + |
| 154 | +def _upload_one(client: httpx.Client, collection_id: str, idx: int) -> str: |
| 155 | + files = {"files": (f"task17-burst-{idx:04d}.txt", _build_doc_body(idx), "text/plain")} |
| 156 | + resp = client.post(f"/api/v2/collections/{collection_id}/documents", files=files) |
| 157 | + assert resp.status_code == HTTPStatus.OK, f"upload {idx} failed: {resp.text}" |
| 158 | + items = resp.json()["items"] |
| 159 | + assert len(items) == 1 |
| 160 | + return items[0]["id"] |
| 161 | + |
| 162 | + |
| 163 | +def _all_indexes_active(item: dict, *, require_graph: bool) -> bool: |
| 164 | + """A document is fully indexed once every enabled modality has |
| 165 | + transitioned to ACTIVE. Graph is gated by ``require_graph`` so |
| 166 | + collections without graph still exercise the same poll loop.""" |
| 167 | + if item.get("vector_index_status") != "ACTIVE": |
| 168 | + return False |
| 169 | + if item.get("fulltext_index_status") != "ACTIVE": |
| 170 | + return False |
| 171 | + if require_graph and item.get("graph_index_status") != "ACTIVE": |
| 172 | + return False |
| 173 | + return True |
| 174 | + |
| 175 | + |
| 176 | +def _probe_health_during_burst(probes: list[tuple[float, float, int]], stop_token: dict) -> None: |
| 177 | + """Background sampler: hits ``/health/live`` every 2s while the |
| 178 | + document burst runs and records (timestamp, latency, status_code). |
| 179 | + Run this in a worker thread; main thread sets ``stop_token['stop']`` |
| 180 | + when the burst finishes.""" |
| 181 | + base = API_BASE_URL.rstrip("/") |
| 182 | + with httpx.Client(timeout=2.0) as probe: |
| 183 | + while not stop_token.get("stop"): |
| 184 | + t0 = time.monotonic() |
| 185 | + try: |
| 186 | + resp = probe.get(f"{base}/health/live") |
| 187 | + latency = time.monotonic() - t0 |
| 188 | + probes.append((time.time(), latency, resp.status_code)) |
| 189 | + except httpx.RequestError: |
| 190 | + latency = time.monotonic() - t0 |
| 191 | + probes.append((time.time(), latency, -1)) |
| 192 | + time.sleep(API_HEALTH_PROBE_INTERVAL_SECONDS) |
| 193 | + |
| 194 | + |
| 195 | +# --------------------------------------------------------------------- |
| 196 | +# Test |
| 197 | +# --------------------------------------------------------------------- |
| 198 | + |
| 199 | + |
| 200 | +def test_concurrent_doc_upload_indexes_under_budget(client, concurrent_collection): |
| 201 | + """Multi-document burst: upload + confirm + wait-for-ACTIVE. |
| 202 | +
|
| 203 | + Asserts: |
| 204 | + - All ``DOC_COUNT`` documents reach ACTIVE for vector / fulltext |
| 205 | + / graph_facts within ``POLL_BUDGET_SECONDS``. |
| 206 | + - ``/health/live`` stays 200 with p95 latency under |
| 207 | + ``API_HEALTH_PROBE_LATENCY_BUDGET_SECONDS`` throughout the burst |
| 208 | + — this is the hard cut acceptance gate (API not blocked by |
| 209 | + worker pressure once the deployments are split). |
| 210 | + - No upload returns 5xx or times out at the API entry point. |
| 211 | + """ |
| 212 | + collection_id = concurrent_collection["id"] |
| 213 | + |
| 214 | + # ---- Phase 1: parallel uploads ---- |
| 215 | + upload_started = time.monotonic() |
| 216 | + document_ids: list[str] = [] |
| 217 | + with concurrent.futures.ThreadPoolExecutor(max_workers=UPLOAD_CONCURRENCY) as pool: |
| 218 | + futures = [pool.submit(_upload_one, client, collection_id, i) for i in range(DOC_COUNT)] |
| 219 | + for fut in concurrent.futures.as_completed(futures): |
| 220 | + document_ids.append(fut.result()) |
| 221 | + upload_elapsed = time.monotonic() - upload_started |
| 222 | + assert len(document_ids) == DOC_COUNT |
| 223 | + # Upload itself must not take embarrassingly long — the API path |
| 224 | + # writes intent rows + enqueues, no heavy work. 30s/20-docs is a |
| 225 | + # generous ceiling that catches API-side regressions. |
| 226 | + assert upload_elapsed < 30.0, ( |
| 227 | + f"upload phase took {upload_elapsed:.1f}s for {DOC_COUNT} docs — API write path regressed?" |
| 228 | + ) |
| 229 | + |
| 230 | + # ---- Phase 2: poll for indexing completion + sample API health ---- |
| 231 | + health_probes: list[tuple[float, float, int]] = [] |
| 232 | + stop_token = {"stop": False} |
| 233 | + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as probe_pool: |
| 234 | + probe_pool.submit(_probe_health_during_burst, health_probes, stop_token) |
| 235 | + |
| 236 | + deadline = time.monotonic() + POLL_BUDGET_SECONDS |
| 237 | + active_doc_ids: set[str] = set() |
| 238 | + last_seen: dict[str, dict] = {} |
| 239 | + while time.monotonic() < deadline and len(active_doc_ids) < DOC_COUNT: |
| 240 | + resp = client.get( |
| 241 | + f"/api/v2/collections/{collection_id}/documents", |
| 242 | + params={"page_size": DOC_COUNT * 2}, |
| 243 | + ) |
| 244 | + assert resp.status_code == HTTPStatus.OK, resp.text |
| 245 | + for item in resp.json()["items"]: |
| 246 | + last_seen[item["id"]] = item |
| 247 | + if _all_indexes_active(item, require_graph=True): |
| 248 | + active_doc_ids.add(item["id"]) |
| 249 | + if len(active_doc_ids) < DOC_COUNT: |
| 250 | + time.sleep(POLL_INTERVAL_SECONDS) |
| 251 | + |
| 252 | + stop_token["stop"] = True |
| 253 | + |
| 254 | + # ---- Phase 3: assertions ---- |
| 255 | + missing = [did for did in document_ids if did not in active_doc_ids] |
| 256 | + assert not missing, ( |
| 257 | + f"{len(missing)}/{DOC_COUNT} documents did not reach ACTIVE within " |
| 258 | + f"{POLL_BUDGET_SECONDS}s budget. Stuck states: " |
| 259 | + f"{[(did, last_seen.get(did, {})) for did in missing[:3]]}" |
| 260 | + ) |
| 261 | + |
| 262 | + # Health probe gate: all samples must be 200 + under the latency |
| 263 | + # budget. The hard cut deployment promise is that worker pressure |
| 264 | + # never blocks the API; if any probe sample violates this, the |
| 265 | + # deployment split has not actually isolated the runtimes. |
| 266 | + failed_probes = [p for p in health_probes if p[2] != 200] |
| 267 | + assert not failed_probes, ( |
| 268 | + f"/health/live returned non-200 during burst: {[(t, lat, code) for t, lat, code in failed_probes[:5]]}" |
| 269 | + ) |
| 270 | + slow_probes = [p for p in health_probes if p[1] > API_HEALTH_PROBE_LATENCY_BUDGET_SECONDS] |
| 271 | + assert len(slow_probes) <= max(1, len(health_probes) // 20), ( |
| 272 | + f"/health/live latency violated {API_HEALTH_PROBE_LATENCY_BUDGET_SECONDS}s " |
| 273 | + f"budget on {len(slow_probes)}/{len(health_probes)} samples — " |
| 274 | + "API event loop is being blocked by worker activity." |
| 275 | + ) |
0 commit comments