Skip to content

Commit 9a7f926

Browse files
committed
[DOP-25645] add tests
1 parent 4bae5cd commit 9a7f926

11 files changed

Lines changed: 101 additions & 29 deletions

File tree

data_rentgen/consumer/extractors/batch_extraction_result.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ def add_run(self, run: RunDTO):
136136

137137
def add_operation(self, operation: OperationDTO):
138138
operation.run = self.add_run(operation.run)
139+
if operation.sql_query:
140+
operation.sql_query = self.add_sql_query(operation.sql_query)
139141
return self._add(self._operations, operation)
140142

141143
def add_input(self, input_: InputDTO):

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ def extract_run(self, event: OpenLineageRunEvent) -> None:
4949
def extract_operation(self, event: OpenLineageRunEvent) -> None:
5050
operation = extract_operation(event)
5151
self.result.add_operation(operation)
52-
# TODO here we don't have a query id, so we should store relation to query dto instead of only id
5352

5453
for input_dataset in event.inputs:
5554
input_dto, symlink_dtos = extract_input(operation, input_dataset)

data_rentgen/consumer/extractors/job.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from textwrap import dedent
54
from urllib.parse import urlparse
65

76
from data_rentgen.consumer.openlineage.job import OpenLineageJob
87
from data_rentgen.consumer.openlineage.run_facets import OpenLineageParentJob
9-
from data_rentgen.dto import JobDTO, JobTypeDTO, LocationDTO, SQLQueryDTO
8+
from data_rentgen.dto import JobDTO, JobTypeDTO, LocationDTO
109

1110

1211
def extract_parent_job(job: OpenLineageParentJob) -> JobDTO:
@@ -21,7 +20,6 @@ def extract_job(job: OpenLineageJob) -> JobDTO:
2120
name=job.name,
2221
location=extract_job_location(job),
2322
type=extract_job_type(job),
24-
sql_query=extract_job_sql_query(job),
2523
)
2624

2725

@@ -44,14 +42,3 @@ def extract_job_type(job: OpenLineageJob) -> JobTypeDTO | None:
4442
return JobTypeDTO(type=type_.upper())
4543

4644
return None
47-
48-
49-
def extract_job_sql_query(job: OpenLineageJob) -> SQLQueryDTO | None:
50-
"""
51-
Sql queries are usual has format of multiline string. So we remove additional spaces and end of the rows symbols.
52-
"""
53-
if job.facets.sql_query:
54-
query = str.strip(dedent(job.facets.sql_query.query))
55-
return SQLQueryDTO(query=query)
56-
57-
return None

data_rentgen/consumer/extractors/operation.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4+
from textwrap import dedent
5+
46
from data_rentgen.consumer.extractors.run import extract_parent_run, extract_run
7+
from data_rentgen.consumer.openlineage.job import OpenLineageJob
58
from data_rentgen.consumer.openlineage.run_event import (
69
OpenLineageRunEvent,
710
OpenLineageRunEventType,
811
)
9-
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO
12+
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO, SQLQueryDTO
1013

1114

