Skip to content

Commit 59de72c

Browse files
committed
[DOP-26770] Add StarRocks integration tests
1 parent 3acd79d commit 59de72c

20 files changed

Lines changed: 974 additions & 16 deletions

File tree

README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ Currently we support consuming lineage from:
4141
* Apache Hive
4242
* Apache Flink
4343
* dbt
44+
* StarRocks (proprietary integration, part of MWS Data Engine)
4445

45-
**Note**: service is under active development, so it doesn't have stable API for now.
46+
**Note**: service is under active development, so API can be unstable.
4647

4748
Goals
4849
-----

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
SparkExtractor,
1313
UnknownExtractor,
1414
)
15+
from data_rentgen.consumer.extractors.impl.starrocks import StarRocksExtractor
1516
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
1617

1718

@@ -31,6 +32,7 @@ def __init__(self) -> None:
3132
HiveExtractor(),
3233
FlinkExtractor(),
3334
DbtExtractor(),
35+
StarRocksExtractor(),
3436
]
3537
self.unknown_extractor = UnknownExtractor()
3638

data_rentgen/consumer/extractors/impl/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from data_rentgen.consumer.extractors.impl.hive import HiveExtractor
88
from data_rentgen.consumer.extractors.impl.interface import ExtractorInterface
99
from data_rentgen.consumer.extractors.impl.spark import SparkExtractor
10+
from data_rentgen.consumer.extractors.impl.starrocks import StarRocksExtractor
1011
from data_rentgen.consumer.extractors.impl.unknown import UnknownExtractor
1112

1213
__all__ = [
@@ -17,5 +18,6 @@
1718
"FlinkExtractor",
1819
"HiveExtractor",
1920
"SparkExtractor",
21+
"StarRocksExtractor",
2022
"UnknownExtractor",
2123
]

data_rentgen/consumer/extractors/impl/hive.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
8181
run=run,
8282
name=hive_query.queryId,
8383
description=hive_query.operationName,
84+
# no started_at == run.started_at
8485
type=self._extract_operation_type(event),
8586
sql_query=self._extract_sql_query(event),
8687
)
@@ -94,7 +95,7 @@ def _extract_output_type(
9495
) -> OutputTypeDTO:
9596
match operation.description:
9697
case None:
97-
return OutputTypeDTO(0)
98+
return OutputTypeDTO.UNKNOWN
9899
case value if value.startswith("CREATE"):
99100
return OutputTypeDTO.CREATE
100101
case value if value.startswith("ALTER"):
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# SPDX-FileCopyrightText: 2024-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from __future__ import annotations
4+
5+
from typing import cast
6+
7+
from data_rentgen.consumer.extractors.generic import GenericExtractor
8+
from data_rentgen.dto import (
9+
JobDTO,
10+
JobTypeDTO,
11+
OperationDTO,
12+
OutputTypeDTO,
13+
RunDTO,
14+
RunStatusDTO,
15+
UserDTO,
16+
)
17+
from data_rentgen.dto.sql_query import SQLQueryDTO
18+
from data_rentgen.openlineage.dataset import OpenLineageOutputDataset
19+
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
20+
from data_rentgen.openlineage.run_facets import (
21+
OpenLineageStarRocksSessionInfoRunFacet,
22+
)
23+
from data_rentgen.utils.uuid import extract_timestamp_from_uuid
24+
25+
26+
class StarRocksExtractor(GenericExtractor):
27+
def match(self, event: OpenLineageRunEvent) -> bool:
28+
return bool(event.job.facets.jobType and event.job.facets.jobType.integration == "STARROCKS")
29+
30+
def is_operation(self, event: OpenLineageRunEvent) -> bool:
31+
return bool(event.job.facets.jobType and event.job.facets.jobType.jobType == "QUERY")
32+
33+
def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
34+
# We treat queries as operations, and operations should be bound to run (session) for grouping.
35+
# So we create run artificially using starrocks_session facet
36+
starrocks_session = cast("OpenLineageStarRocksSessionInfoRunFacet", event.run.facets.starrocks_session)
37+
return RunDTO(
38+
id=starrocks_session.sessionId,
39+
job=JobDTO(
40+
name=f"{starrocks_session.user}@{starrocks_session.clientIp}",
41+
location=self._extract_job_location(event.job),
42+
type=JobTypeDTO(type="STARROCKS_SESSION"),
43+
),
44+
parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
45+
started_at=extract_timestamp_from_uuid(starrocks_session.sessionId),
46+
user=UserDTO(name=starrocks_session.user),
47+
)
48+
49+
def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent):
50+
if self.is_operation(event):
51+
# for query events we don't know session start time
52+
run.status = RunStatusDTO.STARTED
53+
return run
54+
55+
return super()._enrich_run_status(run, event)
56+
57+
def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
58+
run = self.extract_run(event)
59+
60+
operation = OperationDTO(
61+
id=event.run.runId,
62+
run=run,
63+
name=event.job.name,
64+
# no started_at == run.started_at
65+
type=self._extract_operation_type(event),
66+
sql_query=self._extract_sql_query(event),
67+
)
68+
self._enrich_operation_status(operation, event)
69+
return operation
70+
71+
def _extract_output_type( # noqa: PLR0911
72+
self,
73+
operation: OperationDTO,
74+
dataset: OpenLineageOutputDataset,
75+
) -> OutputTypeDTO:
76+
match operation.sql_query:
77+
case None:
78+
return OutputTypeDTO.UNKNOWN
79+
case SQLQueryDTO(query=query) if query.startswith("INSERT"):
80+
return OutputTypeDTO.APPEND
81+
case SQLQueryDTO(query=query) if query.startswith("CREATE"):
82+
return OutputTypeDTO.CREATE
83+
case SQLQueryDTO(query=query) if query.startswith("ALTER"):
84+
return OutputTypeDTO.ALTER
85+
case SQLQueryDTO(query=query) if query.startswith("DROP"):
86+
return OutputTypeDTO.DROP
87+
case SQLQueryDTO(query=query) if query.startswith("TRUNCATE"):
88+
return OutputTypeDTO.TRUNCATE
89+
return OutputTypeDTO.UNKNOWN

