diff --git a/data_rentgen/consumer/extractors/batch_extractor.py b/data_rentgen/consumer/extractors/batch_extractor.py index 77fe9ded..81a3180c 100644 --- a/data_rentgen/consumer/extractors/batch_extractor.py +++ b/data_rentgen/consumer/extractors/batch_extractor.py @@ -38,7 +38,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool: return job_type_facet.jobType != "APPLICATION" if job_type_facet.integration == "AIRFLOW": - return False + return job_type_facet.jobType == "TASK" return has_lineage diff --git a/data_rentgen/consumer/extractors/operation.py b/data_rentgen/consumer/extractors/operation.py index c49ab3d2..61606862 100644 --- a/data_rentgen/consumer/extractors/operation.py +++ b/data_rentgen/consumer/extractors/operation.py @@ -6,40 +6,64 @@ OpenLineageRunEvent, OpenLineageRunEventType, ) -from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO +from data_rentgen.dto import OperationDTO, OperationStatusDTO, OperationTypeDTO, RunDTO def extract_operation(event: OpenLineageRunEvent) -> OperationDTO: - if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK": - run = extract_parent_run(event.run.facets.parent) - else: - run = extract_run(event) - - # in some cases, operation name may contain raw SELECT query with newlines - operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip() - # remove parent job name from operation name - if operation_name.startswith(run.job.name) and operation_name != run.job.name: - prefix = len(run.job.name) + 1 - operation_name = operation_name[prefix:] - - type_: OperationTypeDTO = OperationTypeDTO.BATCH - if event.job.facets.jobType and event.job.facets.jobType.processingType: - type_ = OperationTypeDTO(event.job.facets.jobType.processingType) + run = extract_operation_run(event) operation = OperationDTO( id=event.run.runId, # type: ignore [arg-type] run=run, - name=operation_name, - type=type_, + name=extract_operation_name(run, event), + type=extract_operation_type(event), status=OperationStatusDTO(run.status), started_at=run.started_at, ended_at=run.ended_at, ) enrich_operation_status(operation, event) - enrich_operation_description(operation, event) + enrich_operation_details(operation, event) return operation +def extract_operation_run(event: OpenLineageRunEvent) -> RunDTO: + if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK": + return extract_parent_run(event.run.facets.parent) + + return extract_run(event) + + +def extract_operation_type(event: OpenLineageRunEvent) -> OperationTypeDTO: + if event.job.facets.jobType and event.job.facets.jobType.processingType: + return OperationTypeDTO(event.job.facets.jobType.processingType) + + return OperationTypeDTO.BATCH + + +def extract_operation_name(run: RunDTO, event: OpenLineageRunEvent) -> str | None: + if event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK": + # in some cases, operation name may contain raw SELECT query with newlines. use spaces instead + operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip() + + # Spark execution name is "applicationName.operationName". Strip prefix + if operation_name.startswith(run.job.name) and operation_name != run.job.name: + prefix = len(run.job.name) + 1 + operation_name = operation_name[prefix:] + + return operation_name + + if event.job.facets.jobType and event.job.facets.jobType.integration == "AIRFLOW": + airflow_operator_details = event.run.facets.airflow + if airflow_operator_details: + return airflow_operator_details.task.task_id + + # for FINISHED/KILLED event we don't receive task facet. + # keep existing name in DB instead of resetting it to "dag.task" + return None + + return run.job.name + + def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO: match event.eventType: case OpenLineageRunEventType.START: @@ -62,10 +86,17 @@ def enrich_operation_status(operation: OperationDTO, event: OpenLineageRunEvent) return operation -def enrich_operation_description(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO: +def enrich_operation_details(operation: OperationDTO, event: OpenLineageRunEvent) -> OperationDTO: spark_job_details = event.run.facets.spark_jobDetails if spark_job_details: operation.position = spark_job_details.jobId operation.group = spark_job_details.jobGroup operation.description = spark_job_details.jobDescription + + airflow_operator_details = event.run.facets.airflow + if airflow_operator_details: + operation.description = airflow_operator_details.task.operator_class + operation.position = airflow_operator_details.taskInstance.map_index + if airflow_operator_details.task.task_group: + operation.group = airflow_operator_details.task.task_group.group_id return operation diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index 7350135e..e60cc2d1 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -218,6 +218,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: ( airflow_application_details.dag.owner is not None, airflow_application_details.dag.owner != "airflow", + airflow_application_details.dag.owner != "***", ), ): run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type] @@ -227,6 +228,7 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: ( airflow_application_dag_details.dag.owner is not None, airflow_application_dag_details.dag.owner != "airflow", + airflow_application_dag_details.dag.owner != "***", ), ): run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type] diff --git a/data_rentgen/consumer/openlineage/run_facets/__init__.py b/data_rentgen/consumer/openlineage/run_facets/__init__.py index dafdf07b..12fc77ac 100644 --- a/data_rentgen/consumer/openlineage/run_facets/__init__.py +++ b/data_rentgen/consumer/openlineage/run_facets/__init__.py @@ -7,6 +7,7 @@ OpenLineageAirflowDagRunFacet, OpenLineageAirflowDagRunInfo, OpenLineageAirflowDagRunType, + OpenLineageAirflowTaskGroupInfo, OpenLineageAirflowTaskInfo, OpenLineageAirflowTaskInstanceInfo, OpenLineageAirflowTaskRunFacet, @@ -34,6 +35,7 @@ "OpenLineageAirflowDagRunFacet", "OpenLineageAirflowDagRunInfo", "OpenLineageAirflowDagRunType", + "OpenLineageAirflowTaskGroupInfo", "OpenLineageAirflowTaskInfo", "OpenLineageAirflowTaskInstanceInfo", "OpenLineageAirflowTaskRunFacet", diff --git a/data_rentgen/consumer/openlineage/run_facets/airflow.py b/data_rentgen/consumer/openlineage/run_facets/airflow.py index 131d2bf4..edfbc5ba 100644 --- a/data_rentgen/consumer/openlineage/run_facets/airflow.py +++ b/data_rentgen/consumer/openlineage/run_facets/airflow.py @@ -10,7 +10,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase): """Airflow dag info. - See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json). + See [Dag](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). """ dag_id: str @@ -30,7 +30,7 @@ class OpenLineageAirflowDagRunType(Enum): class OpenLineageAirflowDagRunInfo(OpenLineageBase): """Airflow dagRun info. - See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json). + See [DagRun](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). """ run_id: str @@ -39,17 +39,27 @@ class OpenLineageAirflowDagRunInfo(OpenLineageBase): data_interval_end: datetime +class OpenLineageAirflowTaskGroupInfo(OpenLineageBase): + """Airflow TaskGroup info. + See [task_group](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). + """ + + group_id: str + + class OpenLineageAirflowTaskInfo(OpenLineageBase): """Airflow task info. - See [Task](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json). + See [Task](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). """ task_id: str + operator_class: str | None = None + task_group: OpenLineageAirflowTaskGroupInfo | None = None class OpenLineageAirflowTaskInstanceInfo(OpenLineageBase): """Airflow taskInstance info. - See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json). + See [TaskInstance](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). """ try_number: int @@ -68,7 +78,7 @@ class OpenLineageAirflowDagRunFacet(OpenLineageRunFacet): class OpenLineageAirflowTaskRunFacet(OpenLineageRunFacet): """Run facet describing Airflow Task run. - See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/1.9.0/airflow/providers/openlineage/facets/AirflowRunFacet.json). + See [AirflowRunFacet](https://github.com/apache/airflow/blob/providers-openlineage/2.2.0/providers/openlineage/src/airflow/providers/openlineage/facets/AirflowRunFacet.json). """ dag: OpenLineageAirflowDagInfo diff --git a/data_rentgen/consumer/subscribers.py b/data_rentgen/consumer/subscribers.py index 03e40770..b0037c0c 100644 --- a/data_rentgen/consumer/subscribers.py +++ b/data_rentgen/consumer/subscribers.py @@ -34,6 +34,7 @@ async def runs_events_subscriber( ): logger.info("Extracting events") parsed, malformed = await extract_events(batch, logger) + logger.info("Parsed %r", parsed) logger.info("Saving to database") await save_to_db(parsed, session, logger) diff --git a/data_rentgen/db/models/operation.py b/data_rentgen/db/models/operation.py index 0a6066a7..e45ee8fb 100644 --- a/data_rentgen/db/models/operation.py +++ b/data_rentgen/db/models/operation.py @@ -78,6 +78,7 @@ class Operation(Base): name: Mapped[str] = mapped_column( String, nullable=False, + default="unknown", doc="Name of the operation, e.g. job name", ) diff --git a/data_rentgen/db/repositories/operation.py b/data_rentgen/db/repositories/operation.py index 88c34fbb..0c462658 100644 --- a/data_rentgen/db/repositories/operation.py +++ b/data_rentgen/db/repositories/operation.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone from uuid import UUID -from sqlalchemy import Row, UnaryExpression, any_, func, select +from sqlalchemy import Row, UnaryExpression, any_, bindparam, func, select, update from sqlalchemy.dialects.postgresql import insert from data_rentgen.db.models import Operation, OperationStatus, OperationType @@ -19,39 +19,44 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None: if not operations: return - insert_statement = insert(Operation) - statement = insert_statement.on_conflict_do_update( - index_elements=[Operation.created_at, Operation.id], - set_={ - "name": func.coalesce(insert_statement.excluded.name, Operation.name), - "type": func.coalesce(insert_statement.excluded.type, Operation.type), - "status": func.greatest(insert_statement.excluded.status, Operation.status), - "started_at": func.coalesce(insert_statement.excluded.started_at, Operation.started_at), - "ended_at": func.coalesce(insert_statement.excluded.ended_at, Operation.ended_at), - "description": func.coalesce(insert_statement.excluded.description, Operation.description), - "group": func.coalesce(insert_statement.excluded.group, Operation.group), - "position": func.coalesce(insert_statement.excluded.position, Operation.position), - }, + data = [ + { + "id": operation.id, + "created_at": extract_timestamp_from_uuid(operation.id), + "run_id": operation.run.id, + "name": operation.name, + "type": OperationType(operation.type) if operation.type else None, + "status": OperationStatus(operation.status), + "started_at": operation.started_at, + "ended_at": operation.ended_at, + "description": operation.description, + "group": operation.group, + "position": operation.position, + } + for operation in operations + ] + + # this replaces all null values with defaults + await self._session.execute( + insert(Operation).on_conflict_do_nothing(), + data, ) + # if value is still none, keep existing one await self._session.execute( - statement, - [ + update(Operation).values( { - "id": operation.id, - "created_at": extract_timestamp_from_uuid(operation.id), - "run_id": operation.run.id, - "name": operation.name, - "type": OperationType(operation.type) if operation.type else None, - "status": OperationStatus(operation.status), - "started_at": operation.started_at, - "ended_at": operation.ended_at, - "description": operation.description, - "group": operation.group, - "position": operation.position, - } - for operation in operations - ], + "name": func.coalesce(bindparam("name"), Operation.name), + "type": func.coalesce(bindparam("type"), Operation.type), + "status": func.greatest(bindparam("status"), Operation.status), + "started_at": func.coalesce(bindparam("started_at"), Operation.started_at), + "ended_at": func.coalesce(bindparam("ended_at"), Operation.ended_at), + "description": func.coalesce(bindparam("description"), Operation.description), + "group": func.coalesce(bindparam("group"), Operation.group), + "position": func.coalesce(bindparam("position"), Operation.position), + }, + ), + data, ) async def paginate( diff --git a/data_rentgen/dto/operation.py b/data_rentgen/dto/operation.py index 8f95e41a..9dcd8f5f 100644 --- a/data_rentgen/dto/operation.py +++ b/data_rentgen/dto/operation.py @@ -37,8 +37,8 @@ class OperationStatusDTO(IntEnum): class OperationDTO: id: UUID run: RunDTO - name: str - type: OperationTypeDTO = OperationTypeDTO.BATCH + name: str | None = None + type: OperationTypeDTO | None = None position: int | None = None group: str | None = None description: str | None = None diff --git a/docs/changelog/next_release/210.feature.rst b/docs/changelog/next_release/210.feature.rst new file mode 100644 index 00000000..aab83cfc --- /dev/null +++ b/docs/changelog/next_release/210.feature.rst @@ -0,0 +1 @@ +Parse lineage from Airflow tasks \ No newline at end of file diff --git a/tests/test_consumer/test_extractors/fixtures/airflow_dto.py b/tests/test_consumer/test_extractors/fixtures/airflow_dto.py index 5c46e437..e02e9508 100644 --- a/tests/test_consumer/test_extractors/fixtures/airflow_dto.py +++ b/tests/test_consumer/test_extractors/fixtures/airflow_dto.py @@ -4,11 +4,19 @@ import pytest from data_rentgen.dto import ( + DatasetDTO, + InputDTO, JobDTO, LocationDTO, + OperationDTO, + OperationStatusDTO, + OperationTypeDTO, + OutputDTO, + OutputTypeDTO, RunDTO, RunStartReasonDTO, RunStatusDTO, + SchemaDTO, UserDTO, ) from data_rentgen.dto.job_type import JobTypeDTO @@ -86,3 +94,48 @@ def extracted_airflow_task_run( "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask" ), ) + + +@pytest.fixture +def extracted_airflow_task_operation( + extracted_airflow_task_run: RunDTO, +) -> OperationDTO: + return OperationDTO( + id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"), + name="mytask", + description="BashOperator", + run=extracted_airflow_task_run, + status=OperationStatusDTO.SUCCEEDED, + type=OperationTypeDTO.BATCH, + started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ) + + +@pytest.fixture +def extracted_airflow_postgres_input( + extracted_airflow_task_operation: OperationDTO, + extracted_postgres_dataset: DatasetDTO, + extracted_dataset_schema: SchemaDTO, +) -> InputDTO: + return InputDTO( + operation=extracted_airflow_task_operation, + dataset=extracted_postgres_dataset, + schema=extracted_dataset_schema, + ) + + +@pytest.fixture +def extracted_airflow_hive_output( + extracted_airflow_task_operation: OperationDTO, + extracted_hive_dataset: DatasetDTO, + extracted_dataset_schema: SchemaDTO, +) -> OutputDTO: + return OutputDTO( + type=OutputTypeDTO.CREATE, + operation=extracted_airflow_task_operation, + dataset=extracted_hive_dataset, + schema=extracted_dataset_schema, + num_rows=1_000_000, + num_bytes=1000 * 1024 * 1024, + ) diff --git a/tests/test_consumer/test_extractors/fixtures/airflow_raw.py b/tests/test_consumer/test_extractors/fixtures/airflow_raw.py index fad6c617..5d8a3d59 100644 --- a/tests/test_consumer/test_extractors/fixtures/airflow_raw.py +++ b/tests/test_consumer/test_extractors/fixtures/airflow_raw.py @@ -136,6 +136,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index 7c39ec58..befd1d9c 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -1,13 +1,21 @@ import pytest from data_rentgen.consumer.extractors import BatchExtractor +from data_rentgen.consumer.openlineage.dataset import OpenLineageInputDataset, OpenLineageOutputDataset from data_rentgen.consumer.openlineage.run_event import ( OpenLineageRunEvent, ) from data_rentgen.dto import ( + DatasetDTO, + DatasetSymlinkDTO, + InputDTO, JobDTO, LocationDTO, + OperationDTO, + OutputDTO, RunDTO, + SchemaDTO, + UserDTO, ) @@ -25,7 +33,7 @@ ), ], ) -def test_extractors_extract_batch_airflow( +def test_extractors_extract_batch_airflow_without_lineage( airflow_dag_run_event_start: OpenLineageRunEvent, airflow_dag_run_event_stop: OpenLineageRunEvent, airflow_task_run_event_start: OpenLineageRunEvent, @@ -35,6 +43,7 @@ def test_extractors_extract_batch_airflow( extracted_airflow_task_job: JobDTO, extracted_airflow_dag_run: RunDTO, extracted_airflow_task_run: RunDTO, + extracted_airflow_task_operation: OperationDTO, input_transformation, ): events = [ @@ -52,10 +61,100 @@ def test_extractors_extract_batch_airflow( extracted_airflow_dag_run, extracted_airflow_task_run, ] + assert extracted.operations() == [extracted_airflow_task_operation] assert not extracted.datasets() assert not extracted.dataset_symlinks() assert not extracted.schemas() - assert not extracted.operations() assert not extracted.inputs() assert not extracted.outputs() + + +@pytest.mark.parametrize( + "input_transformation", + [ + # receiving data out of order does not change result + pytest.param( + list, + id="preserve order", + ), + pytest.param( + reversed, + id="reverse order", + ), + ], +) +def test_extractors_extract_batch_airflow_with_lineage( + airflow_dag_run_event_start: OpenLineageRunEvent, + airflow_dag_run_event_stop: OpenLineageRunEvent, + airflow_task_run_event_start: OpenLineageRunEvent, + airflow_task_run_event_stop: OpenLineageRunEvent, + postgres_input: OpenLineageInputDataset, + hdfs_output: OpenLineageOutputDataset, + hdfs_output_with_stats: OpenLineageOutputDataset, + extracted_airflow_dag_job: JobDTO, + extracted_airflow_task_job: JobDTO, + extracted_airflow_dag_run: RunDTO, + extracted_airflow_task_run: RunDTO, + extracted_airflow_task_operation: OperationDTO, + extracted_airflow_location: LocationDTO, + extracted_postgres_location: LocationDTO, + extracted_hdfs_location: LocationDTO, + extracted_hive_location: LocationDTO, + extracted_postgres_dataset: DatasetDTO, + extracted_hdfs_dataset: DatasetDTO, + extracted_hive_dataset: DatasetDTO, + extracted_hdfs_dataset_symlink: DatasetSymlinkDTO, + extracted_hive_dataset_symlink: DatasetSymlinkDTO, + extracted_dataset_schema: SchemaDTO, + extracted_user: UserDTO, + extracted_airflow_postgres_input: InputDTO, + extracted_airflow_hive_output: OutputDTO, + input_transformation, +): + events = [ + airflow_dag_run_event_start, + airflow_task_run_event_start.model_copy( + update={ + "inputs": [postgres_input], + "outputs": [hdfs_output], + }, + ), + airflow_task_run_event_stop.model_copy( + update={ + "inputs": [postgres_input], + "outputs": [hdfs_output_with_stats], + }, + ), + airflow_dag_run_event_stop, + ] + + extracted = BatchExtractor().add_events(input_transformation(events)) + + assert extracted.locations() == [ + extracted_airflow_location, + extracted_postgres_location, + extracted_hive_location, + extracted_hdfs_location, + ] + + assert extracted.jobs() == [extracted_airflow_dag_job, extracted_airflow_task_job] + assert extracted.users() == [extracted_user] + assert extracted.runs() == [extracted_airflow_dag_run, extracted_airflow_task_run] + assert extracted.operations() == [extracted_airflow_task_operation] + + assert extracted.datasets() == [ + extracted_postgres_dataset, + extracted_hive_dataset, + extracted_hdfs_dataset, + ] + + assert extracted.dataset_symlinks() == [ + extracted_hdfs_dataset_symlink, + extracted_hive_dataset_symlink, + ] + + # Both input & output schemas are the same + assert extracted.schemas() == [extracted_dataset_schema] + assert extracted.inputs() == [extracted_airflow_postgres_input] + assert extracted.outputs() == [extracted_airflow_hive_output] diff --git a/tests/test_consumer/test_extractors/test_extractors_operation_airflow.py b/tests/test_consumer/test_extractors/test_extractors_operation_airflow.py new file mode 100644 index 00000000..422e4afd --- /dev/null +++ b/tests/test_consumer/test_extractors/test_extractors_operation_airflow.py @@ -0,0 +1,266 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +from uuid6 import UUID + +from data_rentgen.consumer.extractors import extract_operation +from data_rentgen.consumer.openlineage.job import OpenLineageJob +from data_rentgen.consumer.openlineage.job_facets import ( + OpenLineageJobFacets, + OpenLineageJobProcessingType, + OpenLineageJobTypeJobFacet, +) +from data_rentgen.consumer.openlineage.run import OpenLineageRun +from data_rentgen.consumer.openlineage.run_event import ( + OpenLineageRunEvent, + OpenLineageRunEventType, +) +from data_rentgen.consumer.openlineage.run_facets import ( + OpenLineageRunFacets, +) +from data_rentgen.consumer.openlineage.run_facets.airflow import ( + OpenLineageAirflowDagInfo, + OpenLineageAirflowDagRunInfo, + OpenLineageAirflowDagRunType, + OpenLineageAirflowTaskGroupInfo, + OpenLineageAirflowTaskInfo, + OpenLineageAirflowTaskInstanceInfo, + OpenLineageAirflowTaskRunFacet, +) +from data_rentgen.dto import OperationDTO, OperationStatusDTO +from data_rentgen.dto.job import JobDTO +from data_rentgen.dto.job_type import JobTypeDTO +from data_rentgen.dto.location import LocationDTO +from data_rentgen.dto.operation import OperationTypeDTO +from data_rentgen.dto.run import RunDTO, RunStartReasonDTO, RunStatusDTO + + +def test_extractors_extract_operation_airflow(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + operation = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.BATCH, + integration="AIRFLOW", + jobType="TASK", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + operator_class="BashOperator", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_operation(operation) == OperationDTO( + id=run_id, + run=RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ), + name="mytask", + type=OperationTypeDTO.BATCH, + description="BashOperator", + group=None, + position=None, + status=OperationStatusDTO.SUCCEEDED, + started_at=None, + ended_at=now, + ) + + +def test_extractors_extract_operation_airflow_task_group(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + operation = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.BATCH, + integration="AIRFLOW", + jobType="TASK", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + operator_class="BashOperator", + task_group=OpenLineageAirflowTaskGroupInfo(group_id="mygroup"), + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_operation(operation) == OperationDTO( + id=run_id, + run=RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ), + name="mytask", + type=OperationTypeDTO.BATCH, + description="BashOperator", + group="mygroup", + position=None, + status=OperationStatusDTO.SUCCEEDED, + started_at=None, + ended_at=now, + ) + + +def test_extractors_extract_operation_airflow_map_index(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + operation = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask_10", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.BATCH, + integration="AIRFLOW", + jobType="TASK", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + operator_class="BashOperator", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + map_index=10, + ), + ), + ), + ), + ) + + assert extract_operation(operation) == OperationDTO( + id=run_id, + run=RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask_10", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ), + name="mytask", + type=OperationTypeDTO.BATCH, + description="BashOperator", + group=None, + position=10, + status=OperationStatusDTO.SUCCEEDED, + started_at=None, + ended_at=now, + ) diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py similarity index 67% rename from tests/test_consumer/test_extractors/test_extractors_run.py rename to tests/test_consumer/test_extractors/test_extractors_run_airflow.py index b68d3963..f286ffd2 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py @@ -28,8 +28,6 @@ OpenLineageProcessingEngineName, OpenLineageProcessingEngineRunFacet, OpenLineageRunFacets, - OpenLineageSparkApplicationDetailsRunFacet, - OpenLineageSparkDeployMode, ) from data_rentgen.consumer.openlineage.run_facets.airflow import ( OpenLineageAirflowDagRunFacet, @@ -45,111 +43,7 @@ from data_rentgen.dto.user import UserDTO -def test_extractors_extract_run_spark_app_yarn(): - now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") - run = OpenLineageRunEvent( - eventType=OpenLineageRunEventType.START, - eventTime=now, - job=OpenLineageJob( - namespace="yarn://cluster", - name="myjob", - facets=OpenLineageJobFacets( - jobType=OpenLineageJobTypeJobFacet( - processingType=OpenLineageJobProcessingType.NONE, - integration="SPARK", - jobType="APPLICATION", - ), - ), - ), - run=OpenLineageRun( - runId=run_id, - facets=OpenLineageRunFacets( - spark_applicationDetails=OpenLineageSparkApplicationDetailsRunFacet( - master="yarn", - appName="myapp", - applicationId="application_1234_5678", - deployMode=OpenLineageSparkDeployMode.CLIENT, - driverHost="localhost", - userName="myuser", - uiWebUrl="http://localhost:4040", - proxyUrl="http://yarn-proxy:8088/proxy/application_1234_5678,http://yarn-proxy:18088/proxy/application_1234_5678", - historyUrl="http://history-server:18080/history/application_1234_5678,http://history-server:18081/history/application_1234_5678", - ), - ), - ), - ) - assert extract_run(run) == RunDTO( - id=run_id, - job=JobDTO( - name="myjob", - location=LocationDTO(type="yarn", name="cluster", addresses={"yarn://cluster"}), - type=JobTypeDTO(type="SPARK_APPLICATION"), - ), - status=RunStatusDTO.STARTED, - started_at=now, - start_reason=None, - user=UserDTO(name="myuser"), - ended_at=None, - external_id="application_1234_5678", - attempt=None, - persistent_log_url="http://history-server:18080/history/application_1234_5678", - running_log_url="http://yarn-proxy:8088/proxy/application_1234_5678", - ) - - -def test_extractors_extract_run_spark_app_local(): - now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") - run = OpenLineageRunEvent( - eventType=OpenLineageRunEventType.RUNNING, - eventTime=now, - job=OpenLineageJob( - namespace="host://some.host.com", - name="myjob", - facets=OpenLineageJobFacets( - jobType=OpenLineageJobTypeJobFacet( - processingType=OpenLineageJobProcessingType.NONE, - integration="SPARK", - jobType="APPLICATION", - ), - ), - ), - run=OpenLineageRun( - runId=run_id, - facets=OpenLineageRunFacets( - spark_applicationDetails=OpenLineageSparkApplicationDetailsRunFacet( - master="local[4]", - appName="myapp", - applicationId="local-1234-5678", - deployMode=OpenLineageSparkDeployMode.CLIENT, - driverHost="localhost", - userName="myuser", - uiWebUrl="http://localhost:4040,http://localhost:4041", - ), - ), - ), - ) - - assert extract_run(run) == RunDTO( - id=run_id, - job=JobDTO( - name="myjob", - location=LocationDTO(type="host", name="some.host.com", addresses={"host://some.host.com"}), - type=JobTypeDTO(type="SPARK_APPLICATION"), - ), - status=RunStatusDTO.STARTED, - started_at=None, - start_reason=None, - user=UserDTO(name="myuser"), - external_id="local-1234-5678", - attempt=None, - persistent_log_url=None, - running_log_url="http://localhost:4040", - ) - - -def test_extractors_extract_run_airflow_dag_2_3_plus(): +def test_extractors_extract_run_airflow_dag_log_url_2_3_plus(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") run = OpenLineageRunEvent( @@ -212,7 +106,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus(): ) -def test_extractors_extract_run_airflow_dag_2_x(): +def test_extractors_extract_run_airflow_dag_log_url_2_x(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") run = OpenLineageRunEvent( @@ -275,7 +169,7 @@ def test_extractors_extract_run_airflow_dag_2_x(): ) -def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url(): +def test_extractors_extract_run_airflow_task_log_url_preserve_original(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") run = OpenLineageRunEvent( @@ -310,6 +204,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, @@ -347,7 +242,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url(): ) -def test_extractors_extract_run_airflow_task_2_9_plus(): +def test_extractors_extract_run_airflow_task_log_url_2_9_plus(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") run = OpenLineageRunEvent( @@ -382,6 +277,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, @@ -416,7 +312,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus(): ) -def test_extractors_extract_run_airflow_task_2_x(): +def test_extractors_extract_run_airflow_task_log_url_2_x(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") run = OpenLineageRunEvent( @@ -446,6 +342,7 @@ def test_extractors_extract_run_airflow_task_2_x(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, @@ -480,7 +377,16 @@ def test_extractors_extract_run_airflow_task_2_x(): ) -def test_extractors_extract_run_airflow_dag_check_with_owner(): +@pytest.mark.parametrize( + ["owner", "extracted_user"], + [ + ("myuser", UserDTO(name="myuser")), + (None, None), + ("airflow", None), + ("***", None), + ], +) +def test_extractors_extract_run_airflow_dag_owner(owner: str, extracted_user: UserDTO | None): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") run = OpenLineageRunEvent( @@ -506,7 +412,7 @@ def test_extractors_extract_run_airflow_dag_check_with_owner(): openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner=owner), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -532,7 +438,7 @@ def test_extractors_extract_run_airflow_dag_check_with_owner(): status=RunStatusDTO.SUCCEEDED, started_at=None, start_reason=RunStartReasonDTO.MANUAL, - user=UserDTO(name="myuser"), + user=extracted_user, ended_at=now, external_id="manual__2024-07-05T09:04:13:979349+00:00", attempt=None, @@ -543,7 +449,16 @@ def test_extractors_extract_run_airflow_dag_check_with_owner(): ) -def test_extractors_extract_run_airflow_task_with_owner(): +@pytest.mark.parametrize( + ["owner", "extracted_user"], + [ + ("myuser", UserDTO(name="myuser")), + (None, None), + ("airflow", None), + ("***", None), + ], +) +def test_extractors_extract_run_airflow_task_owner(owner: str, extracted_user: UserDTO | None): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") run = OpenLineageRunEvent( @@ -564,7 +479,7 @@ def test_extractors_extract_run_airflow_task_with_owner(): runId=run_id, facets=OpenLineageRunFacets( airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner=owner), dagRun=OpenLineageAirflowDagRunInfo( run_id="scheduled__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.SCHEDULED, @@ -573,6 +488,7 @@ def test_extractors_extract_run_airflow_task_with_owner(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, @@ -596,7 +512,7 @@ def test_extractors_extract_run_airflow_task_with_owner(): status=RunStatusDTO.SUCCEEDED, started_at=None, start_reason=RunStartReasonDTO.AUTOMATIC, - user=UserDTO(name="myuser"), + user=extracted_user, ended_at=now, external_id="scheduled__2024-07-05T09:04:13:979349+00:00", attempt="1", @@ -607,70 +523,7 @@ def test_extractors_extract_run_airflow_task_with_owner(): ) -def test_extractors_extract_run_airflow_dag_without_owner(): - now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") - run = OpenLineageRunEvent( - eventType=OpenLineageRunEventType.COMPLETE, - eventTime=now, - job=OpenLineageJob( - namespace="http://airflow-host:8081", - name="mydag", - facets=OpenLineageJobFacets( - jobType=OpenLineageJobTypeJobFacet( - processingType=OpenLineageJobProcessingType.BATCH, - integration="AIRFLOW", - jobType="DAG", - ), - ), - ), - run=OpenLineageRun( - runId=run_id, - facets=OpenLineageRunFacets( - processing_engine=OpenLineageProcessingEngineRunFacet( - version=Version("2.1.4"), - name=OpenLineageProcessingEngineName.AIRFLOW, - openlineageAdapterVersion=Version("1.10.0"), - ), - airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), - dagRun=OpenLineageAirflowDagRunInfo( - run_id="manual__2024-07-05T09:04:13:979349+00:00", - run_type=OpenLineageAirflowDagRunType.MANUAL, - data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - ), - ), - ), - ), - ) - - assert extract_run(run) == RunDTO( - id=run_id, - job=JobDTO( - name="mydag", - location=LocationDTO( - type="http", - name="airflow-host:8081", - addresses={"http://airflow-host:8081"}, - ), - type=JobTypeDTO(type="AIRFLOW_DAG"), - ), - status=RunStatusDTO.SUCCEEDED, - started_at=None, - start_reason=RunStartReasonDTO.MANUAL, - user=None, - ended_at=now, - external_id="manual__2024-07-05T09:04:13:979349+00:00", - attempt=None, - persistent_log_url=( - "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" - ), - running_log_url=None, - ) - - -def test_extractors_extract_run_airflow_task_without_owner(): +def test_extractors_extract_run_airflow_task_map_index(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") run = OpenLineageRunEvent( @@ -678,7 +531,7 @@ def test_extractors_extract_run_airflow_task_without_owner(): eventTime=now, job=OpenLineageJob( namespace="http://airflow-host:8081", - name="mydag.mytask", + name="mydag.mytask_10", facets=OpenLineageJobFacets( jobType=OpenLineageJobTypeJobFacet( processingType=OpenLineageJobProcessingType.BATCH, @@ -691,7 +544,7 @@ def test_extractors_extract_run_airflow_task_without_owner(): runId=run_id, facets=OpenLineageRunFacets( airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="scheduled__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.SCHEDULED, @@ -700,9 +553,11 @@ def test_extractors_extract_run_airflow_task_without_owner(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="BashOperator", ), taskInstance=OpenLineageAirflowTaskInstanceInfo( try_number=1, + map_index=10, ), ), ), @@ -712,7 +567,7 @@ def test_extractors_extract_run_airflow_task_without_owner(): assert extract_run(run) == RunDTO( id=run_id, job=JobDTO( - name="mydag.mytask", + name="mydag.mytask_10", location=LocationDTO( type="http", name="airflow-host:8081", @@ -732,110 +587,3 @@ def test_extractors_extract_run_airflow_task_without_owner(): ), running_log_url=None, ) - - -@pytest.mark.parametrize( - ["raw_job_type", "extracted_job_type"], - [ - (None, None), - ( - OpenLineageJobTypeJobFacet( - processingType=OpenLineageJobProcessingType.NONE, - integration="ABC", - ), - JobTypeDTO(type="ABC"), - ), - ( - OpenLineageJobTypeJobFacet( - processingType=OpenLineageJobProcessingType.NONE, - integration="ABC", - jobType="CDE", - ), - JobTypeDTO(type="ABC_CDE"), - ), - ], -) -def test_extractors_extract_run_unknown( - raw_job_type: OpenLineageJobTypeJobFacet | None, - extracted_job_type: JobTypeDTO | None, -): - now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") - run = OpenLineageRunEvent( - eventType=OpenLineageRunEventType.COMPLETE, - eventTime=now, - job=OpenLineageJob( - namespace="something", - name="myjob", - facets=OpenLineageJobFacets( - jobType=raw_job_type, - ), - ), - run=OpenLineageRun(runId=run_id), - ) - - assert extract_run(run) == RunDTO( - id=run_id, - job=JobDTO( - name="myjob", - type=extracted_job_type, - location=LocationDTO( - type="unknown", - name="something", - addresses={"unknown://something"}, - ), - ), - status=RunStatusDTO.SUCCEEDED, - started_at=None, - start_reason=None, - user=None, - ended_at=now, - external_id=None, - attempt=None, - persistent_log_url=None, - running_log_url=None, - ) - - -@pytest.mark.parametrize( - ["event_type", "expected_status"], - [ - (OpenLineageRunEventType.FAIL, RunStatusDTO.FAILED), - (OpenLineageRunEventType.ABORT, RunStatusDTO.KILLED), - (OpenLineageRunEventType.OTHER, RunStatusDTO.UNKNOWN), - ], -) -def test_extractors_extract_run_with_status( - event_type: OpenLineageRunEventType, - expected_status: RunStatusDTO, -): - now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") - run = OpenLineageRunEvent( - eventType=event_type, - eventTime=now, - job=OpenLineageJob(namespace="something", name="myjob"), - run=OpenLineageRun(runId=run_id), - ) - - ended_at = now if expected_status != RunStatusDTO.UNKNOWN else None - assert extract_run(run) == RunDTO( - id=run_id, - job=JobDTO( - name="myjob", - location=LocationDTO( - type="unknown", - name="something", - addresses={"unknown://something"}, - ), - ), - status=expected_status, - started_at=None, - start_reason=None, - user=None, - ended_at=ended_at, - external_id=None, - attempt=None, - persistent_log_url=None, - running_log_url=None, - ) diff --git a/tests/test_consumer/test_extractors/test_extractors_run_spark.py b/tests/test_consumer/test_extractors/test_extractors_run_spark.py new file mode 100644 index 00000000..dc28114d --- /dev/null +++ b/tests/test_consumer/test_extractors/test_extractors_run_spark.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +from uuid6 import UUID + +from data_rentgen.consumer.extractors import extract_run +from data_rentgen.consumer.openlineage.job import OpenLineageJob +from data_rentgen.consumer.openlineage.job_facets import ( + OpenLineageJobFacets, + OpenLineageJobProcessingType, + OpenLineageJobTypeJobFacet, +) +from data_rentgen.consumer.openlineage.run import OpenLineageRun +from data_rentgen.consumer.openlineage.run_event import ( + OpenLineageRunEvent, + OpenLineageRunEventType, +) +from data_rentgen.consumer.openlineage.run_facets import ( + OpenLineageRunFacets, + OpenLineageSparkApplicationDetailsRunFacet, + OpenLineageSparkDeployMode, +) +from data_rentgen.dto import ( + JobDTO, + JobTypeDTO, + LocationDTO, + RunDTO, + RunStatusDTO, +) +from data_rentgen.dto.user import UserDTO + + +def test_extractors_extract_run_spark_app_yarn(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.START, + eventTime=now, + job=OpenLineageJob( + namespace="yarn://cluster", + name="myjob", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.NONE, + integration="SPARK", + jobType="APPLICATION", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + spark_applicationDetails=OpenLineageSparkApplicationDetailsRunFacet( + master="yarn", + appName="myapp", + applicationId="application_1234_5678", + deployMode=OpenLineageSparkDeployMode.CLIENT, + driverHost="localhost", + userName="myuser", + uiWebUrl="http://localhost:4040", + proxyUrl="http://yarn-proxy:8088/proxy/application_1234_5678,http://yarn-proxy:18088/proxy/application_1234_5678", + historyUrl="http://history-server:18080/history/application_1234_5678,http://history-server:18081/history/application_1234_5678", + ), + ), + ), + ) + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="myjob", + location=LocationDTO(type="yarn", name="cluster", addresses={"yarn://cluster"}), + type=JobTypeDTO(type="SPARK_APPLICATION"), + ), + status=RunStatusDTO.STARTED, + started_at=now, + start_reason=None, + user=UserDTO(name="myuser"), + ended_at=None, + external_id="application_1234_5678", + attempt=None, + persistent_log_url="http://history-server:18080/history/application_1234_5678", + running_log_url="http://yarn-proxy:8088/proxy/application_1234_5678", + ) + + +def test_extractors_extract_run_spark_app_local(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.RUNNING, + eventTime=now, + job=OpenLineageJob( + namespace="host://some.host.com", + name="myjob", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.NONE, + integration="SPARK", + jobType="APPLICATION", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + spark_applicationDetails=OpenLineageSparkApplicationDetailsRunFacet( + master="local[4]", + appName="myapp", + applicationId="local-1234-5678", + deployMode=OpenLineageSparkDeployMode.CLIENT, + driverHost="localhost", + userName="myuser", + uiWebUrl="http://localhost:4040,http://localhost:4041", + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="myjob", + location=LocationDTO(type="host", name="some.host.com", addresses={"host://some.host.com"}), + type=JobTypeDTO(type="SPARK_APPLICATION"), + ), + status=RunStatusDTO.STARTED, + started_at=None, + start_reason=None, + user=UserDTO(name="myuser"), + external_id="local-1234-5678", + attempt=None, + persistent_log_url=None, + running_log_url="http://localhost:4040", + ) diff --git a/tests/test_consumer/test_extractors/test_extractors_run_unknown.py b/tests/test_consumer/test_extractors/test_extractors_run_unknown.py new file mode 100644 index 00000000..e01fbb2e --- /dev/null +++ b/tests/test_consumer/test_extractors/test_extractors_run_unknown.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest +from uuid6 import UUID + +from data_rentgen.consumer.extractors import extract_run +from data_rentgen.consumer.openlineage.job import OpenLineageJob +from data_rentgen.consumer.openlineage.job_facets import ( + OpenLineageJobFacets, + OpenLineageJobProcessingType, + OpenLineageJobTypeJobFacet, +) +from data_rentgen.consumer.openlineage.run import OpenLineageRun +from data_rentgen.consumer.openlineage.run_event import ( + OpenLineageRunEvent, + OpenLineageRunEventType, +) +from data_rentgen.dto import ( + JobDTO, + JobTypeDTO, + LocationDTO, + RunDTO, + RunStatusDTO, +) + + +@pytest.mark.parametrize( + ["raw_job_type", "extracted_job_type"], + [ + (None, None), + ( + OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.NONE, + integration="ABC", + ), + JobTypeDTO(type="ABC"), + ), + ( + OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.NONE, + integration="ABC", + jobType="CDE", + ), + JobTypeDTO(type="ABC_CDE"), + ), + ], +) +def test_extractors_extract_run_unknown( + raw_job_type: OpenLineageJobTypeJobFacet | None, + extracted_job_type: JobTypeDTO | None, +): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="something", + name="myjob", + facets=OpenLineageJobFacets( + jobType=raw_job_type, + ), + ), + run=OpenLineageRun(runId=run_id), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="myjob", + type=extracted_job_type, + location=LocationDTO( + type="unknown", + name="something", + addresses={"unknown://something"}, + ), + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=None, + user=None, + ended_at=now, + external_id=None, + attempt=None, + persistent_log_url=None, + running_log_url=None, + ) + + +@pytest.mark.parametrize( + ["event_type", "expected_status"], + [ + (OpenLineageRunEventType.FAIL, RunStatusDTO.FAILED), + (OpenLineageRunEventType.ABORT, RunStatusDTO.KILLED), + (OpenLineageRunEventType.OTHER, RunStatusDTO.UNKNOWN), + ], +) +def test_extractors_extract_run_with_status( + event_type: OpenLineageRunEventType, + expected_status: RunStatusDTO, +): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") + run = OpenLineageRunEvent( + eventType=event_type, + eventTime=now, + job=OpenLineageJob(namespace="something", name="myjob"), + run=OpenLineageRun(runId=run_id), + ) + + ended_at = now if expected_status != RunStatusDTO.UNKNOWN else None + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="myjob", + location=LocationDTO( + type="unknown", + name="something", + addresses={"unknown://something"}, + ), + ), + status=expected_status, + started_at=None, + start_reason=None, + user=None, + ended_at=ended_at, + external_id=None, + attempt=None, + persistent_log_url=None, + running_log_url=None, + ) diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py index f44cdd7f..5e71f17d 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py @@ -13,6 +13,8 @@ Job, Location, Operation, + OperationStatus, + OperationType, Run, RunStartReason, RunStatus, @@ -115,4 +117,16 @@ async def test_runs_handler_airflow( operation_query = select(Operation) operation_scalars = await async_session.scalars(operation_query) operations = operation_scalars.all() - assert not operations + + task_operation = operations[0] + assert task_operation.id == UUID("01908223-0782-7fc0-9d69-b1df9dac2c60") # same id and created_at + assert task_operation.created_at == datetime(2024, 7, 5, 9, 4, 12, 162000, tzinfo=timezone.utc) + assert task_operation.run_id == task_run.id + assert task_operation.name == "mytask" + assert task_operation.type == OperationType.BATCH + assert task_operation.status == OperationStatus.SUCCEEDED + assert task_operation.started_at == datetime(2024, 7, 5, 9, 4, 20, 783845, tzinfo=timezone.utc) + assert task_operation.ended_at == datetime(2024, 7, 5, 9, 7, 37, 858423, tzinfo=timezone.utc) + assert task_operation.description == "SSHOperator" + assert task_operation.position is None + assert task_operation.group is None diff --git a/tests/test_consumer/test_openlineage/test_run_event_airflow.py b/tests/test_consumer/test_openlineage/test_run_event_airflow.py index 06c12ffb..d37f304b 100644 --- a/tests/test_consumer/test_openlineage/test_run_event_airflow.py +++ b/tests/test_consumer/test_openlineage/test_run_event_airflow.py @@ -415,6 +415,7 @@ def test_run_event_airflow_task_start(): ), task=OpenLineageAirflowTaskInfo( task_id="mytask", + operator_class="SSHOperator", ), ), # unknown facets are ignored