1215
def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
@@ -32,7 +35,7 @@ def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
3235
name=operation_name,
3336
type=type_,
3437
status=OperationStatusDTO(run.status),
35-
sql_query=run.job.sql_query,
38+
sql_query=extract_sql_query(event.job),
3639
started_at=run.started_at,
3740
ended_at=run.ended_at,
3841
)
@@ -70,3 +73,14 @@ def enrich_operation_description(operation: OperationDTO, event: OpenLineageRunE
7073
operation.group = spark_job_details.jobGroup
7174
operation.description = spark_job_details.jobDescription
7275
return operation
76+
77+
78+
def extract_sql_query(job: OpenLineageJob) -> SQLQueryDTO | None:
79+
"""
80+
Sql queries are usual has format of multiline string. So we remove additional spaces and end of the rows symbols.
81+
"""
82+
if job.facets.sql:
83+
query = str.strip(dedent(job.facets.sql.query))
84+
return SQLQueryDTO(query=query)
85+
86+
return None

data_rentgen/consumer/openlineage/job_facets/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ class OpenLineageJobFacets(OpenLineageBase):
2828

2929
documentation: OpenLineageDocumentationJobFacet | None = None
3030
jobType: OpenLineageJobTypeJobFacet | None = None
31-
sql_query: OpenLineageSqlJobFacet | None = None
31+
sql: OpenLineageSqlJobFacet | None = None

data_rentgen/db/repositories/operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None:
2828
"started_at": func.coalesce(insert_statement.excluded.started_at, Operation.started_at),
2929
"ended_at": func.coalesce(insert_statement.excluded.ended_at, Operation.ended_at),
3030
"description": func.coalesce(insert_statement.excluded.description, Operation.description),
31+
"sql_query_id": func.coalesce(insert_statement.excluded.sql_query_id, Operation.sql_query_id),
3132
"group": func.coalesce(insert_statement.excluded.group, Operation.group),
3233
"position": func.coalesce(insert_statement.excluded.position, Operation.position),
3334
},
3435
)
35-
3636
await self._session.execute(
3737
statement,
3838
[

data_rentgen/dto/job.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77

88
from data_rentgen.dto.job_type import JobTypeDTO
99
from data_rentgen.dto.location import LocationDTO
10-
from data_rentgen.dto.sql_query import SQLQueryDTO
1110

1211

1312
@dataclass
1413
class JobDTO:
1514
name: str
1615
location: LocationDTO
1716
type: JobTypeDTO | None = None
18-
sql_query: SQLQueryDTO | None = None
1917
id: int | None = field(default=None, compare=False)
2018

2119
@property
@@ -33,16 +31,9 @@ def merge(self, new: JobDTO) -> JobDTO:
3331
else:
3432
type_ = new.type or self.type
3533

36-
sql_query: SQLQueryDTO | None
37-
if new.sql_query and self.sql_query:
38-
sql_query = self.sql_query.merge(new.sql_query)
39-
else:
40-
sql_query = new.sql_query or self.sql_query
41-
4234
return JobDTO(
4335
location=self.location.merge(new.location),
4436
name=self.name,
4537
type=type_,
46-
sql_query=sql_query,
4738
id=new.id or self.id,
4839
)

tests/test_consumer/resources/events_spark.jsonl

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

tests/test_consumer/test_extractors/test_extractors_operation_spark.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
OpenLineageJobFacets,
1212
OpenLineageJobProcessingType,
1313
OpenLineageJobTypeJobFacet,
14+
OpenLineageSqlJobFacet,
1415
)
1516
from data_rentgen.consumer.openlineage.run import OpenLineageRun
1617
from data_rentgen.consumer.openlineage.run_event import (
@@ -29,6 +30,7 @@
2930
from data_rentgen.dto.location import LocationDTO
3031
from data_rentgen.dto.operation import OperationTypeDTO
3132
from data_rentgen.dto.run import RunDTO
33+
from data_rentgen.dto.sql_query import SQLQueryDTO
3234

3335

3436
def test_extractors_extract_operation_spark_job_no_details():
@@ -302,3 +304,62 @@ def test_extractors_extract_operation_spark_job_finished(
302304
started_at=None,
303305
ended_at=ended_at,
304306
)
307+
308+
309+
def test_extractors_extract_operation_spark_job_sql_query():
310+
now = datetime(2024, 7, 5, 9, 6, 29, 462000, tzinfo=timezone.utc)
311+
run_id = UUID("01908224-8410-79a2-8de6-a769ad6944c9")
312+
operation_id = UUID("01908225-1fd7-746b-910c-70d24f2898b1")
313+
314+
operation = OpenLineageRunEvent(
315+
eventType=OpenLineageRunEventType.START,
316+
eventTime=now,
317+
job=OpenLineageJob(
318+
namespace="anything",
319+
name="mysession.execute_some_command",
320+
facets=OpenLineageJobFacets(
321+
jobType=OpenLineageJobTypeJobFacet(
322+
processingType=OpenLineageJobProcessingType.BATCH,
323+
integration="SPARK",
324+
jobType="SQL_JOB",
325+
),
326+
sql=OpenLineageSqlJobFacet(query="select id, name from schema.table where id = 1"),
327+
),
328+
),
329+
run=OpenLineageRun(
330+
runId=operation_id,
331+
facets=OpenLineageRunFacets(
332+
parent=OpenLineageParentRunFacet(
333+
job=OpenLineageParentJob(
334+
namespace="anything",
335+
name="mysession",
336+
),
337+
run=OpenLineageParentRun(
338+
runId=run_id,
339+
),
340+
),
341+
),
342+
),
343+
)
344+
assert extract_operation(operation) == OperationDTO(
345+
id=operation_id,
346+
run=RunDTO(
347+
id=run_id,
348+
job=JobDTO(
349+
name="mysession",
350+
location=LocationDTO(
351+
type="unknown",
352+
name="anything",
353+
addresses={"unknown://anything"},
354+
),
355+
),
356+
),
357+
name="execute_some_command",
358+
type=OperationTypeDTO.BATCH,
359+
position=None,
360+
description=None,
361+
status=OperationStatusDTO.STARTED,
362+
sql_query=SQLQueryDTO(query="select id, name from schema.table where id = 1"),
363+
started_at=now,
364+
ended_at=None,
365+
)

tests/test_consumer/test_handlers/test_runs_handler_spark.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
Run,
2828
RunStatus,
2929
Schema,
30+
SQLQuery,
3031
)
3132

3233
RESOURCES_PATH = Path(__file__).parent.parent.joinpath("resources").resolve()
@@ -95,6 +96,15 @@ async def test_runs_handler_spark(
9596
assert application_run.running_log_url == "http://127.0.0.1:4040"
9697
assert application_run.persistent_log_url is None
9798

99+
sql_query = select(SQLQuery).order_by(SQLQuery.id)
100+
sql_query_scalars = await async_session.scalars(sql_query)
101+
sql_queries = sql_query_scalars.all()
102+
assert len(sql_queries) == 1
103+
104+
operation_sql_query = sql_queries[0]
105+
assert operation_sql_query.fingerprint == UUID("250fe871-527a-5bf7-b730-3f89b42cceb2")
106+
assert operation_sql_query.query == "select id, name from schema.table where id = 1"
107+
98108
operation_query = select(Operation).order_by(Operation.id)
99109
operation_scalars = await async_session.scalars(operation_query)
100110
operations = operation_scalars.all()
@@ -108,6 +118,7 @@ async def test_runs_handler_spark(
108118
assert job_operation.type == OperationType.BATCH
109119
assert job_operation.status == OperationStatus.SUCCEEDED
110120
assert job_operation.started_at == datetime(2024, 7, 5, 9, 6, 29, 462000, tzinfo=timezone.utc)
121+
assert job_operation.sql_query_id == operation_sql_query.id
111122
assert job_operation.ended_at == datetime(2024, 7, 5, 9, 7, 15, 642000, tzinfo=timezone.utc)
112123
assert job_operation.position == 3
113124
assert job_operation.description == "Hive -> Clickhouse"

0 commit comments

Comments
 (0)