Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
930cf20
feat(celery T3.1 commit 1/5): alembic migration — drop legacy + ALTER…
earayu Apr 26, 2026
9aef2a7
feat(celery T3.1 commit 2/5): dispatcher.py + cleanup path C (collect…
earayu Apr 26, 2026
5325788
feat(celery T3.2 + T3.3): SearchResultMetadata §G.5 + private-deploy …
earayu Apr 26, 2026
c941526
feat(celery T3.1 commit 3/5): Config.INDEXING_MODE + FastAPI lifespan…
earayu Apr 26, 2026
c44a2de
test(celery T3.3 follow-up): add vision-only inline mode smoke
earayu Apr 26, 2026
e602f1d
feat(celery T3.1 commit 4b/5 step 1): move extract_keywords helper to…
earayu Apr 26, 2026
39aad24
feat(celery T3.1 commit 4a): migrate 7 production callers to §F.1 schema
earayu Apr 26, 2026
a076a13
feat(celery T3.1 commit 4b/5 step 2): Pattern A/B/C migration of 6 kn…
earayu Apr 26, 2026
5583e63
feat(celery T3.1 commit 5 Part 1): inline processing_lease helpers + …
earayu Apr 27, 2026
94c1d2c
feat(celery T3.1 commit 5 Part 2 chunk 1a): inline CollectionSummaryC…
earayu Apr 27, 2026
5b691db
feat(celery T3.1 commit 5 Part 2 chunk 1b): simplify task bodies + Pa…
earayu Apr 27, 2026
4173af4
feat(celery T3.1 commit 5 Part 2 chunk 2): hard-delete legacy Celery …
earayu Apr 27, 2026
d254dd6
feat(celery T3.1 commit 5 Part 2 chunk 3): wire new-API + final grep …
earayu Apr 27, 2026
5d50ca5
fix(celery T3.1 alembic drift): promote DocumentIndex.{collection_id,…
earayu Apr 27, 2026
144c3f1
fix(celery T3.1 e2e): purge existing triple before INSERT in rebuild …
earayu Apr 27, 2026
143e045
fix(celery T3.1 e2e): drop celery service refs in e2e runners + CI wo…
earayu Apr 27, 2026
579b32a
fix(celery T3.1 worker_factory): replace _placeholder_worker_factory …
earayu Apr 27, 2026
9b5ba76
fix(celery T3.1 view-model): align Document per-modality status Liter…
earayu Apr 27, 2026
e1f2325
fix(celery T3.1 evaluation cross-loop): run_evaluation_run as corouti…
earayu Apr 27, 2026
30b3489
fix(celery T3.1 evaluation hurl): relax timing-sensitive assertions f…
earayu Apr 27, 2026
8ca396f
fix(celery T3.1 parser-wiring): sync invoke parse_document before dis…
earayu Apr 27, 2026
a11df3c
fix(celery T3.1 qdrant id): UUID5-derive Qdrant point id from chunk_i…
earayu Apr 27, 2026
4b0eaf3
fix(celery T3.1 graph-gating): explicit Wave 3 gate for graph modalit…
earayu Apr 27, 2026
6858842
fix(celery T3.1 closing): align fulltext writer fields to retrieval s…
earayu Apr 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/e2e-http-smoke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
if: failure()
run: |
docker compose -f docker-compose.yml ps || true
docker compose -f docker-compose.yml logs --no-color api celeryworker celerybeat postgres redis qdrant es || true
docker compose -f docker-compose.yml logs --no-color api postgres redis qdrant es || true

- name: Stop Compose stack
if: always()
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
if: failure()
run: |
docker compose -f docker-compose.yml ps || true
docker compose -f docker-compose.yml logs --no-color api celeryworker celerybeat postgres redis qdrant es || true
docker compose -f docker-compose.yml logs --no-color api postgres redis qdrant es || true

- name: Stop Compose stack
if: always()
Expand Down
19 changes: 6 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ help:
@printf " make stack-logs Tail stack logs\n\n"
@printf "Services\n"
@printf " make serve-api Run backend API locally\n"
@printf " make serve-worker Run celery worker locally\n"
@printf " make serve-beat Run celery beat locally\n"
@printf " make serve-flower Run flower locally\n"
@printf " make serve-web Run frontend locally\n\n"
@printf "Tests\n"
@printf " make test-all Run unit + integration + pytest E2E suites\n"
Expand Down Expand Up @@ -171,19 +168,15 @@ stack-logs:
##################################################

# Local development services
.PHONY: serve-api serve-web serve-worker serve-flower serve-beat
# Wave 3 T3.1 chunk 3: ``serve-worker`` / ``serve-beat`` / ``serve-flower``
# targets removed alongside the Celery infrastructure deletion. The
# in-process ``aperag.indexing`` runtime (worker pool + reconciler +
# cleanup loops) is spawned by the FastAPI lifespan when ``serve-api``
# starts, so no separate worker / beat / monitoring command is needed.
.PHONY: serve-api serve-web
serve-api: db-migrate
uvicorn aperag.app:app --host 0.0.0.0 --log-config scripts/uvicorn-log-config.yaml

serve-worker:
celery -A config.celery worker -B -l INFO --pool=threads --concurrency=16

serve-beat:
celery -A config.celery beat -l INFO

serve-flower:
celery -A config.celery flower --conf/flowerconfig.py

serve-web:
cd ./web && yarn dev

Expand Down
125 changes: 122 additions & 3 deletions aperag/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio # noqa: E402

from aperag.config import settings
from aperag.observability import (
bind_observability_context,
Expand Down Expand Up @@ -205,9 +207,126 @@ async def initialize_user_quota(self, user_id: str) -> None:


async def combined_lifespan(app: FastAPI):
"""Combined lifespan manager for the API and MCP server."""
async with mcp_app.lifespan(app):
yield
"""Combined lifespan manager for the API + MCP server + indexing runtime.

The indexing runtime (Wave 3 T3.1 wire-in) launches the per-modality
worker pool + reconciler + cleanup loop only when
``settings.indexing_mode == "async"``. In ``inline`` mode the
upload-side ``dispatch_indexing(mode=INLINE)`` runs derive + sync +
cutover within the request coroutine, so no background workers are
needed (per design pack §L Tier-1 deployment).

The runtime is started as background asyncio tasks (not subprocesses)
so a single FastAPI process owns its workers — matches the §E.2
"one Python process per modality" architecture for the in-process
deployment topology. Tier-3 horizontal scale-out runs separate
worker processes; that wiring lives in a future ops launcher.
"""
indexing_runtime_tasks: list[asyncio.Task[None]] = []
indexing_shutdown: asyncio.Event | None = None

if settings.indexing_mode == "async":
# Lazy imports — pulling the indexing runtime symbols at app
# start-up time keeps ``aperag/app.py`` cold-start fast and
# confines the import surface to the wired branch.
from aperag.config import sync_engine
from aperag.indexing import (
InMemoryWorkQueue,
run_cleanup_loop,
run_fulltext_worker,
run_graph_worker,
run_reconcile_loop,
run_summary_worker,
run_vector_worker,
run_vision_worker,
)

indexing_shutdown = asyncio.Event()
# Single process-local InMemoryWorkQueue is the default
# transport for the in-process topology. Tier-3 production
# swaps this for a Redis-backed WorkQueue (RPUSH / BLPOP) by
# injecting via app state at deploy time — Wave 3 follow-up.
queue = InMemoryWorkQueue()
engine = sync_engine

# Worker factory — per-task lazy construction. The async
# worker entrypoints (``run_*_worker``) call this closure on
# every BLPOP'd payload to materialise the concrete
# :class:`ModalityWorker` for that ``(collection, modality)``
# pair. ``ProductionWorkerFactory`` resolves the collection
# row, picks the right backend (Qdrant / Elasticsearch +
# configured embedder / completion model), and constructs the
# worker — all backed by the existing build helpers
# (``get_collection_embedding_service_sync`` /
# ``get_vector_db_connector`` / ``get_object_store``) so this
# is composition, not re-implementation. Construction failures
# raise :class:`WorkerFactoryError`; the orchestrator runner
# catches that and finalises the row FAILED so §I.2 retry
# picks it up. Per architect msg=7782ebe0.
from aperag.indexing.worker_factory import ProductionWorkerFactory

worker_factory = ProductionWorkerFactory(engine=engine)

worker_kwargs = dict(
engine=engine,
queue=queue,
worker_factory=worker_factory,
shutdown=indexing_shutdown,
)
indexing_runtime_tasks.append(asyncio.create_task(run_vector_worker(**worker_kwargs)))
indexing_runtime_tasks.append(asyncio.create_task(run_fulltext_worker(**worker_kwargs)))
indexing_runtime_tasks.append(asyncio.create_task(run_graph_worker(**worker_kwargs)))
indexing_runtime_tasks.append(asyncio.create_task(run_summary_worker(**worker_kwargs)))
indexing_runtime_tasks.append(asyncio.create_task(run_vision_worker(**worker_kwargs)))
indexing_runtime_tasks.append(
asyncio.create_task(
run_reconcile_loop(
engine=engine,
queue=queue,
shutdown=indexing_shutdown,
)
)
)
indexing_runtime_tasks.append(
asyncio.create_task(
run_cleanup_loop(
engine=engine,
workers={}, # T3.3 follow-up: pass concrete worker registry
shutdown=indexing_shutdown,
)
)
)

# Stash on app state so request handlers can dispatch via the
# same queue / engine the workers consume.
app.state.indexing_queue = queue
app.state.indexing_engine = engine

# Service-layer callers (aperag/domains/**) consume the same
# triple through the process-wide IndexingRuntime singleton —
# they don't have a Request handle for app.state. Workers map
# is empty in the async-default deployment; T3.3 follow-up
# populates concrete factories per modality.
from aperag.indexing.runtime import IndexingRuntime, set_runtime

set_runtime(IndexingRuntime(engine=engine, queue=queue, workers={}))
else:
app.state.indexing_queue = None
app.state.indexing_engine = None
from aperag.indexing.runtime import set_runtime

set_runtime(None)

try:
async with mcp_app.lifespan(app):
yield
finally:
if indexing_shutdown is not None:
indexing_shutdown.set()
if indexing_runtime_tasks:
# Drain in-flight worker / reconciler / cleanup loops with
# a short grace window so a SIGTERM does not abort mid-task.
await asyncio.gather(*indexing_runtime_tasks, return_exceptions=True)


# Create the main FastAPI app with combined lifespan
Expand Down
Loading
Loading