Skip to content

Commit cc39ae3

Browse files
committed
[DOP-25649] Create operations for Airflow tasks
1 parent 91d401e commit cc39ae3

19 files changed

Lines changed: 855 additions & 352 deletions

File tree

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool:
3838
return job_type_facet.jobType != "APPLICATION"
3939

4040
if job_type_facet.integration == "AIRFLOW":
41-
return False
41+
return job_type_facet.jobType == "TASK"
4242

4343
return has_lineage
4444

data_rentgen/consumer/extractors/operation.py

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,64 @@
66
OpenLineageRunEvent,
77
OpenLineageRunEventType,
88
)
9-
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO
9+
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO, RunDTO
1010

1111

1212
def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
13-
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
14-
run = extract_parent_run(event.run.facets.parent)
15-
else:
16-
run = extract_run(event)
17-
18-
# in some cases, operation name may contain raw SELECT query with newlines
19-
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
20-
# remove parent job name from operation name
21-
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
22-
prefix = len(run.job.name) + 1
23-
operation_name = operation_name[prefix:]
24-
25-
type_: OperationTypeDTO = OperationTypeDTO.BATCH
26-
if event.job.facets.jobType and event.job.facets.jobType.processingType:
27-
type_ = OperationTypeDTO(event.job.facets.jobType.processingType)
13+
run = extract_operation_run(event)
2814

2915
operation = OperationDTO(
3016
id=event.run.runId, # type: ignore [arg-type]
3117
run=run,
32-
name=operation_name,
33-
type=type_,
18+
name=extract_operation_name(run, event),
19+
type=extract_operation_type(event),
3420
status=OperationStatusDTO(run.status),
3521
started_at=run.started_at,
3622
ended_at=run.ended_at,
3723
)
3824
enrich_operation_status(operation, event)
39-
enrich_operation_description(operation, event)
25+
enrich_operation_details(operation, event)
4026
return operation
4127

4228

29+
def extract_operation_run(event: OpenLineageRunEvent) -> RunDTO:
30+
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
31+
return extract_parent_run(event.run.facets.parent)
32+
33+
return extract_run(event)
34+
35+
36+
def extract_operation_type(event: OpenLineageRunEvent) -> OperationTypeDTO:
37+
if event.job.facets.jobType and event.job.facets.jobType.processingType:
38+
return OperationTypeDTO(event.job.facets.jobType.processingType)
39+
40+
return OperationTypeDTO.BATCH
41+
42+
43+
def extract_operation_name(run: RunDTO, event: OpenLineageRunEvent) -> str | None:
44+
if event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
45+
# in some cases, operation name may contain raw SELECT query with newlines. use spaces instead
46+
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
47+
48+
# Spark execution name is "applicationName.operationName". Strip prefix
49+
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
50+
prefix = len(run.job.name) + 1
51+
operation_name = operation_name[prefix:]
52+
53+
return operation_name
54+
55+
if event.job.facets.jobType and event.job.facets.jobType.integration == "AIRFLOW":
56+
airflow_operator_details = event.run.facets.airflow
57+
if airflow_operator_details:
58+
return airflow_operator_details.task.task_id
59+
60+
# for FINISHED/KILLED event we don't receive task facet.
61+
# keep existing name in DB instead of resetting it to "dag.task"
62+
return None
63+
64+
return run.job.name
65+
66+
4367
def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
4468
match event.eventType:
4569
case OpenLineageRunEventType.START:
@@ -62,10 +86,17 @@ def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent)
6286
return operation
6387

6488

