Skip to content

Commit 5f824ad

Browse files
committed
feat: Add lineage service for ratio task
1 parent 6e84a23 commit 5f824ad

1 file changed

Lines changed: 56 additions & 2 deletions

File tree

runtime/datamate-python/app/module/ratio/service/ratio_task.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111

1212
from app.core.logging import get_logger
13+
from app.db.models.base_entity import LineageNode, LineageEdge
1314
from app.db.models.ratio_task import RatioInstance, RatioRelation
1415
from app.db.models import Dataset, DatasetFiles
1516
from app.db.session import AsyncSessionLocal
1617
from app.module.dataset.schema.dataset_file import DatasetFileTag
17-
from app.module.shared.schema import TaskStatus
18+
from app.module.shared.common.lineage import LineageService
19+
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
1820
from app.module.ratio.schema.ratio_task import FilterCondition
1921

2022
logger = get_logger(__name__)
@@ -126,7 +128,12 @@ async def execute_dataset_ratio_task(instance_id: str) -> None:
126128
# Done
127129
instance.status = TaskStatus.COMPLETED.name
128130
logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}")
129-
131+
await RatioTaskService._add_task_to_graph(
132+
session=session,
133+
src_relations=relations,
134+
task=instance,
135+
dst_dataset=target_ds,
136+
)
130137
except Exception as e:
131138
logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}")
132139
try:
@@ -326,3 +333,50 @@ def get_all_tags(tags) -> list[dict]:
326333
for tag_data in file_tag.get_tags():
327334
all_tags.append(tag_data)
328335
return all_tags
336+
337+
@staticmethod
338+
async def _add_task_to_graph(
339+
session: AsyncSession,
340+
src_relations: List[RatioRelation],
341+
task: RatioInstance,
342+
dst_dataset: Dataset,
343+
) -> None:
344+
"""
345+
在比例抽取任务完成后,将数据集加入血缘图。
346+
ratio_task(DATASOURCE) -> dataset(DATASET) via DATA_RATIO edge
347+
"""
348+
try:
349+
if not src_relations:
350+
logger.warning("Source ratio relations is empty when building lineage graph")
351+
return
352+
353+
lineage_service = LineageService(db=session)
354+
dst_node = LineageNode(
355+
id=dst_dataset.id,
356+
node_type=NodeType.DATASET.value,
357+
name=dst_dataset.name,
358+
description=dst_dataset.description,
359+
)
360+
for rel in src_relations:
361+
ds: Optional[Dataset] = await session.get(Dataset, rel.source_dataset_id)
362+
src_node = LineageNode(
363+
id=rel.source_dataset_id,
364+
node_type=NodeType.DATASET.value,
365+
name=ds.name,
366+
description=ds.description,
367+
)
368+
ratio_edge = LineageEdge(
369+
process_id=task.id,
370+
name=task.name,
371+
edge_type=EdgeType.DATA_RATIO.value,
372+
description=task.description,
373+
from_node_id=rel.source_dataset_id,
374+
to_node_id=dst_node.id,
375+
)
376+
await lineage_service.generate_graph(src_node, ratio_edge, dst_node)
377+
logger.info("Add dataset lineage graph: %s -> %s -> %s", src_node.id, ratio_edge.id, dst_node.id)
378+
await session.commit()
379+
logger.info("Add dataset lineage graph success")
380+
except Exception as exc:
381+
logger.error("Failed to add dataset lineage graph: %s", exc)
382+
await session.rollback()

0 commit comments

Comments
 (0)