Skip to content

Commit e084e13

Browse files
committed
feat: 同步前并发预取 API 日期,消除 Plan 阶段串行瓶颈
1 parent 98e5b5e commit e084e13

3 files changed

Lines changed: 215 additions & 2 deletions

File tree

quantclass_sync_internal/orchestrator.py

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
report_dir_path,
9595
should_skip_by_timestamp,
9696
status_json_path,
97+
update_api_latest_dates,
9798
upsert_product_status,
9899
write_local_timestamp,
99100
)
@@ -889,6 +890,108 @@ def _maybe_run_coin_preprocess(
889890
# 无论哪个分支退出,统一在此累加后处理阶段耗时
890891
report.phase_postprocess_seconds += max(0.0, time.time() - phase_start)
891892

893+
def _prefetch_api_dates(
894+
products: List[str],
895+
api_base: str,
896+
hid: str,
897+
headers: Dict[str, str],
898+
log_dir: Path,
899+
max_workers: int = 8,
900+
) -> Dict[str, Tuple[str, str]]:
901+
"""并发预取产品的 API 最新日期,写入缓存并返回。
902+
903+
已在缓存中且未过期的产品跳过。失败的产品静默跳过,
904+
Plan 阶段会回退到逐产品 HTTP 查询。
905+
"""
906+
import threading
907+
from concurrent.futures import as_completed as _as_completed
908+
909+
# 1. 读现有缓存,筛出需要查询的产品
910+
existing_cache = load_api_latest_dates(log_dir)
911+
now = datetime.now()
912+
uncached = []
913+
for product in products:
914+
cached = existing_cache.get(product)
915+
if cached:
916+
_, checked_at_str = cached
917+
if "T" in checked_at_str:
918+
try:
919+
checked_at = datetime.strptime(checked_at_str, "%Y-%m-%dT%H:%M:%S")
920+
if (now - checked_at).total_seconds() < API_DATE_CACHE_TTL_SECONDS:
921+
continue # 缓存新鲜,跳过
922+
except ValueError:
923+
pass
924+
uncached.append(product)
925+
926+
if not uncached:
927+
log_info(
928+
f"[预取] 全部 {len(products)} 个产品缓存命中,跳过 HTTP",
929+
event="PREFETCH", decision="all_cached",
930+
)
931+
return existing_cache
932+
933+
# 2. 并发预取未命中的产品
934+
log_info(
935+
f"[预取] 并发查询 {len(uncached)}/{len(products)} 个产品",
936+
event="PREFETCH", decision="fetching",
937+
)
938+
fetched: Dict[str, str] = {}
939+
abort_event = threading.Event()
940+
t_start = time.time()
941+
942+
def _fetch_one(product: str) -> Tuple[str, Optional[str]]:
943+
"""单产品 HTTP 查询,401/403 触发全局中止。"""
944+
if abort_event.is_set():
945+
return product, None
946+
try:
947+
date_str = get_latest_time(api_base, product, hid, headers)
948+
return product, date_str
949+
except FatalRequestError as exc:
950+
# 认证失败时中止整个预取
951+
if exc.status_code in (401, 403):
952+
abort_event.set()
953+
return product, None
954+
except Exception:
955+
return product, None
956+
957+
effective_workers = min(max_workers, len(uncached))
958+
executor = ThreadPoolExecutor(max_workers=effective_workers)
959+
try:
960+
futures = {executor.submit(_fetch_one, p): p for p in uncached}
961+
for future in _as_completed(futures, timeout=30):
962+
try:
963+
product, date_str = future.result()
964+
if date_str:
965+
fetched[product] = date_str
966+
except Exception:
967+
pass
968+
if abort_event.is_set():
969+
break
970+
except TimeoutError:
971+
log_info("[预取] 超时,放弃剩余查询", event="PREFETCH", decision="timeout")
972+
finally:
973+
executor.shutdown(wait=False, cancel_futures=True)
974+
975+
elapsed = time.time() - t_start
976+
log_info(
977+
f"[预取] 完成,成功 {len(fetched)}/{len(uncached)},耗时 {elapsed:.1f}s",
978+
event="PREFETCH", decision="done",
979+
)
980+
981+
# 3. 持久化并返回内存合并的缓存(不重读文件,避免竞争和过期条目泄漏)
982+
if fetched:
983+
try:
984+
update_api_latest_dates(log_dir, fetched)
985+
except Exception:
986+
pass
987+
# 合并:保留新鲜的已有缓存 + 刚预取的结果
988+
checked_at_now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
989+
merged: Dict[str, Tuple[str, str]] = dict(existing_cache)
990+
for product, date_str in fetched.items():
991+
merged[product] = (date_str, checked_at_now)
992+
return merged
993+
994+
892995
def _execute_plans(
893996
plans: Sequence[ProductPlan],
894997
command_ctx: CommandContext,
@@ -922,8 +1025,15 @@ def _execute_plans(
9221025
has_error = False
9231026
t_run_start = time.time()
9241027

925-
# 加载 API 日期缓存(check_updates 写入的),新鲜时跳过门控 HTTP
926-
_api_date_cache = load_api_latest_dates(report_dir_path(command_ctx.data_root))
1028+
# 并发预取所有产品的 API 最新日期,写入缓存供 Plan 阶段命中(替代单次 load_api_latest_dates)
1029+
product_names = [normalize_product_name(p.name) for p in plans]
1030+
_api_date_cache = _prefetch_api_dates(
1031+
products=product_names,
1032+
api_base=command_ctx.api_base.rstrip("/"),
1033+
hid=hid,
1034+
headers=headers,
1035+
log_dir=report_dir_path(command_ctx.data_root),
1036+
)
9271037

9281038
# stop-on-error 要求严格顺序控制,强制串行
9291039
effective_workers = max(1, max_workers) if not command_ctx.stop_on_error else 1

tests/test_integration.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
open_status_db,
3333
load_product_status,
3434
read_local_timestamp_date,
35+
report_dir_path,
3536
resolve_runtime_paths,
3637
write_local_timestamp,
38+
PRODUCT_LAST_STATUS_FILE,
3739
)
3840
from quantclass_sync_internal.file_sync import sync_known_product
3941
from quantclass_sync_internal.constants import TIMESTAMP_FILE_NAME
@@ -172,6 +174,10 @@ def _run_execute_plans(
172174
"""
173175
report = _new_report("test-integ-001", mode="network")
174176

177+
# 清除 API 日期缓存,确保每次调用模拟独立运行(_prefetch_api_dates 不命中旧缓存)
178+
cache_file = report_dir_path(ctx.data_root) / PRODUCT_LAST_STATUS_FILE
179+
cache_file.unlink(missing_ok=True)
180+
175181
save_file_mock = make_save_file_mock(date_to_content)
176182

177183
with (

tests/test_update_catchup.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,5 +699,102 @@ def test_cache_miss_falls_through(self, mock_get_latest, mock_skip):
699699
mock_get_latest.assert_called_once()
700700

701701

702+
class TestPrefetchApiDates(unittest.TestCase):
703+
"""_prefetch_api_dates 并发预取 API 日期。"""
704+
705+
def setUp(self):
706+
self._tmpdir = tempfile.TemporaryDirectory()
707+
self.root = Path(self._tmpdir.name)
708+
self.log_dir = self.root / ".quantclass_sync" / "log"
709+
self.log_dir.mkdir(parents=True)
710+
711+
def tearDown(self):
712+
self._tmpdir.cleanup()
713+
714+
@patch("quantclass_sync_internal.orchestrator.get_latest_time")
715+
def test_fetches_uncached_products(self, mock_get):
716+
"""无缓存时并发调用 get_latest_time。"""
717+
mock_get.return_value = "2026-03-18"
718+
from quantclass_sync_internal.orchestrator import _prefetch_api_dates
719+
cache = _prefetch_api_dates(
720+
products=["prod-a", "prod-b"],
721+
api_base="http://fake", hid="hid", headers={},
722+
log_dir=self.log_dir,
723+
)
724+
self.assertEqual(mock_get.call_count, 2)
725+
self.assertIn("prod-a", cache)
726+
self.assertIn("prod-b", cache)
727+
728+
@patch("quantclass_sync_internal.orchestrator.get_latest_time")
729+
def test_skips_fresh_cache(self, mock_get):
730+
"""缓存新鲜时不调用 HTTP。"""
731+
from quantclass_sync_internal.orchestrator import _prefetch_api_dates
732+
from quantclass_sync_internal.status_store import update_api_latest_dates
733+
# 预先写入新鲜缓存
734+
update_api_latest_dates(self.log_dir, {"prod-a": "2026-03-18"})
735+
cache = _prefetch_api_dates(
736+
products=["prod-a"],
737+
api_base="http://fake", hid="hid", headers={},
738+
log_dir=self.log_dir,
739+
)
740+
mock_get.assert_not_called()
741+
self.assertIn("prod-a", cache)
742+
743+
@patch("quantclass_sync_internal.orchestrator.get_latest_time")
744+
def test_partial_cache_only_fetches_missing(self, mock_get):
745+
"""部分缓存命中时只查询缺失的产品。"""
746+
mock_get.return_value = "2026-03-17"
747+
from quantclass_sync_internal.orchestrator import _prefetch_api_dates
748+
from quantclass_sync_internal.status_store import update_api_latest_dates
749+
update_api_latest_dates(self.log_dir, {"prod-a": "2026-03-18"})
750+
cache = _prefetch_api_dates(
751+
products=["prod-a", "prod-b"],
752+
api_base="http://fake", hid="hid", headers={},
753+
log_dir=self.log_dir,
754+
)
755+
# 只查了 prod-b
756+
self.assertEqual(mock_get.call_count, 1)
757+
self.assertIn("prod-a", cache)
758+
self.assertIn("prod-b", cache)
759+
760+
@patch("quantclass_sync_internal.orchestrator.get_latest_time", side_effect=Exception("network"))
761+
def test_failure_returns_partial_cache(self, mock_get):
762+
"""预取失败时返回已有缓存,不阻断流程。"""
763+
from quantclass_sync_internal.orchestrator import _prefetch_api_dates
764+
cache = _prefetch_api_dates(
765+
products=["prod-a"],
766+
api_base="http://fake", hid="hid", headers={},
767+
log_dir=self.log_dir,
768+
)
769+
# 失败但不抛异常,返回空缓存
770+
self.assertEqual(cache, {})
771+
772+
@patch("quantclass_sync_internal.orchestrator.get_latest_time")
773+
def test_expired_cache_refetches(self, mock_get):
774+
"""缓存过期时重新查询。"""
775+
from datetime import datetime, timedelta
776+
from quantclass_sync_internal.orchestrator import _prefetch_api_dates
777+
from quantclass_sync_internal.constants import API_DATE_CACHE_TTL_SECONDS
778+
# 手动写入过期缓存
779+
expired_time = (datetime.now() - timedelta(seconds=API_DATE_CACHE_TTL_SECONDS + 60))
780+
status_path = self.log_dir / "product_last_status.json"
781+
import json
782+
status_path.write_text(json.dumps({
783+
"prod-a": {
784+
"date_time": "2026-03-17",
785+
"checked_at": expired_time.strftime("%Y-%m-%dT%H:%M:%S"),
786+
"source": "api_check",
787+
}
788+
}))
789+
mock_get.return_value = "2026-03-18"
790+
cache = _prefetch_api_dates(
791+
products=["prod-a"],
792+
api_base="http://fake", hid="hid", headers={},
793+
log_dir=self.log_dir,
794+
)
795+
mock_get.assert_called_once() # 过期缓存触发重查
796+
self.assertEqual(cache["prod-a"][0], "2026-03-18") # 返回新日期
797+
798+
702799
if __name__ == "__main__":
703800
unittest.main()

0 commit comments

Comments
 (0)