Skip to content

Commit a95f355

Browse files
earayuclaude
andauthored
refactor(phase8 #40 v2): extract indexing orchestration helpers (Option D, rebased on #1669) (#1668)
Per 符炫炜 D4 (refined) canonical msg=71e37a49 + Option I scope lock msg=287c8512 + clarification msg=8dc7fe50 — keep thin @app.task wrappers (chain/chord composition + chord callback contract require Celery- registered tasks) but extract their orchestration business logic into a pure sync helpers module so it can be reasoned about and unit-tested without a Celery worker. This is the rebase of v1 (commit 67196dd, dropped) onto post-#1669 main (`fcc3b2d` Phase 3 indexing task domainization). Changes from v1: - helper file moved from `aperag/tasks/orchestration_helpers.py` to `aperag/domains/indexing/orchestration.py` (co-located with the indexing domain Celery tasks now living in `aperag/domains/indexing/tasks.py` post-#1669) - pre-existing `aperag/tasks/{reconciler,scheduler,collection, document,models,processing_lease,utils}.py` (~2428 LOC of cross- domain task scheduling infra) untouched per Option I scope - target Celery wrappers refactored in their new home `aperag/domains/indexing/tasks.py` (was `config/celery_tasks.py` pre-#1669) Scope (Option D): - New module `aperag/domains/indexing/orchestration.py` with 5 pure sync helpers: - `is_skipped_payload(payload)` — public mirror of the previous private check - `build_dispatched_workflow_result(async_result)` — handoff payload - `build_index_workflow_chord(...)` — pure orchestration; builds `chord(group(parallel_index_tasks), completion_callback)` without calling `.apply_async()` (caller dispatches) - `aggregate_workflow_results(...)` — pure logic; classifies successful / failed / skipped, derives TaskStatus, builds WorkflowResult - `build_workflow_failure_result(...)` — uniform failure path - `aperag/domains/indexing/tasks.py` — 4 Celery wrappers (`trigger_create/delete/update_indexes_workflow` + `notify_workflow_complete`) reduced to thin call-site - Removed now-dead private helpers `_is_skipped_payload` and `_build_dispatched_workflow_result` - Removed unused imports: `chord`, `group`, `IndexTaskResult`, `TaskStatus`, `WorkflowResult` from indexing tasks.py top-level Invariants preserved (per Option D scope): - chain/chord composition unchanged (`chain(parse, trigger).delay()` and `chord(group(...), notify_workflow_complete.s(...))` shape identical) - task name strings unchanged via explicit `name=` kwarg in `@current_app.task` decorator (`config.celery_tasks.trigger_*`, `config.celery_tasks.notify_workflow_complete`) — Celery beat / queue routing / DB task_id references intact - `aperag/tasks/reconciler.py` / `aperag/tasks/scheduler.py` etc. untouched (fire-and-forget semantics preserved per Option I) - `BaseIndexTask` base class still attached to `notify_workflow_complete` - @current_app.task decorators retained (chord callback contract requires broker-registered task) Gates: - ruff check aperag/domains/indexing/orchestration.py aperag/domains/indexing/tasks.py: clean - pytest tests/unit_test/test_modularization_boundaries.py -x -q: 20/20 pass - pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing listUserQuotas FE adapter test from PR #1664>: 577 pass / 29 skip - grep `chord(` + `notify_workflow_complete.s(` shows composition preserved in indexing/tasks.py Ghost-check: §1 5-Layer whitelist intact. No DB schema, API, or behavior changes. No Celery primitive removed. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 6bcc0ef commit a95f355

2 files changed

Lines changed: 246 additions & 165 deletions

File tree

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Copyright 2025 ApeCloud, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Pure orchestration helpers for indexing Celery workflow tasks.
16+
17+
These functions encapsulate the orchestration / aggregation logic that
18+
used to live inside the ``trigger_*_workflow`` and
19+
``notify_workflow_complete`` Celery tasks in this domain. They are
20+
extracted as plain sync helpers so the logic can be reasoned about,
21+
unit-tested, and reused without spinning up a Celery worker.
22+
23+
Per Phase 8 D4 (refined) canonical:
24+
- Thin Celery task wrappers in ``aperag/domains/indexing/tasks.py``
25+
keep their ``@app.task`` decorators (chain/chord composition + chord
26+
callback contract require broker-registered tasks) but delegate
27+
their bodies to these helpers.
28+
- chain/chord composition, reconciler/scheduler call sites, task name
29+
strings, and beat schedule entries are unchanged.
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import logging
35+
from typing import Any, List
36+
37+
from celery import chord, group
38+
39+
from aperag.tasks.models import IndexTaskResult, TaskStatus, WorkflowResult
40+
41+
logger = logging.getLogger(__name__)
42+
43+
44+
def is_skipped_payload(payload: Any) -> bool:
45+
"""A payload is "skipped" when carrying the sentinel ``status == "skipped"``.
46+
47+
Public mirror of the previous private ``_is_skipped_payload`` helper.
48+
"""
49+
return isinstance(payload, dict) and payload.get("status") == "skipped"
50+
51+
52+
def build_dispatched_workflow_result(async_result) -> dict:
53+
"""Return a small, JSON-serializable handoff payload for downstream tracking."""
54+
return {
55+
"status": "dispatched",
56+
"workflow_id": async_result.id,
57+
}
58+
59+
60+
def build_index_workflow_chord(
61+
*,
62+
document_id: str,
63+
index_types: List[str],
64+
per_index_signature_factory,
65+
completion_callback_signature,
66+
):
67+
"""Build a ``chord(group(parallel_index_tasks), completion_callback)``.
68+
69+
Pure orchestration: no I/O, no broker call. The caller decides when
70+
to ``.apply_async()`` the returned chord.
71+
72+
Args:
73+
document_id: Document being indexed (for logging only).
74+
index_types: List of index types to fan out to.
75+
per_index_signature_factory: Callable ``(index_type) -> Signature``
76+
that produces a Celery signature for a single index_type.
77+
Encapsulates the per-task arguments (e.g. ``parsed_data``).
78+
completion_callback_signature: Celery signature for the chord
79+
callback (typically ``notify_workflow_complete.s(...)``).
80+
81+
Returns:
82+
A ``celery.canvas.chord`` object ready to be ``.apply_async()``.
83+
"""
84+
parallel_tasks = group([per_index_signature_factory(index_type) for index_type in index_types])
85+
workflow_chord = chord(parallel_tasks, completion_callback_signature)
86+
logger.debug(
87+
"Built index workflow chord for document %s with %d parallel index tasks",
88+
document_id,
89+
len(index_types),
90+
)
91+
return workflow_chord
92+
93+
94+
def aggregate_workflow_results(
95+
*,
96+
index_results: List[dict],
97+
document_id: str,
98+
operation: str,
99+
index_types: List[str],
100+
) -> WorkflowResult:
101+
"""Aggregate per-index results from a chord body into a ``WorkflowResult``.
102+
103+
Pure logic: parses/normalizes ``IndexTaskResult`` dicts, classifies
104+
them into successful / failed / skipped, derives an overall
105+
``TaskStatus``, and constructs the final ``WorkflowResult`` payload.
106+
107+
No I/O, no broker call. Safe to call without a running Celery worker.
108+
"""
109+
successful_tasks: List[str] = []
110+
failed_tasks: List[str] = []
111+
skipped_tasks: List[str] = []
112+
normalized_results: List[IndexTaskResult] = []
113+
114+
for result_dict in index_results:
115+
if isinstance(result_dict, dict) and result_dict.get("status") == "skipped":
116+
skipped_tasks.append(result_dict.get("index_type", "unknown"))
117+
continue
118+
try:
119+
result = IndexTaskResult.from_dict(result_dict)
120+
normalized_results.append(result)
121+
if result.success:
122+
successful_tasks.append(result.index_type)
123+
else:
124+
failed_tasks.append(f"{result.index_type}: {result.error}")
125+
except Exception as e:
126+
failed_tasks.append(f"unknown: {str(e)}")
127+
128+
if not failed_tasks:
129+
status = TaskStatus.SUCCESS
130+
processed_indexes = successful_tasks if successful_tasks else skipped_tasks
131+
status_message = (
132+
f"Document {document_id} {operation} COMPLETED SUCCESSFULLY! "
133+
f"Processed indexes: {', '.join(processed_indexes)}"
134+
)
135+
if skipped_tasks:
136+
status_message += f". Skipped: {', '.join(skipped_tasks)}"
137+
logger.info(status_message)
138+
elif successful_tasks:
139+
status = TaskStatus.PARTIAL_SUCCESS
140+
status_message = (
141+
f"Document {document_id} {operation} COMPLETED with WARNINGS. "
142+
f"Success: {', '.join(successful_tasks)}. Failures: {'; '.join(failed_tasks)}"
143+
)
144+
if skipped_tasks:
145+
status_message += f". Skipped: {', '.join(skipped_tasks)}"
146+
logger.warning(status_message)
147+
else:
148+
status = TaskStatus.FAILED
149+
status_message = f"Document {document_id} {operation} FAILED. All tasks failed: {'; '.join(failed_tasks)}"
150+
logger.error(status_message)
151+
152+
return WorkflowResult(
153+
workflow_id=f"{document_id}_{operation}",
154+
document_id=document_id,
155+
operation=operation,
156+
status=status,
157+
message=status_message,
158+
successful_indexes=successful_tasks,
159+
failed_indexes=[f.split(":")[0] for f in failed_tasks],
160+
total_indexes=len(index_types),
161+
index_results=normalized_results,
162+
)
163+
164+
165+
def build_workflow_failure_result(
166+
*,
167+
document_id: str,
168+
operation: str,
169+
index_types: List[str],
170+
error_message: str,
171+
) -> WorkflowResult:
172+
"""Construct a uniform failure ``WorkflowResult`` for the unexpected path
173+
in ``notify_workflow_complete``."""
174+
return WorkflowResult(
175+
workflow_id=f"{document_id}_{operation}",
176+
document_id=document_id,
177+
operation=operation,
178+
status=TaskStatus.FAILED,
179+
message=error_message,
180+
successful_indexes=[],
181+
failed_indexes=index_types,
182+
total_indexes=len(index_types),
183+
index_results=[],
184+
)

0 commit comments

Comments
 (0)