@@ -67,7 +67,7 @@ class TelemetryEvent(Base):
6767 id = Column(BigInteger, primary_key = True , autoincrement = True )
6868 event_type = Column(String(64 ), nullable = False ) # e.g. 'graph_extraction.window'
6969 ts = Column(DateTime(timezone = True ), nullable = False , default = lambda : datetime.now(UTC ))
70- collection_id = Column(String(24 ), nullable = False )
70+ collection_id = Column(String(24 ), nullable = True ) # per Weston msg=a95b2546 NIT: nullable to accommodate P1 worker lane event 没 collection scope (e.g. global queue depth) — P0 graph extraction event 强制 set, P1 worker lane event 可空
7171 document_id = Column(String(24 ), nullable = True ) # window/document event 必填; future P1 worker lane event 可空
7272 document_index_id = Column(String(32 ), nullable = True ) # per ziang msg=785625f5: 串联同 doc 多次 retry
7373 parse_version = Column(String(32 ), nullable = True )
@@ -97,6 +97,7 @@ class WindowExtractionAttrs(BaseModel):
9797 model_id: Optional[str ] # e.g. 'gpt-4o-mini' / 'qwen2.5-72b'
9898 provider: Optional[str ] # e.g. 'openai' / 'qwen'
9999 timeout_seconds: Optional[int ]
100+ chunks_truncated: bool = False # set True when len(input chunk_ids) > 128 (per Weston msg=a95b2546 BLOCKER 2)
100101 # 注: NOT 含 prompt_text / completion_text / chunk_text / entity_description / error_message — privacy gate
101102
102103class DocumentExtractionAttrs (BaseModel ):
@@ -112,13 +113,18 @@ class DocumentExtractionAttrs(BaseModel):
112113 # 注: NOT 含 error_message_list / failed_window_details (privacy gate)
113114```
114115
115- ** Privacy invariant (Layer 4 boundary AST gate enforce)** : ` attrs ` JSON dict 不含:
116- - ` chunk_text ` / ` chunk_content ` / ` chunk.text ` / ` chunk.content `
117- - ` query_text ` / ` user_query `
118- - ` entity_description ` / ` description_text `
119- - ` prompt_text ` / ` completion_text ` / ` llm_response `
120- - ` error_message ` / ` traceback ` / ` repr(exc) ` (per huangzhangshu msg=171acb55)
121- - 仅允许: ID list / count / duration / status enum / error_type whitelist enum / model_id / provider
116+ ** Privacy invariant (Layer 4 boundary AST gate enforce, per Weston msg=a95b2546 BLOCKER 1 修订 — 数据流 NOT 全文 grep)** : gate 必须钉「forbidden field 不能 flow INTO ` TelemetryEvent.attrs ` / ` WindowExtractionAttrs ` / ` DocumentExtractionAttrs ` / ` telemetry_emit(attrs=...) ` 参数」,** 不是** indexing 全文 zero match (` aperag/indexing/fulltext.py ` / ` vision.py ` / ` summary.py ` / ` parser.py ` + graph_extractor 本身必须读 chunk text 抽取实体, 全文 zero match 会误伤合法路径)。
117+
118+ ` attrs ` payload ** 不含** (data-flow constraint):
119+ - ` chunk_text ` / ` chunk_content ` / ` chunk.text ` / ` chunk.content ` — 仅允许 ` chunk_ids ` (ID list)
120+ - ` query_text ` / ` user_query ` — 不允许进 attrs
121+ - ` entity_description ` / ` description_text ` — 仅允许 ` entity_count ` (count)
122+ - ` prompt_text ` / ` completion_text ` / ` llm_response ` — 仅允许 ` llm_token_count ` / ` model_id ` / ` provider `
123+ - ` error_message ` / ` traceback ` / ` repr(exc) ` (per huangzhangshu msg=171acb55) — 仅允许 ` error_type: str ` whitelist enum
124+
125+ 仅允许: ID list / count / duration / status enum / error_type whitelist enum / model_id / provider / Pydantic Field-typed primitives
126+
127+ ** Boundary AST gate 实施** : 扫描范围 = ` aperag/telemetry/** ` (全文, telemetry module 自身) + producer call sites (` aperag/indexing/graph_extractor.py::_extract_one_window ` 函数 body + ` aperag/indexing/worker_factory.py::_build_graph_facts_worker ` + ` GraphModalityWorker.sync ` 函数 body),** 仅 scan 进入 ` telemetry_emit(attrs=...) ` / ` WindowExtractionAttrs(...) ` / ` DocumentExtractionAttrs(...) ` 调用 keyword 参数 + ` attrs.update(...) ` / ` attrs[k]=v ` 赋值** 的 expression — AST data-flow analysis 钉 forbidden read attribute access (e.g. ` chunk.text ` , ` entity.description ` ) 不在这些 expression boundary 内。allowlist = Pydantic Field-typed schema (typed payload only, no untyped dict.update)。
122128
123129** chunk_ids cardinality cap (per Weston msg=22e6df03 BLOCKER 1)** : ` chunk_ids ` list ` max_length=128 ` (Pydantic validator), 超过截断 + 加 ` chunks_truncated: bool ` flag — 防 window 含百级 chunk 时 attrs payload 无界膨胀。
124130
@@ -169,8 +175,11 @@ async def _extract_one_window(self, chunk_ids, ...) -> tuple[WindowExtractResult
169175 status = ' failed' if not isinstance (exc, LLMTimeoutError) else ' timeout' ,
170176 error_type = classify_error(exc), # whitelist classifier
171177 )
172- # re-raise 让现有 retry/halt logic 处理
173- raise
178+ # per huangzhangshu msg=a563d88d implementation detail:
179+ # 用 structured exception 把 WindowExtractionStats 带给 caller,
180+ # 保 caller 的 outer finally 能可靠累加 windows_failed/windows_timeout
181+ # + entity_count/relation_count (即使失败也可能有 partial entities)
182+ raise WindowExtractionFailed(stats = stats) from exc
174183 finally :
175184 # emit telemetry (best-effort, fail-safe per § 3.1.3 Layer 2)
176185 try :
@@ -186,6 +195,7 @@ async def _extract_one_window(self, chunk_ids, ...) -> tuple[WindowExtractResult
186195 attrs = WindowExtractionAttrs(
187196 chunk_ids = chunk_ids[:128 ], # cardinality cap
188197 chunk_count = len (chunk_ids),
198+ chunks_truncated = len (chunk_ids) > 128 , # per Weston msg=a95b2546 BLOCKER 2
189199 entity_count = stats.entity_count,
190200 relation_count = stats.relation_count,
191201 llm_call_count = stats.llm_call_count,
@@ -200,37 +210,61 @@ async def _extract_one_window(self, chunk_ids, ...) -> tuple[WindowExtractResult
200210 return result, stats
201211```
202212
203- ** Document event** (` aperag/indexing/worker_factory._build_graph_facts_worker ` 外层 / ` GraphModalityWorker.sync ` 末尾, per ziang msg=785625f5 接入点修正 — 不存在独立 graph_facts_worker.py 文件):
213+ ** Document event** (` aperag/indexing/worker_factory._build_graph_facts_worker ` 外层 / ` GraphModalityWorker.sync ` outer try/finally, per ziang msg=785625f5 接入点修正 — 不存在独立 graph_facts_worker.py 文件 + per Weston msg=a95b2546 BLOCKER 3 outer try/finally guarantee emit exactly once 不论 success/failed/timeout):
214+
204215``` python
205216class GraphModalityWorker :
206217 async def sync (self , ...):
207- # 在 run 内累加 counters (per Weston BLOCKER 2: 不实时 aggregate from telemetry events)
218+ # 在 run 内累加 counters (per Weston msg=22e6df03 BLOCKER 2: 不实时 aggregate from telemetry events)
208219 doc_stats = DocumentExtractionRunCounters(
209220 chunks_total = 0 , windows_total = 0 ,
210221 windows_success = 0 , windows_failed = 0 , windows_timeout = 0 ,
211222 entities_total = 0 , relations_total = 0 ,
212223 wall_time_start_ms = int (time.monotonic() * 1000 ),
213224 )
214- # ... existing extraction loop, 每 window 累加 doc_stats counters from WindowExtractionStats
215- for window_chunks in ... :
216- try :
217- _, win_stats = await graph_extractor._extract_one_window(window_chunks, ... )
218- doc_stats.windows_total += 1
219- if win_stats.status == ' success' :
220- doc_stats.windows_success += 1
221- elif win_stats.status == ' timeout' :
222- doc_stats.windows_timeout += 1
223- else :
224- doc_stats.windows_failed += 1
225- doc_stats.entities_total += win_stats.entity_count
226- doc_stats.relations_total += win_stats.relation_count
227- except Exception :
228- doc_stats.windows_total += 1
229- doc_stats.windows_failed += 1
230- # re-raise per existing logic
231-
232- # emit document summary at end of sync (best-effort, fail-safe)
225+ doc_status: Literal[' success' , ' failed' ] = ' success'
226+
227+ # OUTER try/finally guarantees document summary emit exactly once
228+ # regardless of success / window failure re-raise / timeout / crash
229+ # (per Weston msg=a95b2546 BLOCKER 3 — sample 把 emit 放 end of sync
230+ # 在 first window failure re-raise 时永远到不了, 必须 outer try/finally
231+ # cover, telemetry failure 仍不污染 indexing task state per § 3.1.3 Class 2 fail-safe)
233232 try :
233+ # ... existing extraction loop, 每 window 累加 doc_stats counters from WindowExtractionStats
234+ for window_chunks in ... :
235+ try :
236+ _, win_stats = await graph_extractor._extract_one_window(window_chunks, ... )
237+ doc_stats.windows_total += 1
238+ if win_stats.status == ' success' :
239+ doc_stats.windows_success += 1
240+ elif win_stats.status == ' timeout' :
241+ doc_stats.windows_timeout += 1
242+ else :
243+ doc_stats.windows_failed += 1
244+ doc_stats.entities_total += win_stats.entity_count
245+ doc_stats.relations_total += win_stats.relation_count
246+ except WindowExtractionFailed as exc:
247+ # per huangzhangshu msg=a563d88d: structured exception
248+ # carries WindowExtractionStats — caller 累加 actual stats
249+ # 不依赖窗口外推 (failed window 的 duration_ms / partial entity_count
250+ # 仍 captured + summary windows_failed/timeout 准确)
251+ doc_stats.windows_total += 1
252+ if exc.stats.status == ' timeout' :
253+ doc_stats.windows_timeout += 1
254+ else :
255+ doc_stats.windows_failed += 1
256+ doc_stats.entities_total += exc.stats.entity_count
257+ doc_stats.relations_total += exc.stats.relation_count
258+ doc_status = ' failed'
259+ # re-raise per existing logic — outer try/finally 仍 emit summary
260+ raise exc.__cause__ if exc.__cause__ else exc
261+ except Exception :
262+ doc_status = ' failed'
263+ raise # re-raise to preserve existing indexing worker task semantics
264+ finally :
265+ # emit document summary best-effort, ALWAYS once regardless of
266+ # success / partial failure / outer raise (per Weston BLOCKER 3)
267+ try :
234268 telemetry_emit(
235269 event_type = ' graph_extraction.document' ,
236270 collection_id = self .collection_id,
0 commit comments