- 完整性:覆盖日线行情、估值指标、复权因子、财报(6类)、分红、十大股东、治理风险数据
- 增量更新:所有数据支持增量获取,避免重复拉取
- 数据源解耦:通过 Protocol 抽象,支持多种数据源无缝切换
- 因子预计算:截面因子 + 时序因子全量预计算,筛选时零计算
- 时间边界:快照生成严格按公告日期过滤,杜绝前视偏差
- 性能优化:PyArrow predicate pushdown、lru_cache、ThreadPoolExecutor 并行 I/O
┌─────────────────────────────────────────────────────┐
│ Consumer Layer (只读) │
│ api.py (查询接口, lru_cache) │
│ snapshot.py (时点快照, ThreadPoolExecutor 并行 I/O) │
├─────────────────────────────────────────────────────┤
│ Computation Layer │
│ factor_store.py (因子预计算 + _CachedApi 批量缓存) │
├─────────────────────────────────────────────────────┤
│ Orchestration Layer │
│ updater.py (DataUpdater: 增量编排) │
├─────────────────────────────────────────────────────┤
│ Persistence Layer │
│ storage.py (Parquet I/O: 月分区/股票分区, │
│ predicate pushdown via PyArrow filters) │
├─────────────────────────────────────────────────────┤
│ Abstraction Layer │
│ provider.py (DataProvider Protocol + Registry) │
├─────────────────────────────────────────────────────┤
│ Implementation Layer │
│ tushare/provider.py (Tushare Pro API 封装) │
└─────────────────────────────────────────────────────┘
@runtime_checkable
class DataProvider(Protocol):
@property
def name(self) -> str: ...
# --- 基础数据 ---
def fetch_stock_list(self) -> pd.DataFrame: ...
def fetch_trade_calendar(self, start_date, end_date) -> pd.DataFrame: ...
# --- 日线市场数据 (按日期批量) ---
def fetch_daily_bulk(self, trade_date) -> pd.DataFrame: ...
def fetch_adj_factor_bulk(self, trade_date) -> pd.DataFrame: ...
def fetch_daily_indicator_bulk(self, trade_date) -> pd.DataFrame: ...
# --- 财报数据 (按股票) ---
def fetch_balancesheet(self, ts_code) -> pd.DataFrame: ...
def fetch_income(self, ts_code) -> pd.DataFrame: ...
def fetch_cashflow(self, ts_code) -> pd.DataFrame: ...
def fetch_financial_indicator(self, ts_code) -> pd.DataFrame: ...
def fetch_dividend(self, ts_code) -> pd.DataFrame: ...
def fetch_top10_holders(self, ts_code) -> pd.DataFrame: ...
def fetch_disclosure_date(self, end_date) -> pd.DataFrame: ...
# --- 治理与风险数据 (按股票) ---
def fetch_fina_audit(self, ts_code) -> pd.DataFrame: ...
def fetch_fina_mainbz(self, ts_code) -> pd.DataFrame: ...
def fetch_pledge_stat(self, ts_code) -> pd.DataFrame: ...
def fetch_stk_holdernumber(self, ts_code) -> pd.DataFrame: ...
def fetch_stk_holdertrade(self, ts_code) -> pd.DataFrame: ...
def fetch_share_float(self, ts_code) -> pd.DataFrame: ...
def fetch_repurchase(self, ts_code) -> pd.DataFrame: ...# 注册
register("tushare", TushareProvider())
# 获取(懒加载:首次调用时自动实例化 Tushare)
provider = get_provider() # 默认 provider
provider = get_provider("tushare") # 指定 provider
# 切换默认
set_default("akshare")Provider 实现位于 src/data/tushare/provider.py(子目录,非单文件),包含所有 Tushare Pro API 调用封装。
实现 DataProvider 协议即可:
class AKShareProvider:
@property
def name(self) -> str:
return "akshare"
def fetch_stock_list(self) -> pd.DataFrame:
# 返回 ts_code, name, industry, list_status, list_date
...data/
├── tushare/ # 市场数据根目录
│ ├── basic/ # 基础数据(全量覆盖)
│ │ ├── stock_list.parquet
│ │ └── trade_calendar.parquet
│ ├── daily/ # 日线数据(月分区)
│ │ ├── raw/ # OHLCV
│ │ │ ├── 2019-01.parquet
│ │ │ └── ...
│ │ ├── indicator/ # PE/PB/DV/市值
│ │ ├── adj_factor/ # 复权因子
│ │ ├── factors/ # 预计算截面因子
│ │ └── ts_factors/ # 预计算时序因子
│ │ └── latest.parquet # (单文件,每股票一行)
│ └── financial/ # 财报数据(按股票分区)
│ ├── balancesheet/
│ ├── income/
│ ├── cashflow/
│ ├── fina_indicator/
│ ├── dividend/
│ ├── top10_holders/
│ ├── top10_floatholders/
│ ├── disclosure_date/
│ ├── fina_audit/ # 审计意见
│ ├── fina_mainbz/ # 主营业务构成
│ ├── pledge_stat/ # 股权质押
│ ├── stk_holdernumber/ # 股东户数
│ ├── stk_holdertrade/ # 高管增减持
│ ├── share_float/ # 限售解禁
│ └── repurchase/ # 回购
└── snapshots/ # 生成的时点快照
| 数据类型 | 分区方式 | 分区键 | 写入模式 |
|---|---|---|---|
| 股票列表 | 单文件 | — | overwrite |
| 交易日历 | 单文件 | — | overwrite |
| 日线行情 | 按月 | YYYY-MM |
merge (ts_code + trade_date) |
| 日线指标 | 按月 | YYYY-MM |
merge (ts_code + trade_date) |
| 复权因子 | 按月 | YYYY-MM |
merge (ts_code + trade_date) |
| 截面因子 | 按月 | YYYY-MM |
merge (ts_code + trade_date) |
| 时序因子 | 单文件 | latest |
merge (ts_code) |
| 财报数据 | 按股票 | ts_code |
merge (ts_code + end_date) |
| 治理风险数据 | 按股票 | ts_code |
merge (依数据类型) |
增量更新的核心:新数据与旧数据合并,按 merge_on 列去重,保留最新。
# storage.save() with mode='merge'
existing = load_one(...)
combined = pd.concat([existing, new_df])
combined = combined.drop_duplicates(subset=merge_on, keep='last')
combined = combined.sort_values(merge_on)
combined.to_parquet(path, compression='zstd')storage.load_range() 使用 PyArrow filters 实现谓词下推,避免全量加载月分区文件:
# 按 ts_code 过滤时,只读取匹配的行组
filters = [('ts_code', '=', ts_code)] if ts_code else None
table = pq.read_table(path, filters=filters)class DataUpdater:
def __init__(self, provider_name=None):
self.provider = get_provider(provider_name)
# 自动检测最新日期,只拉取增量
def update_daily(self, start_date=None, end_date=None):
if start_date is None:
start_date = storage.get_latest_date('daily', 'raw') # 从断点续传
def daily_update(self):
"""日常一键更新"""
self.update_stock_list()
self.update_daily()
self.update_daily_indicator()
self.update_factors()
def full_update(self, market_start, financial_codes):
"""全量初始化"""
self.init_basic()
self.update_daily(market_start)
self.update_daily_indicator(market_start)
self.update_financials(financial_codes)
self.update_factors()
self.update_ts_factors(financial_codes)# 日常增量(行情 + 指标 + 截面因子)
python -m src.engine.launcher data daily-update
# 全量初始化
python -m src.engine.launcher data full-update 2020-01-01 601288.SH 000001.SZ
# 单独更新
python -m src.engine.launcher data update-daily
python -m src.engine.launcher data update-indicator
python -m src.engine.launcher data update-financials 601288.SH 000001.SZ
python -m src.engine.launcher data update-factors
python -m src.engine.launcher data update-ts-factors
python -m src.engine.launcher data recalc-factors
python -m src.engine.launcher data recalc-ts-factors| 截面因子 (Cross-Section) | 时序因子 (Time-Series) | |
|---|---|---|
| 计算粒度 | 每个交易日 × 每只股票 | 每只股票(一次) |
| 输入 | 当日全市场 indicator DataFrame | 单只股票的历史财报 |
| 输出 | Series(与输入 index 对齐) |
float 标量 |
| 存储 | daily/factors/{YYYY-MM}.parquet |
daily/ts_factors/latest.parquet |
| 行数 | 每月 ~5000 × ~22 交易日 = ~110,000 行 | ~5500 行(每股票一行) |
| 更新策略 | 增量:从最新日期续算 | 增量:跳过已有值的股票 |
| 典型因子 | dv, ep, market_cap_yi | profit_growth_5y, roe_avg_3y, dividend_years |
时序因子计算需要读取每只股票的财报。直接调用 api.get_income(ts_code) 会逐股票加载 Parquet 文件,5000+ 股票耗时过长。
_CachedApi 解决方案:一次性将所有财报文件加载到内存,按 {data_type: {ts_code: DataFrame}} 组织,后续查询直接命中内存缓存。
所有上层模块通过 api.py 读取数据,不直接操作 storage。
api.py 使用 functools.lru_cache 缓存高频读取的元数据:
@functools.lru_cache(maxsize=2)
def _load_stock_list() -> pd.DataFrame: ...
@functools.lru_cache(maxsize=1)
def _load_trade_calendar() -> pd.DataFrame: ...# 市场数据
api.get_daily(start_date, end_date, ts_code=None)
api.get_daily_adjusted(start_date, end_date, adjust='qfq') # 前/后复权
api.get_daily_indicator(start_date, end_date, ts_code=None)
# 财报
api.get_income(ts_code, end_date=None)
api.get_balancesheet(ts_code, end_date=None)
api.get_cashflow(ts_code, end_date=None)
api.get_financial_indicator(ts_code, end_date=None)
api.get_dividend(ts_code)
api.get_top10_holders(ts_code, end_date=None)
# 治理风险
api.get_fina_audit(ts_code, end_date=None)
api.get_fina_mainbz(ts_code, end_date=None)
api.get_pledge_stat(ts_code, end_date=None)
api.get_stk_holdernumber(ts_code, end_date=None)
# 预计算因子
api.get_factors(start_date, end_date, ts_code=None) # 截面因子
api.get_ts_factors(ts_code=None) # 时序因子
api.get_daily_indicator_with_factors(...) # 指标 + 因子合并
# 元信息
api.get_stock_list()
api.get_stock_name(ts_code)
api.get_trade_dates(start_date, end_date)
api.get_data_status()为 Agent 分析提供严格符合时间边界的数据上下文。快照包含截止日期之前已公开的所有信息,不包含任何未来信息。
@dataclass
class StockSnapshot:
ts_code: str
stock_name: str
cutoff_date: str
generated_at: str
industry: str = '' # 行业分类
area: str = '' # 地区
list_date: str = '' # 上市日期
# 行情数据 (trade_date <= cutoff_date)
price_history: pd.DataFrame # ~3年日线
daily_indicators: pd.DataFrame # PE/PB/DV/市值
# 基本面数据 (ann_date <= cutoff_date)
balancesheet: pd.DataFrame
income: pd.DataFrame
cashflow: pd.DataFrame
fina_indicator: pd.DataFrame
dividend: pd.DataFrame
top10_holders: pd.DataFrame
top10_floatholders: pd.DataFrame
# 治理与风险数据
fina_audit: pd.DataFrame # 审计意见
fina_mainbz: pd.DataFrame # 主营业务构成
pledge_stat: pd.DataFrame # 股权质押统计
stk_holdernumber: pd.DataFrame # 股东户数变化
stk_holdertrade: pd.DataFrame # 高管增减持
share_float: pd.DataFrame # 限售解禁
repurchase: pd.DataFrame # 回购
# 元数据
latest_report_period: str
data_sources: List[str]
warnings: List[str]create_snapshot() 使用 ThreadPoolExecutor 并行加载各类数据,显著减少快照生成时间:
with ThreadPoolExecutor() as executor:
futures = {
'price': executor.submit(api.get_daily, start, cutoff, ts_code=ts_code),
'indicator': executor.submit(api.get_daily_indicator, start, cutoff, ts_code=ts_code),
'income': executor.submit(api.get_income, ts_code),
'balancesheet': executor.submit(api.get_balancesheet, ts_code),
# ... 其他数据类型并行加载
}财报数据的过滤不是按报告期(end_date),而是按实际公告日期:
优先级:
1. f_ann_date(首次公告日) ← 最准确
2. ann_date(公告日) ← 次选
3. disclosure_date 表关联 ← 第三选
4. end_date + 6个月保守估计 ← 最后兜底
snapshot_to_markdown(snapshot, blind_mode=True) 输出中:
- 隐藏公司名称和代码
- 十大股东按属性分类(国有/机构/自然人/其他法人),不显示名称
- 标注"盲测模式:请仅基于数据做出判断"
- 保留行业分类(从财务数据可推断,不构成信息泄露)