data_rentgen/openlineage/run_facets/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from data_rentgen.openlineage.run_facets.spark_job import (
4545
OpenLineageSparkJobDetailsRunFacet,
4646
)
47+
from data_rentgen.openlineage.run_facets.starrocks_session import OpenLineageStarRocksSessionInfoRunFacet
4748

4849
__all__ = [
4950
"DataRentgenOperationInfoFacet",
@@ -74,6 +75,7 @@
7475
"OpenLineageSparkApplicationDetailsRunFacet",
7576
"OpenLineageSparkDeployMode",
7677
"OpenLineageSparkJobDetailsRunFacet",
78+
"OpenLineageStarRocksSessionInfoRunFacet",
7779
]
7880

7981

@@ -85,6 +87,8 @@ class OpenLineageRunFacets(OpenLineageBase):
8587
parent: OpenLineageParentRunFacet | None = None
8688
processing_engine: OpenLineageProcessingEngineRunFacet | None = None
8789
tags: OpenLineageRunTagsFacet | None = None
90+
nominalTime: OpenLineageNominalTimeRunFacet | None = None
91+
jobDependencies: OpenLineageJobDependenciesRunFacet | None = None
8892
dataRentgen_run: DataRentgenRunInfoFacet | None = None
8993
dataRentgen_operation: DataRentgenOperationInfoFacet | None = None
9094
spark_applicationDetails: OpenLineageSparkApplicationDetailsRunFacet | None = None
@@ -95,5 +99,4 @@ class OpenLineageRunFacets(OpenLineageBase):
9599
flink_job: OpenLineageFlinkJobDetailsRunFacet | None = None
96100
hive_query: OpenLineageHiveQueryInfoRunFacet | None = None
97101
hive_session: OpenLineageHiveSessionInfoRunFacet | None = None
98-
nominalTime: OpenLineageNominalTimeRunFacet | None = None
99-
jobDependencies: OpenLineageJobDependenciesRunFacet | None = None
102+
starrocks_session: OpenLineageStarRocksSessionInfoRunFacet | None = None
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# SPDX-FileCopyrightText: 2025-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from pydantic import UUID7, Field
5+
6+
from data_rentgen.openlineage.run_facets.base import OpenLineageRunFacet
7+
8+
9+
class OpenLineageStarRocksSessionInfoRunFacet(OpenLineageRunFacet):
10+
"""Run facet describing StarRocks session."""
11+
12+
user: str = Field(examples=["myuser"])
13+
sessionId: UUID7 = Field(examples=["019d455f-cf2a-7b1e-92d4-8f5c3e9a7b2d"])
14+
clientIp: str = Field(examples=["11.22.33.44"])

data_rentgen/utils/uuid.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77
from datetime import datetime, timezone
88
from hashlib import sha1
9-
from typing import Any
109
from uuid import NAMESPACE_URL, uuid5
1110
from uuid import UUID as BaseUUID # noqa: N811
1211

@@ -111,13 +110,3 @@ def extract_timestamp_from_uuid(uuid: BaseUUID) -> datetime:
111110
msg = "Only UUIDv6+ are supported"
112111
raise ValueError(msg)
113112
return datetime.fromtimestamp(uuid.time / 1000, tz=timezone.utc)
114-
115-
116-
def uuid_version_validator(run_id: Any) -> NewUUID:
117-
if isinstance(run_id, str):
118-
run_id = NewUUID(run_id)
119-
if not run_id.version or run_id.version < 6: # noqa: PLR2004
120-
err_msg = f"Run ID: {run_id} is not valid uuid. Only UUIDv6+ are supported"
121-
raise ValueError(err_msg)
122-
return run_id
123-
return run_id
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for OpenLineage integration for StarRocks (proprietary, part of MWS Data Engine).

mddocs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ Currently we support consuming lineage from:
1414
* Apache Hive
1515
* Apache Flink
1616
* dbt
17+
* StarRocks (proprietary integration, part of MWS Data Engine)
1718

18-
**Note**: service is under active development, so it doesn’t have stable API for now.
19+
**Note**: service is under active development, so API can be unstable.
1920

2021
# Goals
2122

0 commit comments

Comments
 (0)