Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data_rentgen/consumer/extractors/batch_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool:
return job_type_facet.jobType != "APPLICATION"

if job_type_facet.integration == "AIRFLOW":
return False
return job_type_facet.jobType == "TASK"

return has_lineage

Expand Down
71 changes: 51 additions & 20 deletions data_rentgen/consumer/extractors/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,64 @@
OpenLineageRunEvent,
OpenLineageRunEventType,
)
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO
from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO, RunDTO


def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
run = extract_parent_run(event.run.facets.parent)
else:
run = extract_run(event)

# in some cases, operation name may contain raw SELECT query with newlines
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
# remove parent job name from operation name
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
prefix = len(run.job.name) + 1
operation_name = operation_name[prefix:]

type_: OperationTypeDTO = OperationTypeDTO.BATCH
if event.job.facets.jobType and event.job.facets.jobType.processingType:
type_ = OperationTypeDTO(event.job.facets.jobType.processingType)
run = extract_operation_run(event)

operation = OperationDTO(
id=event.run.runId, # type: ignore [arg-type]
run=run,
name=operation_name,
type=type_,
name=extract_operation_name(run, event),
type=extract_operation_type(event),
status=OperationStatusDTO(run.status),
started_at=run.started_at,
ended_at=run.ended_at,
)
enrich_operation_status(operation, event)
enrich_operation_description(operation, event)
enrich_operation_details(operation, event)
return operation


def extract_operation_run(event: OpenLineageRunEvent) -> RunDTO:
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
return extract_parent_run(event.run.facets.parent)

return extract_run(event)


def extract_operation_type(event: OpenLineageRunEvent) -> OperationTypeDTO:
if event.job.facets.jobType and event.job.facets.jobType.processingType:
return OperationTypeDTO(event.job.facets.jobType.processingType)

return OperationTypeDTO.BATCH


def extract_operation_name(run: RunDTO, event: OpenLineageRunEvent) -> str | None:
if event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
# in some cases, operation name may contain raw SELECT query with newlines. use spaces instead
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()

# Spark execution name is "applicationName.operationName". Strip prefix
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
prefix = len(run.job.name) + 1
operation_name = operation_name[prefix:]

return operation_name

if event.job.facets.jobType and event.job.facets.jobType.integration == "AIRFLOW":
airflow_operator_details = event.run.facets.airflow
if airflow_operator_details:
return airflow_operator_details.task.task_id

# for FINISHED/KILLED event we don't receive task facet.
# keep existing name in DB instead of resetting it to "dag.task"
return None

return run.job.name


def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
match event.eventType:
case OpenLineageRunEventType.START:
Expand All @@ -62,10 +86,17 @@ def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent)
return operation


def enrich_operation_description(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
def enrich_operation_details(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO:
spark_job_details = event.run.facets.spark_jobDetails
if spark_job_details:
operation.position = spark_job_details.jobId
operation.group = spark_job_details.jobGroup
operation.description = spark_job_details.jobDescription

airflow_operator_details = event.run.facets.airflow
if airflow_operator_details:
operation.description = airflow_operator_details.task.operator_class
operation.position = airflow_operator_details.taskInstance.map_index
if airflow_operator_details.task.task_group:
operation.group = airflow_operator_details.task.task_group.group_id
return operation
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
(
airflow_application_details.dag.owner is not None,
airflow_application_details.dag.owner != "airflow",
airflow_application_details.dag.owner != "***",
),
):
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]
Expand All @@ -227,6 +228,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
(
airflow_application_dag_details.dag.owner is not None,
airflow_application_dag_details.dag.owner != "airflow",
airflow_application_dag_details.dag.owner != "***",
),
):
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]
Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/consumer/openlineage/run_facets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
OpenLineageAirflowDagRunFacet,
OpenLineageAirflowDagRunInfo,
OpenLineageAirflowDagRunType,
OpenLineageAirflowTaskGroupInfo,
OpenLineageAirflowTaskInfo,
OpenLineageAirflowTaskInstanceInfo,
OpenLineageAirflowTaskRunFacet,
Expand Down Expand Up @@ -34,6 +35,7 @@
"OpenLineageAirflowDagRunFacet",
"OpenLineageAirflowDagRunInfo",
"OpenLineageAirflowDagRunType",
"OpenLineageAirflowTaskGroupInfo",
"OpenLineageAirflowTaskInfo",
"OpenLineageAirflowTaskInstanceInfo",
"OpenLineageAirflowTaskRunFacet",
Expand Down
20 changes: 15 additions & 5 deletions data_rentgen/consumer/openlineage/run_facets/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class OpenLineageAirflowDagInfo(OpenLineageBase):
"""Airflow dag info.
See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

dag_id: str
Expand All @@ -30,7 +30,7 @@ class OpenLineageAirflowDagRunType(Enum):

class OpenLineageAirflowDagRunInfo(OpenLineageBase):
"""Airflow dagRun info.
See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

