Skip to content

Commit 006a268

Browse files
committed
refactor: 收敛健康检查与推荐修复集
1 parent 494b7aa commit 006a268

5 files changed

Lines changed: 396 additions & 220 deletions

File tree

coin_preprocess_internal/runner.py

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,29 @@ def _write_runtime_timestamp(output_dir: Path, data_date: str) -> None:
129129
type(exc).__name__,
130130
)
131131

132+
133+
def _persist_outputs(
134+
*,
135+
output_dir: Path,
136+
spot_dict: Dict[str, pd.DataFrame],
137+
swap_dict: Dict[str, pd.DataFrame],
138+
market_pivot_spot: Dict[str, pd.DataFrame],
139+
market_pivot_swap: Dict[str, pd.DataFrame],
140+
) -> None:
141+
"""统一写入预处理产物和 runtime timestamp。"""
142+
143+
payloads = {
144+
output_dir / OUTPUT_SPOT_DICT: spot_dict,
145+
output_dir / OUTPUT_SWAP_DICT: swap_dict,
146+
output_dir / OUTPUT_PIVOT_SPOT: market_pivot_spot,
147+
output_dir / OUTPUT_PIVOT_SWAP: market_pivot_swap,
148+
}
149+
_write_pickles_atomically(payloads)
150+
_write_runtime_timestamp(
151+
output_dir=output_dir,
152+
data_date=_resolve_output_data_date(spot_dict=spot_dict, swap_dict=swap_dict),
153+
)
154+
132155
def _has_relist_break(prev_time: pd.Timestamp, prev_close: float, next_time: pd.Timestamp, next_open: float) -> bool:
133156
"""判断边界处是否触发 relist 切段。"""
134157

@@ -372,23 +395,24 @@ def _try_tail_append_symbol(
372395
if tail_raw.empty:
373396
return False, set()
374397

375-
tail_min = pd.to_datetime(tail_raw["candle_begin_time"].min(), errors="coerce")
398+
tail_times = tail_raw["candle_begin_time"]
399+
tail_min = tail_times.min()
376400
if pd.isna(tail_min) or pd.Timestamp(tail_min) > last_time:
377401
# 尾部窗口没覆盖到旧边界,无法证明"只需追加",回退单 symbol 全量重算。
378402
return False, set()
379403

380404
overlap_snapshot = _build_overlap_snapshot(data_dict, keys)
381-
overlap_raw = tail_raw[pd.to_datetime(tail_raw["candle_begin_time"], errors="coerce") <= last_time]
405+
overlap_raw = tail_raw[tail_times <= last_time]
382406
if not _overlap_matches_existing(overlap_raw=overlap_raw, overlap_snapshot=overlap_snapshot, is_swap=is_swap):
383407
return False, set()
384408

385-
new_raw = tail_raw[pd.to_datetime(tail_raw["candle_begin_time"], errors="coerce") > last_time]
409+
new_raw = tail_raw[tail_times > last_time]
386410
if new_raw.empty:
387411
# mtime 变化但无新增行:保守回退单 symbol 全量重算,避免漏算。
388412
return False, set()
389413

390414
prev_close = _safe_float(active_frame["close"].iloc[-1], fallback=0.0)
391-
next_time = pd.Timestamp(pd.to_datetime(new_raw["candle_begin_time"].min(), errors="coerce"))
415+
next_time = pd.Timestamp(new_raw["candle_begin_time"].min())
392416
next_open_raw = new_raw.sort_values("candle_begin_time").iloc[0].get("open", pd.NA)
393417
next_open = _safe_float(next_open_raw, fallback=prev_close)
394418
if _has_relist_break(last_time, prev_close, next_time, next_open):
@@ -506,16 +530,12 @@ def _run_full_rebuild(spot_dir: Path, swap_dir: Path, output_dir: Path, mode: st
506530

507531
if progress_callback:
508532
progress_callback(detail="写入产物...")
509-
payloads = {
510-
output_dir / OUTPUT_SPOT_DICT: spot_dict,
511-
output_dir / OUTPUT_SWAP_DICT: swap_dict,
512-
output_dir / OUTPUT_PIVOT_SPOT: market_pivot_spot,
513-
output_dir / OUTPUT_PIVOT_SWAP: market_pivot_swap,
514-
}
515-
_write_pickles_atomically(payloads)
516-
_write_runtime_timestamp(
533+
_persist_outputs(
517534
output_dir=output_dir,
518-
data_date=_resolve_output_data_date(spot_dict=spot_dict, swap_dict=swap_dict),
535+
spot_dict=spot_dict,
536+
swap_dict=swap_dict,
537+
market_pivot_spot=market_pivot_spot,
538+
market_pivot_swap=market_pivot_swap,
519539
)
520540

521541
return PreprocessSummary(
@@ -664,16 +684,12 @@ def _run_incremental_patch(
664684

665685
if progress_callback:
666686
progress_callback(detail="写入产物...")
667-
payloads = {
668-
output_dir / OUTPUT_SPOT_DICT: spot_dict,
669-
output_dir / OUTPUT_SWAP_DICT: swap_dict,
670-
output_dir / OUTPUT_PIVOT_SPOT: market_pivot_spot,
671-
output_dir / OUTPUT_PIVOT_SWAP: market_pivot_swap,
672-
}
673-
_write_pickles_atomically(payloads)
674-
_write_runtime_timestamp(
687+
_persist_outputs(
675688
output_dir=output_dir,
676-
data_date=_resolve_output_data_date(spot_dict=spot_dict, swap_dict=swap_dict),
689+
spot_dict=spot_dict,
690+
swap_dict=swap_dict,
691+
market_pivot_spot=market_pivot_spot,
692+
market_pivot_swap=market_pivot_swap,
677693
)
678694

679695
changed_count = (

0 commit comments

Comments
 (0)