65-
def enrich_operation_description(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
89+
def enrich_operation_details(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
6690
spark_job_details = event.run.facets.spark_jobDetails
6791
if spark_job_details:
6892
operation.position = spark_job_details.jobId
6993
operation.group = spark_job_details.jobGroup
7094
operation.description = spark_job_details.jobDescription
95+
96+
airflow_operator_details = event.run.facets.airflow
97+
if airflow_operator_details:
98+
operation.description = airflow_operator_details.task.operator_class
99+
operation.position = airflow_operator_details.taskInstance.map_index
100+
if airflow_operator_details.task.task_group:
101+
operation.group = airflow_operator_details.task.task_group.group_id
71102
return operation

data_rentgen/consumer/extractors/run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
218218
(
219219
airflow_application_details.dag.owner is not None,
220220
airflow_application_details.dag.owner != "airflow",
221+
airflow_application_details.dag.owner != "***",
221222
),
222223
):
223224
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]
@@ -227,6 +228,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
227228
(
228229
airflow_application_dag_details.dag.owner is not None,
229230
airflow_application_dag_details.dag.owner != "airflow",
231+
airflow_application_dag_details.dag.owner != "***",
230232
),
231233
):
232234
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]

data_rentgen/consumer/openlineage/run_facets/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
OpenLineageAirflowDagRunFacet,
88
OpenLineageAirflowDagRunInfo,
99
OpenLineageAirflowDagRunType,
10+
OpenLineageAirflowTaskGroupInfo,
1011
OpenLineageAirflowTaskInfo,
1112
OpenLineageAirflowTaskInstanceInfo,
1213
OpenLineageAirflowTaskRunFacet,
@@ -34,6 +35,7 @@
3435
"OpenLineageAirflowDagRunFacet",
3536
"OpenLineageAirflowDagRunInfo",
3637
"OpenLineageAirflowDagRunType",
38+
"OpenLineageAirflowTaskGroupInfo",
3739
"OpenLineageAirflowTaskInfo",
3840
"OpenLineageAirflowTaskInstanceInfo",
3941
"OpenLineageAirflowTaskRunFacet",

data_rentgen/consumer/openlineage/run_facets/airflow.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class OpenLineageAirflowDagInfo(OpenLineageBase):
1212
"""Airflow dag info.
13-
See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
13+
See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
1414
"""
1515

1616
dag_id: str
@@ -30,7 +30,7 @@ class OpenLineageAirflowDagRunType(Enum):
3030

3131
class OpenLineageAirflowDagRunInfo(OpenLineageBase):
3232
"""Airflow dagRun info.
33-
See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
33+
See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
3434
"""
3535

3636
run_id: str
@@ -39,17 +39,27 @@ class OpenLineageAirflowDagRunInfo(OpenLineageBase):
3939
data_interval_end: datetime
4040

4141

42+
class OpenLineageAirflowTaskGroupInfo(OpenLineageBase):
43+
"""Airflow TaskGroup info.
44+
See [task_group](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
45+
"""
46+
47+
group_id: str
48+
49+
4250
class OpenLineageAirflowTaskInfo(OpenLineageBase):
4351
"""Airflow task info.
44-
See [Task](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
52+
See [Task](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
4553
"""
4654

4755
task_id: str
56+
operator_class: str | None = None
57+
task_group: OpenLineageAirflowTaskGroupInfo | None = None
4858

4959

5060
class OpenLineageAirflowTaskInstanceInfo(OpenLineageBase):
5161
"""Airflow taskInstance info.
52-
See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
62+
See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
5363
"""
5464

5565
try_number: int
@@ -68,7 +78,7 @@ class OpenLineageAirflowDagRunFacet(OpenLineageRunFacet):
6878

6979
class OpenLineageAirflowTaskRunFacet(OpenLineageRunFacet):
7080
"""Run facet describing Airflow Task run.
71-
See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
81+
See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
7282
"""
7383

7484
dag: OpenLineageAirflowDagInfo

data_rentgen/consumer/subscribers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async def runs_events_subscriber(
3434
):
3535
logger.info("Extracting events")
3636
parsed, malformed = await extract_events(batch, logger)
37+
logger.info("Parsed %r", parsed)
3738

3839
logger.info("Saving to database")
3940
await save_to_db(parsed, session, logger)

data_rentgen/db/models/operation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class Operation(Base):
7878
name: Mapped[str] = mapped_column(
7979
String,
8080
nullable=False,
81+
default="unknown",
8182
doc="Name of the operation, e.g. job name",
8283
)
8384

