Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions data_rentgen/consumer/extractors/batch_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ def add_events(self, events: list[OpenLineageRunEvent]) -> BatchExtractionResult
return self.result

def is_operation(self, event: OpenLineageRunEvent) -> bool:
has_lineage = bool(event.inputs or event.outputs)

job_type_facet = event.job.facets.jobType
if not job_type_facet:
return False
return has_lineage

if job_type_facet.integration == "SPARK":
return job_type_facet.jobType != "APPLICATION"

if job_type_facet.integration == "AIRFLOW":
return False

return False
return has_lineage

def extract_run(self, event: OpenLineageRunEvent) -> None:
run = extract_run(event)
Expand Down
13 changes: 9 additions & 4 deletions data_rentgen/consumer/extractors/operation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.extractors.run import extract_parent_run
from data_rentgen.consumer.extractors.run import extract_parent_run, extract_run
from data_rentgen.consumer.openlineage.run_event import (
OpenLineageRunEvent,
OpenLineageRunEventType,
Expand All @@ -10,13 +10,15 @@


def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
# operation always has parent
run = extract_parent_run(event.run.facets.parent) # type: ignore[arg-type]
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
run = extract_parent_run(event.run.facets.parent)
else:
run = extract_run(event)

# in some cases, operation name may contain raw SELECT query with newlines
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
# remove parent job name from operation name
if operation_name.startswith(run.job.name):
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
prefix = len(run.job.name) + 1
operation_name = operation_name[prefix:]

Expand All @@ -29,6 +31,9 @@ def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
run=run,
name=operation_name,
type=type_,
status=OperationStatusDTO(run.status),
started_at=run.started_at,
ended_at=run.ended_at,
)
enrich_operation_status(operation, event)
enrich_operation_description(operation, event)
Expand Down
12 changes: 10 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@
"tests.test_database.fixtures.alembic",
"tests.test_consumer.fixtures.consumer_app_settings",
"tests.test_consumer.fixtures.test_broker",
"tests.test_consumer.test_extractors.fixtures.extracted_dto",
"tests.test_consumer.test_extractors.fixtures.column_lineage_facets",
"tests.test_consumer.test_extractors.fixtures.io_raw",
"tests.test_consumer.test_extractors.fixtures.io_dto",
"tests.test_consumer.test_extractors.fixtures.spark_raw",
"tests.test_consumer.test_extractors.fixtures.spark_dto",
"tests.test_consumer.test_extractors.fixtures.airflow_raw",
"tests.test_consumer.test_extractors.fixtures.airflow_dto",
"tests.test_consumer.test_extractors.fixtures.unknown_raw",
"tests.test_consumer.test_extractors.fixtures.unknown_dto",
"tests.test_consumer.test_extractors.fixtures.column_lineage_raw",
"tests.test_server.fixtures.server_app_settings",
"tests.test_server.fixtures.test_server_app",
"tests.test_server.fixtures.test_client",
"tests.test_server.fixtures.keycloak",
"tests.test_server.fixtures.factories.address",
"tests.test_server.fixtures.factories.dataset",
"tests.test_server.fixtures.factories.job_type",
"tests.test_server.fixtures.factories.job",
"tests.test_server.fixtures.factories.lineage",
"tests.test_server.fixtures.factories.location",
Expand Down
10 changes: 5 additions & 5 deletions tests/test_consumer/resources/events_spark.jsonl

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions tests/test_consumer/resources/events_unknown.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"eventTime": "2024-07-05T09:06:29.462Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "START", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [], "outputs": []}
{"eventTime": "2024-07-05T09:07:09.849Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "RUNNING", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "hive://test-hadoop:9083", "uri": "hive://test-hadoop:9083"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp", "description": "Business date"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}}, "inputFacets": {}}], "outputs": [{"namespace": "clickhouse://localhost:8123", "name": "mydb.myschema.mytable", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "clickhouse://localhost:8123", "uri": "clickhouse://localhost:8123"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}, "columnLineage": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", "fields": {"dt": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "dt", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "customer_id": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "total_spent": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "total_spent", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}}, "dataset": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "INDIRECT", "subtype": "JOIN", "description": "ON (DISCOUNTS.CUSTOMERS_ID=CUSTOMERS.ID)", "masking": false}]}]}, "lifecycleStateChange": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", "lifecycleStateChange": "OVERWRITE"}}}]}
{"eventTime": "2024-07-05T09:07:15.642Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "hive://test-hadoop:9083", "uri": "hive://test-hadoop:9083"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp", "description": "Business date"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}}, "inputFacets": {}}], "outputs": [{"namespace": "clickhouse://localhost:8123", "name": "mydb.myschema.mytable", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "clickhouse://localhost:8123", "uri": "clickhouse://localhost:8123"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}, "columnLineage": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", "fields": {"dt": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "dt", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "customer_id": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "total_spent": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "total_spent", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}}}, "lifecycleStateChange": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", "lifecycleStateChange": "OVERWRITE"}}, "outputFacets": {"outputStatistics": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet", "rowCount": 10000, "size": 5000000}}}]}
88 changes: 88 additions & 0 deletions tests/test_consumer/test_extractors/fixtures/airflow_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from datetime import datetime, timezone
from uuid import UUID

import pytest

from data_rentgen.dto import (
JobDTO,
LocationDTO,
RunDTO,
RunStartReasonDTO,
RunStatusDTO,
UserDTO,
)
from data_rentgen.dto.job_type import JobTypeDTO


@pytest.fixture
def extracted_airflow_location() -> LocationDTO:
return LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
)


@pytest.fixture
def extracted_airflow_dag_job(
extracted_airflow_location: LocationDTO,
) -> JobDTO:
return JobDTO(
name="mydag",
location=extracted_airflow_location,
type=JobTypeDTO(type="AIRFLOW_DAG"),
)


@pytest.fixture
def extracted_airflow_task_job(
extracted_airflow_location: LocationDTO,
) -> JobDTO:
return JobDTO(
name="mydag.mytask",
location=extracted_airflow_location,
type=JobTypeDTO(type="AIRFLOW_TASK"),
)


@pytest.fixture
def extracted_airflow_dag_run(
extracted_airflow_dag_job: JobDTO,
) -> RunDTO:
return RunDTO(
id=UUID("01908223-0782-79b8-9495-b1c38aaee839"),
job=extracted_airflow_dag_job,
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
)


@pytest.fixture
def extracted_airflow_task_run(
extracted_airflow_task_job: JobDTO,
) -> RunDTO:
return RunDTO(
id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"),
job=extracted_airflow_task_job,
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
),
)
Loading