refactor(phase8 #40): Celery orchestration helpers (Option D — keep thin wrapper, extract pure logic)#1668
Merged
Merged
Conversation
…on D, rebased on #1669) Per 符炫炜 D4 (refined) canonical msg=71e37a49 + Option I scope lock msg=287c8512 + clarification msg=8dc7fe50 — keep thin @app.task wrappers (chain/chord composition + chord callback contract require Celery- registered tasks) but extract their orchestration business logic into a pure sync helpers module so it can be reasoned about and unit-tested without a Celery worker. This is the rebase of v1 (commit 67196dd, dropped) onto post-#1669 main (`fcc3b2d` Phase 3 indexing task domainization). Changes from v1: - helper file moved from `aperag/tasks/orchestration_helpers.py` to `aperag/domains/indexing/orchestration.py` (co-located with the indexing domain Celery tasks now living in `aperag/domains/indexing/tasks.py` post-#1669) - pre-existing `aperag/tasks/{reconciler,scheduler,collection, document,models,processing_lease,utils}.py` (~2428 LOC of cross- domain task scheduling infra) untouched per Option I scope - target Celery wrappers refactored in their new home `aperag/domains/indexing/tasks.py` (was `config/celery_tasks.py` pre-#1669) Scope (Option D): - New module `aperag/domains/indexing/orchestration.py` with 5 pure sync helpers: - `is_skipped_payload(payload)` — public mirror of the previous private check - `build_dispatched_workflow_result(async_result)` — handoff payload - `build_index_workflow_chord(...)` — pure orchestration; builds `chord(group(parallel_index_tasks), completion_callback)` without calling `.apply_async()` (caller dispatches) - `aggregate_workflow_results(...)` — pure logic; classifies successful / failed / skipped, derives TaskStatus, builds WorkflowResult - `build_workflow_failure_result(...)` — uniform failure path - `aperag/domains/indexing/tasks.py` — 4 Celery wrappers (`trigger_create/delete/update_indexes_workflow` + `notify_workflow_complete`) reduced to thin call-site - Removed now-dead private helpers `_is_skipped_payload` and `_build_dispatched_workflow_result` - Removed unused imports: `chord`, `group`, `IndexTaskResult`, `TaskStatus`, `WorkflowResult` from indexing tasks.py top-level Invariants preserved (per Option D scope): - chain/chord composition unchanged (`chain(parse, trigger).delay()` and `chord(group(...), notify_workflow_complete.s(...))` shape identical) - task name strings unchanged via explicit `name=` kwarg in `@current_app.task` decorator (`config.celery_tasks.trigger_*`, `config.celery_tasks.notify_workflow_complete`) — Celery beat / queue routing / DB task_id references intact - `aperag/tasks/reconciler.py` / `aperag/tasks/scheduler.py` etc. untouched (fire-and-forget semantics preserved per Option I) - `BaseIndexTask` base class still attached to `notify_workflow_complete` - @current_app.task decorators retained (chord callback contract requires broker-registered task) Gates: - ruff check aperag/domains/indexing/orchestration.py aperag/domains/indexing/tasks.py: clean - pytest tests/unit_test/test_modularization_boundaries.py -x -q: 20/20 pass - pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing listUserQuotas FE adapter test from PR #1664>: 577 pass / 29 skip - grep `chord(` + `notify_workflow_complete.s(` shows composition preserved in indexing/tasks.py Ghost-check: §1 5-Layer whitelist intact. No DB schema, API, or behavior changes. No Celery primitive removed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
67196dd to
15eeb99
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 8 task #40 — extract pure-sync orchestration logic from 4 Celery tasks (
trigger_create/delete/update_indexes_workflow+notify_workflow_complete) intoaperag/tasks/orchestration_helpers.py, while keeping thin@app.taskwrappers (chain/chord composition + chord callback contract require Celery-registered tasks).Per 符炫炜 D4 (refined) canonical msg=71e37a49 — Approve Option D.
Scope
New module:
aperag/tasks/orchestration_helpers.py(~180 LOC)is_skipped_payload(payload)— public mirror of the previous private checkbuild_dispatched_workflow_result(async_result)— handoff payload constructorbuild_index_workflow_chord(...)— pure orchestration; buildschord(group(parallel_index_tasks), completion_callback)without calling.apply_async()(caller dispatches)aggregate_workflow_results(...)— pure aggregation; classifies successful / failed / skipped, derivesTaskStatus, returnsWorkflowResultbuild_workflow_failure_result(...)— uniform failure pathconfig/celery_tasks.py(-103 net LOC)trigger_create_indexes_workflow(line ~898) → thin wrapper:is_skipped_payloadcheck +build_index_workflow_chord(...)+apply_async+build_dispatched_workflow_resulttrigger_delete_indexes_workflow(line ~948) → same shape, no skip checktrigger_update_indexes_workflow(line ~984) → same as createnotify_workflow_complete(line ~1030) → thin wrapper:aggregate_workflow_results(...)+.to_dict(); failure path usesbuild_workflow_failure_result(...)_is_skipped_payload,_build_dispatched_workflow_resultchord,group,IndexTaskResult,TaskStatus,WorkflowResultInvariants preserved (per Option D scope)
chain(parse_document_task.s(), trigger_*.s())+chord(group(create_index_task.s(...)), notify_workflow_complete.s(...))shape identicalreconciler.py/scheduler.pycall sites untouched (fire-and-forget semantics preserved)BaseIndexTaskbase class still attached tonotify_workflow_complete@current_app.taskdecorators retained (chord callback contract requires broker-registered task)Why not literal "convert to sync" (D4 letter)
Discussed in #40 thread (msg=24313706 + msg=67f2af93 + msg=71e37a49):
notify_workflow_completeMUST be Celery task — Celery dispatches it via the brokertrigger_*are part ofchain(parse, trigger)— chain steps must be broker-dispatchedparse_document_task.delay().get()(defeatsreconciler.pyfire-and-forget) or (b) restructure to a pattern with no clean Celery primitive符炫炜 confirmed Option D = D4 real intent (msg=71e37a49): "把业务/orchestration 逻辑从 Celery 装饰器里解耦,让它可单测、可 reasoning、不绑定 worker context".
Test plan
uv run ruff check aperag/tasks/orchestration_helpers.py config/celery_tasks.py— cleanuv run pytest tests/unit_test/test_modularization_boundaries.py -x -q— 20/20 passuv run pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing listUserQuotas FE adapter test from PR #1664>— 577 pass / 29 skipchord(+notify_workflow_completeinconfig/celery_tasks.pyshows composition preservedPre-existing issue not introduced by this PR
tests/unit_test/test_web_typed_api_contract.py::test_phase1_fe_complete_identity_auth_admin_audit_adapter_boundaryfails onlistUserQuotas— this is the pre-existing baseline failure documented in Bryce msg=c1a4bd86 (PR #1665) caused by FE PR #1664 trim. Independent of this PR's diff.Diff stats
2 files changed, 247 insertions(+), 165 deletions(-)Ghost-check
§1 5-Layer whitelist intact. No DB schema, API, or behavior changes. No Celery primitive removed. Pure refactor: same wire shape, logic relocated for testability.
Review
Per Phase 8 cadence (Weston blocker-level minimal CR + 符炫炜 canonical drift). Option D scope already locked by 符炫炜 msg=71e37a49 + PM msg=829dddd6.