data_rentgen/db/repositories/operation.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from datetime import datetime, timezone
66
from uuid import UUID
77

8-
from sqlalchemy import Row, UnaryExpression, any_, func, select
8+
from sqlalchemy import Row, UnaryExpression, any_, bindparam, func, select, update
99
from sqlalchemy.dialects.postgresql import insert
1010

1111
from data_rentgen.db.models import Operation, OperationStatus, OperationType
@@ -19,39 +19,44 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None:
1919
if not operations:
2020
return
2121

22-
insert_statement = insert(Operation)
23-
statement = insert_statement.on_conflict_do_update(
24-
index_elements=[Operation.created_at, Operation.id],
25-
set_={
26-
"name": func.coalesce(insert_statement.excluded.name, Operation.name),
27-
"type": func.coalesce(insert_statement.excluded.type, Operation.type),
28-
"status": func.greatest(insert_statement.excluded.status, Operation.status),
29-
"started_at": func.coalesce(insert_statement.excluded.started_at, Operation.started_at),
30-
"ended_at": func.coalesce(insert_statement.excluded.ended_at, Operation.ended_at),
31-
"description": func.coalesce(insert_statement.excluded.description, Operation.description),
32-
"group": func.coalesce(insert_statement.excluded.group, Operation.group),
33-
"position": func.coalesce(insert_statement.excluded.position, Operation.position),
34-
},
22+
data = [
23+
{
24+
"id": operation.id,
25+
"created_at": extract_timestamp_from_uuid(operation.id),
26+
"run_id": operation.run.id,
27+
"name": operation.name,
28+
"type": OperationType(operation.type) if operation.type else None,
29+
"status": OperationStatus(operation.status),
30+
"started_at": operation.started_at,
31+
"ended_at": operation.ended_at,
32+
"description": operation.description,
33+
"group": operation.group,
34+
"position": operation.position,
35+
}
36+
for operation in operations
37+
]
38+
39+
# this replaces all null values with defaults
40+
await self._session.execute(
41+
insert(Operation).on_conflict_do_nothing(),
42+
data,
3543
)
3644

45+
# if value is still none, keep existing one
3746
await self._session.execute(
38-
statement,
39-
[
47+
update(Operation).values(
4048
{
41-
"id": operation.id,
42-
"created_at": extract_timestamp_from_uuid(operation.id),
43-
"run_id": operation.run.id,
44-
"name": operation.name,
45-
"type": OperationType(operation.type) if operation.type else None,
46-
"status": OperationStatus(operation.status),
47-
"started_at": operation.started_at,
48-
"ended_at": operation.ended_at,
49-
"description": operation.description,
50-
"group": operation.group,
51-
"position": operation.position,
52-
}
53-
for operation in operations
54-
],
49+
"name": func.coalesce(bindparam("name"), Operation.name),
50+
"type": func.coalesce(bindparam("type"), Operation.type),
51+
"status": func.greatest(bindparam("status"), Operation.status),
52+
"started_at": func.coalesce(bindparam("started_at"), Operation.started_at),
53+
"ended_at": func.coalesce(bindparam("ended_at"), Operation.ended_at),
54+
"description": func.coalesce(bindparam("description"), Operation.description),
55+
"group": func.coalesce(bindparam("group"), Operation.group),
56+
"position": func.coalesce(bindparam("position"), Operation.position),
57+
},
58+
),
59+
data,
5560
)
5661

5762
async def paginate(

data_rentgen/dto/operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class OperationStatusDTO(IntEnum):
3737
class OperationDTO:
3838
id: UUID
3939
run: RunDTO
40-
name: str
41-
type: OperationTypeDTO = OperationTypeDTO.BATCH
40+
name: str | None = None
41+
type: OperationTypeDTO | None = None
4242
position: int | None = None
4343
group: str | None = None
4444
description: str | None = None
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Parse lineage from Airflow tasks

0 commit comments

Comments
 (0)