22
33import com .baomidou .mybatisplus .core .metadata .IPage ;
44import com .baomidou .mybatisplus .extension .plugins .pagination .Page ;
5+ import com .datamate .common .domain .enums .EdgeType ;
6+ import com .datamate .common .domain .enums .NodeType ;
7+ import com .datamate .common .domain .model .LineageEdge ;
8+ import com .datamate .common .domain .model .LineageNode ;
9+ import com .datamate .common .domain .service .LineageService ;
510import com .datamate .common .domain .utils .ChunksSaver ;
611import com .datamate .common .setting .application .SysParamApplicationService ;
712import com .datamate .datamanagement .interfaces .dto .*;
1722import com .datamate .datamanagement .infrastructure .persistence .repository .DatasetFileRepository ;
1823import com .datamate .datamanagement .infrastructure .persistence .repository .DatasetRepository ;
1924import com .datamate .datamanagement .interfaces .converter .DatasetConverter ;
20- import com .datamate .datamanagement .interfaces .dto .*;
2125import com .fasterxml .jackson .core .JsonProcessingException ;
2226import com .fasterxml .jackson .databind .ObjectMapper ;
2327import com .fasterxml .jackson .databind .SerializationFeature ;
@@ -54,6 +58,7 @@ public class DatasetApplicationService {
5458 private final CollectionTaskClient collectionTaskClient ;
5559 private final DatasetFileApplicationService datasetFileApplicationService ;
5660 private final SysParamApplicationService sysParamService ;
61+ private final LineageService lineageService ;
5762
5863 @ Value ("${datamate.data-management.base-path:/dataset}" )
5964 private String datasetBasePath ;
@@ -72,6 +77,8 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
7277 dataset .setTags (processTagNames (createDatasetRequest .getTags ()));
7378 }
7479 datasetRepository .save (dataset );
80+ // 记录血缘关系
81+ addDatasetToGraph (dataset , null );
7582
7683 //todo 需要解耦这块逻辑
7784 if (StringUtils .hasText (createDatasetRequest .getDataSource ())) {
@@ -81,6 +88,43 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
8188 return dataset ;
8289 }
8390
91+ private void addDatasetToGraph (Dataset dataset , CollectionTaskDetailResponse collection ) {
92+ LineageNode datasetNode = new LineageNode ();
93+ datasetNode .setId (dataset .getId ());
94+ datasetNode .setNodeType (NodeType .DATASET );
95+ datasetNode .setName (dataset .getName ());
96+ datasetNode .setDescription (dataset .getDescription ());
97+
98+ LineageNode collectionNode = null ;
99+ LineageEdge collectionEdge = null ;
100+ if (Objects .nonNull (collection )) {
101+ collectionNode = new LineageNode ();
102+ collectionNode .setId (collection .getId ());
103+ collectionNode .setName (collection .getName ());
104+ collectionNode .setDescription (collection .getDescription ());
105+ collectionNode .setNodeType (NodeType .DATASOURCE );
106+
107+ collectionEdge = new LineageEdge ();
108+ collectionEdge .setProcessId (collection .getId ());
109+ collectionEdge .setName (collection .getName ());
110+ collectionEdge .setEdgeType (EdgeType .DATA_COLLECTION );
111+ collectionEdge .setDescription (dataset .getDescription ());
112+ collectionEdge .setFromNodeId (collectionNode .getId ());
113+ collectionEdge .setToNodeId (datasetNode .getId ());
114+ }
115+ lineageService .generateGraph (collectionNode , collectionEdge , datasetNode );
116+ }
117+
118+ public DatasetLineage getDatasetLineage (String datasetId ) {
119+ Dataset dataset = datasetRepository .getById (datasetId );
120+ if (Objects .isNull (dataset )) {
121+ return new DatasetLineage ();
122+ }
123+ LineageNode datasetNode = lineageService .getNodeById (datasetId );
124+ String graphId = datasetNode .getGraphId ();
125+ return new DatasetLineage (lineageService .getNodesByGraphId (graphId ), lineageService .getEdgesByGraphId (graphId ));
126+ }
127+
84128 public String getDatasetPvcName () {
85129 return sysParamService .getParamByKey (DATASET_PVC_NAME );
86130 }
@@ -100,11 +144,11 @@ public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatase
100144 if (Objects .nonNull (updateDatasetRequest .getStatus ())) {
101145 dataset .setStatus (updateDatasetRequest .getStatus ());
102146 }
147+ datasetRepository .updateById (dataset );
103148 if (StringUtils .hasText (updateDatasetRequest .getDataSource ())) {
104149 // 数据源id不为空,使用异步线程进行文件扫盘落库
105150 processDataSourceAsync (dataset .getId (), updateDatasetRequest .getDataSource ());
106151 }
107- datasetRepository .updateById (dataset );
108152 return dataset ;
109153 }
110154
@@ -261,6 +305,8 @@ private List<String> getFilePaths(String dataSourceId, Dataset dataset) {
261305 log .warn ("Fail to get collection task detail, task ID: {}" , dataSourceId );
262306 return Collections .emptyList ();
263307 }
308+ // 记录血缘关系
309+ addDatasetToGraph (dataset , taskDetail );
264310 Path targetPath = Paths .get (taskDetail .getTargetPath ());
265311 if (!Files .exists (targetPath ) || !Files .isDirectory (targetPath )) {
266312 log .warn ("Target path not exists or is not a directory: {}" , taskDetail .getTargetPath ());
0 commit comments