Skip to content

refactor(phase8 #40): Celery orchestration helpers (Option D — keep thin wrapper, extract pure logic)#1668

Merged
earayu merged 1 commit into
mainfrom
chenyexuan/phase8-celery-sync-helper
Apr 25, 2026
Merged

refactor(phase8 #40): Celery orchestration helpers (Option D — keep thin wrapper, extract pure logic)#1668
earayu merged 1 commit into
mainfrom
chenyexuan/phase8-celery-sync-helper

Conversation

@earayu
Copy link
Copy Markdown
Collaborator

@earayu earayu commented Apr 25, 2026

Summary

Phase 8 task #40 — extract pure-sync orchestration logic from 4 Celery tasks (trigger_create/delete/update_indexes_workflow + notify_workflow_complete) into aperag/tasks/orchestration_helpers.py, while keeping thin @app.task wrappers (chain/chord composition + chord callback contract require Celery-registered tasks).

Per 符炫炜 D4 (refined) canonical msg=71e37a49 — Approve Option D.

Scope

New module: aperag/tasks/orchestration_helpers.py (~180 LOC)

  • is_skipped_payload(payload) — public mirror of the previous private check
  • build_dispatched_workflow_result(async_result) — handoff payload constructor
  • build_index_workflow_chord(...) — pure orchestration; builds chord(group(parallel_index_tasks), completion_callback) without calling .apply_async() (caller dispatches)
  • aggregate_workflow_results(...) — pure aggregation; classifies successful / failed / skipped, derives TaskStatus, returns WorkflowResult
  • build_workflow_failure_result(...) — uniform failure path

config/celery_tasks.py (-103 net LOC)

  • trigger_create_indexes_workflow (line ~898) → thin wrapper: is_skipped_payload check + build_index_workflow_chord(...) + apply_async + build_dispatched_workflow_result
  • trigger_delete_indexes_workflow (line ~948) → same shape, no skip check
  • trigger_update_indexes_workflow (line ~984) → same as create
  • notify_workflow_complete (line ~1030) → thin wrapper: aggregate_workflow_results(...) + .to_dict(); failure path uses build_workflow_failure_result(...)
  • Removed dead private helpers: _is_skipped_payload, _build_dispatched_workflow_result
  • Removed unused imports: chord, group, IndexTaskResult, TaskStatus, WorkflowResult

Invariants preserved (per Option D scope)

  • ✅ chain/chord composition unchanged: chain(parse_document_task.s(), trigger_*.s()) + chord(group(create_index_task.s(...)), notify_workflow_complete.s(...)) shape identical
  • ✅ task name strings unchanged (Celery beat / queue routing / DB task_id references intact)
  • reconciler.py / scheduler.py call sites untouched (fire-and-forget semantics preserved)
  • BaseIndexTask base class still attached to notify_workflow_complete
  • @current_app.task decorators retained (chord callback contract requires broker-registered task)

Why not literal "convert to sync" (D4 letter)

Discussed in #40 thread (msg=24313706 + msg=67f2af93 + msg=71e37a49):

  • Chord callback notify_workflow_complete MUST be Celery task — Celery dispatches it via the broker
  • trigger_* are part of chain(parse, trigger) — chain steps must be broker-dispatched
  • Eliminating the decorators would either (a) require blocking the caller on parse_document_task.delay().get() (defeats reconciler.py fire-and-forget) or (b) restructure to a pattern with no clean Celery primitive

符炫炜 confirmed Option D = D4 real intent (msg=71e37a49): "把业务/orchestration 逻辑从 Celery 装饰器里解耦,让它可单测、可 reasoning、不绑定 worker context".

Test plan

  • uv run ruff check aperag/tasks/orchestration_helpers.py config/celery_tasks.py — clean
  • uv run pytest tests/unit_test/test_modularization_boundaries.py -x -q20/20 pass
  • uv run pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing listUserQuotas FE adapter test from PR #1664>577 pass / 29 skip
  • grep chord( + notify_workflow_complete in config/celery_tasks.py shows composition preserved
  • Celery worker integration test (defer to deploy smoke; helpers are pure logic with no broker call)

Pre-existing issue not introduced by this PR

tests/unit_test/test_web_typed_api_contract.py::test_phase1_fe_complete_identity_auth_admin_audit_adapter_boundary fails on listUserQuotas — this is the pre-existing baseline failure documented in Bryce msg=c1a4bd86 (PR #1665) caused by FE PR #1664 trim. Independent of this PR's diff.

Diff stats

2 files changed, 247 insertions(+), 165 deletions(-)

Ghost-check

§1 5-Layer whitelist intact. No DB schema, API, or behavior changes. No Celery primitive removed. Pure refactor: same wire shape, logic relocated for testability.

Review

Per Phase 8 cadence (Weston blocker-level minimal CR + 符炫炜 canonical drift). Option D scope already locked by 符炫炜 msg=71e37a49 + PM msg=829dddd6.

…on D, rebased on #1669)

Per 符炫炜 D4 (refined) canonical msg=71e37a49 + Option I scope lock
msg=287c8512 + clarification msg=8dc7fe50 — keep thin @app.task wrappers
(chain/chord composition + chord callback contract require Celery-
registered tasks) but extract their orchestration business logic into
a pure sync helpers module so it can be reasoned about and unit-tested
without a Celery worker.

This is the rebase of v1 (commit 67196dd, dropped) onto post-#1669 main
(`fcc3b2d` Phase 3 indexing task domainization). Changes from v1:
- helper file moved from `aperag/tasks/orchestration_helpers.py` to
  `aperag/domains/indexing/orchestration.py` (co-located with the
  indexing domain Celery tasks now living in
  `aperag/domains/indexing/tasks.py` post-#1669)
- pre-existing `aperag/tasks/{reconciler,scheduler,collection,
  document,models,processing_lease,utils}.py` (~2428 LOC of cross-
  domain task scheduling infra) untouched per Option I scope
- target Celery wrappers refactored in their new home
  `aperag/domains/indexing/tasks.py` (was `config/celery_tasks.py`
  pre-#1669)

Scope (Option D):
- New module `aperag/domains/indexing/orchestration.py` with 5 pure
  sync helpers:
  - `is_skipped_payload(payload)` — public mirror of the previous
    private check
  - `build_dispatched_workflow_result(async_result)` — handoff payload
  - `build_index_workflow_chord(...)` — pure orchestration; builds
    `chord(group(parallel_index_tasks), completion_callback)` without
    calling `.apply_async()` (caller dispatches)
  - `aggregate_workflow_results(...)` — pure logic; classifies
    successful / failed / skipped, derives TaskStatus, builds
    WorkflowResult
  - `build_workflow_failure_result(...)` — uniform failure path
- `aperag/domains/indexing/tasks.py` — 4 Celery wrappers
  (`trigger_create/delete/update_indexes_workflow` +
  `notify_workflow_complete`) reduced to thin call-site
- Removed now-dead private helpers `_is_skipped_payload` and
  `_build_dispatched_workflow_result`
- Removed unused imports: `chord`, `group`, `IndexTaskResult`,
  `TaskStatus`, `WorkflowResult` from indexing tasks.py top-level

Invariants preserved (per Option D scope):
- chain/chord composition unchanged (`chain(parse, trigger).delay()`
  and `chord(group(...), notify_workflow_complete.s(...))` shape
  identical)
- task name strings unchanged via explicit `name=` kwarg in
  `@current_app.task` decorator (`config.celery_tasks.trigger_*`,
  `config.celery_tasks.notify_workflow_complete`) — Celery beat /
  queue routing / DB task_id references intact
- `aperag/tasks/reconciler.py` / `aperag/tasks/scheduler.py` etc.
  untouched (fire-and-forget semantics preserved per Option I)
- `BaseIndexTask` base class still attached to
  `notify_workflow_complete`
- @current_app.task decorators retained (chord callback contract
  requires broker-registered task)

Gates:
- ruff check aperag/domains/indexing/orchestration.py
  aperag/domains/indexing/tasks.py: clean
- pytest tests/unit_test/test_modularization_boundaries.py -x -q:
  20/20 pass
- pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing
  listUserQuotas FE adapter test from PR #1664>: 577 pass / 29 skip
- grep `chord(` + `notify_workflow_complete.s(` shows composition
  preserved in indexing/tasks.py

Ghost-check: §1 5-Layer whitelist intact. No DB schema, API, or
behavior changes. No Celery primitive removed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@earayu earayu force-pushed the chenyexuan/phase8-celery-sync-helper branch from 67196dd to 15eeb99 Compare April 25, 2026 04:32
@earayu earayu merged commit a95f355 into main Apr 25, 2026
0 of 2 checks passed
@earayu earayu deleted the chenyexuan/phase8-celery-sync-helper branch April 25, 2026 04:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant