Skip to content

Commit 2c0dee9

Browse files
committed
[DOP-25498] Do not restrict values of Job.type
1 parent 7a94ee2 commit 2c0dee9

34 files changed

Lines changed: 571 additions & 248 deletions

data_rentgen/consumer/extractors/batch_extraction_result.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
DatasetSymlinkDTO,
1111
InputDTO,
1212
JobDTO,
13+
JobTypeDTO,
1314
LocationDTO,
1415
OperationDTO,
1516
OutputDTO,
@@ -25,6 +26,7 @@
2526
ColumnLineageDTO,
2627
DatasetSymlinkDTO,
2728
JobDTO,
29+
JobTypeDTO,
2830
RunDTO,
2931
OperationDTO,
3032
InputDTO,
@@ -56,6 +58,7 @@ def __init__(self):
5658
self._locations: dict[tuple, LocationDTO] = {}
5759
self._datasets: dict[tuple, DatasetDTO] = {}
5860
self._dataset_symlinks: dict[tuple, DatasetSymlinkDTO] = {}
61+
self._job_types: dict[tuple, JobTypeDTO] = {}
5962
self._jobs: dict[tuple, JobDTO] = {}
6063
self._runs: dict[tuple, RunDTO] = {}
6164
self._operations: dict[tuple, OperationDTO] = {}
@@ -71,6 +74,7 @@ def __repr__(self):
7174
f"locations={len(self._locations)}, "
7275
f"datasets={len(self._datasets)}, "
7376
f"dataset_symlinks={len(self._dataset_symlinks)}, "
77+
f"job_types={len(self._job_types)}, "
7478
f"jobs={len(self._jobs)}, "
7579
f"runs={len(self._runs)}, "
7680
f"operations={len(self._operations)}, "
@@ -109,8 +113,13 @@ def add_dataset_symlink(self, dataset_symlink: DatasetSymlinkDTO):
109113
dataset_symlink.to_dataset = self.add_dataset(dataset_symlink.to_dataset)
110114
return self._add(self._dataset_symlinks, dataset_symlink)
111115

116+
def add_job_type(self, job_type: JobTypeDTO):
117+
return self._add(self._job_types, job_type)
118+
112119
def add_job(self, job: JobDTO):
113120
job.location = self.add_location(job.location)
121+
if job.type:
122+
job.type = self.add_job_type(job.type)
114123
return self._add(self._jobs, job)
115124

116125
def add_run(self, run: RunDTO):
@@ -171,9 +180,14 @@ def get_dataset_symlink(self, dataset_symlink_key: tuple) -> DatasetSymlinkDTO:
171180
dataset_symlink.to_dataset = self.get_dataset(dataset_symlink.to_dataset.unique_key)
172181
return dataset_symlink
173182

183+
def get_job_type(self, job_type_key: tuple) -> JobTypeDTO:
184+
return self._job_types[job_type_key]
185+
174186
def get_job(self, job_key: tuple) -> JobDTO:
175187
job = self._jobs[job_key]
176188
job.location = self.get_location(job.location.unique_key)
189+
if job.type:
190+
job.type = self.get_job_type(job.type.unique_key)
177191
return job
178192

179193
def get_run(self, run_key: tuple) -> RunDTO:
@@ -222,6 +236,9 @@ def datasets(self) -> list[DatasetDTO]:
222236
def dataset_symlinks(self) -> list[DatasetSymlinkDTO]:
223237
return list(map(self.get_dataset_symlink, self._dataset_symlinks))
224238

