Skip to content

Commit f06cf91

Browse files
authored
Develop/lineage plus (#313)
* feat: Add lineage service for knowledge base * feat: Add lineage service for ratio task * feat: Add lineage service for synthesis task * feat: Add lineage service for KnowledgeBaseService * feat: Add lineage service for KnowledgeBaseService
1 parent 08f791f commit f06cf91

5 files changed

Lines changed: 181 additions & 5 deletions

File tree

backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,22 @@
22

33
import com.baomidou.mybatisplus.core.metadata.IPage;
44
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
5+
import com.datamate.common.domain.enums.EdgeType;
6+
import com.datamate.common.domain.enums.NodeType;
7+
import com.datamate.common.domain.model.LineageEdge;
8+
import com.datamate.common.domain.model.LineageNode;
9+
import com.datamate.common.domain.service.LineageService;
510
import com.datamate.common.infrastructure.exception.BusinessException;
611
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
712
import com.datamate.common.interfaces.PagedResponse;
813
import com.datamate.common.interfaces.PagingQuery;
914
import com.datamate.common.setting.domain.entity.ModelConfig;
1015
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
1116
import com.datamate.common.setting.infrastructure.client.ModelClient;
17+
import com.datamate.datamanagement.domain.model.dataset.Dataset;
18+
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
19+
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
20+
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
1221
import com.datamate.rag.indexer.domain.model.FileStatus;
1322
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
1423
import com.datamate.rag.indexer.domain.model.RagChunk;
@@ -36,7 +45,10 @@
3645

3746
import java.util.Collections;
3847
import java.util.List;
48+
import java.util.Objects;
3949
import java.util.Optional;
50+
import java.util.Set;
51+
import java.util.stream.Collectors;
4052

4153
/**
4254
* 知识库服务类
@@ -51,7 +63,10 @@ public class KnowledgeBaseService {
5163
private final RagFileRepository ragFileRepository;
5264
private final ApplicationEventPublisher eventPublisher;
5365
private final ModelConfigRepository modelConfigRepository;
66+
private final DatasetRepository datasetRepository;
67+
private final DatasetFileRepository datasetFileRepository;
5468
private final MilvusService milvusService;
69+
private final LineageService lineageService;
5570

5671
/**
5772
* 创建知识库
@@ -151,6 +166,7 @@ public void addFiles(AddFilesReq request) {
151166
}).toList();
152167
ragFileRepository.saveBatch(ragFiles, 100);
153168
eventPublisher.publishEvent(new DataInsertedEvent(knowledgeBase, request));
169+
updateLineageGraph(knowledgeBase, request.getFiles());
154170
}
155171

156172
public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq request) {
@@ -222,4 +238,55 @@ public List<SearchResp.SearchResult> retrieve(RetrieveReq request) {
222238
});
223239
return searchResults;
224240
}
225-
}
241+
242+
/**
243+
* 向知识库添加文件的时候,将相关数据集加入血缘图
244+
*
245+
* @param knowledgeBase 知识库
246+
* @param files 数据集中选择的文件
247+
*/
248+
private void updateLineageGraph(KnowledgeBase knowledgeBase, List<AddFilesReq.FileInfo> files) {
249+
LineageNode kbNode = lineageService.getNodeById(knowledgeBase.getId());
250+
if (kbNode == null) {
251+
kbNode = new LineageNode();
252+
kbNode.setId(knowledgeBase.getId());
253+
kbNode.setNodeType(NodeType.KNOWLEDGE_BASE);
254+
kbNode.setName(knowledgeBase.getName());
255+
kbNode.setDescription(knowledgeBase.getDescription());
256+
}
257+
258+
// 获取所有唯一的数据集ID
259+
Set<String> datasetIds = files.stream()
260+
.map(fileInfo -> {
261+
DatasetFile datasetFile = datasetFileRepository.getById(fileInfo.id());
262+
return datasetFile != null ? datasetFile.getDatasetId() : null;
263+
})
264+
.filter(Objects::nonNull)
265+
.collect(Collectors.toSet());
266+
267+
// 为每个数据集创建血缘关系
268+
for (String datasetId : datasetIds) {
269+
Dataset dataset = datasetRepository.getById(datasetId);
270+
if (dataset == null) continue;
271+
272+
// 创建源数据集节点
273+
LineageNode datasetNode = new LineageNode();
274+
datasetNode.setId(dataset.getId());
275+
datasetNode.setNodeType(NodeType.DATASET);
276+
datasetNode.setName(dataset.getName());
277+
datasetNode.setDescription(dataset.getDescription());
278+
279+
// 创建血缘边
280+
LineageEdge edge = new LineageEdge();
281+
edge.setProcessId(knowledgeBase.getId());
282+
edge.setName("");
283+
edge.setEdgeType(EdgeType.KNOWLEDGE_BASE);
284+
edge.setDescription("Add the files from dataset to the knowledge base.");
285+
edge.setFromNodeId(dataset.getId());
286+
edge.setToNodeId(knowledgeBase.getId());
287+
288+
// 生成血缘图
289+
lineageService.generateGraph(datasetNode, edge, kbNode);
290+
}
291+
}
292+
}