run_id: str
Expand All @@ -39,17 +39,27 @@ class OpenLineageAirflowDagRunInfo(OpenLineageBase):
data_interval_end: datetime


class OpenLineageAirflowTaskGroupInfo(OpenLineageBase):
"""Airflow TaskGroup info.
See [task_group](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

group_id: str


class OpenLineageAirflowTaskInfo(OpenLineageBase):
"""Airflow task info.
See [Task](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
See [Task](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

task_id: str
operator_class: str | None = None
task_group: OpenLineageAirflowTaskGroupInfo | None = None


class OpenLineageAirflowTaskInstanceInfo(OpenLineageBase):
"""Airflow taskInstance info.
See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

try_number: int
Expand All @@ -68,7 +78,7 @@ class OpenLineageAirflowDagRunFacet(OpenLineageRunFacet):

class OpenLineageAirflowTaskRunFacet(OpenLineageRunFacet):
"""Run facet describing Airflow Task run.
See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json).
See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json).
"""

dag: OpenLineageAirflowDagInfo
Expand Down
1 change: 1 addition & 0 deletions data_rentgen/consumer/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async def runs_events_subscriber(
):
logger.info("Extracting events")
parsed, malformed = await extract_events(batch, logger)
logger.info("Parsed %r", parsed)

logger.info("Saving to database")
await save_to_db(parsed, session, logger)
Expand Down
1 change: 1 addition & 0 deletions data_rentgen/db/models/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Operation(Base):
name: Mapped[str] = mapped_column(
String,
nullable=False,
default="unknown",
doc="Name of the operation, e.g. job name",
)

Expand Down
65 changes: 35 additions & 30 deletions data_rentgen/db/repositories/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timezone
from uuid import UUID

from sqlalchemy import Row, UnaryExpression, any_, func, select
from sqlalchemy import Row, UnaryExpression, any_, bindparam, func, select, update
from sqlalchemy.dialects.postgresql import insert

from data_rentgen.db.models import Operation, OperationStatus, OperationType
Expand All @@ -19,39 +19,44 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None:
if not operations:
return

insert_statement = insert(Operation)
statement = insert_statement.on_conflict_do_update(
index_elements=[Operation.created_at, Operation.id],
set_={
"name": func.coalesce(insert_statement.excluded.name, Operation.name),
"type": func.coalesce(insert_statement.excluded.type, Operation.type),
"status": func.greatest(insert_statement.excluded.status, Operation.status),
"started_at": func.coalesce(insert_statement.excluded.started_at, Operation.started_at),
"ended_at": func.coalesce(insert_statement.excluded.ended_at, Operation.ended_at),
"description": func.coalesce(insert_statement.excluded.description, Operation.description),
"group": func.coalesce(insert_statement.excluded.group, Operation.group),
"position": func.coalesce(insert_statement.excluded.position, Operation.position),
},
data = [
{
"id": operation.id,
"created_at": extract_timestamp_from_uuid(operation.id),
"run_id": operation.run.id,
"name": operation.name,
"type": OperationType(operation.type) if operation.type else None,
"status": OperationStatus(operation.status),
"started_at": operation.started_at,
"ended_at": operation.ended_at,
"description": operation.description,
"group": operation.group,
"position": operation.position,
}
for operation in operations
]

# this replaces all null values with defaults
await self._session.execute(
insert(Operation).on_conflict_do_nothing(),
data,
)

# if value is still none, keep existing one
await self._session.execute(
statement,
[
update(Operation).values(
{
"id": operation.id,
"created_at": extract_timestamp_from_uuid(operation.id),
"run_id": operation.run.id,
"name": operation.name,
"type": OperationType(operation.type) if operation.type else None,
"status": OperationStatus(operation.status),
"started_at": operation.started_at,
"ended_at": operation.ended_at,
"description": operation.description,
"group": operation.group,
"position": operation.position,
}
for operation in operations
],
"name": func.coalesce(bindparam("name"), Operation.name),
"type": func.coalesce(bindparam("type"), Operation.type),
"status": func.greatest(bindparam("status"), Operation.status),
"started_at": func.coalesce(bindparam("started_at"), Operation.started_at),
"ended_at": func.coalesce(bindparam("ended_at"), Operation.ended_at),
"description": func.coalesce(bindparam("description"), Operation.description),
"group": func.coalesce(bindparam("group"), Operation.group),
"position": func.coalesce(bindparam("position"), Operation.position),
},
),
data,
)

async def paginate(
Expand Down
4 changes: 2 additions & 2 deletions data_rentgen/dto/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class OperationStatusDTO(IntEnum):
class OperationDTO:
id: UUID
run: RunDTO
name: str
type: OperationTypeDTO = OperationTypeDTO.BATCH
name: str | None = None
type: OperationTypeDTO | None = None
position: int | None = None
group: str | None = None
description: str | None = None
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/210.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Parse lineage from Airflow tasks
53 changes: 53 additions & 0 deletions tests/test_consumer/test_extractors/fixtures/airflow_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import pytest

from data_rentgen.dto import (
DatasetDTO,
InputDTO,
JobDTO,
LocationDTO,
OperationDTO,
OperationStatusDTO,
OperationTypeDTO,
OutputDTO,
OutputTypeDTO,
RunDTO,
RunStartReasonDTO,
RunStatusDTO,
SchemaDTO,
UserDTO,
)
from data_rentgen.dto.job_type import JobTypeDTO
Expand Down Expand Up @@ -86,3 +94,48 @@ def extracted_airflow_task_run(
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
),
)


@pytest.fixture
def extracted_airflow_task_operation(
extracted_airflow_task_run: RunDTO,
) -> OperationDTO:
return OperationDTO(
id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"),
name="mytask",
description="BashOperator",
run=extracted_airflow_task_run,
status=OperationStatusDTO.SUCCEEDED,
type=OperationTypeDTO.BATCH,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
)


@pytest.fixture
def extracted_airflow_postgres_input(
extracted_airflow_task_operation: OperationDTO,
extracted_postgres_dataset: DatasetDTO,
extracted_dataset_schema: SchemaDTO,
) -> InputDTO:
return InputDTO(
operation=extracted_airflow_task_operation,
dataset=extracted_postgres_dataset,
schema=extracted_dataset_schema,
)


@pytest.fixture
def extracted_airflow_hive_output(
extracted_airflow_task_operation: OperationDTO,
extracted_hive_dataset: DatasetDTO,
extracted_dataset_schema: SchemaDTO,
) -> OutputDTO:
return OutputDTO(
type=OutputTypeDTO.CREATE,
operation=extracted_airflow_task_operation,
dataset=extracted_hive_dataset,
schema=extracted_dataset_schema,
num_rows=1_000_000,
num_bytes=1000 * 1024 * 1024,
)
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
operator_class="BashOperator",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
Expand Down
Loading
Loading