From 9e73356b050382bdbde9a1b3c4cb70e87f9850d5 Mon Sep 17 00:00:00 2001 From: Chit Boon Date: Sun, 10 May 2026 19:34:22 +0800 Subject: [PATCH 1/5] feat(dataflows): per-vendor cache scaffolding for news vendors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a small daily / weekly bucket cache used by the upcoming non-US news vendors (CLS, Cninfo, Eastmoney). Daily buckets for breaking news; weekly (ISO-week-Monday-keyed) buckets for Cninfo since disclosures spike at quarter-end. Standalone module — unused at this commit. The vendor wiring lands in a follow-up commit on this branch. --- tests/test_dataflow_cache.py | 105 ++++++++++ tradingagents/dataflows/dataflow_cache.py | 238 ++++++++++++++++++++++ 2 files changed, 343 insertions(+) create mode 100644 tests/test_dataflow_cache.py create mode 100644 tradingagents/dataflows/dataflow_cache.py diff --git a/tests/test_dataflow_cache.py b/tests/test_dataflow_cache.py new file mode 100644 index 00000000000..05fc31fcc32 --- /dev/null +++ b/tests/test_dataflow_cache.py @@ -0,0 +1,105 @@ +"""Tests for the cross-ticker dataflow cache. + +Pins: + 1. ``get_global_news`` is cached by date — second call same day reuses, even + when invoked from a different ticker context. + 2. Financial statements are cached by (ticker, fiscal_quarter) — same ticker + queried on different days within a quarter reuses; new quarter refetches. + 3. Non-cacheable methods (e.g. get_stock_data) fall through every time. +""" + +from datetime import date + +import pytest + +from tradingagents.dataflows import dataflow_cache as dc +from tradingagents.dataflows.config import set_config + + +@pytest.fixture(autouse=True) +def isolated_cache(tmp_path): + set_config({"data_cache_dir": str(tmp_path)}) + yield + + +def _counting_fetch(payload, counter): + def fetch(): + counter[0] += 1 + return payload + return fetch + + +def test_fiscal_quarter_end_buckets(): + assert dc.fiscal_quarter_end("2026-05-08") == "2026-03-31" + assert dc.fiscal_quarter_end("2026-07-15") == "2026-06-30" + assert dc.fiscal_quarter_end("2026-12-31") == "2026-12-31" + assert dc.fiscal_quarter_end("2026-01-02") == "2025-12-31" + + +def test_global_news_cache_reused_same_day(): + counter = [0] + args = ("2026-05-08", 7, 10) + fetch = _counting_fetch("global news payload", counter) + + a = dc.cached_call("get_global_news", args, {}, fetch) + b = dc.cached_call("get_global_news", args, {}, fetch) + + assert a == b == "global news payload" + assert counter[0] == 1, "Second call should hit the cache" + + +def test_global_news_cache_invalidates_on_new_day(): + counter = [0] + fetch = _counting_fetch("payload", counter) + + dc.cached_call("get_global_news", ("2026-05-08", 7, 10), {}, fetch) + dc.cached_call("get_global_news", ("2026-05-09", 7, 10), {}, fetch) + + assert counter[0] == 2 + + +def test_fundamentals_cache_reused_within_quarter(): + counter = [0] + fetch = _counting_fetch("fundamentals body", counter) + + # Two different days, same ticker, same calendar quarter (Q2 2026). + dc.cached_call("get_fundamentals", ("AAPL", "2026-04-15"), {}, fetch) + dc.cached_call("get_fundamentals", ("AAPL", "2026-05-08"), {}, fetch) + + assert counter[0] == 1 + + +def test_fundamentals_cache_separates_tickers(): + counter = [0] + fetch = _counting_fetch("body", counter) + + dc.cached_call("get_fundamentals", ("AAPL", "2026-05-08"), {}, fetch) + dc.cached_call("get_fundamentals", ("MSFT", "2026-05-08"), {}, fetch) + + assert counter[0] == 2 + + +def test_balance_sheet_cache_keyed_by_freq(): + counter = [0] + fetch = _counting_fetch("body", counter) + + dc.cached_call("get_balance_sheet", ("AAPL", "quarterly", "2026-05-08"), {}, fetch) + dc.cached_call("get_balance_sheet", ("AAPL", "annual", "2026-05-08"), {}, fetch) + + assert counter[0] == 2 + + +def test_uncacheable_method_falls_through(): + counter = [0] + fetch = _counting_fetch("body", counter) + + dc.cached_call("get_stock_data", ("AAPL", "2026-01-01", "2026-05-08"), {}, fetch) + dc.cached_call("get_stock_data", ("AAPL", "2026-01-01", "2026-05-08"), {}, fetch) + + assert counter[0] == 2, "Non-cacheable methods should always fetch" + + +def test_cache_key_for_returns_none_on_missing_args(): + assert dc.cache_key_for("get_global_news", (), {}) is None + assert dc.cache_key_for("get_fundamentals", ("AAPL",), {}) is None + assert dc.cache_key_for("nonexistent_method", ("AAPL",), {}) is None diff --git a/tradingagents/dataflows/dataflow_cache.py b/tradingagents/dataflows/dataflow_cache.py new file mode 100644 index 00000000000..4e1cc0fdcf5 --- /dev/null +++ b/tradingagents/dataflows/dataflow_cache.py @@ -0,0 +1,238 @@ +"""Cross-ticker on-disk cache for slow-changing dataflow results. + +Two access patterns this layer accelerates: + 1. **Same day, multiple tickers** — global / macro news is identical for + every ticker analysed on the same calendar day. Cache by date alone. + 2. **Same ticker, different days within a quarter** — financial statements + and the fundamentals overview only roll forward when a new quarterly + report is filed. Cache by (ticker, fiscal_quarter_end). + +This is distinct from the per-(ticker, trade_date) analyst report cache: +that one short-circuits the whole analyst, whereas this one short-circuits +individual data fetches and so still benefits a fresh run on a new +(ticker, date) pair. +""" + +from __future__ import annotations + +import hashlib +import logging +import os +from datetime import date, datetime, timedelta +from typing import Callable, Optional + +from .config import get_config + +logger = logging.getLogger(__name__) + +# Methods we cache here. Each entry maps to a key-builder defined below. +_CACHEABLE_METHODS = frozenset({ + "get_global_news", + "get_fundamentals", + "get_balance_sheet", + "get_cashflow", + "get_income_statement", +}) + + +def _cache_dir() -> str: + return os.path.join(get_config()["data_cache_dir"], "dataflow_cache") + + +def _path_for(key: str) -> str: + digest = hashlib.sha1(key.encode("utf-8")).hexdigest()[:20] + return os.path.join(_cache_dir(), f"{digest}.txt") + + +def _read(key: str) -> Optional[str]: + path = _path_for(key) + if not os.path.exists(path): + return None + try: + with open(path, "r", encoding="utf-8") as f: + return f.read() + except OSError as e: + logger.warning("dataflow cache read failed for %s: %s", key, e) + return None + + +def _write(key: str, value: str) -> None: + if not isinstance(value, str) or not value.strip(): + return + try: + os.makedirs(_cache_dir(), exist_ok=True) + with open(_path_for(key), "w", encoding="utf-8") as f: + f.write(value) + except OSError as e: + logger.warning("dataflow cache write failed for %s: %s", key, e) + + +def fiscal_quarter_end(date_str: str) -> str: + """Most recent calendar quarter end (Mar 31 / Jun 30 / Sep 30 / Dec 31) on + or before ``date_str``. Used as a cache bucket so financial statements + are fetched at most once per quarter per ticker. + """ + d = datetime.strptime(date_str, "%Y-%m-%d").date() + quarters = [(3, 31), (6, 30), (9, 30), (12, 31)] + candidates = [] + for year in (d.year - 1, d.year): + for m, day in quarters: + qe = date(year, m, day) + if qe <= d: + candidates.append(qe) + return max(candidates).isoformat() + + +def iso_week_monday(date_str: str) -> str: + """Monday of the ISO week containing ``date_str``. Bucket for sources + that change at most weekly (Cninfo regulatory filings). + """ + d = datetime.strptime(date_str, "%Y-%m-%d").date() + monday = d - timedelta(days=d.weekday()) + return monday.isoformat() + + +def cache_key_for(method: str, args: tuple, kwargs: dict) -> Optional[str]: + """Build a cache key for a vendor call, or return None if uncacheable. + + Returning None falls through to the live fetch with no caching, so a + new method always works even before its key-builder is added. + """ + try: + if method == "get_global_news": + curr_date = args[0] if args else kwargs.get("curr_date") + lookback = args[1] if len(args) > 1 else kwargs.get("look_back_days", 7) + limit = args[2] if len(args) > 2 else kwargs.get("limit", 5) + if not curr_date: + return None + return f"global_news::{curr_date}::{lookback}::{limit}" + + if method == "get_fundamentals": + ticker = args[0] if args else kwargs.get("ticker") + curr_date = args[1] if len(args) > 1 else kwargs.get("curr_date") + if not ticker or not curr_date: + return None + return f"fundamentals::{ticker}::{fiscal_quarter_end(curr_date)}" + + if method in ("get_balance_sheet", "get_cashflow", "get_income_statement"): + ticker = args[0] if args else kwargs.get("ticker") + freq = args[1] if len(args) > 1 else kwargs.get("freq", "quarterly") + curr_date = args[2] if len(args) > 2 else kwargs.get("curr_date") + if not ticker or not curr_date: + return None + return f"{method}::{ticker}::{freq}::{fiscal_quarter_end(curr_date)}" + except (IndexError, ValueError, KeyError, TypeError) as e: + logger.debug("dataflow cache key build failed for %s: %s", method, e) + return None + + return None + + +def is_cacheable(method: str) -> bool: + return method in _CACHEABLE_METHODS + + +def cached_call(method: str, args: tuple, kwargs: dict, fetch: Callable[[], object]) -> object: + """Lookup-or-fetch, returning the (possibly cached) value. + + Falls through to ``fetch()`` when: + - the method is not in the cacheable allowlist, + - the key builder cannot derive a stable key, + - the fetched value is not a string (we only cache strings here). + """ + if not is_cacheable(method): + return fetch() + + key = cache_key_for(method, args, kwargs) + if key is None: + return fetch() + + hit = _read(key) + if hit is not None: + return hit + + value = fetch() + if isinstance(value, str): + _write(key, value) + return value + + +def vendor_cache_key( + vendor: str, method: str, args: tuple, kwargs: dict +) -> Optional[str]: + """Per-vendor cache key for the news methods. + + Each news vendor caches independently so the multi-source merge for + HK / SH / SZ tickers reuses whatever is already hot. Bucketing: + + * Eastmoney / CLS / yfinance: daily — these surface fresh + editorial / flash content on a daily cadence. + * Cninfo: ISO-weekly — A-share regulatory filings move slowly enough + that re-fetching daily is wasteful. A buyback-progress filing today + doesn't materially change the disclosure list this week. + """ + if method == "get_news": + ticker = args[0] if args else kwargs.get("ticker") + end_date = args[2] if len(args) > 2 else kwargs.get("end_date") + if not ticker or not end_date: + return None + if vendor == "cninfo": + return f"cninfo::{ticker}::{iso_week_monday(end_date)}" + return f"{vendor}::news::{ticker}::{end_date}" + if method == "get_insider_transactions": + ticker = args[0] if args else kwargs.get("ticker") + if not ticker: + return None + return f"{vendor}::insider::{ticker}::{date.today().isoformat()}" + return None + + +def vendor_cached_call( + vendor: str, + method: str, + args: tuple, + kwargs: dict, + fetch: Callable[[], object], +) -> object: + """Try the per-vendor cache first; on miss, fetch and store. + + Falls through to ``fetch()`` when no stable key can be derived for + this vendor/method combo (e.g. ``get_global_news`` is already cached + by the legacy ``cached_call`` keyed on the date+window+limit triple, + no per-vendor split needed). + """ + key = vendor_cache_key(vendor, method, args, kwargs) + if key is None: + return fetch() + + hit = _read(key) + if hit is not None: + return hit + + value = fetch() + if isinstance(value, str): + # Skip-markers from a non-applicable vendor (e.g. Cninfo on an HK + # ticker) are intentionally NOT cached — they're cheap to regenerate + # and we don't want to lock them in if the user later configures a + # different vendor mapping. + stripped = value.strip() + is_skip = stripped.startswith("[") and "skip" in stripped.lower() and stripped.endswith("]") + if not is_skip: + _write(key, value) + return value + + +def clear_dataflow_cache() -> int: + """Delete every cached dataflow file. Returns the count removed.""" + base = _cache_dir() + if not os.path.isdir(base): + return 0 + removed = 0 + for name in os.listdir(base): + path = os.path.join(base, name) + try: + os.remove(path) + removed += 1 + except OSError: + pass + return removed From 57b90b56ca89e344cf3654bb35eb5c1853299a93 Mon Sep 17 00:00:00 2001 From: Chit Boon Date: Sun, 10 May 2026 19:42:49 +0800 Subject: [PATCH 2/5] feat(dataflows): yfinance ticker normaliser (.SH -> .SS) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds normalize_ticker_for_yfinance() and applies it in the yfinance news fetcher. Different platforms expose Shanghai tickers with either .SH or .SS — yfinance only accepts .SS, so user input from any source now resolves to a fetchable symbol. Also tightens get_global_news_yfinance from four broad queries to two high-signal queries. Roughly halves the macro-news fetch latency without losing distinct signal (the four were heavily overlapping). --- tradingagents/dataflows/stockstats_utils.py | 15 +++++++++++++++ tradingagents/dataflows/yfinance_news.py | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/tradingagents/dataflows/stockstats_utils.py b/tradingagents/dataflows/stockstats_utils.py index 260ef73cd35..d8b01291431 100644 --- a/tradingagents/dataflows/stockstats_utils.py +++ b/tradingagents/dataflows/stockstats_utils.py @@ -13,6 +13,21 @@ logger = logging.getLogger(__name__) +def normalize_ticker_for_yfinance(ticker: str) -> str: + """Translate exchange suffix variants yfinance does not accept. + + Different platforms use ``.SH`` or ``.SS`` interchangeably for the + Shanghai exchange; yfinance only accepts ``.SS``. We normalise here + so user input from any source resolves to a fetchable symbol. + """ + if not ticker or "." not in ticker: + return ticker + code, suffix = ticker.rsplit(".", 1) + if suffix.upper() == "SH": + return f"{code}.SS" + return ticker + + def yf_retry(func, max_retries=3, base_delay=2.0): """Execute a yfinance call with exponential backoff on rate limits. diff --git a/tradingagents/dataflows/yfinance_news.py b/tradingagents/dataflows/yfinance_news.py index 55c5d251264..2dacfe1f1c0 100644 --- a/tradingagents/dataflows/yfinance_news.py +++ b/tradingagents/dataflows/yfinance_news.py @@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta from .config import get_config -from .stockstats_utils import yf_retry +from .stockstats_utils import yf_retry, normalize_ticker_for_yfinance def _extract_article_data(article: dict) -> dict: @@ -69,7 +69,7 @@ def get_news_yfinance( """ article_limit = get_config()["news_article_limit"] try: - stock = yf.Ticker(ticker) + stock = yf.Ticker(normalize_ticker_for_yfinance(ticker)) news = yf_retry(lambda: stock.get_news(count=article_limit)) if not news: From 12b92a48b72051436667e7e58e7b240f3be647e4 Mon Sep 17 00:00:00 2001 From: Chit Boon Date: Sun, 10 May 2026 19:52:25 +0800 Subject: [PATCH 3/5] feat(dataflows): CLS, Cninfo, Eastmoney news vendors + auto-router MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds three news vendors covering HK and CN A-share (SH / SZ) tickers: * Eastmoney — editorial Chinese-language news (HK / SH / SZ) * CLS — Cailianshe flash-news stream (HK / SH / SZ) * Cninfo — official CSRC corporate disclosures (SH / SZ only) The vendors are wired into route_to_vendor via a ticker-suffix auto-router. When data_vendors.news_data = "auto" (now the default), the router dispatches: * .HK -> eastmoney + cls * .SS / .SH / .SZ -> eastmoney + cls + cninfo * everything else -> yfinance (existing behaviour) Multi-vendor calls run sequentially and concatenate non-empty results as Markdown blocks separated by horizontal rules. Per-vendor failures and "[Vendor skip — ...]" markers are dropped from the merge so the analyst sees whichever sources succeeded; if all sources fail or skip, we fall through to yfinance as a final safety net. Each vendor call goes through the per-vendor cache added in the preceding commit — daily buckets for editorial / flash news, weekly buckets (ISO-week-Monday-keyed) for Cninfo since disclosures spike at quarter-end. Adds akshare>=1.18 as a runtime dep (Eastmoney / Cninfo backends). Imported lazily inside the vendors so users analysing only US tickers pay no import cost. Tests cover routing, the multi-vendor merge, skip-marker filtering, cache hit/miss, and Cninfo's two known schema shapes (公告链接 vs announcementId+orgId). All vendor calls are mocked at the akshare import boundary — no live network. --- pyproject.toml | 5 + tests/test_cls_cninfo_news.py | 206 ++++++++++++++++++ tests/test_eastmoney_news.py | 159 ++++++++++++++ tests/test_news_caching.py | 136 ++++++++++++ tradingagents/dataflows/cls_news.py | 162 ++++++++++++++ tradingagents/dataflows/cninfo_disclosures.py | 166 ++++++++++++++ tradingagents/dataflows/eastmoney_news.py | 160 ++++++++++++++ tradingagents/dataflows/interface.py | 159 ++++++++++++-- tradingagents/default_config.py | 6 +- 9 files changed, 1138 insertions(+), 21 deletions(-) create mode 100644 tests/test_cls_cninfo_news.py create mode 100644 tests/test_eastmoney_news.py create mode 100644 tests/test_news_caching.py create mode 100644 tradingagents/dataflows/cls_news.py create mode 100644 tradingagents/dataflows/cninfo_disclosures.py create mode 100644 tradingagents/dataflows/eastmoney_news.py diff --git a/pyproject.toml b/pyproject.toml index 07cbbd3f75f..b603ad87990 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,11 @@ dependencies = [ "tqdm>=4.67.1", "typing-extensions>=4.14.0", "yfinance>=0.2.63", + # akshare wraps Eastmoney + Cninfo Chinese-market endpoints used by the + # auto-router for .HK / .SS / .SZ tickers. Imported lazily inside the + # vendor modules — only matters when a Chinese-market ticker is + # actually analysed. + "akshare>=1.18", ] [project.scripts] diff --git a/tests/test_cls_cninfo_news.py b/tests/test_cls_cninfo_news.py new file mode 100644 index 00000000000..fdd6dd2969e --- /dev/null +++ b/tests/test_cls_cninfo_news.py @@ -0,0 +1,206 @@ +"""Tests for CLS flash news + Cninfo disclosures + multi-source merge. + +Pins: + 1. CLS filters the global flash stream by ticker / company-name match + and applies the requested date window. + 2. CLS skips non-Asian-market tickers explicitly. + 3. Cninfo only handles SH/SZ; HK/US tickers get a labelled skip marker. + 4. Cninfo formats akshare's disclosure DataFrame into Markdown with + filing title / date / link. + 5. Auto-router resolves SS/SZ → [eastmoney, cls, cninfo] and HK → + [eastmoney, cls]. + 6. ``route_to_vendor`` calls every vendor in the resolved list and + concatenates non-empty results separated by ``---``. + 7. Pure skip-markers from a non-applicable vendor are dropped so the + LLM doesn't get padded with no-op noise. +""" + +from unittest.mock import patch + +import pandas as pd +import pytest + +from tradingagents.dataflows import cls_news as cls +from tradingagents.dataflows import cninfo_disclosures as cninfo +from tradingagents.dataflows import interface as iface +from tradingagents.dataflows.config import set_config + + +@pytest.fixture(autouse=True) +def isolated_cache(tmp_path): + set_config({ + "data_cache_dir": str(tmp_path), + "data_vendors": {"news_data": "auto"}, + "tool_vendors": {}, + }) + cls._NAME_CACHE.clear() + yield + + +def _cls_frame(rows): + return pd.DataFrame(rows, columns=["标题", "内容", "发布日期", "发布时间"]) + + +def _cninfo_frame(rows): + return pd.DataFrame(rows, columns=["代码", "简称", "公告标题", "公告时间", "公告链接"]) + + +# --- CLS --------------------------------------------------------------- + +def test_cls_matches_ticker_and_filters_dates(): + fake = _cls_frame([ + ["腾讯控股回购股份", "腾讯今日宣布回购计划。", "2026-05-08", "09:30:00"], + ["其他公司新闻", "与本案无关。", "2026-05-08", "10:00:00"], + ["腾讯Q1业绩", "营收增长。", "2024-01-01", "09:00:00"], # outside window + ]) + with patch("akshare.stock_info_global_cls", return_value=fake), \ + patch("tradingagents.dataflows.cls_news._resolve_company_short_name", return_value="腾讯"): + out = cls.get_news_cls("0700.HK", "2026-05-01", "2026-05-15") + + assert "腾讯控股回购股份" in out + assert "其他公司新闻" not in out + assert "腾讯Q1业绩" not in out # date filter + + +def test_cls_skips_non_asian_market(): + out = cls.get_news_cls("AAPL", "2026-05-01", "2026-05-10") + assert "skip" in out.lower() + + +def test_cls_handles_fetch_error(): + with patch("akshare.stock_info_global_cls", side_effect=RuntimeError("network")): + out = cls.get_news_cls("0700.HK", "2026-05-01", "2026-05-10") + assert "fetch failed" in out.lower() + + +# --- Cninfo ------------------------------------------------------------ + +def test_cninfo_formats_disclosure_frame(): + fake = _cninfo_frame([ + ["600519", "贵州茅台", "贵州茅台关于回购股份实施进展的公告", "2026-05-08", "https://cninfo.com.cn/x"], + ["600519", "贵州茅台", "贵州茅台关于召开业绩说明会的公告", "2026-04-29", "https://cninfo.com.cn/y"], + ]) + with patch("akshare.stock_zh_a_disclosure_report_cninfo", return_value=fake): + out = cninfo.get_disclosures_cninfo("600519.SS", "2026-04-01", "2026-05-10") + + assert "贵州茅台关于回购股份实施进展的公告" in out + assert "https://cninfo.com.cn/x" in out + assert "巨潮" in out + + +def test_cninfo_skips_hk_and_us(): + out = cninfo.get_disclosures_cninfo("0700.HK", "2026-05-01", "2026-05-10") + assert "skip" in out.lower() + out = cninfo.get_disclosures_cninfo("AAPL", "2026-05-01", "2026-05-10") + assert "skip" in out.lower() + + +def test_cninfo_handles_empty_result(): + with patch("akshare.stock_zh_a_disclosure_report_cninfo", return_value=pd.DataFrame()): + out = cninfo.get_disclosures_cninfo("600519.SS", "2026-05-01", "2026-05-10") + assert "No Cninfo filings" in out + + +@pytest.mark.parametrize("ann_id,org_id,ann_time", [ + ("20260508001234", "9900016138", "2026-05-08"), + ("20260101005678", "9900019999", "2026-01-01"), +]) +def test_cninfo_build_link_reconstructs_from_announcement_id(ann_id, org_id, ann_time): + """_build_link Path B: no 公告链接 column; reconstruct URL from announcementId + orgId + 公告时间.""" + fake = pd.DataFrame([{ + "代码": "600519", + "简称": "贵州茅台", + "公告标题": "测试公告", + "公告时间": ann_time, + "announcementId": ann_id, + "orgId": org_id, + }]) + with patch("akshare.stock_zh_a_disclosure_report_cninfo", return_value=fake): + out = cninfo.get_disclosures_cninfo("600519.SS", "2026-01-01", "2026-12-31") + + assert "cninfo.com.cn" in out + assert f"announcementId={ann_id}" in out + assert f"orgId={org_id}" in out + + +# --- Auto-router multi-source ---------------------------------------- + +def test_auto_router_returns_list_for_ss(): + vendors = iface._resolve_auto_vendors("get_news", ("600519.SS",)) + assert vendors == ["eastmoney", "cls", "cninfo"] + + +def test_auto_router_returns_list_for_sz(): + vendors = iface._resolve_auto_vendors("get_news", ("000001.SZ",)) + assert vendors == ["eastmoney", "cls", "cninfo"] + + +def test_auto_router_returns_list_for_hk(): + vendors = iface._resolve_auto_vendors("get_news", ("0700.HK",)) + assert vendors == ["eastmoney", "cls"] + + +def test_auto_router_us_stays_single_yfinance(): + assert iface._resolve_auto_vendors("get_news", ("AAPL",)) == ["yfinance"] + + +def test_auto_router_global_news_always_yfinance(): + assert iface._resolve_auto_vendors("get_global_news", ("2026-05-08", 7, 10)) == ["yfinance"] + + +def test_route_to_vendor_merges_multiple_sources_for_ss(): + """SS ticker fans out to eastmoney + cls + cninfo; results concatenate.""" + fake_em = _cls_frame([]) # placeholder, eastmoney mock returns string directly + iface_orig = iface.VENDOR_METHODS["get_news"].copy() + try: + iface.VENDOR_METHODS["get_news"]["eastmoney"] = lambda *a, **kw: "" + iface.VENDOR_METHODS["get_news"]["cls"] = lambda *a, **kw: "" + iface.VENDOR_METHODS["get_news"]["cninfo"] = lambda *a, **kw: "" + out = iface.route_to_vendor("get_news", "600519.SS", "2026-05-01", "2026-05-10") + finally: + iface.VENDOR_METHODS["get_news"] = iface_orig + + assert "" in out + assert "" in out + assert "" in out + # Sources separated by --- so the LLM sees clear demarcation. + assert "---" in out + + +def test_route_to_vendor_drops_pure_skip_markers(): + """If a vendor returns a [skip — ...] marker, it's omitted from the merge + so the analyst doesn't get prompts polluted with no-op noise.""" + iface_orig = iface.VENDOR_METHODS["get_news"].copy() + try: + iface.VENDOR_METHODS["get_news"]["eastmoney"] = lambda *a, **kw: "real eastmoney content" + iface.VENDOR_METHODS["get_news"]["cls"] = lambda *a, **kw: "[CLS skip — HK suffix routed elsewhere]" + iface.VENDOR_METHODS["get_news"]["cninfo"] = lambda *a, **kw: "real cninfo content" + out = iface.route_to_vendor("get_news", "600519.SS", "2026-05-01", "2026-05-10") + finally: + iface.VENDOR_METHODS["get_news"] = iface_orig + + assert "real eastmoney content" in out + assert "real cninfo content" in out + assert "CLS skip" not in out + + +def test_route_to_vendor_survives_per_vendor_failure(): + """One source raising must not block the others.""" + iface_orig = iface.VENDOR_METHODS["get_news"].copy() + try: + iface.VENDOR_METHODS["get_news"]["eastmoney"] = lambda *a, **kw: (_ for _ in ()).throw(RuntimeError("network")) + iface.VENDOR_METHODS["get_news"]["cls"] = lambda *a, **kw: "" + iface.VENDOR_METHODS["get_news"]["cninfo"] = lambda *a, **kw: "" + out = iface.route_to_vendor("get_news", "600519.SS", "2026-05-01", "2026-05-10") + finally: + iface.VENDOR_METHODS["get_news"] = iface_orig + + assert "" in out + assert "" in out + # No network error leaks to the analyst — failure is logged, not surfaced. + + +def test_auto_router_si_routes_to_yfinance(): + """SG (.SI) tickers must currently fall through to yfinance — the + SGX vendor is held back from this PR pending more end-to-end testing.""" + assert iface._resolve_auto_vendors("get_news", ("D05.SI",)) == ["yfinance"] diff --git a/tests/test_eastmoney_news.py b/tests/test_eastmoney_news.py new file mode 100644 index 00000000000..4c82af02826 --- /dev/null +++ b/tests/test_eastmoney_news.py @@ -0,0 +1,159 @@ +"""Tests for the Eastmoney news vendor + auto-router. + +Pins: + 1. Ticker-format conversion: ``.SS`` / ``.SZ`` strip suffix, ``.HK`` zero-pads to 5 digits. + 2. The news vendor formats Eastmoney's Chinese DataFrame into a + Markdown report covering title / source / publish date / link. + 3. Date filtering drops articles outside the requested window. + 4. The ``auto`` vendor in ``route_to_vendor`` routes ``.HK`` / ``.SS`` / + ``.SZ`` tickers to Eastmoney and US tickers to yfinance. + 5. ``get_global_news`` always goes to yfinance regardless of ``auto``. +""" + +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +from tradingagents.dataflows import eastmoney_news as em +from tradingagents.dataflows import interface as iface +from tradingagents.dataflows.config import set_config + + +@pytest.fixture(autouse=True) +def isolated_cache(tmp_path): + set_config({"data_cache_dir": str(tmp_path)}) + yield + + +def _fake_em_frame(rows): + return pd.DataFrame(rows, columns=["关键词", "新闻标题", "新闻内容", "发布时间", "文章来源", "新闻链接"]) + + +def test_to_eastmoney_symbol_a_share(): + sym = em._to_eastmoney_symbol("600519.SS") + assert sym == ("600519", "Shanghai A-share") + sym = em._to_eastmoney_symbol("000001.SZ") + assert sym == ("000001", "Shenzhen A-share") + + +def test_to_eastmoney_symbol_hk_zero_pads(): + sym = em._to_eastmoney_symbol("0700.HK") + assert sym == ("00700", "Hong Kong") + sym = em._to_eastmoney_symbol("9988.HK") + assert sym == ("09988", "Hong Kong") + + +def test_to_eastmoney_symbol_returns_none_for_unsupported(): + assert em._to_eastmoney_symbol("AAPL") is None + assert em._to_eastmoney_symbol("D05.SI") is None # Singapore not covered + assert em._to_eastmoney_symbol("") is None + + +def test_get_news_eastmoney_formats_dataframe(): + fake = _fake_em_frame([ + ["00700", "腾讯控股回购", "腾讯今日宣布回购计划。", "2026-05-08 09:00:00", "证券时报", "https://example.com/1"], + ["00700", "腾讯Q1财报", "营收增长10%。", "2026-05-06 14:00:00", "财新网", "https://example.com/2"], + ]) + + with patch("akshare.stock_news_em", return_value=fake): + out = em.get_news_eastmoney("0700.HK", "2026-05-01", "2026-05-10") + + assert "腾讯控股回购" in out + assert "腾讯Q1财报" in out + assert "证券时报" in out + assert "https://example.com/1" in out + assert "Hong Kong" in out + + +def test_get_news_eastmoney_filters_by_date(): + fake = _fake_em_frame([ + ["600519", "在窗口内", "x", "2026-05-08 09:00:00", "界面新闻", "https://example.com/1"], + ["600519", "窗口前", "y", "2026-04-01 09:00:00", "界面新闻", "https://example.com/2"], + ["600519", "窗口后", "z", "2026-06-01 09:00:00", "界面新闻", "https://example.com/3"], + ]) + + with patch("akshare.stock_news_em", return_value=fake): + out = em.get_news_eastmoney("600519.SS", "2026-05-01", "2026-05-15") + + assert "在窗口内" in out + assert "窗口前" not in out + assert "窗口后" not in out + + +def test_get_news_eastmoney_handles_unsupported_ticker(): + out = em.get_news_eastmoney("AAPL", "2026-05-01", "2026-05-10") + assert "[Eastmoney skip" in out + assert "AAPL" in out + + +def test_get_news_eastmoney_handles_fetch_error(): + with patch("akshare.stock_news_em", side_effect=RuntimeError("network down")): + out = em.get_news_eastmoney("0700.HK", "2026-05-01", "2026-05-10") + assert "Eastmoney fetch failed" in out + assert "network down" in out + + +def test_auto_routes_hk_to_eastmoney(): + assert iface._resolve_auto_vendor("get_news", ("0700.HK", "2026-05-01", "2026-05-10")) == "eastmoney" + + +def test_auto_routes_a_share_to_eastmoney(): + assert iface._resolve_auto_vendor("get_news", ("600519.SS", "2026-05-01", "2026-05-10")) == "eastmoney" + assert iface._resolve_auto_vendor("get_news", ("000001.SZ", "2026-05-01", "2026-05-10")) == "eastmoney" + + +def test_auto_routes_us_ticker_to_yfinance(): + assert iface._resolve_auto_vendor("get_news", ("AAPL", "2026-05-01", "2026-05-10")) == "yfinance" + + + +def test_auto_global_news_always_yfinance(): + """get_global_news has no ticker; auto must not crash and must route to yfinance.""" + assert iface._resolve_auto_vendor("get_global_news", ("2026-05-08", 7, 10)) == "yfinance" + + +def test_route_to_vendor_with_auto_calls_eastmoney_for_hk(tmp_path): + """End-to-end: when news_data='auto' and ticker is HK, the eastmoney vendor is invoked.""" + # Isolated cache dir so the per-vendor cache doesn't leak across runs. + set_config({ + "data_cache_dir": str(tmp_path), + "data_vendors": {"news_data": "auto"}, + "tool_vendors": {}, + }) + fake = _fake_em_frame([ + ["00700", "测试", "x", "2026-05-08 09:00:00", "test", "https://example.com"], + ]) + with patch("akshare.stock_news_em", return_value=fake) as m: + out = iface.route_to_vendor("get_news", "0700.HK", "2026-05-01", "2026-05-10") + # Multi-source merge calls eastmoney + cls (HK fans to both); we only + # care that eastmoney was reached at least once. + assert m.call_count >= 1 + assert "测试" in out + + +def test_route_to_vendor_with_auto_calls_yfinance_for_us(tmp_path): + """End-to-end: US ticker via auto goes to yfinance, not eastmoney. + + ``VENDOR_METHODS`` is a dict of function references captured at import + time, so patching the module attribute does not redirect the call. + Patch the dict entry directly instead. + """ + set_config({ + "data_cache_dir": str(tmp_path), + "data_vendors": {"news_data": "auto"}, + "tool_vendors": {}, + }) + yf_mock = MagicMock(return_value="") + ak_mock = MagicMock() + original_yf = iface.VENDOR_METHODS["get_news"]["yfinance"] + iface.VENDOR_METHODS["get_news"]["yfinance"] = yf_mock + try: + with patch("akshare.stock_news_em", ak_mock): + out = iface.route_to_vendor("get_news", "AAPL", "2026-05-01", "2026-05-10") + finally: + iface.VENDOR_METHODS["get_news"]["yfinance"] = original_yf + + yf_mock.assert_called_once() + ak_mock.assert_not_called() + assert out == "" diff --git a/tests/test_news_caching.py b/tests/test_news_caching.py new file mode 100644 index 00000000000..a411db11f91 --- /dev/null +++ b/tests/test_news_caching.py @@ -0,0 +1,136 @@ +"""Tests for per-vendor news caching in the auto-router. + +Pins: + 1. Eastmoney / CLS / yfinance bucket by day. + 2. Cninfo buckets by ISO week (filings move slower). + 3. Multi-source merge calls each vendor through the per-vendor cache, + so two consecutive runs for the same ticker hit the cache for every + vendor — no live network calls on the second run. + 4. A vendor that returns a skip-marker is NOT cached (cheap to regen, + don't lock in mis-routing). + 5. Per-vendor cache is independent — a CLS hit doesn't suppress a + fresh Cninfo fetch when only Cninfo's bucket has rolled over. +""" + +from unittest.mock import MagicMock + +import pytest + +from tradingagents.dataflows import dataflow_cache as dc +from tradingagents.dataflows import interface as iface +from tradingagents.dataflows.config import set_config + + +@pytest.fixture(autouse=True) +def isolated_cache(tmp_path): + set_config({ + "data_cache_dir": str(tmp_path), + "data_vendors": {"news_data": "auto"}, + "tool_vendors": {}, + }) + yield + + +def test_iso_week_monday_buckets(): + """Mon 4 May 2026 through Sun 10 May 2026 are the same ISO week.""" + assert dc.iso_week_monday("2026-05-04") == "2026-05-04" # Monday + assert dc.iso_week_monday("2026-05-08") == "2026-05-04" # Friday + assert dc.iso_week_monday("2026-05-10") == "2026-05-04" # Sunday — same ISO week + assert dc.iso_week_monday("2026-05-11") == "2026-05-11" # next Monday — new week + + +def test_vendor_cache_key_eastmoney_daily(): + key = dc.vendor_cache_key("eastmoney", "get_news", ("0700.HK", "2026-05-01", "2026-05-10"), {}) + assert key == "eastmoney::news::0700.HK::2026-05-10" + + +def test_vendor_cache_key_cninfo_weekly(): + key = dc.vendor_cache_key("cninfo", "get_news", ("600519.SS", "2026-05-01", "2026-05-10"), {}) + # 2026-05-10 is a Sunday → ISO Mon = 2026-05-04 + assert key == "cninfo::600519.SS::2026-05-04" + + +def test_vendor_cached_call_reuses_on_second_call(): + """Second invocation must NOT re-run fetch.""" + call_count = {"n": 0} + + def fetch(): + call_count["n"] += 1 + return "" + + a = dc.vendor_cached_call("eastmoney", "get_news", + ("0700.HK", "2026-05-01", "2026-05-10"), {}, fetch) + b = dc.vendor_cached_call("eastmoney", "get_news", + ("0700.HK", "2026-05-01", "2026-05-10"), {}, fetch) + assert a == b == "" + assert call_count["n"] == 1 + + +def test_vendor_cached_call_skips_caching_skip_markers(): + """Skip-markers must regenerate — cheap to recompute, harmful to lock in.""" + call_count = {"n": 0} + + def fetch(): + call_count["n"] += 1 + return "[Cninfo skip — A-share only]" + + dc.vendor_cached_call("cninfo", "get_news", + ("0700.HK", "2026-05-01", "2026-05-10"), {}, fetch) + dc.vendor_cached_call("cninfo", "get_news", + ("0700.HK", "2026-05-01", "2026-05-10"), {}, fetch) + assert call_count["n"] == 2, "skip markers should not be cached" + + +def test_vendor_caches_are_independent(): + """Eastmoney hot, Cninfo cold → only Cninfo fetches on second run.""" + em_calls = {"n": 0} + cninfo_calls = {"n": 0} + + def em_fetch(): + em_calls["n"] += 1 + return "" + + def cninfo_fetch(): + cninfo_calls["n"] += 1 + return "" + + args = ("600519.SS", "2026-05-01", "2026-05-10") + + # First pass: both fetch. + dc.vendor_cached_call("eastmoney", "get_news", args, {}, em_fetch) + dc.vendor_cached_call("cninfo", "get_news", args, {}, cninfo_fetch) + assert em_calls["n"] == 1 and cninfo_calls["n"] == 1 + + # Second pass: both cached. + dc.vendor_cached_call("eastmoney", "get_news", args, {}, em_fetch) + dc.vendor_cached_call("cninfo", "get_news", args, {}, cninfo_fetch) + assert em_calls["n"] == 1 and cninfo_calls["n"] == 1 + + +def test_route_to_vendor_caches_each_source_in_merge(): + """End-to-end: route_to_vendor for SH ticker calls 3 vendors first run, + then 0 vendors on the second (everything cached).""" + em = MagicMock(return_value="") + cls = MagicMock(return_value="") + cninfo = MagicMock(return_value="") + + iface_orig = iface.VENDOR_METHODS["get_news"].copy() + try: + iface.VENDOR_METHODS["get_news"]["eastmoney"] = em + iface.VENDOR_METHODS["get_news"]["cls"] = cls + iface.VENDOR_METHODS["get_news"]["cninfo"] = cninfo + + # First run: each vendor called once. + out1 = iface.route_to_vendor("get_news", "600519.SS", "2026-05-01", "2026-05-10") + assert em.call_count == 1 + assert cls.call_count == 1 + assert cninfo.call_count == 1 + + # Second run: zero new fetches. + out2 = iface.route_to_vendor("get_news", "600519.SS", "2026-05-01", "2026-05-10") + assert em.call_count == 1 + assert cls.call_count == 1 + assert cninfo.call_count == 1 + assert out1 == out2 + finally: + iface.VENDOR_METHODS["get_news"] = iface_orig diff --git a/tradingagents/dataflows/cls_news.py b/tradingagents/dataflows/cls_news.py new file mode 100644 index 00000000000..0261598ddba --- /dev/null +++ b/tradingagents/dataflows/cls_news.py @@ -0,0 +1,162 @@ +"""CLS (财联社) flash news for HK / Shanghai / Shenzhen tickers. + +CLS is the de-facto real-time market headline service for mainland China — +short, fast flashes that surface market-moving events minutes after they +break, distinct from Eastmoney's longer editorial articles. We pull the +global flash stream via akshare and filter by ticker code or company-name +mention, so the analyst sees both the breaking event and Eastmoney's +journalism explainer of it. + +Returned content is Chinese-language. The trading-agents pipeline reads +it natively (MiMo / Kimi / DeepSeek all handle Chinese without +translation overhead). +""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import List, Optional + +from .eastmoney_news import _split_ticker # ticker-suffix parsing reused +from .stockstats_utils import normalize_ticker_for_yfinance + +logger = logging.getLogger(__name__) + + +# In-process cache: ticker -> short company name. Resolved via yfinance +# (already a hard dep) so we can match articles that name the issuer. +_NAME_CACHE: dict[str, Optional[str]] = {} + + +def _resolve_company_short_name(ticker: str) -> Optional[str]: + """One-shot ticker -> short Chinese-or-English issuer name. Best-effort. + + yfinance only accepts ``.SS`` for Shanghai; ``.SH`` (which Chinese + platforms commonly use) raises 404. We normalise here so users can + paste tickers in either convention without losing the name lookup. + """ + if ticker in _NAME_CACHE: + return _NAME_CACHE[ticker] + try: + import yfinance as yf + info = yf.Ticker(normalize_ticker_for_yfinance(ticker)).info or {} + name = info.get("shortName") or info.get("longName") + if isinstance(name, str): + for tail in (" Limited", " Ltd.", " Ltd", " Group", " Holdings", " Corporation", " Co"): + if name.endswith(tail): + name = name[: -len(tail)].strip() + _NAME_CACHE[ticker] = name + return name + except Exception as e: # pragma: no cover + logger.debug("CLS name lookup failed for %s: %s", ticker, e) + _NAME_CACHE[ticker] = None + return None + + +def _matches(haystack: str, needles: List[str]) -> bool: + haystack_l = haystack.lower() + return any(n and n.lower() in haystack_l for n in needles) + + +def get_news_cls( + ticker: str, + start_date: str, + end_date: str, +) -> str: + """Fetch CLS flash news mentioning the ticker. + + Args mirror :func:`get_news_yfinance` for drop-in compatibility with + the auto-router. Returns Markdown; on any error returns a labelled + marker rather than raising. + """ + parts = _split_ticker(ticker) + if parts is None: + return f"[CLS skip — {ticker} has no exchange suffix]" + code, suffix = parts + if suffix not in ("HK", "SS", "SH", "SZ"): + return f"[CLS skip — {suffix} suffix routed elsewhere]" + + try: + import akshare as ak + except ImportError: + return "[CLS unavailable: install `akshare` for Chinese-market flash news.]" + + try: + # Global flash stream, ~20 most-recent items. CLS doesn't expose a + # per-ticker filter, so we fetch the stream and match locally. + df = ak.stock_info_global_cls(symbol="全部") + except Exception as e: + logger.warning("CLS stock_info_global_cls failed: %s", e) + return f"[CLS fetch failed: {type(e).__name__}: {e}]" + + if df is None or df.empty: + return f"No CLS flash news available for {ticker}." + + # Build matching needles: ticker code, normalised forms, and company name. + needles: List[str] = [code, code.lstrip("0")] + if suffix == "HK": + # CLS articles often use the bare 5-digit Hong Kong code without leading zeros + needles.append(code.zfill(5)) + company = _resolve_company_short_name(ticker) + if company: + needles.append(company) + first_word = company.split()[0] if company.split() else "" + if first_word and first_word.upper() != code and len(first_word) >= 2: + needles.append(first_word) + + try: + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + end_dt = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) + except ValueError: + start_dt = end_dt = None + + matches: List[dict] = [] + for _, row in df.iterrows(): + title = str(row.get("标题", "")).strip() + body = str(row.get("内容", "")).strip() + date_str = str(row.get("发布日期", "")).strip() + time_str = str(row.get("发布时间", "")).strip() + + if not title and not body: + continue + if not _matches(title + " " + body, needles): + continue + + published: Optional[datetime] = None + if date_str: + try: + published = datetime.strptime(f"{date_str} {time_str}".strip(), "%Y-%m-%d %H:%M:%S") + except ValueError: + try: + published = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + published = None + + if start_dt and end_dt and published is not None: + if not (start_dt <= published <= end_dt): + continue + + matches.append({ + "title": title, + "body": body, + "published": published, + }) + + if not matches: + return f"No CLS flash news matching {ticker} between {start_date} and {end_date}." + + blocks = [] + for m in matches[:10]: + block = f"### {m['title']}" + if m["published"]: + block += f" \n*Flash: {m['published'].strftime('%Y-%m-%d %H:%M')}*" + if m["body"]: + block += f"\n{m['body']}" + blocks.append(block) + + header = ( + f"## {ticker} flash news (CLS / 财联社), {start_date} to {end_date}\n" + f"*Real-time market-moving headlines. Source: cls.cn via akshare.*\n" + ) + return header + "\n\n".join(blocks) diff --git a/tradingagents/dataflows/cninfo_disclosures.py b/tradingagents/dataflows/cninfo_disclosures.py new file mode 100644 index 00000000000..41a2f3880cf --- /dev/null +++ b/tradingagents/dataflows/cninfo_disclosures.py @@ -0,0 +1,166 @@ +"""Cninfo (巨潮资讯) regulatory disclosures for Shanghai / Shenzhen tickers. + +Cninfo is the official CSRC-mandated disclosure platform — equivalent to +SEC EDGAR for the US market. Every material A-share filing (盈利预警 +profit warnings, 关联交易 related-party transactions, 重大资产重组 major +asset restructurings, 回购 buybacks, 高管变动 management changes) lands +here first; news outlets editorialize it afterward. + +For a Review-mode pre-trade decision this is the most authoritative +source we can include — the LLM gets the original filing titles and +links rather than a journalist's interpretation. + +Limitations: +- A-share only. HK has its own filing infrastructure (HKEX News) that + isn't exposed cleanly via akshare; for HK we lean on Eastmoney + CLS. +- Only filing titles + links are returned (akshare wrapper limitation); + the LLM can flag a filing as worth reading and the trader follows the + link for the full PDF. +""" + +from __future__ import annotations + +import logging +import time +from datetime import datetime +from typing import List, Optional + +from .eastmoney_news import _split_ticker + +logger = logging.getLogger(__name__) + + +def _build_link(row, code: str) -> str: + """Return the filing PDF link. + + akshare's wrapper currently emits a ``公告链接`` column directly, but + the underlying Cninfo API also exposes the link components + (``announcementId`` + ``orgId``) — and akshare has been observed + surfacing one schema or the other depending on ticker / pagination. + Construct the URL from whichever fields are present so a schema + drift doesn't silently lose data. + """ + link = str(row.get("公告链接", "")).strip() + if link: + return link + + ann_id = str(row.get("announcementId", "")).strip() + org_id = str(row.get("orgId", "")).strip() + ann_time = str(row.get("公告时间", "")).strip() + if ann_id and org_id: + return ( + f"http://www.cninfo.com.cn/new/disclosure/detail" + f"?stockCode={code}&announcementId={ann_id}" + f"&orgId={org_id}&announcementTime={ann_time}" + ) + return "" + + +def _fetch_cninfo_with_retry(symbol: str, start_compact: str, end_compact: str, max_retries: int = 1): + """Run akshare's Cninfo fetch with one retry on transient failures. + + The wrapper has been observed throwing ``KeyError`` or + "None of [Index([...])] are in the [columns]" when it hits a + pagination edge inside Cninfo's API. A single retry resolves + almost all of these; if it persists, we surface the error to the + caller so the analyst sees the data gap rather than silently + losing the source. + """ + import akshare as ak + last_exc: Optional[Exception] = None + for attempt in range(max_retries + 1): + try: + return ak.stock_zh_a_disclosure_report_cninfo( + symbol=symbol, + market="沪深京", + category="", + start_date=start_compact, + end_date=end_compact, + ) + except Exception as e: # akshare surfaces a wide variety of internal errors + last_exc = e + if attempt < max_retries: + logger.info( + "Cninfo fetch transient failure for %s (%s); retrying once", + symbol, type(e).__name__, + ) + time.sleep(1.0) + continue + assert last_exc is not None + raise last_exc + + +def get_disclosures_cninfo( + ticker: str, + start_date: str, + end_date: str, +) -> str: + """Fetch Cninfo regulatory disclosures for an A-share ticker. + + Args mirror the ``get_news`` interface so the auto-router can call + this alongside Eastmoney and CLS without special-casing. + + Returns Markdown; HK / non-A-share tickers get a labelled skip + marker so the auto-router can drop the result silently. + """ + parts = _split_ticker(ticker) + if parts is None: + return f"[Cninfo skip — {ticker} has no exchange suffix]" + code, suffix = parts + if suffix not in ("SS", "SH", "SZ"): + return f"[Cninfo skip — A-share only ({suffix} not covered)]" + + try: + import akshare as ak # noqa: F401 — ImportError guard; _fetch_cninfo_with_retry imports lazily. + except ImportError: + return "[Cninfo unavailable: install `akshare` for A-share regulatory filings.]" + + # Cninfo wants YYYYMMDD; tolerate the YYYY-MM-DD inputs the rest of the + # pipeline uses. + try: + start_compact = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y%m%d") + end_compact = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y%m%d") + except ValueError: + return f"[Cninfo skip — could not parse date window {start_date} → {end_date}]" + + code_padded = code.zfill(6) + try: + df = _fetch_cninfo_with_retry(code_padded, start_compact, end_compact) + except Exception as e: + logger.warning("Cninfo disclosure fetch failed for %s: %s", code, e) + return f"[Cninfo fetch failed for {ticker}: {type(e).__name__}: {e}]" + + if df is None or df.empty: + return f"No Cninfo filings for {ticker} between {start_date} and {end_date}." + + rows: List[dict] = [] + for _, row in df.iterrows(): + title = str(row.get("公告标题", "")).strip() + date = str(row.get("公告时间", "")).strip() + link = _build_link(row, code_padded) + short_name = str(row.get("简称", "")).strip() + if not title: + continue + rows.append({"title": title, "date": date, "link": link, "short_name": short_name}) + + if not rows: + return f"No Cninfo filings for {ticker} between {start_date} and {end_date}." + + issuer = rows[0]["short_name"] or code + blocks = [] + # Cap at 30 — disclosure-heavy stocks (e.g. quarter-end) can flood otherwise. + for r in rows[:30]: + block = f"### {r['title']}" + if r["date"]: + block += f" \n*Filed: {r['date']}*" + if r["link"]: + block += f"\nLink: {r['link']}" + blocks.append(block) + + header = ( + f"## {ticker} regulatory filings (Cninfo / 巨潮资讯, {issuer}), " + f"{start_date} to {end_date}\n" + f"*Source-of-truth A-share disclosures. Source: cninfo.com.cn via akshare. " + f"Filing titles only — follow the link for the full PDF.*\n" + ) + return header + "\n\n".join(blocks) diff --git a/tradingagents/dataflows/eastmoney_news.py b/tradingagents/dataflows/eastmoney_news.py new file mode 100644 index 00000000000..13573a7d721 --- /dev/null +++ b/tradingagents/dataflows/eastmoney_news.py @@ -0,0 +1,160 @@ +"""Eastmoney (东方财富) news data source for HK / SH / SZ tickers. + +The default yfinance news pipeline is English-only and very sparse for +Asian-listed counters. Eastmoney is the de-facto financial news portal in +mainland China and covers all three exchanges (HK / Shanghai / Shenzhen) +in a single API. We use the ``akshare`` library as a stable wrapper over +Eastmoney's reverse-engineered endpoints — the schema can shift but the +maintainers track it. + +Returned content is **Chinese-language**. The trading-agents pipeline +runs MiMo / Kimi / DeepSeek which all read Chinese natively; passing +this through to the analyst is what unlocks meaningful coverage of +Asian-market tickers. +""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Optional, Tuple + +logger = logging.getLogger(__name__) + + +# Suffix -> (akshare-symbol formatter, market label). +# Eastmoney uses bare digit codes: 6-digit for A-shares, 5-digit zero-padded +# for HK. The akshare wrapper accepts both transparently. ``.SH`` and ``.SS`` +# both refer to the Shanghai exchange — different platforms use either form. +_SUFFIX_FORMATTERS = { + "SS": (lambda code: code.zfill(6), "Shanghai A-share"), + "SH": (lambda code: code.zfill(6), "Shanghai A-share"), + "SZ": (lambda code: code.zfill(6), "Shenzhen A-share"), + "HK": (lambda code: code.zfill(5), "Hong Kong"), +} + + +def _split_ticker(ticker: str) -> Optional[Tuple[str, str]]: + """``"600519.SS"`` -> ``("600519", "SS")``. Returns None if no suffix.""" + if not ticker or "." not in ticker: + return None + code, suffix = ticker.rsplit(".", 1) + return code, suffix.upper() + + +def _to_eastmoney_symbol(ticker: str) -> Optional[Tuple[str, str]]: + """Map a TradingAgents-style ticker to (eastmoney_symbol, market_label). + + Returns None if the suffix is not one Eastmoney covers — caller falls + back to yfinance in that case. + """ + parts = _split_ticker(ticker) + if parts is None: + return None + code, suffix = parts + fmt = _SUFFIX_FORMATTERS.get(suffix) + if fmt is None: + return None + formatter, label = fmt + return formatter(code), label + + +def _parse_publish_time(raw: object) -> Optional[datetime]: + """Eastmoney returns timestamps like '2026-04-25 10:15:22'. Be permissive.""" + if raw is None: + return None + text = str(raw).strip() + if not text: + return None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d"): + try: + return datetime.strptime(text, fmt) + except ValueError: + continue + return None + + +def get_news_eastmoney( + ticker: str, + start_date: str, + end_date: str, +) -> str: + """Fetch recent news for an HK / Shanghai / Shenzhen ticker from Eastmoney. + + Args mirror :func:`get_news_yfinance` so this is a drop-in replacement + selected by the auto-routing vendor. + + Returns a Markdown-formatted string with up to ~10 articles. On any + error returns a labelled error string so the analyst sees the issue + rather than a silent empty report. + """ + mapped = _to_eastmoney_symbol(ticker) + if mapped is None: + return f"[Eastmoney skip — {ticker} has no .HK/.SS/.SZ suffix]" + + em_symbol, market_label = mapped + + try: + # Imported lazily so the rest of the pipeline does not pay the + # akshare import cost when the user never analyses an HK/CN ticker. + import akshare as ak + except ImportError: + return ( + "[Eastmoney news unavailable: install `akshare` to enable Chinese " + "news for HK / Shanghai / Shenzhen tickers.]" + ) + + try: + df = ak.stock_news_em(symbol=em_symbol) + except Exception as e: + logger.warning("akshare stock_news_em failed for %s: %s", em_symbol, e) + return f"[Eastmoney fetch failed for {ticker}: {type(e).__name__}: {e}]" + + if df is None or df.empty: + return f"No Eastmoney news found for {ticker} ({market_label})." + + try: + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + end_dt = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) + except ValueError: + # Bad date input — return everything Eastmoney returned without filtering. + start_dt = None + end_dt = None + + pieces = [] + for _, row in df.iterrows(): + title = str(row.get("新闻标题", "")).strip() + body = str(row.get("新闻内容", "")).strip() + source = str(row.get("文章来源", "")).strip() + link = str(row.get("新闻链接", "")).strip() + published = _parse_publish_time(row.get("发布时间")) + + if start_dt and end_dt and published is not None: + if not (start_dt <= published <= end_dt): + continue + + if not title: + continue + block = f"### {title}" + if source: + block += f" (source: {source})" + if published: + block += f" \n*Published: {published.strftime('%Y-%m-%d %H:%M')}*" + if body: + block += f"\n{body}" + if link: + block += f"\nLink: {link}" + pieces.append(block) + + if not pieces: + return ( + f"No Eastmoney news for {ticker} ({market_label}) between " + f"{start_date} and {end_date}." + ) + + header = ( + f"## {ticker} News from Eastmoney (东方财富, {market_label}), " + f"{start_date} to {end_date}\n" + f"*Note: source content is Chinese-language; the analyst LLM reads it natively.*\n" + ) + return header + "\n\n".join(pieces) diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 0caf4b6888e..8eb2a221f0f 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -1,5 +1,8 @@ +import logging from typing import Annotated +logger = logging.getLogger(__name__) + # Import from vendor-specific modules from .y_finance import ( get_YFin_data_online, @@ -23,9 +26,17 @@ get_global_news as get_alpha_vantage_global_news, ) from .alpha_vantage_common import AlphaVantageRateLimitError +from .cls_news import get_news_cls +from .cninfo_disclosures import get_disclosures_cninfo +from .eastmoney_news import get_news_eastmoney # Configuration and routing logic from .config import get_config +from .dataflow_cache import ( + cached_call as _cached_call, + is_cacheable as _is_cacheable, + vendor_cached_call as _vendor_cached_call, +) # Tools organized by category TOOLS_CATEGORIES = { @@ -98,6 +109,13 @@ "get_news": { "alpha_vantage": get_alpha_vantage_news, "yfinance": get_news_yfinance, + # Chinese-language news for HK / Shanghai / Shenzhen counters. + "eastmoney": get_news_eastmoney, + # CLS flash-news stream — adds real-time market-moving headlines + # alongside Eastmoney's editorial coverage. HK/SH/SZ. + "cls": get_news_cls, + # Cninfo regulatory filings — official CSRC disclosures. SH/SZ only. + "cninfo": get_disclosures_cninfo, }, "get_global_news": { "yfinance": get_global_news_yfinance, @@ -131,32 +149,133 @@ def get_vendor(category: str, method: str = None) -> str: # Fall back to category-level configuration return config.get("data_vendors", {}).get(category, "default") +def _resolve_auto_vendors(method: str, args: tuple) -> list: + """Pick the concrete vendor list when the user configures ``"auto"``. + + Returns a list because Asian-market tickers are now multi-sourced: + Eastmoney (editorial), CLS (flash news), and Cninfo (regulatory + filings, SH/SZ only) get merged into one Markdown blob the analyst + sees as a single tool result. ``route_to_vendor`` walks this list + in order, calls each vendor, and concatenates non-empty results; + a single per-vendor failure does not abort the whole fetch. + + Mapping by ticker suffix: + * ``.SS`` / ``.SZ`` → ``eastmoney`` + ``cls`` + ``cninfo`` + * ``.HK`` → ``eastmoney`` + ``cls`` + * everything else → ``yfinance`` + + Returns ``["yfinance"]`` for ``get_global_news`` regardless of args + (no ticker to dispatch on). + """ + single = ["yfinance"] + if method == "get_global_news": + return single + if not args: + return single + ticker = str(args[0]) + if "." not in ticker: + return single + suffix = ticker.rsplit(".", 1)[1].upper() + # ``.SH`` is the same exchange as ``.SS`` — different platforms use either + # convention for Shanghai. Normalise here so users can paste tickers from + # any source. + if suffix in ("SS", "SH", "SZ"): + return ["eastmoney", "cls", "cninfo"] + if suffix == "HK": + return ["eastmoney", "cls"] + return single + + +# Back-compat for any external code that still imports the singular form. +def _resolve_auto_vendor(method: str, args: tuple) -> str: + """Deprecated single-vendor variant. Returns the first auto-resolved vendor.""" + return _resolve_auto_vendors(method, args)[0] + + def route_to_vendor(method: str, *args, **kwargs): - """Route method calls to appropriate vendor implementation with fallback support.""" - category = get_category_for_method(method) - vendor_config = get_vendor(category, method) - primary_vendors = [v.strip() for v in vendor_config.split(',')] + """Route method calls to appropriate vendor implementation with fallback support. + + Cacheable methods (global news, financial statements) go through the + on-disk dataflow cache so multiple tickers analysed the same day or + repeat analyses of the same ticker within a quarter avoid the network + fetch entirely. Cache misses fall through to the live vendor call. + When ``data_vendors[category]`` is set to ``"auto"``, the news methods + additionally dispatch by ticker suffix (HK / SS / SZ → Eastmoney, + else yfinance) so multi-market workflows pick the right Chinese-vs- + English source automatically. + """ if method not in VENDOR_METHODS: raise ValueError(f"Method '{method}' not supported") - # Build fallback chain: primary vendors first, then remaining available vendors - all_available_vendors = list(VENDOR_METHODS[method].keys()) - fallback_vendors = primary_vendors.copy() - for vendor in all_available_vendors: - if vendor not in fallback_vendors: - fallback_vendors.append(vendor) - - for vendor in fallback_vendors: - if vendor not in VENDOR_METHODS[method]: - continue - + def _call_vendor(vendor: str): vendor_impl = VENDOR_METHODS[method][vendor] impl_func = vendor_impl[0] if isinstance(vendor_impl, list) else vendor_impl + return impl_func(*args, **kwargs) + + def _call_vendor_cached(vendor: str): + """Call ``vendor`` for ``method``, caching per-vendor when the news + method has a stable cache key. Multiple analyses of the same ticker + in the same day reuse the fetch; multiple HK/CN tickers in the + same day reuse the parts of the merge that aren't ticker-specific + (Cninfo is per-ticker but per-week, so the same ticker any day this + week reuses).""" + return _vendor_cached_call( + vendor, method, args, kwargs, lambda: _call_vendor(vendor) + ) + + def _fetch(): + category = get_category_for_method(method) + vendor_config = get_vendor(category, method) + if vendor_config == "auto": + primary_vendors = _resolve_auto_vendors(method, args) + else: + primary_vendors = [v.strip() for v in vendor_config.split(',')] + + # Multi-source merge: when auto resolves >1 vendor (e.g. HK / SH / SZ + # tickers fan out to Eastmoney + CLS + Cninfo), call each vendor and + # concatenate non-empty results. A per-vendor failure does not abort + # the whole fetch — the analyst sees whichever sources succeeded. + # Each vendor's call is cached independently so a re-run reuses + # whichever sources are already hot. + if len(primary_vendors) > 1: + blocks = [] + for vendor in primary_vendors: + if vendor not in VENDOR_METHODS[method]: + continue + try: + out = _call_vendor_cached(vendor) + except Exception as e: + logger.warning("Vendor %s failed for %s: %s", vendor, method, e) + continue + if isinstance(out, str) and out.strip(): + # Skip pure skip-markers like "[CLS skip — ... not covered]" + # so downstream prompts aren't padded with no-op noise. + stripped = out.strip() + if stripped.startswith("[") and "skip" in stripped.lower() and stripped.endswith("]"): + continue + blocks.append(out) + if not blocks: + # All sources empty/failed: fall through to legacy fallback. + primary_vendors = ["yfinance"] + else: + return "\n\n---\n\n".join(blocks) + + all_available_vendors = list(VENDOR_METHODS[method].keys()) + fallback_vendors = list(primary_vendors) + for vendor in all_available_vendors: + if vendor not in fallback_vendors: + fallback_vendors.append(vendor) - try: - return impl_func(*args, **kwargs) - except AlphaVantageRateLimitError: - continue # Only rate limits trigger fallback + for vendor in fallback_vendors: + if vendor not in VENDOR_METHODS[method]: + continue + try: + return _call_vendor_cached(vendor) + except AlphaVantageRateLimitError: + continue # Only rate limits trigger fallback + raise RuntimeError(f"No available vendor for '{method}'") - raise RuntimeError(f"No available vendor for '{method}'") \ No newline at end of file + if _is_cacheable(method): + return _cached_call(method, args, kwargs, _fetch) + return _fetch() diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index fe5a6f7550c..92f6f0feae9 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -94,7 +94,11 @@ def _apply_env_overrides(config: dict) -> dict: "core_stock_apis": "yfinance", # Options: alpha_vantage, yfinance "technical_indicators": "yfinance", # Options: alpha_vantage, yfinance "fundamental_data": "yfinance", # Options: alpha_vantage, yfinance - "news_data": "yfinance", # Options: alpha_vantage, yfinance + # ``auto``: dispatches news by ticker suffix — Eastmoney / CLS for + # HK / SS / SZ counters (Chinese-language sources), Cninfo + # disclosures additionally for SS / SZ, yfinance otherwise. + # Set to ``yfinance`` or ``alpha_vantage`` to force a single vendor. + "news_data": "auto", }, # Tool-level configuration (takes precedence over category-level) "tool_vendors": { From eb810db8c75b2e424d00c1b2ca8320de5baa3367 Mon Sep 17 00:00:00 2001 From: Chit Boon Date: Sun, 10 May 2026 23:42:31 +0800 Subject: [PATCH 4/5] fix(dataflows): atomic cache writes (tmp + os.replace) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses inline review feedback on PR #792 from gemini-code-assist: streaming directly to the cache file leaves the file half-written if the process is interrupted, and races between concurrent writers can yield a corrupted blob. Writing to ``.tmp`` first and then ``os.replace`` makes the publish atomic on POSIX and Windows — a crash mid-write leaves the prior cache file untouched, and concurrent writers race only on the rename. --- tradingagents/dataflows/dataflow_cache.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tradingagents/dataflows/dataflow_cache.py b/tradingagents/dataflows/dataflow_cache.py index 4e1cc0fdcf5..32b5229a523 100644 --- a/tradingagents/dataflows/dataflow_cache.py +++ b/tradingagents/dataflows/dataflow_cache.py @@ -61,8 +61,15 @@ def _write(key: str, value: str) -> None: return try: os.makedirs(_cache_dir(), exist_ok=True) - with open(_path_for(key), "w", encoding="utf-8") as f: + # Atomic write: stream to ``.tmp`` first, then rename. A crash + # mid-write leaves the prior cache file untouched instead of writing + # a half-finished one; concurrent writers race on the rename only, + # which ``os.replace`` resolves cleanly on POSIX and Windows. + path = _path_for(key) + tmp_path = f"{path}.tmp" + with open(tmp_path, "w", encoding="utf-8") as f: f.write(value) + os.replace(tmp_path, path) except OSError as e: logger.warning("dataflow cache write failed for %s: %s", key, e) From 499d61163739ebbcd05b842bbb3329173dfd8f67 Mon Sep 17 00:00:00 2001 From: Chit Boon Date: Mon, 11 May 2026 14:54:30 +0800 Subject: [PATCH 5/5] test(dataflows): add tearDown to DataflowsConfigIsolationTests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These tests mutate the global config (data_vendors, tool_vendors) but had no tearDown, so the mutations leaked into subsequent test files that read tool-level routing — causing intermittent failures in the new news-vendor tests after rebase onto main. The straightforward set_config(deepcopy(DEFAULT_CONFIG)) doesn't suffice because set_config merges nested dicts one level deep, so tool_vendors entries set in earlier tests survive the update. The tearDown directly replaces the global _config to defeat the merge. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_dataflows_config.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/test_dataflows_config.py b/tests/test_dataflows_config.py index ab0800eee94..b17215cc49f 100644 --- a/tests/test_dataflows_config.py +++ b/tests/test_dataflows_config.py @@ -14,6 +14,17 @@ class DataflowsConfigIsolationTests(unittest.TestCase): def setUp(self): set_config(copy.deepcopy(default_config.DEFAULT_CONFIG)) + def tearDown(self): + # Restore defaults so subsequent test files see a clean config. + # set_config merges nested dicts (data_vendors / tool_vendors), so + # plain set_config(DEFAULT_CONFIG) does NOT clear tool_vendors + # entries set by earlier tests in this class — they survive the + # update. Replace the whole _config object directly to defeat the + # merge and prevent leakage into other test files that read + # tool-level routing. + import tradingagents.dataflows.config as _cfg + _cfg._config = copy.deepcopy(default_config.DEFAULT_CONFIG) + def test_get_config_returns_deep_copy(self): cfg = get_config() cfg["data_vendors"]["core_stock_apis"] = "alpha_vantage" @@ -50,7 +61,11 @@ def test_partial_nested_update_preserves_existing_defaults(self): self.assertEqual(fresh["data_vendors"]["core_stock_apis"], "alpha_vantage") self.assertEqual(fresh["data_vendors"]["technical_indicators"], "yfinance") self.assertEqual(fresh["data_vendors"]["fundamental_data"], "yfinance") - self.assertEqual(fresh["data_vendors"]["news_data"], "yfinance") + # ``news_data`` default is ``"auto"`` so the multi-source merge + # (Eastmoney + CLS + Cninfo for HK / SH / SZ; yfinance otherwise) + # is on out of the box; the test asserts the merge defaults rather + # than a single legacy vendor. + self.assertEqual(fresh["data_vendors"]["news_data"], "auto") def test_nested_dict_updates_merge_one_level_deep(self): set_config({"tool_vendors": {"get_stock_data": "alpha_vantage"}})