Skip to content

Commit 67a37d5

Browse files
committed
[DOP-25498] Make operation extraction more generic
1 parent 7a94ee2 commit 67a37d5

37 files changed

Lines changed: 1678 additions & 720 deletions

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from data_rentgen.consumer.extractors.operation import extract_operation
99
from data_rentgen.consumer.extractors.output import extract_output
1010
from data_rentgen.consumer.extractors.run import extract_run
11-
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
1211
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
1312
from data_rentgen.dto import (
1413
DatasetDTO,
@@ -22,13 +21,27 @@ def __init__(self) -> None:
2221

2322
def add_events(self, events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
2423
for event in events:
25-
if event.job.facets.jobType and event.job.facets.jobType.jobType == OpenLineageJobType.JOB:
24+
if self.is_operation(event):
2625
self.extract_operation(event)
2726
else:
2827
self.extract_run(event)
29-
3028
return self.result
3129

30+
def is_operation(self, event: OpenLineageRunEvent) -> bool:
31+
has_lineage = bool(event.inputs or event.outputs)
32+
33+
job_type_facet = event.job.facets.jobType
34+
if not job_type_facet:
35+
return has_lineage
36+
37+
if job_type_facet.integration == "SPARK":
38+
return job_type_facet.jobType != "APPLICATION"
39+
40+
if job_type_facet.integration == "AIRFLOW":
41+
return job_type_facet.jobType == "TASK"
42+
43+
return has_lineage
44+
3245
def extract_run(self, event: OpenLineageRunEvent) -> None:
3346
run = extract_run(event)
3447
self.result.add_run(run)

data_rentgen/consumer/extractors/job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from data_rentgen.consumer.openlineage.job import OpenLineageJob
77
from data_rentgen.consumer.openlineage.run_facets import OpenLineageParentJob
8-
from data_rentgen.dto import JobDTO, JobTypeDTO, LocationDTO
8+
from data_rentgen.dto import JobDTO, LocationDTO
99

1010

1111
def extract_parent_job(job: OpenLineageParentJob) -> JobDTO:
@@ -34,10 +34,10 @@ def extract_job_location(job: OpenLineageJob | OpenLineageParentJob) -> Location
3434
)
3535

3636

37-
def extract_job_type(job: OpenLineageJob) -> JobTypeDTO | None:
37+
def extract_job_type(job: OpenLineageJob) -> str | None:
3838
if job.facets.jobType:
3939
job_type = job.facets.jobType.jobType
4040
integration_type = job.facets.jobType.integration
41-
return JobTypeDTO(f"{integration_type}_{job_type}")
41+
return f"{integration_type}_{job_type}".upper()
4242

4343
return None

data_rentgen/consumer/extractors/operation.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from data_rentgen.consumer.extractors.run import extract_parent_run
4+
from data_rentgen.consumer.extractors.run import extract_parent_run, extract_run
55
from data_rentgen.consumer.openlineage.run_event import (
66
OpenLineageRunEvent,
77
OpenLineageRunEventType,
@@ -10,21 +10,27 @@
1010

1111

1212
def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
13-
# operation always has parent
14-
run = extract_parent_run(event.run.facets.parent) # type: ignore[arg-type]
13+
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
14+
run = extract_parent_run(event.run.facets.parent)
15+
else:
16+
run = extract_run(event)
1517

1618
# in some cases, operation name may contain raw SELECT query with newlines
1719
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
1820
# remove parent job name from operation name
19-
if operation_name.startswith(run.job.name):
21+
if operation_name != run.job.name and operation_name.startswith(run.job.name):
2022
prefix = len(run.job.name) + 1
2123
operation_name = operation_name[prefix:]
2224

25+
type_: OperationTypeDTO = OperationTypeDTO.BATCH
26+
if event.job.facets.jobType:
27+
type_ = OperationTypeDTO(event.job.facets.jobType.processingType)
28+
2329
operation = OperationDTO(
2430
id=event.run.runId, # type: ignore [arg-type]
2531
run=run,
2632
name=operation_name,
27-
type=OperationTypeDTO(event.job.facets.jobType.processingType) if event.job.facets.jobType else None,
33+
type=type_,
2834
)
2935
enrich_operation_status(operation, event)
3036
enrich_operation_description(operation, event)

data_rentgen/consumer/openlineage/job_facets/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,15 @@
77
OpenLineageDocumentationJobFacet,
88
)
99
from data_rentgen.consumer.openlineage.job_facets.job_type import (
10-
OpenLineageJobIntegrationType,
1110
OpenLineageJobProcessingType,
12-
OpenLineageJobType,
1311
OpenLineageJobTypeJobFacet,
1412
)
1513

1614
__all__ = [
1715
"OpenLineageDocumentationJobFacet",
1816
"OpenLineageJobFacet",
1917
"OpenLineageJobFacets",
20-
"OpenLineageJobIntegrationType",
2118
"OpenLineageJobProcessingType",
22-
"OpenLineageJobType",
2319
"OpenLineageJobTypeJobFacet",
2420
]
2521

data_rentgen/consumer/openlineage/job_facets/job_type.py

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,64 +3,24 @@
33

44
from enum import Enum
55

6-
from pydantic import field_validator
7-
86
from data_rentgen.consumer.openlineage.job_facets.base import OpenLineageJobFacet
97

108

11-
class OpenLineageJobIntegrationType(str, Enum):
12-
"""Integration where job is running.
13-
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
14-
"""
15-
16-
SPARK = "SPARK"
17-
AIRFLOW = "AIRFLOW"
18-
19-
def __str__(self) -> str:
20-
return self.value
21-
22-
23-
class OpenLineageJobType(str, Enum):
24-
"""Job type.
25-
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
26-
"""
27-
28-
APPLICATION = "APPLICATION"
29-
JOB = "JOB"
30-
DAG = "DAG"
31-
TASK = "TASK"
32-
33-
def __str__(self) -> str:
34-
return self.value
35-
36-
@classmethod
37-
def _missing_(cls, value):
38-
if value in {"SQL_JOB", "RDD_JOB"}:
39-
return cls.JOB
40-
return None
41-
42-
439
class OpenLineageJobProcessingType(str, Enum):
4410
"""Job processing type.
4511
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
4612
"""
4713

4814
BATCH = "BATCH"
4915
STREAMING = "STREAMING"
16+
NONE = "NONE"
5017

5118

5219
class OpenLineageJobTypeJobFacet(OpenLineageJobFacet):
5320
"""Job facet describing job type.
5421
See [JobTypeJobFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobTypeJobFacet.json).
5522
"""
5623

57-
integration: OpenLineageJobIntegrationType
58-
jobType: OpenLineageJobType
59-
processingType: OpenLineageJobProcessingType | None = None
60-
61-
@field_validator("processingType", mode="before")
62-
@classmethod
63-
def _validate_processing_type(cls, processing_type: str):
64-
if processing_type == "NONE":
65-
return None
66-
return processing_type
24+
processingType: OpenLineageJobProcessingType
25+
integration: str
26+
jobType: str | None = None

data_rentgen/consumer/subscribers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def extract_events(
6565
event = OpenLineageRunEventAdapter.validate_json(message.value)
6666
extractor.add_events([event])
6767
except (ValueError, TypeError):
68-
logger.error( # noqa: TRY400
68+
logger.exception(
6969
"Failed to parse message: ConsumerRecord(topic=%r, partition=%d, offset=%d)",
7070
message.topic,
7171
message.partition,
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Add job_type
4+
5+
Revision ID: 2d2fe3f2f348
6+
Revises: 976168ee4f16
7+
Create Date: 2025-04-25 15:09:17.556969
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "2d2fe3f2f348"
16+
down_revision = "976168ee4f16"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.create_table(
23+
"job_type",
24+
sa.Column("id", sa.BigInteger(), nullable=False),
25+
sa.Column("type", sa.String(), nullable=False),
26+
sa.PrimaryKeyConstraint("id", name=op.f("pk__job_type")),
27+
sa.UniqueConstraint("type", name=op.f("uq__job_type__type")),
28+
)
29+
op.create_index(op.f("ix__job_type__type"), "job_type", ["type"], unique=False)
30+
31+
op.execute(
32+
sa.text(
33+
"""
34+
INSERT INTO
35+
job_type (id, type)
36+
VALUES
37+
(0, 'UNKNOWN'),
38+
(1, 'SPARK_APPLICATION'),
39+
(2, 'AIRFLOW_DAG'),
40+
(3, 'AIRFLOW_TASK');
41+
""",
42+
),
43+
)
44+
op.execute(sa.text("ALTER SEQUENCE job_type_id_seq RESTART WITH 4;"))
45+
46+
op.execute(sa.text("LOCK TABLE job IN ACCESS EXCLUSIVE MODE;"))
47+
op.drop_index("ix__job__type", table_name="job")
48+
op.alter_column(
49+
"job",
50+
"type",
51+
new_column_name="type_id",
52+
existing_type=sa.String(length=32),
53+
type_=sa.BigInteger(),
54+
nullable=False,
55+
postgresql_using="""
56+
CASE
57+
WHEN type = 'SPARK_APPLICATION'
58+
THEN 1
59+
WHEN type = 'AIRFLOW_DAG'
60+
THEN 2
61+
WHEN type = 'AIRFLOW_TASK'
62+
THEN 3
63+
ELSE 0
64+
END
65+
""",
66+
)
67+
op.create_index(op.f("ix__job__type_id"), "job", ["type_id"], unique=False)
68+
69+
70+
def downgrade() -> None:
71+
op.execute(sa.text("LOCK TABLE job IN ACCESS EXCLUSIVE MODE;"))
72+
op.drop_index(op.f("ix__job__type_id"), table_name="job")
73+
op.alter_column(
74+
"job",
75+
"type_id",
76+
new_column_name="type",
77+
existing_type=sa.BigInteger(),
78+
type_=sa.String(length=32),
79+
nullable=False,
80+
)
81+
op.execute(
82+
sa.text(
83+
"""
84+
UPDATE job
85+
SET type = (SELECT job_type.type FROM job_type WHERE job_type.id = job.type);
86+
""",
87+
),
88+
)
89+
op.create_index("ix__job__type", "job", ["type"], unique=False)
90+
91+
op.drop_index(op.f("ix__job_type__type"), table_name="job_type")
92+
op.drop_table("job_type")

data_rentgen/db/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
)
1414
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
1515
from data_rentgen.db.models.input import Input
16-
from data_rentgen.db.models.job import Job, JobType
16+
from data_rentgen.db.models.job import Job
17+
from data_rentgen.db.models.job_type import JobType
1718
from data_rentgen.db.models.location import Location
1819
from data_rentgen.db.models.operation import Operation, OperationStatus, OperationType
1920
from data_rentgen.db.models.output import Output, OutputType

data_rentgen/db/models/job.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,15 @@
33

44
from __future__ import annotations
55

6-
from enum import Enum
7-
8-
from sqlalchemy import BigInteger, Computed, ForeignKey, Index, String, UniqueConstraint
6+
from sqlalchemy import BigInteger, Computed, ForeignKey, Index, String, UniqueConstraint, select
97
from sqlalchemy.dialects.postgresql import TSVECTOR
10-
from sqlalchemy.orm import Mapped, mapped_column, relationship
11-
from sqlalchemy_utils import ChoiceType
8+
from sqlalchemy.orm import Mapped, column_property, mapped_column, relationship
129

1310
from data_rentgen.db.models.base import Base
11+
from data_rentgen.db.models.job_type import JobType
1412
from data_rentgen.db.models.location import Location
1513

1614

17-
class JobType(str, Enum):
18-
AIRFLOW_DAG = "AIRFLOW_DAG"
19-
AIRFLOW_TASK = "AIRFLOW_TASK"
20-
SPARK_APPLICATION = "SPARK_APPLICATION"
21-
UNKNOWN = "UNKNOWN"
22-
23-
def __str__(self) -> str:
24-
return self.value
25-
26-
2715
class Job(Base):
2816
__tablename__ = "job"
2917
__table_args__ = (
@@ -49,12 +37,12 @@ class Job(Base):
4937
doc="Job name, e.g. Airflow DAG name + task name, or Spark applicationName",
5038
)
5139

52-
type: Mapped[JobType] = mapped_column(
53-
ChoiceType(JobType, impl=String(32)),
40+
type_id: Mapped[int] = mapped_column(
41+
BigInteger,
42+
ForeignKey("job_type.id", ondelete="CASCADE"),
5443
index=True,
5544
nullable=False,
56-
default=JobType.UNKNOWN,
57-
doc="Job type, e.g. AIRFLOW_DAG, AIRFLOW_TASK, SPARK_APPLICATION",
45+
doc="Job type",
5846
)
5947

6048
search_vector: Mapped[str] = mapped_column(
@@ -76,3 +64,6 @@ class Job(Base):
7664
deferred=True,
7765
doc="Full-text search vector",
7866
)
67+
68+
69+
Job.type = column_property(select(JobType.type).where(Job.type_id == JobType.id).scalar_subquery())

data_rentgen/db/models/job_type.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from __future__ import annotations
5+
6+
from sqlalchemy import BigInteger, String, UniqueConstraint
7+
from sqlalchemy.orm import Mapped, mapped_column
8+
9+
from data_rentgen.db.models.base import Base
10+
11+
12+
class JobType(Base):
13+
__tablename__ = "job_type"
14+
__table_args__ = (UniqueConstraint("type"),)
15+
16+
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
17+
18+
type: Mapped[str] = mapped_column(
19+
String,
20+
index=True,
21+
nullable=False,
22+
doc="Job type, e.g. SPARK_APPLICATION, AIRFLOW_DAG",
23+
)

0 commit comments

Comments
 (0)