239+
def job_types(self) -> list[JobTypeDTO]:
240+
return list(map(self.get_job_type, self._job_types))
241+
225242
def jobs(self) -> list[JobDTO]:
226243
return list(map(self.get_job, self._jobs))
227244

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from data_rentgen.consumer.extractors.operation import extract_operation
99
from data_rentgen.consumer.extractors.output import extract_output
1010
from data_rentgen.consumer.extractors.run import extract_run
11-
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
1211
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
1312
from data_rentgen.dto import (
1413
DatasetDTO,
@@ -22,13 +21,25 @@ def __init__(self) -> None:
2221

2322
def add_events(self, events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
2423
for event in events:
25-
if event.job.facets.jobType and event.job.facets.jobType.jobType == OpenLineageJobType.JOB:
24+
if self.is_operation(event):
2625
self.extract_operation(event)
2726
else:
2827
self.extract_run(event)
29-
3028
return self.result
3129

30+
def is_operation(self, event: OpenLineageRunEvent) -> bool:
31+
job_type_facet = event.job.facets.jobType
32+
if not job_type_facet:
33+
return False
34+
35+
if job_type_facet.integration == "SPARK":
36+
return job_type_facet.jobType != "APPLICATION"
37+
38+
if job_type_facet.integration == "AIRFLOW":
39+
return False
40+
41+
return False
42+
3243
def extract_run(self, event: OpenLineageRunEvent) -> None:
3344
run = extract_run(event)
3445
self.result.add_run(run)

data_rentgen/consumer/extractors/job.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ def extract_job_location(job: OpenLineageJob | OpenLineageParentJob) -> Location
3636

3737
def extract_job_type(job: OpenLineageJob) -> JobTypeDTO | None:
3838
if job.facets.jobType:
39-
job_type = job.facets.jobType.jobType
4039
integration_type = job.facets.jobType.integration
41-
return JobTypeDTO(f"{integration_type}_{job_type}")
40+
job_type = job.facets.jobType.jobType
41+
type_ = f"{integration_type}_{job_type}" if job_type else integration_type
42+
return JobTypeDTO(type=type_.upper())
4243

4344
return None

data_rentgen/consumer/extractors/operation.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
2020
prefix = len(run.job.name) + 1
2121
operation_name = operation_name[prefix:]
2222

23+
type_: OperationTypeDTO = OperationTypeDTO.BATCH
24+
if event.job.facets.jobType and event.job.facets.jobType.processingType:
25+
type_ = OperationTypeDTO(event.job.facets.jobType.processingType)
26+
2327
operation = OperationDTO(
2428
id=event.run.runId, # type: ignore [arg-type]
2529
run=run,
2630
name=operation_name,
27-
type=OperationTypeDTO(event.job.facets.jobType.processingType) if event.job.facets.jobType else None,
31+
type=type_,
2832
)
2933
enrich_operation_status(operation, event)
3034
enrich_operation_description(operation, event)

data_rentgen/consumer/openlineage/job_facets/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,15 @@
77
OpenLineageDocumentationJobFacet,
88
)
99
from data_rentgen.consumer.openlineage.job_facets.job_type import (
10-
OpenLineageJobIntegrationType,
1110
OpenLineageJobProcessingType,
12-
OpenLineageJobType,
1311
OpenLineageJobTypeJobFacet,
1412
)
1513

1614
__all__ = [
1715
"OpenLineageDocumentationJobFacet",
1816
"OpenLineageJobFacet",
1917
"OpenLineageJobFacets",
20-
"OpenLineageJobIntegrationType",
2118
"OpenLineageJobProcessingType",
22-
"OpenLineageJobType",
2319
"OpenLineageJobTypeJobFacet",
2420
]
2521

data_rentgen/consumer/openlineage/job_facets/job_type.py

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,64 +3,24 @@
33

44
from enum import Enum
55

6-
from pydantic import field_validator
7-
86
from data_rentgen.consumer.openlineage.job_facets.base import OpenLineageJobFacet
97

108

11-
class OpenLineageJobIntegrationType(str, Enum):
12-
"""Integration where job is running.
13-
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
14-
"""
15-
16-
SPARK = "SPARK"
17-
AIRFLOW = "AIRFLOW"
18-
19-
def __str__(self) -> str:
20-
return self.value
21-
22-
23-
class OpenLineageJobType(str, Enum):
24-
"""Job type.
25-
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
26-
"""
27-
28-
APPLICATION = "APPLICATION"
29-
JOB = "JOB"
30-
DAG = "DAG"
31-
TASK = "TASK"
32-
33-
def __str__(self) -> str:
34-
return self.value
35-
36-
@classmethod
37-
def _missing_(cls, value):
38-
if value in {"SQL_JOB", "RDD_JOB"}:
39-
return cls.JOB
40-
return None
41-
42-
439
class OpenLineageJobProcessingType(str, Enum):
4410
"""Job processing type.
4511
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
4612
"""
4713

4814
BATCH = "BATCH"
4915
STREAMING = "STREAMING"
16+
NONE = "NONE"
5017

5118

5219
class OpenLineageJobTypeJobFacet(OpenLineageJobFacet):
5320
"""Job facet describing job type.
5421
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
5522
"""
5623

57-
integration: OpenLineageJobIntegrationType
58-
jobType: OpenLineageJobType
59-
processingType: OpenLineageJobProcessingType | None = None
60-
61-
@field_validator("processingType", mode="before")
62-
@classmethod
63-
def _validate_processing_type(cls, processing_type: str):
64-
if processing_type == "NONE":
65-
return None
66-
return processing_type
24+
processingType: OpenLineageJobProcessingType
25+
integration: str
26+
jobType: str | None = None

data_rentgen/consumer/subscribers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def extract_events(
6565
event = OpenLineageRunEventAdapter.validate_json(message.value)
6666
extractor.add_events([event])
6767
except (ValueError, TypeError):
68-
logger.error( # noqa: TRY400
68+
logger.exception(
6969
"Failed to parse message: ConsumerRecord(topic=%r, partition=%d, offset=%d)",
7070
message.topic,
7171
message.partition,
@@ -109,6 +109,12 @@ async def save_to_db(
109109
dataset_symlink = await unit_of_work.dataset_symlink.create_or_update(dataset_symlink_dto)
110110
dataset_symlink_dto.id = dataset_symlink.id
111111

112+
logger.debug("Creating job types")
113+
for job_type_dto in data.job_types():
114+
async with unit_of_work:
115+
job_type = await unit_of_work.job_type.get_or_create(job_type_dto)
116+
job_type_dto.id = job_type.id
117+
112118
logger.debug("Creating jobs")
113119
for job_dto in data.jobs():
114120
async with unit_of_work:
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Add job_type
4+
5+
Revision ID: 2d2fe3f2f348
6+
Revises: 976168ee4f16
7+
Create Date: 2025-04-25 15:09:17.556969
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "2d2fe3f2f348"
16+
down_revision = "976168ee4f16"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.create_table(
23+
"job_type",
24+
sa.Column("id", sa.BigInteger(), nullable=False),
25+
sa.Column("type", sa.String(), nullable=False),
26+
sa.PrimaryKeyConstraint("id", name=op.f("pk__job_type")),
27+
sa.UniqueConstraint("type", name=op.f("uq__job_type__type")),
28+
)
29+
op.create_index(op.f("ix__job_type__type"), "job_type", ["type"], unique=False)
30+
31+
op.execute(
32+
sa.text(
33+
"""
34+
INSERT INTO
35+
job_type (id, type)
36+
VALUES
37+
(0, 'UNKNOWN'),
38+
(1, 'SPARK_APPLICATION'),
39+
(2, 'AIRFLOW_DAG'),
40+
(3, 'AIRFLOW_TASK');
41+
""",
42+
),
43+
)
44+
op.execute(sa.text("ALTER SEQUENCE job_type_id_seq RESTART WITH 4;"))
45+
46+
op.execute(sa.text("LOCK TABLE job IN ACCESS EXCLUSIVE MODE;"))
47+
op.drop_index("ix__job__type", table_name="job")
48+
op.alter_column(
49+
"job",
50+
"type",
51+
new_column_name="type_id",
52+
existing_type=sa.String(length=32),
53+
type_=sa.BigInteger(),
54+
nullable=False,
55+
postgresql_using="""
56+
CASE
57+
WHEN type = 'SPARK_APPLICATION'
58+
THEN 1
59+
WHEN type = 'AIRFLOW_DAG'
60+
THEN 2
61+
WHEN type = 'AIRFLOW_TASK'
62+
THEN 3
63+
ELSE 0
64+
END
65+
""",
66+
)
67+
op.create_index(op.f("ix__job__type_id"), "job", ["type_id"], unique=False)
68+
op.create_foreign_key(
69+
op.f("fk__job__type_id__job_type"),
70+
"job",
71+
"job_type",
72+
["type_id"],
73+
["id"],
74+
ondelete="CASCADE",
75+
)
76+
77+
78+
def downgrade() -> None:
79+
op.execute(sa.text("LOCK TABLE job IN ACCESS EXCLUSIVE MODE;"))
80+
op.drop_constraint(op.f("fk__job__type_id__job_type"), "job", type_="foreignkey")
81+
op.drop_index(op.f("ix__job__type_id"), table_name="job")
82+
op.alter_column(
83+
"job",
84+
"type_id",
85+
new_column_name="type",
86+
existing_type=sa.BigInteger(),
87+
type_=sa.String(length=32),
88+
nullable=False,
89+
)
90+
op.execute(
91+
sa.text(
92+
"""
93+
UPDATE job
94+
SET type = (SELECT job_type.type FROM job_type WHERE job_type.id = job.type);
95+
""",
96+
),
97+
)
98+
op.create_index("ix__job__type", "job", ["type"], unique=False)
99+
100+
op.drop_index(op.f("ix__job_type__type"), table_name="job_type")
101+
op.drop_table("job_type")

data_rentgen/db/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
)
1414
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
1515
from data_rentgen.db.models.input import Input
16-
from data_rentgen.db.models.job import Job, JobType
16+
from data_rentgen.db.models.job import Job
17+
from data_rentgen.db.models.job_type import JobType
1718
from data_rentgen.db.models.location import Location
1819
from data_rentgen.db.models.operation import Operation, OperationStatus, OperationType
1920
from data_rentgen.db.models.output import Output, OutputType

0 commit comments

Comments
 (0)