Skip to content

Commit 67196dd

Browse files
earayuclaude
andcommitted
refactor(phase8 #40): extract Celery orchestration helpers (Option D)
Per 符炫炜 D4 (refined) canonical msg=71e37a49 — Approve Option D: keep thin @app.task wrappers (chain/chord composition + chord callback contract require Celery-registered tasks) but extract their orchestration business logic into a pure sync helpers module so it can be reasoned about and unit-tested without a Celery worker. Scope: - New module `aperag/tasks/orchestration_helpers.py` with: - `is_skipped_payload(payload)` — public mirror of the private check - `build_dispatched_workflow_result(async_result)` — handoff payload - `build_index_workflow_chord(...)` — pure orchestration; builds `chord(group(parallel_index_tasks), completion_callback)` without calling `.apply_async()` (caller dispatches) - `aggregate_workflow_results(...)` — pure logic; classifies successful / failed / skipped, derives TaskStatus, builds WorkflowResult - `build_workflow_failure_result(...)` — uniform failure path - `config/celery_tasks.py` — `trigger_create/delete/update_indexes_workflow` + `notify_workflow_complete` reduced to thin wrappers calling helpers. - Removed now-dead private helpers `_is_skipped_payload` and `_build_dispatched_workflow_result` (replaced by public versions). - Removed unused imports: `chord`, `group`, `IndexTaskResult`, `TaskStatus`, `WorkflowResult` from celery_tasks.py top-level. Invariants preserved (per Option D scope): - chain/chord composition unchanged (`chain(parse, trigger).delay()` and `chord(group(...), notify_workflow_complete.s(...))` shape identical) - task name strings unchanged (Celery beat / queue routing / DB task_id references intact) - reconciler.py / scheduler.py call sites untouched (fire-and-forget semantics preserved) - BaseIndexTask base class still attached to notify_workflow_complete - @app.task decorators retained (chord callback contract requires broker-registered task) Gates: - ruff check aperag/tasks/orchestration_helpers.py config/celery_tasks.py: clean - pytest tests/unit_test/test_modularization_boundaries.py -x -q: 20/20 pass - pytest tests/unit_test/ --ignore=objectstore --deselect=<pre-existing listUserQuotas FE adapter test from PR #1664>: 577 pass / 29 skip - grep `chord(` + `notify_workflow_complete` shows composition preserved at line 911/917/939/945/977/983 of celery_tasks.py Net: +62/-165 in celery_tasks.py + new helpers module (~180 LOC). Ghost-check: §1 5-Layer whitelist intact. No DB schema, API, or behavior changes. No Celery primitive removed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent cd5f19d commit 67196dd

2 files changed

Lines changed: 247 additions & 165 deletions

File tree

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2025 ApeCloud, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Pure orchestration helpers for Celery indexing workflow tasks.
16+
17+
These functions encapsulate the orchestration / aggregation logic that
18+
used to live inside ``config.celery_tasks.trigger_*_workflow`` and
19+
``notify_workflow_complete``. They are extracted as plain sync helpers
20+
so the logic can be reasoned about, unit-tested, and reused without
21+
spinning up a Celery worker.
22+
23+
Per Phase 8 D4 (refined) canonical:
24+
- Thin Celery task wrappers in ``config/celery_tasks.py`` keep their
25+
``@app.task`` decorators (chain/chord composition requires it) but
26+
delegate their bodies to these helpers.
27+
- chain/chord composition, reconciler/scheduler call sites, task name
28+
strings, and beat schedule entries are unchanged.
29+
"""
30+
31+
from __future__ import annotations
32+
33+
import logging
34+
from typing import Any, List
35+
36+
from celery import chord, group
37+
38+
from aperag.tasks.models import IndexTaskResult, TaskStatus, WorkflowResult
39+
40+
logger = logging.getLogger(__name__)
41+
42+
43+
def is_skipped_payload(payload: Any) -> bool:
44+
"""Public mirror of ``config.celery_tasks._is_skipped_payload``.
45+
46+
A payload is considered "skipped" when it is a dict carrying the
47+
sentinel ``status == "skipped"`` produced by upstream parse stages
48+
(e.g. ownership-lost / record-not-found).
49+
"""
50+
return isinstance(payload, dict) and payload.get("status") == "skipped"
51+
52+
53+
def build_dispatched_workflow_result(async_result) -> dict:
54+
"""Return a small, JSON-serializable handoff payload for downstream tracking."""
55+
return {
56+
"status": "dispatched",
57+
"workflow_id": async_result.id,
58+
}
59+
60+
61+
def build_index_workflow_chord(
62+
*,
63+
document_id: str,
64+
index_types: List[str],
65+
per_index_signature_factory,
66+
completion_callback_signature,
67+
):
68+
"""Build a ``chord(group(parallel_index_tasks), completion_callback)``.
69+
70+
Pure orchestration: no I/O, no broker call. The caller decides when
71+
to ``.apply_async()`` the returned chord.
72+
73+
Args:
74+
document_id: Document being indexed (for logging only).
75+
index_types: List of index types to fan out to.
76+
per_index_signature_factory: Callable ``(index_type) -> Signature``
77+
that produces a Celery signature for a single index_type.
78+
Encapsulates the per-task arguments (e.g. ``parsed_data``).
79+
completion_callback_signature: Celery signature for the chord
80+
callback (typically ``notify_workflow_complete.s(...)``).
81+
82+
Returns:
83+
A ``celery.canvas.chord`` object ready to be ``.apply_async()``.
84+
"""
85+
parallel_tasks = group([per_index_signature_factory(index_type) for index_type in index_types])
86+
workflow_chord = chord(parallel_tasks, completion_callback_signature)
87+
logger.debug(
88+
"Built index workflow chord for document %s with %d parallel index tasks",
89+
document_id,
90+
len(index_types),
91+
)
92+
return workflow_chord
93+
94+
95+
def aggregate_workflow_results(
96+
*,
97+
index_results: List[dict],
98+
document_id: str,
99+
operation: str,
100+
index_types: List[str],
101+
) -> WorkflowResult:
102+
"""Aggregate per-index results from a chord body into a ``WorkflowResult``.
103+
104+
Pure logic: parses/normalizes ``IndexTaskResult`` dicts, classifies
105+
them into successful / failed / skipped, derives an overall
106+
``TaskStatus``, and constructs the final ``WorkflowResult`` payload.
107+
108+
No I/O, no broker call. Safe to call without a running Celery worker.
109+
"""
110+
successful_tasks: List[str] = []
111+
failed_tasks: List[str] = []
112+
skipped_tasks: List[str] = []
113+
normalized_results: List[IndexTaskResult] = []
114+
115+
for result_dict in index_results:
116+
if isinstance(result_dict, dict) and result_dict.get("status") == "skipped":
117+
skipped_tasks.append(result_dict.get("index_type", "unknown"))
118+
continue
119+
try:
120+
result = IndexTaskResult.from_dict(result_dict)
121+
normalized_results.append(result)
122+
if result.success:
123+
successful_tasks.append(result.index_type)
124+
else:
125+
failed_tasks.append(f"{result.index_type}: {result.error}")
126+
except Exception as e:
127+
failed_tasks.append(f"unknown: {str(e)}")
128+
129+
if not failed_tasks:
130+
status = TaskStatus.SUCCESS
131+
processed_indexes = successful_tasks if successful_tasks else skipped_tasks
132+
status_message = (
133+
f"Document {document_id} {operation} COMPLETED SUCCESSFULLY! "
134+
f"Processed indexes: {', '.join(processed_indexes)}"
135+
)
136+
if skipped_tasks:
137+
status_message += f". Skipped: {', '.join(skipped_tasks)}"
138+
logger.info(status_message)
139+
elif successful_tasks:
140+
status = TaskStatus.PARTIAL_SUCCESS
141+
status_message = (
142+
f"Document {document_id} {operation} COMPLETED with WARNINGS. "
143+
f"Success: {', '.join(successful_tasks)}. Failures: {'; '.join(failed_tasks)}"
144+
)
145+
if skipped_tasks:
146+
status_message += f". Skipped: {', '.join(skipped_tasks)}"
147+
logger.warning(status_message)
148+
else:
149+
status = TaskStatus.FAILED
150+
status_message = f"Document {document_id} {operation} FAILED. All tasks failed: {'; '.join(failed_tasks)}"
151+
logger.error(status_message)
152+
153+
return WorkflowResult(
154+
workflow_id=f"{document_id}_{operation}",
155+
document_id=document_id,
156+
operation=operation,
157+
status=status,
158+
message=status_message,
159+
successful_indexes=successful_tasks,
160+
failed_indexes=[f.split(":")[0] for f in failed_tasks],
161+
total_indexes=len(index_types),
162+
index_results=normalized_results,
163+
)
164+
165+
166+
def build_workflow_failure_result(
167+
*,
168+
document_id: str,
169+
operation: str,
170+
index_types: List[str],
171+
error_message: str,
172+
) -> WorkflowResult:
173+
"""Construct a uniform failure ``WorkflowResult`` for the unexpected path
174+
in ``notify_workflow_complete``."""
175+
return WorkflowResult(
176+
workflow_id=f"{document_id}_{operation}",
177+
document_id=document_id,
178+
operation=operation,
179+
status=TaskStatus.FAILED,
180+
message=error_message,
181+
successful_indexes=[],
182+
failed_indexes=index_types,
183+
total_indexes=len(index_types),
184+
index_results=[],
185+
)

0 commit comments

Comments
 (0)