本文只说明 解析任务 Pipeline:src/core/pipeline/parse_task/ 如何承接 Java 通过 MQ 投递的 parse_task 消息,并把一次文档入库任务收敛为 document_parse_pipeline 的终态。
召回链路是另一条独立 Pipeline,见 recall_pipeline.md。
与本文互补:
- 解析任务端到端流程、状态语义、失败码 → parse_task_pipeline.md
- 分块策略 → chunking.md
- 向量化存储 → vectorization.md
- MQ 集成 → mq.md
解析 Pipeline 负责把"一次解析任务"的副作用收敛到单一编排入口 ParseTaskPipeline,包括日志、对象存储、分块、dense 向量化、预分词、ES 入库、sparse 向量化、状态落库与终态通知。
设计上遵循三条规则:
- 解析主编排保持薄:
ParseTaskPipeline只做消息分流、幂等屏障、上下文校验、重试 CAS 与兜底异常收敛。 - 阶段执行类化:六阶段执行委托给
stages/子包的StagePipeline;首次执行与用户侧重试共用同一条阶段链路。 - 副作用边界清晰:阶段状态写入和通知由
Stage模板统一处理;解析、分片、向量化、预分词、ES、sparse 等底层操作集中在StageServices,不直接写阶段状态、不发通知。
src/core/pipeline/
├── __init__.py # 对外门面:ParseTaskPipeline / RecallPipeline 等
├── parse_task/ # 解析任务 Pipeline
│ ├── pipeline.py # ParseTaskPipeline:首次/重试分流 + StagePipeline 调度
│ ├── constants.py # 解析任务状态和用户提示文案
│ ├── error_codes.py # ParseFailureCode + build_failure_reason
│ ├── models.py # ParsePipelineResult / PipelineStatus
│ ├── log_repository.py # document_parsed_log 仓储
│ ├── notifier.py # parse_result MQ 通知与兜底
│ ├── source.py # 对象存储下载、Markdown 上传、MinerU URL 构造
│ ├── temp_workspace.py # PARSE_TEMP_DIR 清理、临时文件分配、safe_unlink
│ ├── validator.py # 前置校验、MQ 重投、中断状态收敛、重试校验
│ ├── _utils.py # 子包内部共享工具
│ ├── stages/ # 六阶段类化编排
│ │ ├── base.py # Stage 抽象基类 + StagePipeline 编排器
│ │ ├── context.py # StageContext / StageOutcome
│ │ ├── services.py # StageServices + PreprocessorProtocol
│ │ ├── cleaning.py # 下载 -> 解析 -> 上传 Markdown
│ │ ├── chunking.py # 分片 / 重试反查完整 chunk truth set
│ │ ├── vectorizing.py # dense 向量化
│ │ ├── pretokenize.py # 文件级预分词 plan
│ │ ├── es_indexing.py # ES 文档级全量重建
│ │ └── sparse_vectorizing.py# sparse 向量化,最终翻转 pipeline_status=SUCCESS
│ └── post_process/ # document_parse_pipeline 状态机仓储
│ ├── constants.py # PIPELINE_STATUS_* / STAGE_STATUS_* / POST_PROCESS_STAGE_*
│ ├── models.py # PostProcessStageResult / PostProcessResult
│ └── repository.py # ParsePipelineRepository
└── recall/ # 召回 Pipeline,见 recall_pipeline.md
post_process/ 嵌在 parse_task/ 下,是因为 document_parse_pipeline 行由解析任务创建和驱动,生命周期与 document_parsed_log 1:1 绑定。它不是独立顶层 Pipeline,而是解析 Pipeline 的内部状态机仓储。
ParseTaskConsumer
-> ParseTaskMessage.parse_msg()
-> ParseTaskPipeline.execute(payload)
-> _run(payload, db)
├── is_retry=True
│ -> validate_retry_context
│ -> mark_superseded(old_pipeline, new_task_id)
│ -> create_for_retry + create_with_inherited_state
│ -> StagePipeline.run(ctx)
└── is_retry=False
-> log_repository.create(payload)
-> guard.handle_duplicate / guard.validate
-> StagePipeline.run(ctx)
StagePipeline 是唯一的六阶段执行链:
CleaningStage
-> ChunkingStage
-> VectorizingStage
-> PretokenizeStage
-> EsIndexingStage
-> SparseVectorizingStage
ParseTaskPipeline._build_stage_pipeline() 每次执行时从当前协作者重新装配 StagePipeline,便于单测在构造后替换 fake repository、notifier 或 services。
| 层 | 角色 | 职责 |
|---|---|---|
ParseTaskPipeline |
薄编排 | 首次/重试分流、幂等屏障、上下文校验、重试 CAS、未归类异常兜底 |
StagePipeline |
阶段循环 | 按固定顺序执行六阶段,遇到 finalized 结果立即返回 |
Stage 子类 |
单阶段模板 | mark_started -> run -> mark_success;失败时 mark_failed + notify failed;继承 SUCCESS 时走 on_skip |
StageServices |
底层操作集合 | 解析、分片、dense 向量化、预分词、ES 写入、sparse 向量化、chunk 反查 |
ParsePipelineRepository |
状态仓储 | 写 document_parse_pipeline 整体状态、阶段状态、耗时、失败原因和恢复入口 |
核心协作者:
| 协作者 | 输入依赖 | 主要职责 | 副作用 |
|---|---|---|---|
ParseLogRepository |
ParsePipelineRepository |
document_parsed_log 创建、查询、解析产物快照写入;首次创建时同步生成 document_parse_pipeline 行 |
MySQL |
ParseSourceIO |
BaseObjectStorage |
源文件下载、Markdown 上传、MinerU URL 构造、判断是否跳过下载 | OSS |
ParseResultNotifier |
MQService, ParseLogRepository, ParsePipelineRepository |
发送 parse_result 终态消息;通知失败时兜底落库 |
MQ + MySQL |
ParseTaskGuard |
ParseLogRepository, ParsePipelineRepository, ParseResultNotifier |
MQ 消息一致性校验、重复 task_id 终态补发、非终态 pipeline 中断收敛、重试上下文校验 | 通过依赖产生副作用 |
document_parse_pipeline.pipeline_status 是整体任务状态的权威单源。阶段顺序由 POST_PROCESS_STAGE_ORDER 固定:
CLEANING
-> CHUNKING
-> VECTORIZING
-> PRETOKENIZE
-> ES_INDEXING
-> SPARSE_VECTORIZING
| 阶段 | 主要动作 | 成功入口 | 失败入口 |
|---|---|---|---|
CLEANING |
下载源文件或构造 MinerU URL,解析文件,上传 Markdown,写解析产物快照 | mark_cleaning_success |
mark_cleaning_failed |
CHUNKING |
基于 Markdown / ParseResult 分片,批量写入 kb_document_chunk truth set |
mark_chunking_success |
mark_chunking_failed |
VECTORIZING |
消费已落库 chunk 写 dense 向量;重试时补做 PENDING + FAILED chunk |
mark_vectorizing_success |
mark_vectorizing_failed |
PRETOKENIZE |
构建文件级内存 FilePostIndexPlan,不持久化、不写 chunk 状态 |
mark_pretokenize_success |
mark_pretokenize_failed |
ES_INDEXING |
文档级前置删除、全量写入 ES、失败时 best-effort 清理半成品 | mark_es_success |
mark_es_failed |
SPARSE_VECTORIZING |
对 dense-success chunk 做 sparse 向量化,是整体成功的最后一段 | mark_sparse_vectorizing_success |
mark_sparse_vectorizing_failed |
整体状态翻转:
PENDING -> PROCESSING:首个mark_<stage>_started触发。* -> SUCCESS:仅mark_sparse_vectorizing_success翻转;mark_es_success不代表整体成功。* -> FAILED:任一阶段mark_<stage>_failed触发,同时写failed_stage、recover_from_stage、failure_reason、finished_at。
用户侧重试通过 MQ payload 的 is_retry=true 进入解析 Pipeline 的重试分支。Python 侧不自动重试、不计数、不设上限。
重试准备阶段:
ParseTaskGuard.validate_retry_context(payload, db)校验旧任务存在、旧 pipeline 为FAILED、recover_from_stage可用、未被其他重试接班。ParsePipelineRepository.mark_superseded(old_pipeline, new_task_id)使用UPDATE ... WHERE superseded_by_task_id IS NULL做 CAS 仲裁。ParseLogRepository.create_for_retry(...)与ParsePipelineRepository.create_with_inherited_state(...)创建新 log 与新 pipeline,继承旧 pipeline 已成功阶段。- 进入同一条
StagePipeline.run(ctx);继承SUCCESS的阶段跳过,从首个非 SUCCESS 阶段恢复。
关键恢复语义:
- 从
CLEANING恢复时重新下载、解析并上传 Markdown。 CHUNKING被跳过时,ChunkingStage.on_skip会从 MySQL 反查当前文档完整 chunk truth set,语义等价于首次分片输出。- dense / sparse 按各自 SQL 状态决定补做范围。
- ES 阶段始终按文档级全量重建,不按
es_status补做子集。
解析 Pipeline 不持有"怎么按 settings 造具体基础设施"的装配细节。相关逻辑归属各自模块:
| 工厂入口 | 位置 | 用途 |
|---|---|---|
create_chunking_engine() |
src/core/splitter/factory.py |
按 CHUNKING_* 配置组装 ChunkingEngine |
create_system_embedding_client() / LazyEmbeddingClient |
src/core/splitter/factory.py |
按 SYSTEM_LLM_* 配置构造或延迟构造 embedding 客户端 |
compose_vector_storage_facade() |
src/core/vector_storage/factory.py |
装配 VectorStorageFacade |
StorageFactory.get_storage() |
src/services/storage/factory.py |
按配置返回对象存储实现 |
- 在
post_process/constants.py增加阶段常量、状态字段映射,并调整POST_PROCESS_STAGE_ORDER。 - 给
document_parse_pipeline增加xxx_status/xxx_duration_ms字段,写 Alembic migration。 - 在
ParsePipelineRepository增加mark_xxx_started/mark_xxx_success/mark_xxx_failed。 - 在
stages/下增加新的Stage子类,并接入build_stage_pipeline()。 - 若阶段需要底层能力,先放进
StageServices,保持 Stage 只写状态模板和阶段特例。 - 同步 parse_task_pipeline.md、../api/schemas/mysql.md,必要时同步对外契约文档。
构造 ParseTaskPipeline 时传入替身即可。测试里也可以替换 _notifier、_services 等协作者后再调用 _build_stage_pipeline(),因为每次执行都会重新装配阶段链。
只动 src/services/storage/:实现 BaseObjectStorage,让 StorageFactory 按配置返回新实例。ParseSourceIO 不需要感知具体后端。
| 测试目标 | 推荐入口 |
|---|---|
| 解析 Pipeline 编排骨架 | tests/unit/core/pipeline/test_parse_task_pipeline.py |
| 解析 Pipeline ES 阶段语义 | tests/unit/core/pipeline/test_parse_task_pipeline_es.py |
document_parse_pipeline 仓储 |
tests/unit/core/pipeline/test_post_process_repository.py |
| MQ 端到端集成 | tests/integration/core/mq/test_kafka_parse_task_pipeline_integration.py |
测试时优先替换协作者或 StageServices 方法,不要在 MQ consumer 中拼接业务流程,也不要通过 patch 大量私有方法绕过阶段模板。
- 解析成功通知必须晚于 Markdown、分片、dense 向量化、预分词、ES 入库和 sparse 向量化全部完成。
- 新增外部系统调用时,先判断归属
StageServices、独立协作者还是下游模块工厂,不直接塞进ParseTaskPipeline._run。 document_parse_pipeline.pipeline_status是整体成功/失败的权威单源。migrations/db.sql是 0001 baseline 冻结快照;schema 演进只通过 ORM + Alembic migration。