backend/shared/domain-common/src/main/java/com/datamate/common/domain/enums/EdgeType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ public enum EdgeType {
1010
DATA_CLEANING,
1111
DATA_LABELING,
1212
DATA_SYNTHESIS,
13-
DATA_RATIO
13+
DATA_RATIO,
14+
KNOWLEDGE_BASE,
1415
}

runtime/datamate-python/app/module/generation/interface/generation_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,10 @@ async def export_synthesis_task_to_dataset(
463463
- 仅写入文件,不再创建数据集。
464464
"""
465465
exporter = SynthesisDatasetExporter(db)
466+
generation = GenerationService(db)
466467
try:
467468
dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
469+
await generation.add_synthesis_to_graph(db, task_id, dataset_id)
468470
except SynthesisExportError as e:
469471
logger.error(
470472
"Failed to export synthesis task %s to dataset %s: %s",

runtime/datamate-python/app/module/generation/service/generation_service.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
from sqlalchemy import select
99
from sqlalchemy.ext.asyncio import AsyncSession
1010

11+
from app.db.models.base_entity import LineageNode, LineageEdge
1112
from app.db.models.data_synthesis import (
1213
DataSynthInstance,
1314
DataSynthesisFileInstance,
1415
DataSynthesisChunkInstance,
1516
SynthesisData,
1617
)
17-
from app.db.models.dataset_management import DatasetFiles
18+
from app.db.models.dataset_management import DatasetFiles, Dataset
1819
from app.db.session import logger
1920
from app.module.generation.schema.generation import Config, SyntheConfig
2021
from app.module.generation.service.prompt import (
@@ -26,6 +27,8 @@
2627
from app.module.shared.util.model_chat import extract_json_substring
2728
from app.module.shared.llm import LLMFactory
2829
from app.module.system.service.common_service import get_model_by_id
30+
from app.module.shared.common.lineage import LineageService
31+
from app.module.shared.schema import NodeType, EdgeType
2932

3033

3134
def _filter_docs(split_docs, chunk_size):
@@ -657,3 +660,52 @@ async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> No
657660
file_task.processed_chunks = new_value
658661
await self.db.commit()
659662
await self.db.refresh(file_task)
663+
664+
async def add_synthesis_to_graph(self, db: AsyncSession, task_id: str, dest_dataset_id: str) -> None:
665+
"""记录数据合成血缘关系:源数据集 -> 合成数据集 via DATA_SYNTHESIS"""
666+
try:
667+
# 获取任务和目标数据集信息
668+
task = await self.db.get(DataSynthInstance, task_id)
669+
src_dataset_result = await db.execute(
670+
select(DatasetFiles.dataset_id)
671+
.join(DataSynthesisFileInstance, DatasetFiles.id == DataSynthesisFileInstance.source_file_id)
672+
.where(DataSynthesisFileInstance.synthesis_instance_id == task_id)
673+
.limit(1)
674+
)
675+
src_dataset_id = src_dataset_result.scalar_one_or_none()
676+
src_dataset = await self.db.get(Dataset, src_dataset_id)
677+
dst_dataset = await self.db.get(Dataset, dest_dataset_id)
678+
679+
if not task or not dst_dataset:
680+
logger.warning("Missing task or destination dataset for lineage graph")
681+
return
682+
683+
src_node = LineageNode(
684+
id=src_dataset.id,
685+
node_type=NodeType.DATASET.value,
686+
name=src_dataset.name,
687+
description=src_dataset.description
688+
)
689+
dest_node = LineageNode(
690+
id=dst_dataset.id,
691+
node_type=NodeType.DATASET.value,
692+
name=dst_dataset.name,
693+
description=dst_dataset.description
694+
)
695+
synthesis_edge = LineageEdge(
696+
process_id=task_id,
697+
name=task.name,
698+
edge_type=EdgeType.DATA_SYNTHESIS.value,
699+
description=task.description,
700+
from_node_id=src_node.id,
701+
to_node_id=dst_dataset.id
702+
)
703+
704+
# 生成血缘图
705+
lineage_service = LineageService(db=db)
706+
await lineage_service.generate_graph(src_node, synthesis_edge, dest_node)
707+
await self.db.commit()
708+
709+
logger.info(f"Added synthesis lineage: {src_node.name} -> {dest_dataset.name}")
710+
except Exception as exc:
711+
logger.error(f"Failed to add synthesis lineage: {exc}")

runtime/datamate-python/app/module/ratio/service/ratio_task.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111

1212
from app.core.logging import get_logger
13+
from app.db.models.base_entity import LineageNode, LineageEdge
1314
from app.db.models.ratio_task import RatioInstance, RatioRelation
1415
from app.db.models import Dataset, DatasetFiles
1516
from app.db.session import AsyncSessionLocal
1617
from app.module.dataset.schema.dataset_file import DatasetFileTag
17-
from app.module.shared.schema import TaskStatus
18+
from app.module.shared.common.lineage import LineageService
19+
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
1820
from app.module.ratio.schema.ratio_task import FilterCondition
1921

2022
logger = get_logger(__name__)
@@ -126,7 +128,12 @@ async def execute_dataset_ratio_task(instance_id: str) -> None:
126128
# Done
127129
instance.status = TaskStatus.COMPLETED.name
128130
logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}")
129-
131+
await RatioTaskService._add_task_to_graph(
132+
session=session,
133+
src_relations=relations,
134+
task=instance,
135+
dst_dataset=target_ds,
136+
)
130137
except Exception as e:
131138
logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}")
132139
try:
@@ -326,3 +333,50 @@ def get_all_tags(tags) -> list[dict]:
326333
for tag_data in file_tag.get_tags():
327334
all_tags.append(tag_data)
328335
return all_tags
336+
337+
@staticmethod
338+
async def _add_task_to_graph(
339+
session: AsyncSession,
340+
src_relations: List[RatioRelation],
341+
task: RatioInstance,
342+
dst_dataset: Dataset,
343+
) -> None:
344+
"""
345+
在比例抽取任务完成后,将数据集加入血缘图。
346+
ratio_task(DATASOURCE) -> dataset(DATASET) via DATA_RATIO edge
347+
"""
348+
try:
349+
if not src_relations:
350+
logger.warning("Source ratio relations is empty when building lineage graph")
351+
return
352+
353+
lineage_service = LineageService(db=session)
354+
dst_node = LineageNode(
355+
id=dst_dataset.id,
356+
node_type=NodeType.DATASET.value,
357+
name=dst_dataset.name,
358+
description=dst_dataset.description,
359+
)
360+
for rel in src_relations:
361+
ds: Optional[Dataset] = await session.get(Dataset, rel.source_dataset_id)
362+
src_node = LineageNode(
363+
id=rel.source_dataset_id,
364+
node_type=NodeType.DATASET.value,
365+
name=ds.name,
366+
description=ds.description,
367+
)
368+
ratio_edge = LineageEdge(
369+
process_id=task.id,
370+
name=task.name,
371+
edge_type=EdgeType.DATA_RATIO.value,
372+
description=task.description,
373+
from_node_id=rel.source_dataset_id,
374+
to_node_id=dst_node.id,
375+
)
376+
await lineage_service.generate_graph(src_node, ratio_edge, dst_node)
377+
logger.info("Add dataset lineage graph: %s -> %s -> %s", src_node.id, ratio_edge.id, dst_node.id)
378+
await session.commit()
379+
logger.info("Add dataset lineage graph success")
380+
except Exception as exc:
381+
logger.error("Failed to add dataset lineage graph: %s", exc)
382+
await session.rollback()

0 commit comments

Comments
 (0)