Skip to content

Commit 2f594ff

Browse files
committed
feat: orchestrator 门控优先读缓存 API 日期,5 分钟内跳过 HTTP
1 parent 12b90d8 commit 2f594ff

2 files changed

Lines changed: 119 additions & 17 deletions

File tree

quantclass_sync_internal/orchestrator.py

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
validate_run_mode,
2626
)
2727
from .constants import (
28+
API_DATE_CACHE_TTL_SECONDS,
2829
BUSINESS_DAY_ONLY_PRODUCTS,
2930
PREPROCESS_PRODUCT,
3031
PREPROCESS_TRIGGER_PRODUCTS,
@@ -85,6 +86,7 @@
8586
)
8687
from .status_store import (
8788
export_status_json,
89+
load_api_latest_dates,
8890
load_product_status,
8991
normalize_data_date,
9092
open_status_db,
@@ -483,6 +485,7 @@ def _resolve_requested_dates_for_plan(
483485
t_product_start: float,
484486
catch_up_to_latest: bool = False,
485487
lock: Optional[threading.Lock] = None,
488+
api_date_cache: Optional[Dict[str, Tuple[str, str]]] = None,
486489
) -> Tuple[List[str], bool]:
487490
"""
488491
解析单产品执行日期列表,并处理 timestamp 门控。
@@ -498,23 +501,46 @@ def _resolve_requested_dates_for_plan(
498501
return [requested_date_for_plan], False
499502

500503
product_name = normalize_product_name(plan.name)
501-
try:
502-
# 1) 读取 API 可用日期列表(latest)
503-
api_latest_candidates = get_latest_times(
504-
api_base=command_ctx.api_base.rstrip("/"),
505-
product=product_name,
506-
hid=hid,
507-
headers=headers,
508-
)
509-
except Exception as exc:
510-
# latest 获取失败时保持 fail-open,继续执行旧兜底路径。
511-
log_info(
512-
f"[{plan.name}] timestamp 门控异常,回退执行更新。",
513-
event="PRODUCT_PLAN",
514-
decision="fallback_run",
515-
error=str(exc),
516-
)
517-
return [requested_date_for_plan], False
504+
505+
# 缓存检查:check_updates 已查过且未过期时跳过 HTTP
506+
cache_hit = False
507+
api_latest_candidates: List[str] = []
508+
if api_date_cache:
509+
cached = api_date_cache.get(product_name) or api_date_cache.get(plan.name)
510+
if cached:
511+
cached_date, checked_at_str = cached
512+
if "T" in checked_at_str: # 旧日期格式无法比较,视为过期
513+
try:
514+
checked_at = datetime.strptime(checked_at_str, "%Y-%m-%dT%H:%M:%S")
515+
age_seconds = (datetime.now() - checked_at).total_seconds()
516+
if age_seconds < API_DATE_CACHE_TTL_SECONDS:
517+
log_info(
518+
f"[{plan.name}] 使用缓存 API 日期 {cached_date}{int(age_seconds)}s 前查询)",
519+
event="PRODUCT_PLAN", decision="cache_hit",
520+
)
521+
api_latest_candidates = [cached_date]
522+
cache_hit = True
523+
except ValueError:
524+
pass # 解析失败,回退 HTTP
525+
526+
if not cache_hit:
527+
try:
528+
# 1) 读取 API 可用日期列表(latest)
529+
api_latest_candidates = get_latest_times(
530+
api_base=command_ctx.api_base.rstrip("/"),
531+
product=product_name,
532+
hid=hid,
533+
headers=headers,
534+
)
535+
except Exception as exc:
536+
# latest 获取失败时保持 fail-open,继续执行旧兜底路径。
537+
log_info(
538+
f"[{plan.name}] timestamp 门控异常,回退执行更新。",
539+
event="PRODUCT_PLAN",
540+
decision="fallback_run",
541+
error=str(exc),
542+
)
543+
return [requested_date_for_plan], False
518544

519545
# latest 语义保持原样:这里不做业务日裁剪,只做规范化和去重排序。
520546
api_latest_candidates = _normalize_date_queue(
@@ -896,6 +922,9 @@ def _execute_plans(
896922
has_error = False
897923
t_run_start = time.time()
898924

925+
# 加载 API 日期缓存(check_updates 写入的),新鲜时跳过门控 HTTP
926+
_api_date_cache = load_api_latest_dates(report_dir_path(command_ctx.data_root))
927+
899928
# stop-on-error 要求严格顺序控制,强制串行
900929
effective_workers = max(1, max_workers) if not command_ctx.stop_on_error else 1
901930
# 保护共享状态的互斥锁(串行时无竞争,开销可忽略)
@@ -922,6 +951,7 @@ def _run_one_plan(plan: ProductPlan) -> Tuple[bool, float, SyncStats, str, str]:
922951
t_product_start=t_product_start,
923952
catch_up_to_latest=catch_up_to_latest,
924953
lock=_lock,
954+
api_date_cache=_api_date_cache,
925955
)
926956
with _lock:
927957
report.phase_plan_seconds += max(0.0, time.time() - t_plan_phase)

tests/test_update_catchup.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ def fake_resolve_requested_dates_for_plan(
530530
t_product_start: float,
531531
catch_up_to_latest: bool = False,
532532
lock=None,
533+
**kwargs,
533534
) -> tuple[list[str], bool]:
534535
if plan.name == "stock-trading-data":
535536
return ([], False)
@@ -589,6 +590,7 @@ def fake_resolve_requested_dates_for_plan(
589590
t_product_start: float,
590591
catch_up_to_latest: bool = False,
591592
lock=None,
593+
**kwargs,
592594
) -> tuple[list[str], bool]:
593595
return (["2026-02-09"], False)
594596

@@ -627,5 +629,75 @@ def fake_process_product(
627629
self.assertEqual(["error"], [item.status for item in report.products])
628630

629631

632+
class TestApiDateCache(unittest.TestCase):
633+
"""_resolve_requested_dates_for_plan 应优先使用缓存。"""
634+
635+
def setUp(self):
636+
self._tmpdir = tempfile.TemporaryDirectory()
637+
self.root = Path(self._tmpdir.name)
638+
639+
def tearDown(self):
640+
self._tmpdir.cleanup()
641+
642+
def _ctx(self):
643+
return CommandContext(
644+
run_id="test", data_root=self.root,
645+
dry_run=False, stop_on_error=False,
646+
)
647+
648+
def _plan(self, name="stock-trading-data"):
649+
return build_product_plan([name])[0]
650+
651+
def _report(self):
652+
return _new_report("test", mode="network")
653+
654+
@patch("quantclass_sync_internal.orchestrator.should_skip_by_timestamp", return_value=True)
655+
@patch("quantclass_sync_internal.orchestrator.get_latest_times")
656+
def test_cache_hit_skips_http(self, mock_get_latest, mock_skip):
657+
"""缓存新鲜时不调用 get_latest_times。"""
658+
from datetime import datetime
659+
fresh_checked_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
660+
cache = {"stock-trading-data": ("2026-03-18", fresh_checked_at)}
661+
queue, skipped = _resolve_requested_dates_for_plan(
662+
plan=self._plan(), command_ctx=self._ctx(),
663+
hid="hid", headers={"api-key": "k"},
664+
requested_date_time="", force_update=False,
665+
report=self._report(), t_product_start=time.time(),
666+
api_date_cache=cache,
667+
)
668+
mock_get_latest.assert_not_called()
669+
670+
@patch("quantclass_sync_internal.orchestrator.should_skip_by_timestamp", return_value=True)
671+
@patch("quantclass_sync_internal.orchestrator.get_latest_times", return_value=["2026-03-18"])
672+
def test_cache_expired_falls_through(self, mock_get_latest, mock_skip):
673+
"""缓存过期时回退 HTTP。"""
674+
from datetime import datetime, timedelta
675+
from quantclass_sync_internal.constants import API_DATE_CACHE_TTL_SECONDS
676+
expired_time = (datetime.now() - timedelta(seconds=API_DATE_CACHE_TTL_SECONDS + 60))
677+
old_checked_at = expired_time.strftime("%Y-%m-%dT%H:%M:%S")
678+
cache = {"stock-trading-data": ("2026-03-18", old_checked_at)}
679+
_resolve_requested_dates_for_plan(
680+
plan=self._plan(), command_ctx=self._ctx(),
681+
hid="hid", headers={"api-key": "k"},
682+
requested_date_time="", force_update=False,
683+
report=self._report(), t_product_start=time.time(),
684+
api_date_cache=cache,
685+
)
686+
mock_get_latest.assert_called_once()
687+
688+
@patch("quantclass_sync_internal.orchestrator.should_skip_by_timestamp", return_value=True)
689+
@patch("quantclass_sync_internal.orchestrator.get_latest_times", return_value=["2026-03-18"])
690+
def test_cache_miss_falls_through(self, mock_get_latest, mock_skip):
691+
"""产品不在缓存中时回退 HTTP。"""
692+
_resolve_requested_dates_for_plan(
693+
plan=self._plan(), command_ctx=self._ctx(),
694+
hid="hid", headers={"api-key": "k"},
695+
requested_date_time="", force_update=False,
696+
report=self._report(), t_product_start=time.time(),
697+
api_date_cache={},
698+
)
699+
mock_get_latest.assert_called_once()
700+
701+
630702
if __name__ == "__main__":
631703
unittest.main()

0 commit comments

Comments
 (0)