Skip to content

Commit ca4dd61

Browse files
committed
feat: Add lineage service for synthesis task
1 parent 5f824ad commit ca4dd61

2 files changed

Lines changed: 55 additions & 1 deletion

File tree

runtime/datamate-python/app/module/generation/interface/generation_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,8 +496,10 @@ async def export_synthesis_task_to_dataset(
496496
- 仅写入文件,不再创建数据集。
497497
"""
498498
exporter = SynthesisDatasetExporter(db)
499+
generation = GenerationService(db)
499500
try:
500501
dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
502+
await generation.add_synthesis_to_graph(db, task_id, dataset_id)
501503
except SynthesisExportError as e:
502504
logger.error(
503505
"Failed to export synthesis task %s to dataset %s: %s",

runtime/datamate-python/app/module/generation/service/generation_service.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
from sqlalchemy import select
99
from sqlalchemy.ext.asyncio import AsyncSession
1010

11+
from app.db.models.base_entity import LineageNode, LineageEdge
1112
from app.db.models.data_synthesis import (
1213
DataSynthInstance,
1314
DataSynthesisFileInstance,
1415
DataSynthesisChunkInstance,
1516
SynthesisData,
1617
)
17-
from app.db.models.dataset_management import DatasetFiles
18+
from app.db.models.dataset_management import DatasetFiles, Dataset
1819
from app.db.session import logger
1920
from app.module.generation.schema.generation import Config, SyntheConfig
2021
from app.module.generation.service.prompt import (
@@ -26,6 +27,8 @@
2627
from app.module.shared.util.model_chat import extract_json_substring
2728
from app.module.shared.llm import LLMFactory
2829
from app.module.system.service.common_service import get_model_by_id
30+
from app.module.shared.common.lineage import LineageService
31+
from app.module.shared.schema import NodeType, EdgeType
2932

3033

3134
def _filter_docs(split_docs, chunk_size):
@@ -657,3 +660,52 @@ async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> No
657660
file_task.processed_chunks = new_value
658661
await self.db.commit()
659662
await self.db.refresh(file_task)
663+
664+
async def add_synthesis_to_graph(self, db: AsyncSession, task_id: str, dest_dataset_id: str) -> None:
665+
"""记录数据合成血缘关系:源数据集 -> 合成数据集 via DATA_SYNTHESIS"""
666+
try:
667+
# 获取任务和目标数据集信息
668+
task = await self.db.get(DataSynthInstance, task_id)
669+
src_dataset_result = await db.execute(
670+
select(DatasetFiles.dataset_id)
671+
.join(DataSynthesisFileInstance, DatasetFiles.id == DataSynthesisFileInstance.source_file_id)
672+
.where(DataSynthesisFileInstance.synthesis_instance_id == task_id)
673+
.limit(1)
674+
)
675+
src_dataset_id = src_dataset_result.scalar_one_or_none()
676+
src_dataset = await self.db.get(Dataset, src_dataset_id)
677+
dst_dataset = await self.db.get(Dataset, dest_dataset_id)
678+
679+
if not task or not dst_dataset:
680+
logger.warning("Missing task or destination dataset for lineage graph")
681+
return
682+
683+
src_node = LineageNode(
684+
id=src_dataset.id,
685+
node_type=NodeType.DATASET.value,
686+
name=src_dataset.name,
687+
description=src_dataset.description
688+
)
689+
dest_node = LineageNode(
690+
id=dst_dataset.id,
691+
node_type=NodeType.DATASET.value,
692+
name=dst_dataset.name,
693+
description=dst_dataset.description
694+
)
695+
synthesis_edge = LineageEdge(
696+
process_id=task_id,
697+
name=task.name,
698+
edge_type=EdgeType.DATA_SYNTHESIS.value,
699+
description=task.description,
700+
from_node_id=src_node.id,
701+
to_node_id=dst_dataset.id
702+
)
703+
704+
# 生成血缘图
705+
lineage_service = LineageService(db=db)
706+
await lineage_service.generate_graph(src_node, synthesis_edge, dest_node)
707+
await self.db.commit()
708+
709+
logger.info(f"Added synthesis lineage: {src_node.name} -> {dest_dataset.name}")
710+
except Exception as exc:
711+
logger.error(f"Failed to add synthesis lineage: {exc}")

0 commit comments

Comments
 (0)