Skip to content

Commit 4dbe1a1

Browse files
committed
fix: preserve sync provenance in update cache
1 parent 843b553 commit 4dbe1a1

4 files changed

Lines changed: 150 additions & 16 deletions

File tree

quantclass_sync_internal/data_query.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ def get_products_overview(
176176
last_reason = last.get("reason_code", "")
177177
last_source = last.get("source", "")
178178
last_date = _parse_date(last.get("date_time", ""))
179+
cached_api_date = _parse_date(last.get("api_date_time", ""))
180+
cached_api_anchor = _parse_date(last.get("api_checked_at", "")) or cached_api_date
179181
freshness_anchor = _parse_date(last.get("checked_at", "")) or last_date
180182
last_result_fresh = (
181183
freshness_anchor is not None
@@ -187,7 +189,7 @@ def get_products_overview(
187189
if api_date is not None:
188190
# 同一最新日期已在同步阶段确认无有效输出时,不再把它计为待更新。
189191
if (
190-
last_source == "sync"
192+
last_status == "skipped"
191193
and last_reason == REASON_NO_VALID_OUTPUT
192194
and last_result_fresh
193195
and last_date == api_date
@@ -199,14 +201,16 @@ def get_products_overview(
199201
else:
200202
# 用缓存的 API 日期作为参考,避免周末/假日误报落后;
201203
# 缓存超过宽限期或无缓存时降级回 today,提示可能有新数据。
202-
# 仅排除明确来自同步结果(source="sync")的 date_time;
203-
# 旧安装缺少 source 字段时保持兼容,仍沿用原缓存逻辑。
204-
cached_api_date = last_date if last_source != "sync" else None
205-
cache_fresh = (
206-
cached_api_date is not None
207-
and freshness_anchor is not None
208-
and (today - freshness_anchor).days <= _STALE_GRACE_DAYS
209-
)
204+
# 新格式优先读取专用 API 缓存字段;旧格式继续兼容非 sync 来源的 date_time。
205+
if cached_api_date is not None and cached_api_anchor is not None:
206+
cache_fresh = (today - cached_api_anchor).days <= _STALE_GRACE_DAYS
207+
else:
208+
cached_api_date = last_date if last_source != "sync" else None
209+
cache_fresh = (
210+
cached_api_date is not None
211+
and freshness_anchor is not None
212+
and (today - freshness_anchor).days <= _STALE_GRACE_DAYS
213+
)
210214
# 无有效缓存时,A 股产品用交易日历找最近交易日,避免周末误报
211215
if _is_a_stock_product(product) and trading_calendar:
212216
trading_ref = _last_trading_day(trading_calendar, today)

quantclass_sync_internal/status_store.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -523,14 +523,27 @@ def update_api_latest_dates(log_dir: Path, api_latest_dates: Dict[str, object])
523523
continue
524524
latest_date = candidates[-1]
525525
if product in existing:
526-
existing[product]["date_time"] = latest_date
527526
existing[product]["api_dates"] = candidates
528-
existing[product]["checked_at"] = checked_at
529-
existing[product]["source"] = _SOURCE_API_CHECK
527+
existing[product]["api_date_time"] = latest_date
528+
existing[product]["api_checked_at"] = checked_at
529+
has_sync_payload = any(
530+
bool(existing[product].get(field))
531+
for field in ("status", "reason_code", "error")
532+
)
533+
if has_sync_payload:
534+
existing[product]["source"] = _SOURCE_SYNC
535+
if not has_sync_payload and existing[product].get("source") != _SOURCE_SYNC:
536+
existing[product]["date_time"] = latest_date
537+
existing[product]["checked_at"] = checked_at
538+
existing[product]["source"] = _SOURCE_API_CHECK
530539
else:
531540
existing[product] = {
532541
"status": "", "reason_code": "", "error": "",
533-
"date_time": latest_date, "api_dates": candidates, "checked_at": checked_at,
542+
"date_time": latest_date,
543+
"checked_at": checked_at,
544+
"api_date_time": latest_date,
545+
"api_checked_at": checked_at,
546+
"api_dates": candidates,
534547
"source": _SOURCE_API_CHECK,
535548
}
536549
with atomic_temp_path(status_path, tag="last_status") as tmp:
@@ -557,11 +570,18 @@ def load_api_latest_dates(log_dir: Path) -> Dict[str, Tuple[List[str], str]]:
557570
for product, info in data.items():
558571
if not isinstance(info, dict):
559572
continue
560-
# 只读取 check_updates 写入的记录,排除同步结果
573+
candidates = _normalize_api_date_candidates(
574+
info.get("api_dates", info.get("api_date_time", info.get("date_time")))
575+
)
576+
checked_at = info.get("api_checked_at") or info.get("checked_at")
577+
# 新格式:只要显式存在 api_checked_at / api_date_time,即视为 API 检查缓存。
578+
if info.get("api_checked_at") or info.get("api_date_time"):
579+
if candidates and isinstance(checked_at, str):
580+
result[product] = (candidates, checked_at)
581+
continue
582+
# 旧格式:只读取 source=="api_check" 的记录,排除同步结果。
561583
if info.get("source") != _SOURCE_API_CHECK:
562584
continue
563-
candidates = _normalize_api_date_candidates(info.get("api_dates", info.get("date_time")))
564-
checked_at = info.get("checked_at")
565585
if candidates and isinstance(checked_at, str):
566586
result[product] = (candidates, checked_at)
567587
return result

tests/test_data_query.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
get_run_detail,
1515
get_run_history,
1616
)
17+
from quantclass_sync_internal.status_store import update_api_latest_dates
1718

1819

1920
class TestDaysBehind(unittest.TestCase):
@@ -376,6 +377,65 @@ def test_overview_same_api_date_after_no_valid_output_is_not_pending(self):
376377
self.assertEqual(overview[0]["days_behind"], 0)
377378
self.assertEqual(overview[0]["status_color"], "green")
378379

380+
def test_overview_same_api_date_after_check_updates_keeps_no_valid_output_suppressed(self):
381+
"""check_updates 写入 API 缓存后,仍保留 same-date no_valid_output 抑制。"""
382+
self._write_timestamp("coin-cap", "2026-03-10")
383+
self._write_product_last_status({
384+
"coin-cap": {
385+
"status": "skipped",
386+
"reason_code": "no_valid_output",
387+
"error": "同步未产生可用输出,已跳过状态推进。",
388+
"date_time": "2026-03-11",
389+
"checked_at": "2026-03-13T09:00:00",
390+
"source": "sync",
391+
}
392+
})
393+
update_api_latest_dates(self.log_dir, {"coin-cap": "2026-03-11"})
394+
395+
import unittest.mock
396+
with unittest.mock.patch(
397+
"quantclass_sync_internal.data_query.report_dir_path",
398+
return_value=self.log_dir,
399+
):
400+
overview = get_products_overview(
401+
self.data_root,
402+
["coin-cap"],
403+
today=date(2026, 3, 13),
404+
api_latest_dates={"coin-cap": "2026-03-11"},
405+
)
406+
407+
self.assertEqual(overview[0]["days_behind"], 0)
408+
self.assertEqual(overview[0]["status_color"], "green")
409+
410+
def test_overview_suppresses_same_date_no_valid_output_even_if_source_was_polluted(self):
411+
"""旧状态文件 source 被污染为 api_check 时,same-date no_valid_output 仍应抑制待更新。"""
412+
self._write_timestamp("coin-cap", "2026-03-10")
413+
self._write_product_last_status({
414+
"coin-cap": {
415+
"status": "skipped",
416+
"reason_code": "no_valid_output",
417+
"error": "同步未产生可用输出,已跳过状态推进。",
418+
"date_time": "2026-03-11",
419+
"checked_at": "2026-03-13T09:00:00",
420+
"source": "api_check",
421+
}
422+
})
423+
424+
import unittest.mock
425+
with unittest.mock.patch(
426+
"quantclass_sync_internal.data_query.report_dir_path",
427+
return_value=self.log_dir,
428+
):
429+
overview = get_products_overview(
430+
self.data_root,
431+
["coin-cap"],
432+
today=date(2026, 3, 13),
433+
api_latest_dates={"coin-cap": "2026-03-11"},
434+
)
435+
436+
self.assertEqual(overview[0]["days_behind"], 0)
437+
self.assertEqual(overview[0]["status_color"], "green")
438+
379439
def test_overview_api_latest_dates_earlier_than_local(self):
380440
"""API 日期早于本地日期时 behind=0(max(0, diff) 截断)。"""
381441
self._write_timestamp("stock-trading-data", "2026-03-14")

tests/test_status_store.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,56 @@ def test_source_residual_after_sync_overwrite(self):
250250
cache_after = load_api_latest_dates(log_dir)
251251
self.assertNotIn("prod-a", cache_after)
252252

253+
def test_api_check_does_not_overwrite_sync_provenance(self):
254+
"""已有 sync 结果时,check_updates 只补 API 缓存,不覆盖同步来源。"""
255+
with tempfile.TemporaryDirectory() as tmpdir:
256+
log_dir = Path(tmpdir)
257+
report = _new_report("test", mode="network")
258+
_append_result(
259+
report,
260+
product="prod-a",
261+
status="skipped",
262+
reason_code="no_valid_output",
263+
error="同步未产生可用输出,已跳过状态推进。",
264+
date_time="2026-03-18",
265+
)
266+
_update_product_last_status(log_dir, report)
267+
268+
update_api_latest_dates(log_dir, {"prod-a": "2026-03-18"})
269+
270+
status = json.loads((log_dir / "product_last_status.json").read_text(encoding="utf-8"))
271+
entry = status["prod-a"]
272+
self.assertEqual(entry["source"], _SOURCE_SYNC)
273+
self.assertEqual(entry["reason_code"], "no_valid_output")
274+
self.assertEqual(entry["status"], "skipped")
275+
self.assertEqual(entry["api_dates"], ["2026-03-18"])
276+
self.assertEqual(entry["api_date_time"], "2026-03-18")
277+
self.assertIn("T", entry["api_checked_at"])
278+
279+
cache = load_api_latest_dates(log_dir)
280+
self.assertIn("prod-a", cache)
281+
282+
def test_api_check_heals_old_polluted_source_to_sync(self):
283+
"""旧版本把 source 污染成 api_check 时,下次检查更新应自愈回 sync。"""
284+
with tempfile.TemporaryDirectory() as tmpdir:
285+
log_dir = Path(tmpdir)
286+
status_path = log_dir / "product_last_status.json"
287+
status_path.write_text(json.dumps({
288+
"prod-a": {
289+
"status": "skipped",
290+
"reason_code": "no_valid_output",
291+
"error": "同步未产生可用输出,已跳过状态推进。",
292+
"date_time": "2026-03-18",
293+
"checked_at": "2026-03-18T10:00:00",
294+
"source": _SOURCE_API_CHECK,
295+
}
296+
}), encoding="utf-8")
297+
298+
update_api_latest_dates(log_dir, {"prod-a": "2026-03-18"})
299+
300+
status = json.loads(status_path.read_text(encoding="utf-8"))
301+
self.assertEqual(status["prod-a"]["source"], _SOURCE_SYNC)
302+
253303

254304
if __name__ == "__main__":
255305
unittest.main()

0 commit comments

Comments
 (0)